原文链接:Pulsar の 保证消息的顺序性、幂等性和可靠性 一、背景
前面两篇文章,已经介绍了关于Pulsar消费者的详细使用和自研的Pulsar组件。
接下来,将简单分析如何保证消息的顺序性、幂等性和可靠性;但并不会每个分析都会进行代码实战,进行代码实战的都是比较有意思的点,如消费消息如何保证顺序性和幂等性,而其他的其实都是比较简单的,就不做代码实战了。
二、特性分析 2.1、顺序性保证消息是按顺序发送,按顺序消费,一个接着一个。
2.1.1、活动图 2.1.2、分析producer:
发送者保证消息的顺序性其实是比较简单的:
利用单队列发送
一个业务对应一个队列
一个队列只能由一个消费者监听消费
利用 Pulsar 的分区Topic
producer发送消息时需要指定key属性,Pulsar自动会根据Key值将消息分配到指定的分区中
支持多个消费者消费,多个消费者可以监听同一个分区,但是相同的Key只会分配给同一个消费者
生产者这里就不做什么实战的,都是比较简单的点,没啥好说的。
consumer:
消费者保证消息的顺序性有下面两种方式:
当前线程执行
单线程执行保证了消费的顺序性
消费效率低
自定义线程池列表异步并发消费
如果直接使用线程池,那么虽然能提高消费效率,但是并不能保证顺序性
这里我们会自定义线程池列表,列表中的线程池的核心线程数和最大线程数都是1,保证顺序消费
Producer发送的消息体中,需指定key,我们会根据key#hashCode定位到对应的线程池,这里参考HashMap的做法。
2.1.3、代码实战消费者保证消息顺序性的第二点的实现还是比较有意思的:如何自定义线程池列表、如何根据消息的key来定位线程池。
代码如下:
发送消息:
/** * 指定key发送消息 * @author winfun **/ @Slf4j public class ThirdProducerDemo { public static void main(String[] args) throws PulsarClientException { PulsarClient client = PulsarClient.builder() .serviceUrl("pulsar://127.0.0.1:6650") .build(); ProducerBuilder<String> productBuilder = client.newProducer(Schema.STRING).topic("winfun/study/test-topic3") .blockIfQueueFull(Boolean.TRUE).batchingMaxMessages(100).enableBatching(Boolean.TRUE).sendTimeout(3, TimeUnit.SECONDS); Producer<String> producer = productBuilder.create(); for (int i = 0; i < 100; i++) { MsgDTO msgDTO = new MsgDTO(); String content = "hello"+i; String key; if (content.contains("1")){ key = "k213e434y1df"; }else if (content.contains("2")){ key = "keasdgashgfy2"; }else { key = "other"; } msgDTO.setId(key); msgDTO.setContent(content); producer.send(JSONUtil.toJsonStr(msgDTO)); } producer.close(); } }消费消息
/** * 顺序性消费-消费者demo * @author: winfun **/ @Slf4j @PulsarListener(topics = {"test-topic3"}) public class SuccessionConsumerListener extends BaseMessageListener { List<ExecutorService> executorServiceList = new ArrayList<>(); /** * 初始化自定义线程池列表 */ @PostConstruct public void initCustomThreadPool(){ for (int i = 0; i < 10; i++) { /** * 1、核心线程数和最大线程数都为1,避免多线程消费导致顺序被打乱 * 2、使用有界队列,设定最大长度,避免无限任务数导致OOM * 3、使用CallerRunsPolicy拒绝策略,让当前线程执行,避免消息丢失,也可以直接让消费者执行当前任务,阻塞住其他任务,也能保证顺序性 */ ExecutorService threadPoolExecutor = new ThreadPoolExecutor( 1, 1, 60, TimeUnit.MINUTES, new LinkedBlockingDeque<>(100), new ThreadFactoryBuilder().setNameFormat(String.format("custom-thread-pool-%d",i)).get(), new ThreadPoolExecutor.CallerRunsPolicy() ); this.executorServiceList.add(threadPoolExecutor); } } /** * 消费消息 * 自定义监听器实现方法 * 消息如何响应由开发者决定: * Consumer#acknowledge * Consumer#reconsumeLater * Consumer#negativeAcknowledge * * @param consumer 消费者 * @param msg 消息 */ @Override protected void doReceived(Consumer<String> consumer, Message<String> msg) { String value = msg.getValue(); MsgDTO msgDTO = JSONUtil.toBean(value, MsgDTO.class); // 匹配列表中对应的线程池 int index = (this.executorServiceList.size()-1)&this.spreed(msgDTO.getId().hashCode()); log.info("成功获取线程池列表索引,msgId is {}, index is {}",msgDTO.getId(),index); ExecutorService executorService = this.executorServiceList.get(index); executorService.execute(()->{ log.info("成功消费消息,threadName is {},msg is {}",Thread.currentThread().getName(),msg); consumer.acknowledgeAsync(msg); }); } /** * hashCode扩展,保证hashCode前后十六位都能完美进行位运算 * @param hashCode * @return */ private int spreed(int hashCode){ return (hashCode ^ (hashCode >>> 16)) & hashCode; } /*** * 是否开启异步消费,默认开启 * @return {@link Boolean } **/ @Override public Boolean enableAsync() { // 首先关闭线程池异步并发消费 return Boolean.FALSE; } } 2.2、幂等性幂等性的话,我们主要是分析一下消费者的,如何保证消费者只正确消费一次消息还是非常重要的。
2.2.1、活动图