加锁可以保证多个进程在修改同一块数据的时,同一个时间只能有一个任务可以进行修改,即串行的修改,虽然效率下来了,但是却可以保证数据的安全性。
虽然可以用文件共享数据实现进程间通信,但问题是:
•效率低(共享数据基于文件,而文件是硬盘上的数据)
•需要自己加锁处理
因此我们需要找一种解决办法能够兼顾:
•效率高(多个进程共享一块内存的数据)
•帮我们处理好锁的问题
这就是mutliprocessing模块为我们提供的基于消息的IPC通信机制:队列和管道。
队列和管道都是将数据存放在内存中,而队列又是基于(管道+锁)实现的,可以让我们从复杂的锁问题中解脱出来,因而队列才是进程间通信的最佳选择。
我们应该尽量避免使用共享数据,尽可能的使用消息传递和队列,避免处理复杂的同步和锁问题,而且在进程数据增多时,往往可以获得更好的可扩展性。
队列
队列彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的。
创建队列的类(底层就是以管道和锁定的方式实现的)
Queue([maxsize]):创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递
参数介绍
maxsize是队列中允许最大项数,省略则无大小限制
但需要明确:
1.队列内存放的是消息而非大数据
2.队列占用的是内存空间,因而maxsize即便是无大小限制也受限于内存大小
主要方法接收
q.put()方法用于插入数据到队列中
q.get()方法可以从队列读取并且删除一个元素
队列的使用
from multiprocessing import Process,Queue
# 存放数据
q = Queue(3) # 在队列里不应该存放大的文件,因为占用的是内存
q.put('123')
q.put([1,2,3,])
q.put({'name':'xiaoyafei'})
print(q.full()) # 可以查询是否已经满了
# 取数据
print(q.get())
print(q.get())
print(q.get())
print('Queue是否为空:',q.empty()) # 因为此时队列已经空了,所以接下来取数据就会堵塞
print(q.get())
# 运行结果为
True
123
[1, 2, 3]
{'name': 'xiaoyafei'}
Queue是否为空: True # 程序会在一直等待中
......
在这段代码中,我们可以发现:由Queue实例化出来的对象就是一个容器,而在我们存数据的时候就把数据丢到这个容器中,如果想要取数据的话就要到这个数据里面直接拿。
生产者消费者模型
为什么要使用生产者消费者模型
生产者指的是生产数据的任务,消费者指的是处理数据的任务,在并发编程中,如果生产者生产的很快,而消费者处理的速度却很慢,那么生产者就必须等待消费者处理完后才能继续生产数据。同样的道理,如果消费者的处理速度大于生产者,那么消费者就需要等待生产者。为了解决这个问题于是引入了生产者和消费者模型。
什么是生产者和消费者模型
生产者消费者模式是通过一个容器来解决生产者和消费者的耦合问题的。生产者和消费者之间不直接通信,而通过阻塞队列来进行通信,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列中去取,阻塞队列就相当于是一个缓冲区,平衡了生产者和消费者的处理能力。
这个阻塞队列就是用来给生产者和消费者解耦的。
关系图:
生产者和消费者模型实现
import time
from multiprocessing import Process,Queue
def producer(q): # 生产者
for i in range(3): # 3个人生产包子
res = '包子%s'%i
time.sleep(1)
print('生产者生产了%s'%res)
q.put(res) # 生产完把包子丢到消息队列里面去
def consumer(q): # 消费者
while True:
res = q.get() # 从消息队列中取数据赋值给res
time.sleep(2)
print('消费者吃了%s'%res)
if __name__ == '__main__':
q = Queue() # 如果不写大小,那么默认是无限制的
# 生产者们
p1 = Process(target=producer,args=(q,))
p2 = Process(target=producer,args=(q,))
p3 = Process(target=producer,args=(q,))
# 消费者们
c1 = Process(target=consumer,args=(q,))
c2 = Process(target=consumer,args=(q,))
p1.start()
p2.start()
p3.start()
c1.start()
c2.start()
p1.join()
p2.join()
p3.join()
print('主进程')
讲解一下这段代码:
1.生产者来生产包子,消费者来吃包子。
2.生产者每1秒生产3个包子,消费者每2秒吃一个包子,备注为:每个生产者/每个消费者
3.创建消息队列Queue,无限制大小
4.p1/p2/p3.join()方法保证了子进程先运行完之后再运行主进程
5.生产者把包子丢到消息队列里面之后就不用管了
接下来,这段程序的运行结果为: