Celery 源码解析六:Events 的实现 (2)

Celery 源码解析六:Events 的实现

看一下 _publish 的代码,感觉没了意思,又是使用 AMQP,Celery 这是讲 MQ 贯彻到底啊!那似乎没办法了,这里就算完了,但是,我们的事情却还没完,因为这里都是针对的旧的任务,我们希望看到的 worker-online 还没看到呢,但是 Bootstep 的工作却是完成了,似乎这里线索就断了。

但是,同样细心的同学可能会记得,我们之前曾经说过一个叫做 HeartBootstep,它的职责是干啥来着?如果忘记了,不妨回到第一篇回顾一下,记得的话,我们进代码看看,哈哈哈

Celery 源码解析六:Events 的实现

nice,你会发现,这个 Bootstep 是依赖于 Events 的,同时在 Line 29 中给你会发现就用到了我们刚刚初始化的 event_dispatcher,然后就调用 start 了,所以不妨一起看看:

Celery 源码解析六:Events 的实现

嘿嘿,看到没,这里就是 worker-online 的发生地了,而且我们还顺便捕捉到了 worker-heartbeat 这个 Event,so lucky,但是有个地方我们不明的,那就是这个 _send 干了什么,如果不出意料的话,应该是调用的 dispatcher._publish,走,看看去:

Celery 源码解析六:Events 的实现

好,并没有按照我的套路来,调用的居然是 event_dispatcher 的 send,那么它和 _publish 有什么区别呢?不妨一起看一看:

Celery 源码解析六:Events 的实现

这里和 _publish 的唯一不一样的地方就是做了缓存处理,就是在 Line 185 这里,如果需要缓存,那么缓存一波,在 Line 192 这里如果缓存满了,那么就发送呗。有一点值得注意的就是在 Line 198,这里调用的是 publish 而不是 _publish,搞那么多事,那么这里有有什么不一样?

Celery 源码解析六:Events 的实现

好呗,从这里看似乎除了对 clock 进行一个操作之外,没有其他特殊之处,那么这个 clock 又是什么,起到什么作用呢?略微查找就知道了,这又是 Kombu 的东西,然后看解释就知道了这是一个 Counter,可以用来给 Consumer 判定是否接受这个 Event 用的,所以我们可以先 pass。所以,总得来说,我们可以发现,这里已经对 Event 的产生有了一定了解了,这里可以产生的一个比较明显的问题点就是:Celery 中 Event 的 send、publish 和 _publish 的区别是啥?

消息的传递机制

在跟踪 Event 的产生的过程中我们已经顺便将 Event 的发送给看了,其实还是 Kombu 的 AMQP 在作用,然后通过 Connection 发送到对应的 MQ 中,再后面就被 Consumer 接收,全链路就是这样:

Event<Producer> ------> MQ ---------> Event<Consumer>(Receiver)

前半部分我们已经清楚了,但是后半部分还不清楚,所以我们的重点就是看看后半部分具体是怎么做的。但是后半部分要从何处入手这是个问题,我这里省去了查找的过程,直接说一下入口吧,位置就在 celery/bin/events.py,对于任一一种 Events,我们需要关注的是 run_evtop 这个函数,所以先来看看:

Celery 源码解析六:Events 的实现

这里很简短,继续跟下去看看咯:

Celery 源码解析六:Events 的实现

这里有点意思了,但是还是可以比较简单得看到 Line 529 是关键所在:

Celery 源码解析六:Events 的实现

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpwfdd.html