看到这里我们就该偷笑了,看到 while 1 就意味差不多到最后了,哈哈,Line 508 使用的是 read 的 Connection,然后 Line 512 创建了一个 Receiver,在 Line 515 进行 capture,所以我们可以断定,我们想找的就在这两句里面了,直接看 Line 515 吧:
这里有点意思的就是又是遇到 Kombu 的锅:
class kombu.mixins.ConsumerMixin[source]
Convenience mixin for implementing consumer programs.
It can be used outside of threads, with threads, or greenthreads (eventlet/gevent) too.
The basic class would need a connection attribute which must be a Connection instance, and define a get_consumers() method that returns a list of kombu.Consumer instances to use. Supporting multiple consumers is important so that multiple channels can be used for different QoS requirements.
这里其实是有多个 EventReceiver 绑定了这个 Connection,然后 ConsumerMixin 帮助协调这些 Receiver,每个 Receiver 都可以收到这些 Event,但是能不能处理就看他们的 routing_key 设置得好不好了。
Event 的处理机制看完 Event 的接收机制我们知道了 Event 是以 AMQP 的形式接收的,那么毫无以为,处理逻辑应该在回调机制上回调的,所以关键还是在于 Line 512 中的 handlers,我们来看着:
在 Receiver 中的 process 我们发现了他是这么用 handlers 的,那么没问题,state.event 才是最后的关键,state 中间做了两层的封装,到最后就成了 _create_dispatcher,但是同样得,这个函数也是比较复杂,所以我这里对他进行精简一下,概括起来是这样的:
先找 group 的 handler,有的话就用这个了,否则看下面;这个默认是没东西的,所以可以先pass
如果是 worker 的 Event,就执行 worker 对应的处理
如果是 task 的 Event,就执行 task 的对应处理
OK,这差不多就是 Event 的内容啦,关于 Event,后面有更精彩的应用会说到,不知道用 Celery 的同学平时对这个特性有没有使用的场景?