JUC并发编程与高性能内存队列disruptor实战-下 (5)

Sequencer:是Disruptor的真正核心。该接口的两种实现(单一生产者和多生产者)实现了所有并行算法,以便在生产者和消费者之间快速、正确地传递数据。

Sequence Barrier:产生了一个序列屏障,其中包含了对Sequencer中发布的主序列的引用和任何依赖消费者的序列的引用。它包含确定是否有任何事件可供使用者处理的逻辑。

Wait Strategy:等待策略决定了消费者将如何等待事件被生产者放置到破坏者,如SleepingWaitStrategyYieldingWaitStrategyBlockingWaitStrategyBusySpinWaitStrategy等。

Event:从生产者传递到消费者的数据单位。事件没有特定的代码表示,因为它完全由用户定义。

Event Processor:处理来自Disruptor的事件的主事件循环,并拥有消费者序列的所有权。有一种称为BatchEventProcessor的表示,它包含事件循环的有效实现,并将回调到使用过的EventHandler接口的提供实现。

Event Handler:由用户实现的接口,代表Disruptor的消费者。

Producer:这是用户代码调用Disruptor来排队事件。

image-20220117170729044

设计要点

内存分配更加合理,使用RingBuffer数据结构,数组元素在初始化时一次性全部创建,提升缓存命中率。

对象循环利用,避免频繁 GC。

能够避免伪共享,提升缓存利用率。Disruptor为了解决伪共享问题,使用的方法是缓存行填充,这是一种以空间换时间的策略,主要思想就是通过往对象中填充无意义的变量,来保证整个对象独占缓存行。而JDK8之后也提供了一个@Contended注解,使用它就可以进行自动填充,使用时需要在启动时增加一个JVM参数。

采用无锁算法,避免频繁加锁、解锁的性能消耗。支持批量消费,消费者可以无锁方式消费多个消息。

有相对更多的等待策略实现。

示例代码(多生产者多消费者)

pom文件引入disruptor的依赖

<dependency> <groupId>com.lmax</groupId> <artifactId>disruptor</artifactId> <version>3.4.4</version> </dependency>

事件类LongEvent.java

package cn.itxs.disruptor; public class LongEvent { private long value; public void set(long value) { this.value = value; } @Override public String toString() { return "LongEvent{" + "value=" + value + '}'; } }

事件工厂类EventFactory.java

package cn.itxs.disruptor; import com.lmax.disruptor.EventFactory; public class LongEventFactory implements EventFactory<LongEvent> { @Override public LongEvent newInstance() { return new LongEvent(); } }

事件处理实现类,也即是消费者,这里实现EventHandler接口,也即是每个消费者都消费相同数量的生产者数据,LongEventHandler.java

package cn.itxs.disruptor; import com.lmax.disruptor.EventHandler; public class LongEventHandler implements EventHandler<LongEvent> { public static long count = 0; @Override public void onEvent(LongEvent event, long sequence, boolean endOfBatch) { count ++; System.out.println("[" + Thread.currentThread().getName() + "]" + event + "消费序号:" + sequence + ",event=" + event.toString()); } }

测试类

package cn.itxs.disruptor; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.SleepingWaitStrategy; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; import java.util.concurrent.*; public class DisruptorMain { public static void main(String[] args) throws InterruptedException { // The factory for the event LongEventFactory factory = new LongEventFactory(); // Specify the size of the ring buffer, must be power of 2. int bufferSize = 1024*1024; // Construct the Disruptor Disruptor<LongEvent> disruptor = new Disruptor<>(factory, bufferSize, Executors.defaultThreadFactory(), ProducerType.MULTI, new SleepingWaitStrategy()); // Connect the handlers LongEventHandler h1 = new LongEventHandler(); LongEventHandler h2 = new LongEventHandler(); disruptor.handleEventsWith(h1, h2); // Start the Disruptor, starts all threads running disruptor.start(); // Get the ring buffer from the Disruptor to be used for publishing. RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); //================================================================================================ final int threadCount = 3; CyclicBarrier barrier = new CyclicBarrier(threadCount); ExecutorService service = Executors.newCachedThreadPool(); for (long i = 0; i < threadCount; i++) { final long threadNum = i; service.submit(()-> { System.out.printf("Thread %s ready to start!\n", threadNum ); try { barrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } for (int j = 0; j < 2; j++) { final int seq = j; ringBuffer.publishEvent((event, sequence) -> { event.set(seq); System.out.println(threadNum + "线程生产了序号为" + sequence + ",消息为" + seq); }); } }); } service.shutdown(); TimeUnit.SECONDS.sleep(3); System.out.println(LongEventHandler.count); } }

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-VpEC2SPp-1642435544540)(image-20220117183016502.png)]

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zzjdzy.html