欢迎访问我的GitHub
https://github.com/zq2599/blog_demos
内容:所有原创文章分类汇总及配套源码,涉及Java、Docker、Kubernetes、DevOPS等;
《disruptor笔记》系列链接快速入门
Disruptor类分析
环形队列的基础操作(不用Disruptor类)
事件消费知识点小结
事件消费实战
常见场景
等待策略
知识点补充(终篇)
本篇概览通过前文的实战,咱们对Disruptor有了初步认识,借助com.lmax.disruptor.dsl.Disruptor类可以轻松完成以下操作:
环形队列初始化
指定事件消费者
启动消费者线程
接下来要面对两个问题:
深入了解Disruptor类是如何完成上述操作的;
对Disruptor类有了足够了解时,尝试不用Disruptor,自己动手操作环形队列,实现消息的生产和消费,这样做的目的是加深对Disruptor内部的认识,做到知其所以然;
接下来咱们先解决第一个问题吧,结合Disruptor对象的源码来看看上述三个操作到底做了什么;
环形队列初始化环形队列初始化发生在实例化Disruptor对象的时候,即Disruptor的构造方法:
public Disruptor(final EventFactory<T> eventFactory, final int ringBufferSize, final ThreadFactory threadFactory) { this(RingBuffer.createMultiProducer(eventFactory, ringBufferSize), new BasicExecutor(threadFactory)); }RingBuffer.createMultiProducer方法内部实例化了RingBuffer,如下图红框:
记下第一个重要知识点:创建RingBuffer对象;
指定事件消费者在前文中,下面这行代码指定了事件由StringEventHandler消费:
disruptor.handleEventsWith(new StringEventHandler(eventCountPrinter));查看handleEventsWith方法的内部:
public final EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers) { return createEventProcessors(new Sequence[0], handlers); }展开createEventProcessors方法,如下图,请重点关注创建SequenceBarrier和BatchEventProcessor等操作:
展开上图红框四中的updateGatingSequencesForNextInChain方法,如下图,红框中的ringBuffer.addGatingSequences需要重点关注:
小结一下,disruptor.handleEventsWith方法涉及到四个重要知识点:
创建SequenceBarrier对象,用于接收ringBuffer中的可消费事件
创建BatchEventProcessor,负责消费事件
绑定BatchEventProcessor对象的异常处理类
调用ringBuffer.addGatingSequences,将消费者的Sequence传给ringBuffer
启动消费者线程前文已通过日志确定了消费事件的逻辑是在一个独立的线程中执行的,启动消费者线程的代码如下:
disruptor.start();展开start方法,如下可见,关键代码是consumerInfo.start(executor):
public RingBuffer<T> start() { checkOnlyStartedOnce(); for (final ConsumerInfo consumerInfo : consumerRepository) { consumerInfo.start(executor); } return ringBuffer; }ConsumerInfo是接口,对应的实现类有EventProcessorInfo和WorkerPoolInfo两种,这里应该是哪种呢?既然来源是consumerRepository,这就要看当初是怎么存入consumerRepository的,前面在分析createEventProcessors方法时,下图红框中的consumerRepository.add被忽略了,现在需要进去看看:
进去后一目了然,可见ConsumerInfo的实现是EventProcessorInfo:
所以,回到前面对consumerInfo.start(executor)方法的分析,这里要看的就是EventProcessorInfo的start方法了,如下图,非常简单,就是启动一个线程执行eventprocessor(这个eventprocessor是BatchEventProcessor对象):
小结一下,disruptor.start方法涉及到一个重要知识点:
启动独立线程,用来执行消费事件的业务逻辑;
消费事件的逻辑