# 运行结果为
生产者生产了包子0
生产者生产了包子0
生产者生产了包子0
生产者生产了包子1
生产者生产了包子1
生产者生产了包子1
生产者生产了包子2
消费者吃了包子0
生产者生产了包子2
消费者吃了包子0
生产者生产了包子2
主进程
消费者吃了包子0
消费者吃了包子1
消费者吃了包子1
消费者吃了包子1
消费者吃了包子2
消费者吃了包子2
消费者吃了包子2
.......
# 但是程序在运行之后,会一直处于阻塞状态,因为消费者还在不停的取数据,但是生产者已经把数据生产完了
如何去解决上诉的问题呢?能不能思考,如果消费者去取数据的时候取了一个None,那么就停止:
import time
from multiprocessing import Process,Queue
def producer(q): # 生产者
for i in range(3): # 3个人生产包子
res = '包子%s'%i
time.sleep(1)
print('生产者生产了%s'%res)
q.put(res) # 生产完把包子丢到消息队列里面去
def consumer(q): # 消费者
while True:
res = q.get() # 从消息队列中取数据赋值给res
if res == None:break # 如果取的数据是空,那么就结束
time.sleep(2)
print('消费者吃了%s'%res)
if __name__ == '__main__':
q = Queue() # 如果不写大小,那么默认是无限制的
# 生产者们
p1 = Process(target=producer,args=(q,))
p2 = Process(target=producer,args=(q,))
p3 = Process(target=producer,args=(q,))
# 消费者们
c1 = Process(target=consumer,args=(q,))
c2 = Process(target=consumer,args=(q,))
p1.start()
p2.start()
p3.start()
c1.start()
c2.start()
p1.join()
p2.join()
p3.join()
q.put(None) # 因为有两个消费者,那么就需要再往消息队列里面传递两个None
q.put(None)
print('主进程')
其实我们的思路无非就是发送结束信号而已,有另外一种队列提供了这种机制
JoinableQueue([maxsize])
这就像是一个Queue对象,但队列允许项目的使用者通知生产者项目已经被成功处理。通知进程是使用共享和条件变量实现的。
参数介绍
maxsize是队列中允许最大项数,省略则代表无大小限制
方法介绍
JoinableQueue的实例p除了与Queue对象相同的方法之外,还有:
1.q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发异常
2.q.join():生产者使用此方法发出信号,直到队列中所有的项目都被处理。阻塞将持续到队列中的每个项目均调用q.task_done()方法为止
基于JoinableQueue实现生产者和消费者模型
import time
from multiprocessing import Process,JoinableQueue
def producer(q): # 生产者
for i in range(3): # 3个人生产包子
res = '包子%s'%i
time.sleep(2)
print('生产者生产了%s'%res)
q.put(res) # 生产完把包子丢到消息队列里面去
q.join() # 等待消息队列的数据都被取完
def consumer(q): # 消费者
while True:
res = q.get() # 从消息队列中取数据赋值给res
time.sleep(1)
print('消费者吃了%s'%res)
q.task_done() # 消费者给生产者发送结束信号,但是还是在做q.get()
if __name__ == '__main__':
q = JoinableQueue() # 如果不写大小,那么默认是无限制的
# 生产者们
p1 = Process(target=producer,args=(q,))
p2 = Process(target=producer,args=(q,))
p3 = Process(target=producer,args=(q,))