Pulsar の 保证消息的顺序性、幂等性和可靠性

原文链接:Pulsar保证消息顺序性、幂等性和可靠性 一、背景

前面两篇文章,已经介绍了关于Pulsar消费者的详细使用和自研的Pulsar组件。

接下来,将简单分析如何保证消息顺序性、幂等性和可靠性;但并不会每个分析都会进行代码实战,进行代码实战的都是比较有意思的点,如消费消息如何保证顺序性和幂等性,而其他的其实都是比较简单的,就不做代码实战了。

二、特性分析 2.1、顺序性

保证消息是按顺序发送,按顺序消费,一个接着一个。

2.1.1、活动图

在这里插入图片描述

2.1.2、分析

producer:

发送者保证消息的顺序性其实是比较简单的:

利用单队列发送

一个业务对应一个队列

一个队列只能由一个消费者监听消费

利用 Pulsar 的分区Topic

producer发送消息时需要指定key属性,Pulsar自动会根据Key值将消息分配到指定的分区中

支持多个消费者消费,多个消费者可以监听同一个分区,但是相同的Key只会分配给同一个消费者

生产者这里就不做什么实战的,都是比较简单的点,没啥好说的。

consumer:

消费者保证消息的顺序性有下面两种方式:

当前线程执行

单线程执行保证了消费的顺序性

消费效率低

自定义线程池列表异步并发消费

如果直接使用线程池,那么虽然能提高消费效率,但是并不能保证顺序性

这里我们会自定义线程池列表,列表中的线程池的核心线程数和最大线程数都是1,保证顺序消费

Producer发送的消息体中,需指定key,我们会根据key#hashCode定位到对应的线程池,这里参考HashMap的做法。

2.1.3、代码实战

消费者保证消息顺序性的第二点的实现还是比较有意思的:如何自定义线程池列表、如何根据消息的key来定位线程池。

代码如下:

发送消息:

/** * 指定key发送消息 * @author winfun **/ @Slf4j public class ThirdProducerDemo { public static void main(String[] args) throws PulsarClientException { PulsarClient client = PulsarClient.builder() .serviceUrl("pulsar://127.0.0.1:6650") .build(); ProducerBuilder<String> productBuilder = client.newProducer(Schema.STRING).topic("winfun/study/test-topic3") .blockIfQueueFull(Boolean.TRUE).batchingMaxMessages(100).enableBatching(Boolean.TRUE).sendTimeout(3, TimeUnit.SECONDS); Producer<String> producer = productBuilder.create(); for (int i = 0; i < 100; i++) { MsgDTO msgDTO = new MsgDTO(); String content = "hello"+i; String key; if (content.contains("1")){ key = "k213e434y1df"; }else if (content.contains("2")){ key = "keasdgashgfy2"; }else { key = "other"; } msgDTO.setId(key); msgDTO.setContent(content); producer.send(JSONUtil.toJsonStr(msgDTO)); } producer.close(); } }

消费消息

/** * 顺序性消费-消费者demo * @author: winfun **/ @Slf4j @PulsarListener(topics = {"test-topic3"}) public class SuccessionConsumerListener extends BaseMessageListener { List<ExecutorService> executorServiceList = new ArrayList<>(); /** * 初始化自定义线程池列表 */ @PostConstruct public void initCustomThreadPool(){ for (int i = 0; i < 10; i++) { /** * 1、核心线程数和最大线程数都为1,避免多线程消费导致顺序被打乱 * 2、使用有界队列,设定最大长度,避免无限任务数导致OOM * 3、使用CallerRunsPolicy拒绝策略,让当前线程执行,避免消息丢失,也可以直接让消费者执行当前任务,阻塞住其他任务,也能保证顺序性 */ ExecutorService threadPoolExecutor = new ThreadPoolExecutor( 1, 1, 60, TimeUnit.MINUTES, new LinkedBlockingDeque<>(100), new ThreadFactoryBuilder().setNameFormat(String.format("custom-thread-pool-%d",i)).get(), new ThreadPoolExecutor.CallerRunsPolicy() ); this.executorServiceList.add(threadPoolExecutor); } } /** * 消费消息 * 自定义监听器实现方法 * 消息如何响应由开发者决定: * Consumer#acknowledge * Consumer#reconsumeLater * Consumer#negativeAcknowledge * * @param consumer 消费者 * @param msg 消息 */ @Override protected void doReceived(Consumer<String> consumer, Message<String> msg) { String value = msg.getValue(); MsgDTO msgDTO = JSONUtil.toBean(value, MsgDTO.class); // 匹配列表中对应的线程池 int index = (this.executorServiceList.size()-1)&this.spreed(msgDTO.getId().hashCode()); log.info("成功获取线程池列表索引,msgId is {}, index is {}",msgDTO.getId(),index); ExecutorService executorService = this.executorServiceList.get(index); executorService.execute(()->{ log.info("成功消费消息,threadName is {},msg is {}",Thread.currentThread().getName(),msg); consumer.acknowledgeAsync(msg); }); } /** * hashCode扩展,保证hashCode前后十六位都能完美进行位运算 * @param hashCode * @return */ private int spreed(int hashCode){ return (hashCode ^ (hashCode >>> 16)) & hashCode; } /*** * 是否开启异步消费,默认开启 * @return {@link Boolean } **/ @Override public Boolean enableAsync() { // 首先关闭线程池异步并发消费 return Boolean.FALSE; } } 2.2、幂等性

幂等性的话,我们主要是分析一下消费者的,如何保证消费者只正确消费一次消息还是非常重要的。

2.2.1、活动图

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zzjggg.html