一个基于RabbitMQ的可复用的事务消息方案

分布式事务是微服务实践中一个比较棘手的问题,在笔者所实施的微服务实践方案中,都采用了折中或者规避强一致性的方案。参考Ebay多年前提出的本地消息表方案,基于RabbitMQ和MySQL(JDBC)做了轻量级的封装,实现了低入侵性的事务消息模块。本文的内容就是详细分析整个方案的设计思路和实施。环境依赖如下:

JDK1.8+

spring-boot-start-web:2.x.x、spring-boot-start-jdbc:2.x.x、spring-boot-start-amqp:2.x.x

HikariCP:3.x.x(spring-boot-start-jdbc自带)、mysql-connector-java:5.1.48

redisson:3.12.1

方案设计思路

事务消息原则上只适合弱一致性(或者说最终一致性)的场景,常见的弱一致性场景如:

用户服务完成了注册动作,向短信服务推送一条营销相关的消息。

信贷体系中,订单服务保存订单完毕,向审批服务推送一条待审批的订单记录信息。

......

强一致性的场景一般不应该选用事务消息

一个基于RabbitMQ的可复用的事务消息方案

一般情况下,要求强一致性说明要严格同步,也就是所有操作必须同时成功或者同时失败,这样就会引入同步带来的额外消耗。如果一个事务消息模块设计合理,补偿、查询、监控等等功能都完毕,由于系统交互是异步的,整体吞吐要比严格同步高。在笔者负责的业务系统中基于事务消息使用还定制了一条基本原则:消息内容正确的前提下,消费方出现异常需要自理

简单来说就是:上游保证了自身的业务正确性,成功推送了正确的消息到RabbitMQ就认为上游义务已经结束。

为了降低代码的入侵性,事务消息需要借助Spring的编程式事务或者声明式事务。编程式事务一般依赖于TransactionTemplate,而声明式事务依托于AOP模块,依赖于注解@Transactional。

接着需要自定义一个事务消息功能模块,新增一个事务消息记录表(其实就是本地消息表),用于保存每一条需要发送的消息记录。事务消息功能模块的主要功能是:

保存消息记录。

推送消息到RabbitMQ服务端。

消息记录的查询、补偿推送等等。

事务执行的逻辑单元

在事务执行的逻辑单元里面,需要进行待推送的事务消息记录的保存,也就是:本地(业务)逻辑和事务消息记录保存操作绑定在同一个事务

一个基于RabbitMQ的可复用的事务消息方案

发送消息到RabbitMQ服务端这一步需要延后到事务提交之后,这样才能保证事务提交成功和消息成功发送到RabbitMQ服务端这两个操作是一致的。为了把保存待发送的事务消息发送消息到RabbitMQ两个动作从使用者感知角度合并为一个动作,这里需要用到Spring特有的事务同步器TransactionSynchronization,这里分析一下事务同步器的主要方法的回调位置,主要参考AbstractPlatformTransactionManager#commit()或者AbstractPlatformTransactionManager#processCommit()方法:

一个基于RabbitMQ的可复用的事务消息方案

上图仅仅演示了事务正确提交的场景(不包含异常的场景)。这里可以明确知道,事务同步器TransactionSynchronization的afterCommit()和afterCompletion(int status)方法都在真正的事务提交点AbstractPlatformTransactionManager#doCommit()之后回调,因此可以选用这两个方法其中之一用于执行推送消息到RabbitMQ服务端,整体的伪代码如下:

@Transactional public Dto businessMethod(){ business transaction code block ... // 保存事务消息 [saveTransactionMessageRecord()] // 注册事务同步器 - 在afterCommit()方法中推送消息到RabbitMQ [register TransactionSynchronization,send message in method afterCommit()] business transaction code block ... }

上面伪代码中,保存事务消息注册事务同步器两个步骤可以安插在事务方法中的任意位置,也就是说与执行顺序无关。

事务消息的补偿

虽然之前提到笔者建议下游服务自理自身服务消费异常的场景,但是有些时候迫于无奈还是需要上游把对应的消息重新推送,这个算是特殊的场景。另外还有一个场景需要考虑:事务提交之后触发事务同步器TransactionSynchronization的afterCommit()方法失败。这是一个低概率的场景,但是在生产中一定会出现,一个比较典型的原因就是:事务提交完成后尚未来得及触发TransactionSynchronization#afterCommit()方法进行推送服务实例就被重启。如下图所示:

一个基于RabbitMQ的可复用的事务消息方案

为了统一处理补偿推送的问题,使用了有限状态判断消息是否已经推送成功:

在事务方法内,保存事务消息的时候,标记消息记录推送状态为处理中

事务同步器接口TransactionSynchronization的afterCommit()方法的实现中,推送对应的消息到RabbitMQ,然后更变事务消息记录的状态为推送成功

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zyjddj.html