从头造轮子:python3 asyncio之 gather (3) (3)

result = coro.send(None),进入用户定义函数

async def helloworld(): print('enter helloworld') ret = await wilsonasyncio.gather(hello(), world()) print('exit helloworld') return ret

ret = await wilsonasyncio.gather(hello(), world()),这里没啥可说的,进入gather函数

def gather(*coros_or_futures, loop=None): loop = get_event_loop() def _done_callback(fut): nonlocal nfinished nfinished += 1 if nfinished == nfuts: results = [] for fut in children: res = fut.result() results.append(res) outer.set_result(results) children = [] nfuts = 0 nfinished = 0 for arg in coros_or_futures: fut = tasks.ensure_future(arg, loop=loop) nfuts += 1 fut.add_done_callback(_done_callback) children.append(fut) outer = _GatheringFuture(children, loop=loop) return outer

loop = get_event_loop()获取事件循环

def _done_callback(fut)这个函数是回调函数,细节后面分析,现在只需要知道任务(hello()与world())执行完之后就会回调就行

for arg in coros_or_futures for循环确保每一个任务都是Future对象,并且add_done_callback将回调函数设置为_done_callback ,还有将他们加入到_ready队列等待下一次循环调度

3个重要的变量:
       children 里面存放的是每一个异步任务,在本例是hello()与world()
       nfuts 存放是异步任务的数量,在本例是2
       nfinished 存放的是异步任务完成的数量,目前是0,完成的时候是2

继续往下,来到了_GatheringFuture ,看看源码:

class _GatheringFuture(Future): def __init__(self, children, *, loop=None): super().__init__(loop=loop) self._children = children

_GatheringFuture 最主要的作用就是将多个异步任务放入self._children,然后用_GatheringFuture 这个对象来管理。需要注意,这个对象继承了Future

至此,gather完成初始化,返回了outer,其实就是_GatheringFuture

总结一下gather,初始化了3个重要的变量,后面用来存放状态;给每一个异步任务添加回调函数;将多个异步子任务合并,并且使用一个Future对象去管理

3.3.1)gather完成,回到helloworld()

async def helloworld(): print('enter helloworld') ret = await wilsonasyncio.gather(hello(), world()) print('exit helloworld') return ret

ret = await wilsonasyncio.gather(hello(), world()) gather返回_GatheringFuture ,随后使用await,就会进入Future.__await__

def __await__(self): if self._state == _PENDING: self._asyncio_future_blocking = True yield self return self.result()

由于_GatheringFuture 的状态是_PENDING,所以进入if,遇到yield self,将self,也就是_GatheringFuture 返回(这里注意yield的用法,流程控制的功能)

那yield回到哪儿去了呢?从哪儿send就回到哪儿去,所以,他又回到了task.__step函数里面去

def __step(self, exc=None): coro = self._coro try: if exc is None: result = coro.send(None) else: result = coro.throw(exc) except StopIteration as exc: super().set_result(exc.value) else: blocking = getattr(result, '_asyncio_future_blocking', None) if blocking: result._asyncio_future_blocking = False result.add_done_callback(self.__wakeup, result) finally: self = None

这里是本函数的第一个核心点,流程控制/跳转,需要非常的清晰,如果搞不清楚的同学,再详细的去阅读有关yield/yield from的文章

继续往下走,由于用户函数helloworld()没有结束,所以不会抛异常,所以来到了else分支

blocking = getattr(result, '_asyncio_future_blocking', None)这里有一个重要的状态,那就是_asyncio_future_blocking ,只有调用__await__,才会有这个参数,默认是true,这个参数主要的作用:一个异步函数,如果调用了多个子异步函数,那证明该异步函数没有结束(后面详细讲解),就需要添加“唤醒”回调

result._asyncio_future_blocking = False将参数置位False,并且添加self.__wakeup回调等待唤醒

__step函数完成

这里需要详细讲解一下_asyncio_future_blocking 的作用

如果在异步函数里面出现了await,调用其他异步函数的情况,就会走到Future.__await__将_asyncio_future_blocking 设置为true

async def helloworld(): print('enter helloworld') ret = await wilsonasyncio.gather(hello(), world()) print('exit helloworld') return ret class Future: def __await__(self): if self._state == _PENDING: self._asyncio_future_blocking = True yield self return self.result()

这样做了之后,在task.__step中就会把该任务的回调函数设置为__wakeup

为啥要__wakeup,因为helloworld()并没有执行完成,所以需要再次__wakeup来唤醒helloworld()

这里揭示了,在Eventloop里面,只要使用await调用其他异步任务,就会挂起父任务,转而去执行子任务,直至子任务完成之后,回到父任务继续执行

先喝口水,休息一下,下面更复杂。。。

3.4)第二次循环run_forever --> run_once

eventloops.py

def run_once(self): ntodo = len(self._ready) for _ in range(ntodo): handle = self._ready.popleft() handle._run()

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zzfgdz.html