为了理解消息处理逻辑,还要重点关注BatchEventProcessor.processEvents方法,如下图所示,其实也很简单,就是不停的从环形队列取出可用的事件,然后再更新自己的Sequence,相当于标记已经消费到哪里了:
总结最后总结Disruptor类的重要功能:
创建环形队列(RingBuffer对象)
创建SequenceBarrier对象,用于接收ringBuffer中的可消费事件
创建BatchEventProcessor,负责消费事件
绑定BatchEventProcessor对象的异常处理类
调用ringBuffer.addGatingSequences,将消费者的Sequence传给ringBuffer
启动独立线程,用来执行消费事件的业务逻辑
聪明的您一定会发现,本文并没有全面分析Disruptor类的源码,例如after、shutdown等方法都没有提到,确实如此,欣宸在此给您道歉了,本篇的重点是找出那些与基本功能有关代码,为后面的实战提供理论指导(不用Disruptor类实现消息生产消费的实战),因此很多高级功能都跳过了;
理解官方流程图此时再看官方流程图,聪明的您应该很快就能理解此图表达的意思:每个消费者都有自己的Sequence,通过此Sequence取得自己在环形队列中消费的位置,再通过SequenceBarrier来等待可用事件的出现,等到事件出现了就用get方法取出具体的事件,给EventHandler来处理:
后续预告此时,咱们对Disruptor类已经有了比较深入的理解,接下来的文章,咱们会尝试不用Disruptor类,仅凭着对RingBuffer对象的操作来实现以下三种功能:
100个事件,单个消费者消费;
100个事件,三个消费者,每个都独自消费这个100个事件;
100个事件,三个消费者共同消费这个100个事件;
你不孤单,欣宸原创一路相伴Java系列
Spring系列
Docker系列
kubernetes系列
数据库+中间件系列
DevOps系列
欢迎关注公众号:程序员欣宸微信搜索「程序员欣宸」,我是欣宸,期待与您一同畅游Java世界...
https://github.com/zq2599/blog_demos