Python中多进程深入解析(8)

# 运行结果为
生产者生产了包子0
生产者生产了包子0
生产者生产了包子0
生产者生产了包子1
生产者生产了包子1
生产者生产了包子1
生产者生产了包子2
消费者吃了包子0
生产者生产了包子2
消费者吃了包子0
生产者生产了包子2
主进程
消费者吃了包子0
消费者吃了包子1
消费者吃了包子1
消费者吃了包子1
消费者吃了包子2
消费者吃了包子2
消费者吃了包子2
.......

# 但是程序在运行之后,会一直处于阻塞状态,因为消费者还在不停的取数据,但是生产者已经把数据生产完了

如何去解决上诉的问题呢?能不能思考,如果消费者去取数据的时候取了一个None,那么就停止:


import time
from multiprocessing import Process,Queue

def producer(q):  # 生产者
    for i in range(3):  # 3个人生产包子
        res = '包子%s'%i
        time.sleep(1)
        print('生产者生产了%s'%res)

q.put(res)  # 生产完把包子丢到消息队列里面去

def consumer(q):  # 消费者
    while True:
        res = q.get()  # 从消息队列中取数据赋值给res
        if res == None:break  # 如果取的数据是空,那么就结束
        time.sleep(2)
        print('消费者吃了%s'%res)

if __name__ == '__main__':
    q = Queue()  # 如果不写大小,那么默认是无限制的

# 生产者们
    p1 = Process(target=producer,args=(q,))
    p2 = Process(target=producer,args=(q,))
    p3 = Process(target=producer,args=(q,))

# 消费者们
    c1 = Process(target=consumer,args=(q,))
    c2 = Process(target=consumer,args=(q,))

p1.start()
    p2.start()
    p3.start()

c1.start()
    c2.start()

p1.join()
    p2.join()
    p3.join()
    q.put(None)  # 因为有两个消费者,那么就需要再往消息队列里面传递两个None
    q.put(None)
    print('主进程')

其实我们的思路无非就是发送结束信号而已,有另外一种队列提供了这种机制

JoinableQueue([maxsize])

这就像是一个Queue对象,但队列允许项目的使用者通知生产者项目已经被成功处理。通知进程是使用共享和条件变量实现的。

参数介绍

maxsize是队列中允许最大项数,省略则代表无大小限制

方法介绍

JoinableQueue的实例p除了与Queue对象相同的方法之外,还有:
1.q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发异常
2.q.join():生产者使用此方法发出信号,直到队列中所有的项目都被处理。阻塞将持续到队列中的每个项目均调用q.task_done()方法为止

基于JoinableQueue实现生产者和消费者模型

import time
from multiprocessing import Process,JoinableQueue

def producer(q):  # 生产者
    for i in range(3):  # 3个人生产包子
        res = '包子%s'%i
        time.sleep(2)
        print('生产者生产了%s'%res)

q.put(res)  # 生产完把包子丢到消息队列里面去
    q.join()  # 等待消息队列的数据都被取完

def consumer(q):  # 消费者
    while True:
        res = q.get()  # 从消息队列中取数据赋值给res
        time.sleep(1)
        print('消费者吃了%s'%res)
        q.task_done()  # 消费者给生产者发送结束信号,但是还是在做q.get()


if __name__ == '__main__':
    q = JoinableQueue()  # 如果不写大小,那么默认是无限制的

# 生产者们
    p1 = Process(target=producer,args=(q,))
    p2 = Process(target=producer,args=(q,))
    p3 = Process(target=producer,args=(q,))

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/942b8ff73ff4d41252705a0d573d82eb.html