一个基于RabbitMQ的可复用的事务消息方案 (3)

因为此模块有可能扩展出一个后台管理模块,所以要把消息的管理和状态相关字段和大体积的消息内容分别存放在两个表,从而避免大批量查询消息记录的时候MySQL服务IO使用率过高的问题(这是和上一个公司的DBA团队商讨后得到的一个比较合理的方案)。预留了两个业务字段business_module和business_key用于标识业务模块和业务键(一般是唯一识别号,例如订单号)。

一个基于RabbitMQ的可复用的事务消息方案

一般情况下,如果服务通过配置自行提前声明队列和交换器的绑定关系,那么发送RabbitMQ消息的时候其实只依赖于exchangeName和routingKey两个字段(header类型的交换器是特殊的,也比较少用,这里暂时不用考虑),考虑到服务可能会遗漏声明操作,发送消息的时候会基于队列进行首次绑定声明并且缓存相关的信息(RabbitMQ中的队列-交换器绑定声明只要每次声明绑定关系的参数一致,则不会抛出异常)。

方案代码设计

下面的方案设计描述中,暂时忽略了消息事务管理后台的API设计,这些可以在后期补充。

定义贫血模型实体类TransactionalMessage和TransactionalMessageContent:

@Data public class TransactionalMessage { private Long id; private LocalDateTime createTime; private LocalDateTime editTime; private String creator; private String editor; private Integer deleted; private Integer currentRetryTimes; private Integer maxRetryTimes; private String queueName; private String exchangeName; private String exchangeType; private String routingKey; private String businessModule; private String businessKey; private LocalDateTime nextScheduleTime; private Integer messageStatus; private Long initBackoff; private Integer backoffFactor; } @Data public class TransactionalMessageContent { private Long id; private Long messageId; private String content; }

然后定义dao接口(这里暂时不展开实现的细节代码,存储使用MySQL,如果要替换为其他类型的数据库,只需要使用不同的实现即可):

public interface TransactionalMessageDao { void insertSelective(TransactionalMessage record); void updateStatusSelective(TransactionalMessage record); List<TransactionalMessage> queryPendingCompensationRecords(LocalDateTime minScheduleTime, LocalDateTime maxScheduleTime, int limit); } public interface TransactionalMessageContentDao { void insert(TransactionalMessageContent record); List<TransactionalMessageContent> queryByMessageIds(String messageIds); }

接着定义事务消息服务接口TransactionalMessageService:

// 对外提供的服务类接口 public interface TransactionalMessageService { void sendTransactionalMessage(Destination destination, TxMessage message); } @Getter @RequiredArgsConstructor public enum ExchangeType { FANOUT("fanout"), DIRECT("direct"), TOPIC("topic"), DEFAULT(""), ; private final String type; } // 发送消息的目的地 public interface Destination { ExchangeType exchangeType(); String queueName(); String exchangeName(); String routingKey(); } @Builder public class DefaultDestination implements Destination { private ExchangeType exchangeType; private String queueName; private String exchangeName; private String routingKey; @Override public ExchangeType exchangeType() { return exchangeType; } @Override public String queueName() { return queueName; } @Override public String exchangeName() { return exchangeName; } @Override public String routingKey() { return routingKey; } } // 事务消息 public interface TxMessage { String businessModule(); String businessKey(); String content(); } @Builder public class DefaultTxMessage implements TxMessage { private String businessModule; private String businessKey; private String content; @Override public String businessModule() { return businessModule; } @Override public String businessKey() { return businessKey; } @Override public String content() { return content; } } // 消息状态 @RequiredArgsConstructor public enum TxMessageStatus { /** * 成功 */ SUCCESS(1), /** * 待处理 */ PENDING(0), /** * 处理失败 */ FAIL(-1), ; private final Integer status; }

TransactionalMessageService的实现类是事务消息的核心功能实现,代码如下:

@Slf4j @Service @RequiredArgsConstructor public class RabbitTransactionalMessageService implements TransactionalMessageService { private final AmqpAdmin amqpAdmin; private final TransactionalMessageManagementService managementService; private static final ConcurrentMap<String, Boolean> QUEUE_ALREADY_DECLARE = new ConcurrentHashMap<>(); @Override public void sendTransactionalMessage(Destination destination, TxMessage message) { String queueName = destination.queueName(); String exchangeName = destination.exchangeName(); String routingKey = destination.routingKey(); ExchangeType exchangeType = destination.exchangeType(); // 原子性的预声明 QUEUE_ALREADY_DECLARE.computeIfAbsent(queueName, k -> { Queue queue = new Queue(queueName); amqpAdmin.declareQueue(queue); Exchange exchange = new CustomExchange(exchangeName, exchangeType.getType()); amqpAdmin.declareExchange(exchange); Binding binding = BindingBuilder.bind(queue).to(exchange).with(routingKey).noargs(); amqpAdmin.declareBinding(binding); return true; }); TransactionalMessage record = new TransactionalMessage(); record.setQueueName(queueName); record.setExchangeName(exchangeName); record.setExchangeType(exchangeType.getType()); record.setRoutingKey(routingKey); record.setBusinessModule(message.businessModule()); record.setBusinessKey(message.businessKey()); String content = message.content(); // 保存事务消息记录 managementService.saveTransactionalMessageRecord(record, content); // 注册事务同步器 TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() { @Override public void afterCommit() { managementService.sendMessageSync(record, content); } }); } }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zyjddj.html