producer:
生产者如何保证幂等性,感觉这个话题没什么好讨论的,如果发生失败就重新发送,否则就正常发送就好了。
consumer:
消费者保证消息幂等性,最主要是利用中间表来保存消费记录:
本地新增表来保存消息消费记录
在消息消费前,先判断MessageId判断是否存在消费记录
如果存在,直接响应
如果不存在,则开启本地事务,接着进行消息消费
当成功消费时提交事务,否则回滚
2.2.3、代码实战如何利用消费记录表和本地事务来完成消息消费的幂等性,看下面代码:
发送消息
** * * @author winfun **/ @Slf4j public class FourthProducerDemo { public static void main(String[] args) throws PulsarClientException { PulsarClient client = PulsarClient.builder() .serviceUrl("pulsar://127.0.0.1:6650") .build(); ProducerBuilder<String> productBuilder = client.newProducer(Schema.STRING).topic("winfun/study/test-topic4") .blockIfQueueFull(Boolean.TRUE).batchingMaxMessages(100).enableBatching(Boolean.TRUE).sendTimeout(3, TimeUnit.SECONDS); Producer<String> producer = productBuilder.create(); for (int i = 0; i < 20; i++) { MsgDTO msgDTO = new MsgDTO(); String content = "hello"+i; String key; if (content.contains("1")){ key = "k213e434y1df"; }else if (content.contains("2")){ key = "keasdgashgfy2"; }else { key = "other"; } msgDTO.setId(key); msgDTO.setContent(content); producer.send(JSONUtil.toJsonStr(msgDTO)); } producer.close(); } }消费消息
package com.github.howinfun.consumer.idempotent; import cn.hutool.json.JSONUtil; import com.github.howinfun.core.entity.MessageConsumeRecord; import com.github.howinfun.core.service.MessageConsumeRecordService; import com.github.howinfun.dto.MsgDTO; import io.github.howinfun.listener.BaseMessageListener; import io.github.howinfun.listener.PulsarListener; import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.Objects; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import javax.annotation.PostConstruct; import jodd.util.concurrent.ThreadFactoryBuilder; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.PulsarClientException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.transaction.annotation.Transactional; /** * 幂等性消费-消费者demo * @author: winfun * @date: 2021/9/2 12:49 下午 **/ @Slf4j @PulsarListener(topics = {"test-topic4"}) public class IdempotentConsumerListener extends BaseMessageListener { List<ExecutorService> executorServiceList = new ArrayList<>(); @Autowired private MessageConsumeRecordService service; /** * 初始化自定义线程池列表 */ @PostConstruct public void initCustomThreadPool(){ for (int i = 0; i < 10; i++) { /** * 1、核心线程数和最大线程数都为1,避免多线程消费导致顺序被打乱 * 2、使用有界队列,设定最大长度,避免无限任务数导致OOM * 3、使用CallerRunsPolicy拒绝策略,让当前线程执行,避免消息丢失,也可以直接让消费者执行当前任务,阻塞住其他任务,也能保证顺序性 */ ExecutorService threadPoolExecutor = new ThreadPoolExecutor( 1, 1, 60, TimeUnit.MINUTES, new LinkedBlockingDeque<>(100), new ThreadFactoryBuilder().setNameFormat(String.format("custom-thread-pool-%d",i)).get(), new ThreadPoolExecutor.CallerRunsPolicy() ); this.executorServiceList.add(threadPoolExecutor); } } /** * 消费消息 * 自定义监听器实现方法 * 消息如何响应由开发者决定: * Consumer#acknowledge * Consumer#reconsumeLater * Consumer#negativeAcknowledge * * @param consumer 消费者 * @param msg 消息 */ @Override protected void doReceived(Consumer<String> consumer, Message<String> msg) { boolean flag = preReceived(msg); if (Boolean.FALSE.equals(flag)){ String value = msg.getValue(); MsgDTO msgDTO = JSONUtil.toBean(value, MsgDTO.class); int index = (this.executorServiceList.size()-1)&this.spreed(msgDTO.getId().hashCode()); log.info("成功获取线程池列表索引,msgId is {}, index is {}",msgDTO.getId(),index); ExecutorService executorService = this.executorServiceList.get(index); executorService.execute(()->{ try { this.doInnerReceived(consumer,msg); } catch (PulsarClientException e) { log.error("消息消费失败",e); } }); }else { log.info("此消息的消费记录已存在,直接响应,messageId is {}", msg.getMessageId().toString()); try { consumer.acknowledge(msg); } catch (PulsarClientException e) { log.error("消息提交失败",e); } } } /** * 消费前判断,messageId是否存在对应的消费记录 * @param msg 消息 * @return 存在结果 */ private boolean preReceived(Message<String> msg){ MessageConsumeRecord record = this.service.getByMessageId(msg.getMessageId().toString()); if (Objects.isNull(record)){ return false; } return true; } /** * 消息消费 * @param consumer 消费者 * @param msg 消息 */ @Transactional(rollbackFor = Exception.class) public void doInnerReceived(Consumer<String> consumer,Message<String> msg) throws PulsarClientException { String messageContent = msg.getValue(); String messageId = msg.getMessageId().toString(); log.info("成功消费消息,threadName is {},msg is {}",Thread.currentThread().getName(),messageContent); this.service.save(new MessageConsumeRecord() .setMessageId(messageId) .setMessageContent(messageContent) .setCreateTime(new Date())); // 模拟重复消费,如果消息内容包含8,则插入数据库,但是不响应 if (messageContent.contains("8")){ log.info("消息已被消费入库,但不响应,模拟重复消费,messageId is {},messageContent is {}",messageId,messageContent); }else { consumer.acknowledge(msg); } } /** * hashCode扩展,保证hashCode前后十六位都能完美计算 * @param hashCode * @return */ private int spreed(int hashCode){ return (hashCode ^ (hashCode >>> 16)) & hashCode; } /*** * 是否开启异步消费,默认开启 * @return {@link Boolean } **/ @Override public Boolean enableAsync() { // 首先关闭线程池异步并发消费 return Boolean.FALSE; } } 2.3、可靠性 2.3.1、活动图生产者: