disruptor笔记之八:知识点补充(终篇) (2)

在consume-mode中,上述代码有对应的服务类TranslatorPublishServiceImpl.java,并且有对应的单元测试代码(ConsumeModeServiceTest.testTranslatorPublishService),这里就不占篇幅了,您若有兴趣可以自行查阅;

看看ringBuffer.publishEvent的内部实现,是如何帮咱们省去之前那几行的,首先是调用了sequencer.next:

@Override public <A> void publishEvent(EventTranslatorOneArg<E, A> translator, A arg0) { final long sequence = sequencer.next(); translateAndPublish(translator, sequence, arg0); }

再打开translateAndPublish看看,ringBuffer.get、try-finally代码块、sequencer.publish都在,这下放心了,以前咱们做的事情,现在disruptor帮我们做了,咱们可以专心业务逻辑了:

private <A> void translateAndPublish(EventTranslatorOneArg<E, A> translator, long sequence, A arg0) { try { translator.translateTo(get(sequence), sequence, arg0); } finally { sequencer.publish(sequence); } } Lambda风格

disruptor的重要API也支持Lambda表达式作为入参,这里将几处常用的API整理如下:

Disruptor类实例化(LambdaServiceImpl.java):

// lambda类型的实例化 disruptor = new Disruptor<OrderEvent>(OrderEvent::new, BUFFER_SIZE, DaemonThreadFactory.INSTANCE);

设置事件消费者的时候,可以用Lambda取代之前的对象(LambdaServiceImpl.java):

// lambda表达式指定具体消费逻辑 disruptor.handleEventsWith((event, sequence, endOfBatch) -> { log.info("lambda操作, sequence [{}], endOfBatch [{}], event : {}", sequence, endOfBatch, event); // 这里延时100ms,模拟消费事件的逻辑的耗时 Thread.sleep(100); // 计数 eventCountPrinter.accept(null); });

发布事件的操作,也支持Lambda表达式,如下所示,我在父类ConsumeModeService.java中新增publistEvent方法,里面调用的disruptor.getRingBuffer().publishEvent的入参就是Lambda表达式和事件所需的业务数据,这样就省区了以前发布事件的类OrderEventProducer.java:

public void publistEvent(EventTranslatorOneArg<OrderEvent, String> translator, String value) { disruptor.getRingBuffer().publishEvent(translator, value); }

如下所示,现在拿到业务数据后发布事件的操作变得非常轻了,Lambda表达式中做好业务数据转事件的逻辑即可,最终,不再需要OrderEventProducer.java,一行代码完成事件发布(ConsumeModeServiceTest.java):

for(int i=0;i<EVENT_COUNT;i++) { log.info("publich {}", i); final String content = String.valueOf(i); lambdaService.publistEvent((event, sequence, value) -> event.setValue(value), content); } 清理数据

由于存储的数据结构是环形队列,对于每个事件的实例,会一直保存在队列中,直到再次在这个位置写入时才会被新的事件实例覆盖,考虑到可能有的场景要求数据被消费后就立即被清除,disruptor官方提供了以下建议:

事件定义的类中,增加一个清理业务数据的方法(假设是ObjectEvent类的clear方法);

新增一个事件处理类(假设是ClearingEventHandler),在里面主动调用事件定义类的清理业务数据的方法;

在编写事件消费逻辑时,最后添加上述事件处理类ClearingEventHandler,这样就会调用ObjectEvent实例的clear方法,将业务数据清理掉;

官方给出的代码如下:

在这里插入图片描述

至此,整个《disruptor笔记》就完成了,感谢您的关注,希望这个系列的内容能给您带来帮助,在开发中多一些选择和参考;

你不孤单,欣宸原创一路相伴

Java系列

Spring系列

Docker系列

kubernetes系列

数据库+中间件系列

DevOps系列

欢迎关注公众号:程序员欣宸

微信搜索「程序员欣宸」,我是欣宸,期待与您一同畅游Java世界...
https://github.com/zq2599/blog_demos

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zggzsx.html