Python中的多进程、多线程和协程 (8)

为此需要使用 asyncio.Queue 。它相比普通的队列的区别是,其put/get操作会在无法执行时阻塞(这一点和multiprocessing.Queue很像),而且这些操作都是协程(注意到,这使得你调用它们时只会返回协程对象而不会实际执行),可以await。

import time import asyncio start = time.perf_counter() async def producer(q): for i in range(5): await asyncio.sleep(1) # producing takes 1 sec await q.put(i) # will wait if q is full print(f'put {i} at {time.perf_counter() - start}') await q.join() # will wait until all objects produced are **taken out** and **consumed**. async def consumer(q): for i in range(5): item = await q.get() # will wait if q is empty. BTW we see that "await XXX" is an expression not a command. print(f'get {item} at {time.perf_counter() - start}') await asyncio.sleep(1) # consuming takes 1 sec q.task_done() # tells the queue that [the object just taken out] has been consumed. just taking out is not enough! print(f'consumed {item} at {time.perf_counter() - start}') async def main(): q = asyncio.Queue() P = asyncio.create_task(producer(q)) C = asyncio.create_task(consumer(q)) await P await C print(f'done at {time.perf_counter() - start}') # asyncio.run(main()) # use outside IPython await main() # use inside IPython put 0 at 1.0108397000003606 get 0 at 1.0112231999955839 put 1 at 2.017216499996721 consumed 0 at 2.0176210000063293 get 1 at 2.0177472999930615 put 2 at 3.0279211000015493 consumed 1 at 3.0283254999958444 get 2 at 3.028457599997637 put 3 at 4.039952199993422 consumed 2 at 4.041183299996192 get 3 at 4.041302300000098 put 4 at 5.0465819999953965 consumed 3 at 5.04690839999239 get 4 at 5.047016099997563 consumed 4 at 6.047789799995371 done at 6.048323099996196 import time import asyncio start = time.perf_counter() async def sleep_and_put(q): await asyncio.sleep(1) await q.put(1) async def main(): q = asyncio.Queue() C = asyncio.create_task(q.get()) P = asyncio.create_task(sleep_and_put(q)) await C await P print(f'finished at {time.perf_counter() - start}') # asyncio.run(main()) # use outside IPython await main() # use inside IPython finished at 1.01112650000141

由上例可见,Queue.get()(其实Queue.put()等其他方法也一样)是一个协程,因此也可以给它创建任务以进行并发。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpgdgd.html