您好,登录后才能下订单哦!
密码登录
            
            
            
            
        登录注册
            
            
            
        点击 登录注册 即表示同意《亿速云用户服务条款》
        如何分析Python全栈中的队列,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。
知识点:
lock.acquire()# 上锁 lock.release()# 解锁 #同一时间允许一个进程上一把锁 就是Lock 加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行的修改,没错,速度是慢了,但牺牲速度却保证了数据安全。 #同一时间允许多个进程上多把锁 就是[信号量Semaphore] 信号量是锁的变形: 实际实现是 计数器 + 锁,同时允许多个进程上锁 # 互斥锁Lock : 互斥锁就是进程的互相排斥,谁先抢到资源,谁就上锁改资源内容,为了保证数据的同步性 # 注意:多个锁一起上,不开锁,会造成死锁.上锁和解锁是一对.
程序实现:
# ### 锁 lock 互斥锁
from multiprocessing import Process,Lock
""" 上锁和解锁是一对, 连续上锁不解锁是死锁 ,只有在解锁的状态下,其他进程才有机会上锁 """
"""
# 创建一把锁
lock = Lock()
# 上锁
lock.acquire()
# lock.acquire() # 连续上锁,造成了死锁现象;
print("我在袅袅炊烟 ..  你在焦急等待 ... 厕所进行时 ... ")
# 解锁
lock.release()
"""
# ### 12306 抢票软件
import json,time,random
# 1.读写数据库当中的票数
def wr_info(sign , dic=None):
	if sign == "r":
		with open("ticket",mode="r",encoding="utf-8") as fp:
			dic = json.load(fp)
		return dic
	elif sign == "w":
		with open("ticket",mode="w",encoding="utf-8") as fp:
			json.dump(dic,fp)
# dic = wr_info("w",dic={"count":0})
# print(dic , type(dic) )
# 2.执行抢票的方法
def get_ticket(person):
	# 先获取数据库中实际票数
	dic = wr_info("r")
	# 模拟一下网络延迟
	time.sleep(random.uniform(0.1,0.7))
	# 判断票数
	if dic["count"] > 0:
		print("{}抢到票了".format(person))
		# 抢到票后,让当前票数减1
		dic["count"] -= 1
		# 更新数据库中的票数
		wr_info("w",dic)
	else:
		print("{}没有抢到票哦".format(person))
# 3.对抢票和读写票数做一个统一的调用
def main(person,lock):
	# 查看剩余票数
	dic = wr_info("r")
	print("{}查看票数剩余: {}".format(person,dic["count"]))
	# 上锁
	lock.acquire()
	# 开始抢票
	get_ticket(person)
	# 解锁 
	lock.release()
if __name__ == "__main__":
	lock = Lock()
	lst = ["梁新宇","康裕康","张保张","于朝志","薛宇健","韩瑞瑞","假摔先","刘子涛","黎明辉","赵凤勇"]
	for i in lst:
		p = Process(    target=main,args=(  i  , lock  )   )
		p.start()
"""
创建进程,开始抢票是异步并发程序
直到开始抢票的时候,变成同步程序,
先抢到锁资源的先执行,后抢到锁资源的后执行;
按照顺序依次执行;是同步程序;
抢票的时候,变成同步程序,好处是可以等到数据修改完成之后,在让下一个人抢,保证数据不乱。
如果不上锁的话,只剩一张票的时候,那么所有的人都能抢到票,因为程序执行的速度太快,所以接近同步进程,导致数据也不对。
"""	ticket文件
{"count": 0}
# ### 信号量 Semaphore 本质上就是锁,只不过是多个进程上多把锁,可以控制上锁的数量
"""Semaphore = lock + 数量 """
from multiprocessing import Semaphore , Process
import time , random
"""
	# 同一时间允许多个进程上5把锁
	sem = Semaphore(5)
	#上锁
	sem.acquire()
	print("执行操作 ... ")
	#解锁
	sem.release()
"""
def singsong_ktv(person,sem):
	# 上锁
	sem.acquire()
	print("{}进入了唱吧ktv , 正在唱歌 ~".format(person))
	# 唱一段时间
	time.sleep( random.randrange(4,8) ) # 4 5 6 7
	print("{}离开了唱吧ktv , 唱完了 ... ".format(person))
	# 解锁
	sem.release()
if __name__ == "__main__":
	sem = Semaphore(5)
	lst = ["赵凤勇" , "沈思雨", "赵万里" , "张宇" , "假率先" , "孙杰龙" , "陈璐" , "王雨涵" , "杨元涛" , "刘一凤"   ]
	for i  in lst:
		p = Process(target=singsong_ktv , args = (i , sem)		)
		p.start()
"""
# 总结: Semaphore 可以设置上锁的数量 , 同一时间上多把锁
创建进程时,是异步并发,执行任务时,是同步程序;
"""
# 赵万里进入了唱吧ktv , 正在唱歌 ~
# 赵凤勇进入了唱吧ktv , 正在唱歌 ~
# 张宇进入了唱吧ktv , 正在唱歌 ~
# 沈思雨进入了唱吧ktv , 正在唱歌 ~
# 孙杰龙进入了唱吧ktv , 正在唱歌 ~# ### 事件 (Event)
"""
# 阻塞事件 :
	e = Event()生成事件对象e   
	e.wait()动态给程序加阻塞 , 程序当中是否加阻塞完全取决于该对象中的is_set() [默认返回值是False]
    # 如果是True  不加阻塞
    # 如果是False 加阻塞
# 控制这个属性的值
    # set()方法     将这个属性的值改成True
    # clear()方法   将这个属性的值改成False
    # is_set()方法  判断当前的属性是否为True  (默认上来是False)
"""
from multiprocessing import Process,Event
import time , random
# 1
'''
e = Event()
# 默认属性值是False.
print(e.is_set()) 
# 判断内部成员属性是否是False 
e.wait()
# 如果是False , 代码程序阻塞
print(" 代码执行中 ...  ")
'''
# 2
'''
e = Event()
# 将这个属性的值改成True
e.set()
# 判断内部成员属性是否是True
e.wait()
# 如果是True , 代码程序不阻塞
print(" 代码执行中 ...  ")
# 将这个属性的值改成False
e.clear()
e.wait()
print(" 代码执行中 .... 2")
'''
# 3
"""
e = Event()
# wait(3) 代表最多等待3秒;
e.wait(3)
print(" 代码执行中 .... 3")
"""
# ### 模拟经典红绿灯效果
# 红绿灯切换
def traffic_light(e):
	print("红灯亮")
	while True:
		if e.is_set():
			# 绿灯状态 -> 切红灯
			time.sleep(1)
			print("红灯亮")
			# True => False
			e.clear()
		else:
			# 红灯状态 -> 切绿灯
			time.sleep(1)
			print("绿灯亮")
			# False => True
			e.set()
# e = Event()
# traffic_light(e)
# 车的状态
def car(e,i):
	# 判断是否是红灯,如果是加上wait阻塞
	if not e.is_set():
		print("car{} 在等待 ... ".format(i))
		e.wait()
	# 否则不是,代表绿灯通行;
	print("car{} 通行了 ... ".format(i))
"""	
# 1.全国红绿灯
if __name__ == "__main__":
	e = Event()
	# 创建交通灯
	p1 = Process(target=traffic_light , args=(e,))
	p1.start()
	# 创建小车进程
	for i in range(1,21):
		time.sleep(random.randrange(2))
		p2 = Process(target=car , args=(e,i))
		p2.start()
"""
# 2.包头红绿灯,没有车的时候,把红绿灯关了,省电;
if __name__ == "__main__":
	lst = []
	e = Event()
	# 创建交通灯
	p1 = Process(target=traffic_light , args=(e,))
	# 设置红绿灯为守护进程
	p1.daemon = True
	p1.start()
	# 创建小车进程
	for i in range(1,21):
		time.sleep(random.randrange(2))
		p2 = Process(target=car , args=(e,i))
		lst.append(p2)
		p2.start()
	# 让所有的小车全部跑完,把红绿灯炸飞
	print(lst)
	for i in lst:
		i.join()
	print("关闭成功 .... ")事件知识点:
# 阻塞事件 : e = Event()生成事件对象e e.wait()动态给程序加阻塞 , 程序当中是否加阻塞完全取决于该对象中的is_set() [默认返回值是False] # 如果是True 不加阻塞 # 如果是False 加阻塞 # 控制这个属性的值 # set()方法 将这个属性的值改成True # clear()方法 将这个属性的值改成False # is_set()方法 判断当前的属性是否为True (默认上来是False)
# ### 进程队列(进程与子进程是相互隔离的,如果两者想要进行通信,可以利用队列实现)
from multiprocessing import Process,Queue
# 引入线程模块; 为了捕捉queue.Empty异常;
import queue
# 1.基本语法
"""顺序: 先进先出,后进后出"""
# 创建进程队列
q = Queue()
# put() 存放
q.put(1)
q.put(2)
q.put(3)
# get() 获取
"""在获取不到任何数据时,会出现阻塞"""
# print(  q.get()  )
# print(  q.get()  )
# print(  q.get()  )
# print(  q.get()  )
# get_nowait() 拿不到数据报异常
"""[windows]效果正常  [linux]不兼容"""
try:
	print(  q.get_nowait()  )
	print(  q.get_nowait()  )
	print(  q.get_nowait()  )
	print(  q.get_nowait()  )
except : #queue.Empty
	pass
# put_nowait() 非阻塞版本的put
# 设置当前队列最大长度为3 ( 元素个数最多是3个 )
"""在指定队列长度的情况下,如果塞入过多的数据,会导致阻塞"""
# q2 = Queue(3)
# q2.put(111)
# q2.put(222)
# q2.put(333)
# q2.put(444)
"""使用put_nowait 在队列已满的情况下,塞入数据会直接报错"""
q2 = Queue(3)
try:
	q2.put_nowait(111)
	q2.put_nowait(222)
	q2.put_nowait(333)
	q2.put_nowait(444)
except:
	pass
# 2.进程间的通信IPC
def func(q):
	# 2.子进程获取主进程存放的数据
	res = q.get()
	print(res,"<22>")
	# 3.子进程中存放数据
	q.put("刘一缝")
if __name__ == "__main__":
	q3 = Queue()
	p = Process(target=func,args=(q3,))
	p.start()
	# 1.主进程存入数据
	q3.put("赵凤勇")
	# 为了等待子进程把数据存放队列后,主进程在获取数据;
	p.join()
	# 4.主进程获取子进程存放的数据
	print(q3.get() , "<33>")小提示: 一般主进程比子进程执行的快一些

队列知识点:
# 进程间通信 IPC # IPC Inter-Process Communication # 实现进程之间通信的两种机制: # 管道 Pipe # 队列 Queue # put() 存放 # get() 获取 # get_nowait() 拿不到报异常 # put_nowait() 非阻塞版本的put q.empty() 检测是否为空 (了解) q.full() 检测是否已经存满 (了解)
# ### 生产者和消费者模型 """ # 爬虫案例 1号进程负责抓取其他多个网站中相关的关键字信息,正则匹配到队列中存储(mysql) 2号进程负责把队列中的内容拿取出来,将经过修饰后的内容布局到自个的网站中 1号进程可以理解成生产者 2号进程可以理解成消费者 从程序上来看 生产者负责存储数据 (put) 消费者负责获取数据 (get) 生产者和消费者比较理想的模型: 生产多少,消费多少 . 生产数据的速度 和 消费数据的速度 相对一致 """ # 1.基础版生产着消费者模型 """问题 : 当前模型,程序不能正常终止 """ """ from multiprocessing import Process,Queue import time,random # 消费者模型 def consumer(q,name): while True: # 获取队列中的数据 food = q.get() time.sleep(random.uniform(0.1,1)) print("{}吃了{}".format(name,food)) # 生产者模型 def producer(q,name,food): for i in range(5): time.sleep(random.uniform(0.1,1)) # 展示生产的数据 print( "{}生产了{}".format( name , food+str(i) ) ) # 存储生产的数据在队列中 q.put(food+str(i)) if __name__ == "__main__": q = Queue() p1 = Process( target=consumer,args=(q , "赵万里") ) p2 = Process( target=producer,args=(q , "赵沈阳" , "香蕉" ) ) p1.start() p2.start() p2.join() """ # 2.优化模型 """特点 : 手动在队列的最后,加入标识None, 终止消费者模型""" """ from multiprocessing import Process,Queue import time,random # 消费者模型 def consumer(q,name): while True: # 获取队列中的数据 food = q.get() # 如果最后一次获取的数据是None , 代表队列已经没有更多数据可以获取了,终止循环; if food is None: break time.sleep(random.uniform(0.1,1)) print("{}吃了{}".format(name,food)) # 生产者模型 def producer(q,name,food): for i in range(5): time.sleep(random.uniform(0.1,1)) # 展示生产的数据 print( "{}生产了{}".format( name , food+str(i) ) ) # 存储生产的数据在队列中 q.put(food+str(i)) if __name__ == "__main__": q = Queue() p1 = Process( target=consumer,args=(q , "赵万里") ) p2 = Process( target=producer,args=(q , "赵沈阳" , "香蕉" ) ) p1.start() p2.start() p2.join() q.put(None) # 香蕉0 香蕉1 香蕉2 香蕉3 香蕉4 None """ # 3.多个生产者和消费者 """ 问题 : 虽然可以解决问题 , 但是需要加入多个None , 代码冗余""" from multiprocessing import Process,Queue import time,random # 消费者模型 def consumer(q,name): while True: # 获取队列中的数据 food = q.get() # 如果最后一次获取的数据是None , 代表队列已经没有更多数据可以获取了,终止循环; if food is None: break time.sleep(random.uniform(0.1,1)) print("{}吃了{}".format(name,food)) # 生产者模型 def producer(q,name,food): for i in range(5): time.sleep(random.uniform(0.1,1)) # 展示生产的数据 print( "{}生产了{}".format( name , food+str(i) ) ) # 存储生产的数据在队列中 q.put(food+str(i)) if __name__ == "__main__": q = Queue() p1 = Process( target=consumer,args=(q , "赵万里") ) p1_1 = Process( target=consumer,args=(q , "赵世超") ) p2 = Process( target=producer,args=(q , "赵沈阳" , "香蕉" ) ) p2_2 = Process( target=producer,args=(q , "赵凤勇" , "大蒜" ) ) p1.start() p1_1.start() p2.start() p2_2.start() # 等待所有数据填充完毕 p2.join() p2_2.join() # 把None 关键字放在整个队列的最后,作为跳出消费者循环的标识符; q.put(None) # 给第一个消费者加一个None , 用来终止 q.put(None) # 给第二个消费者加一个None , 用来终止 # ...

# ### JoinableQueue 队列
"""
put 存放  
get 获取  
task_done 计算器属性值-1  
join 配合task_done来使用 , 阻塞
put 一次数据, 队列的内置计数器属性值+1
get 一次数据, 通过task_done让队列的内置计数器属性值-1
join: 会根据队列计数器的属性值来判断是否阻塞或者放行
	队列计数器属性是 等于 0 ,  代码不阻塞放行
	队列计数器属性是 不等 0 ,  意味着代码阻塞
"""
from multiprocessing  import JoinableQueue
jq = JoinableQueue()
jq.put("王同培") # +1
jq.put("王伟")   # +2
print(jq.get())
print(jq.get())
# print(jq.get()) 阻塞
jq.task_done()   # -1
jq.task_done()   # -1
jq.join()
print(" 代码执行结束 .... ")
# ### 2.使用JoinableQueue 改造生产着消费者模型
from multiprocessing import Process,Queue
import time,random
# 消费者模型
def consumer(q,name):
	while True:
		# 获取队列中的数据
		food = q.get()
		time.sleep(random.uniform(0.1,1))
		print("{}吃了{}".format(name,food))
		# 让队列的内置计数器属性-1
		q.task_done()
# 生产者模型
def producer(q,name,food):
	for i in range(5):
		time.sleep(random.uniform(0.1,1))
		# 展示生产的数据
		print(  "{}生产了{}".format(  name , food+str(i)  )   )
		# 存储生产的数据在队列中
		q.put(food+str(i))
if __name__ == "__main__":
	q = JoinableQueue()
	p1 = Process(  target=consumer,args=(q , "赵万里")  )
	p2 = Process(  target=producer,args=(q , "赵沈阳" , "香蕉" )  )
	p1.daemon = True
	p1.start()
	p2.start()
	p2.join()
	# 必须等待队列中的所有数据全部消费完毕,再放行
	q.join()
	print("程序结束 ... ")ipc可以让进程之间进行通信 lock其实也让进程之间进行通信了,多个进程去抢一把锁,一个进程抢到 这 把锁了,其他的进程就抢不到这把锁了,进程通过socket底层互相发 消息,告诉其他进程当前状态已经被锁定了,不能再强了。 进程之间默认是隔离的,不能通信的,如果想要通信,必须通过ipc的 方式(lock、joinablequeue、Manager)
看完上述内容,你们掌握如何分析Python全栈中的队列的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注亿速云行业资讯频道,感谢各位的阅读!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。