单线程、多线程之间、进程之间、协程之间很多时候需要协同完成工作,这个时候它们需要进行通讯。或者说为了解耦,普遍采用Queue,生产消费模式。
系列文章
python并发编程之multiprocessing进程(二)
同步deque和多线程Queue
程序有时需要在列表的端点进行操作,比list更加优化的数据结构有Queue和deque。
dequedeque一般用在定长队列,多余的数据会被丢弃,这个队列是线程非安全的。
from queue import Queue, deque # 大于会截取后面的一段 q = deque(iterable=[1,2,3,4], maxlen=5) # 参数iterable可以是任何可迭代对象,maxlen代表定长 # 添加与取出 q.append(3) # 从尾部添加 q.pop() # 从尾部弹出一个 q.appendleft(4) # 从首部添加 q.popleft() # 从首部弹出 q.clear() # 清空队列 q.extend([1, 3, 3]) # 将原来的队列从右侧扩展 q.extendleft() # 将原来的队列从左侧扩展 q.insert(2, 3) # 在索引为2的位置插入3,如果队列已达到最大,抛出异常 # 复制 q1 = q.copy() # 完全符合一份队列 # 统计 n = q.count(3) # 统计某个值的数目 x = q.index(3) # 查找某个值的位置 # 变换 q.reverse() # 将原来的q翻转 q.remove(3) # 删除队列中的所有的3 q.rotate(2) # 向右旋转两步 QueueQueue提供更加完整成熟的队列操作,相对于deque来说偏重型,他是线程安全的队列。
方法和属性分析
from queue import Queue, deque q = Queue(maxsize=5) #maxsize<=0,队列长度没有限制,这个Queue是线程安全的,通过锁机制保证 print(q.queue) # 一个deque队列 print(q.mutex) # 队列的线程锁 print(q.not_empty) # 非空通知,用在多线程 print(q.not_full) # 非满通知,用在多线程 print(q.all_tasks_done) # 完成的任务 print(q.maxsize) print(q.unfinished_tasks) # 队列未完成的任务数量,即队列目前的数目 # 数据存取 q.put(3, block=True, timeout=3) # 向队列左边添加数据,block为True队列满了阻塞等待,block为false则直接抛出异常 q.get(block=True, timeout=3) # 队列取出数据,超时抛出异常,block为false忽略timeout # q.get_nowait() # 立即获取,没有抛出异常 q.put_nowait(4) # 立即插入,已满抛出异常 # 判断 q.full() # 判断当前队列是否已满,满了返回True q.empty() # 判断当前队列是否为空,空返回True # 统计 q.task_done() # 用来通知队列任务完成 q.qsize() # 当前队列的任务数量,不绝对可靠 q.join() # 阻塞直到所有的任务完成,即q.unfinished_tasks降为0实例
from threading import Thread from queue import Queue, deque import time def get_from_queue(queue:Queue): while True: if not queue.empty(): print(queue.get_nowait()) queue.task_done() # 任务完成 def put_to_queue(queue:Queue): for i in range(100): if not queue.full(): queue.put_nowait(i) else: time.sleep(0.1) q = Queue(5) th1 = Thread(target=get_from_queue, args=(q,)) th2 = Thread(target=put_to_queue, args=(q,)) th1.start() th2.start() 进程间通讯multiprocessing的Queue对象可以作为进程间通讯的第三者。
from multiprocessing import Queue, Process, Pool import time def get_from_queue(queue:Queue): while True: if not queue.empty(): print(queue.get_nowait()) def put_to_queue(queue:Queue): for i in range(100): if not queue.full(): queue.put_nowait(i) else: time.sleep(0.1) if __name__ == \'__main__\': q = Queue(9) # 这个Queue可以在多个进程之间共享 p1 = Process(target=get_from_queue, args=(q,)) p2 = Process(target=put_to_queue, args=(q,)) p1.start() p2.start() multiprocessing.Queue对象Queue对象的大部分方法和Queue.Queue的方法相同,用法也一样,但有几个特殊的方法:
q = Queue(9) # 这个Queue可以在多个进程之间共享 # q.close() # 关闭队列,不再接收数据 # q.cancel_join_thread() # 取消阻塞等待 q.join_thread() # 线程阻塞等待 gevent协程的Queuegevent.queue.Queue基于协程,Queue在多个协程间共享,Queue实现了迭代器协议,可以使用for循环遍历。
from gevent.queue import Queue import gevent import time def get_from_queue(queue:Queue, n): i = 0 print(\'start---get--{}\'.format(n)) while True: print(str(queue.get()) + \'get\' + str(n)) i += 1 if i == 100: break def put_to_queue(queue:Queue, n): i = 0 print(\'start---put--{}\'.format(n)) while True: queue.put(i) print(str(i) + \'put\' + str(n)) i += 1 if i == 100: break if __name__ == \'__main__\': q = Queue(9) # 这个Queue可以在多个进程之间共享 job1 = [gevent.spawn(put_to_queue, q,i) for i in range(2)] job2 = [gevent.spawn(get_from_queue, q,i) for i in range(2)] job1.extend(job2) gevent.joinall(job1)