这周RocketMQ发布了4.3.0版本,New Feature中最受关注的一点就是支持了事务消息:
今天花了点时间看了下具体的实现内容,下面是简单的总结。
RocketMQ事务消息概要通过冯嘉发布的《RocketMQ 4.3正式发布,支持分布式事务》一文可以看到RocketMQ采用了2PC的方案来提交事务消息,同时增加一个补偿逻辑来处理二阶段超时或者失败的消息。
这张图说明了事务消息的大致方案,分为两个逻辑:正常事务消息的发送及提交、事务消息的补偿流程
事务消息发送及提交:
发送消息(half消息)
服务端响应消息写入结果
根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)
根据本地事务状态执行Commit或者Rollback(Commit操作生成消息索引,消息对消费者可见)
补偿流程:
对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查”
Producer收到回查消息,检查回查消息对应的本地事务的状态
根据本地事务状态,重新Commit或者Rollback
补偿阶段用于解决消息Commit或者Rollback发生超时或者失败的情况。
以上RocketMQ事务消息的整体方案,对于了解Notify的同学应该是很熟悉的,下面是之前Notify相关的资料:
整体方案是完全相同的,只是两者的Storage不同。
RocketMQ事务消息设计 一阶段的消息如何对用户不可见事务消息相对普通消息最大的特点就是一阶段发送的消息对用户是不可见的。
如何做到写入了消息但是对用户不可见?——写入消息数据,但是不创建对应的消息的索引信息。
熟悉RocketMQ的同学应该都清楚,消息在服务端的存储结构如上,每条消息都会有对应的索引信息,Consumer通过索引读取消息。
那么实现一阶段写入的消息不被用户消费(需要在Commit后才能消费),只需要写入Storage Queue,但是不构建Index Queue即可。
RocketMQ中具体实现策略是:写入的如果事务消息,对消息的Topic和Queue等属性进行替换,同时将原来的Topic和Queue信息存储到消息的属性中。
上图即RocketMQ替换事务消息属性的代码实现,替换属性后这条消息被写入到TransactionalMessageUtil.buildHalfTopic()的Queue 0中。
(RocketMQ将事务消息一阶段发送的消息称为Half消息让人费解,采用的2PC的方式,一阶段消息称为Prepare Message或者Pending Message更能体现它的含义)
在完成Storage Queue的写入后,在appendCallback中,普通消息会去构建消息索引,而如果发现是事务消息,则跳过了创建索引的逻辑。
如果让一阶段的消息对用户可见在完成一阶段写入一条对用户不可见的消息后,二阶段如果是Commit操作,则需要让消息对用户可见;如果是Rollback则需要撤销一阶段的消息。
先说Rollback的情况。对于Rollback,本身一阶段的消息对用户是不可见的,其实不需要真正撤销消息(实际上RocketMQ也无法去真正的删除一条消息,因为是顺序写文件的)。
但是区别于这条消息没有确定状态(Pending状态,事务悬而未决),需要一个操作来标识这条消息的最终状态。
RocketMQ事务消息方案中引入了Op消息的概念,用Op消息标识事务消息是否状态已经确定(Commit或者Rollback)。如果一条事务消息没有对应的Op消息,说明这个事务的状态还无法确定(可能是二阶段失败了)。
引入Op消息后,事务消息无论是Commit或者Rollback都会记录一个Op操作。
Commit相对于Rollback只是在写入Op消息前创建Half消息的索引。
Op消息的存储
RocketMQ将Op消息写入到全局一个特定的Topic中:TransactionalMessageUtil.buildOpTopic()
这个Topic是一个内部的Topic(像Half消息的Topic一样),不会被用户消费。
Op消息的内容为对应的Half消息的存储的Offset,这样通过Op消息能索引到Half消息进行后续的回查操作。
Half消息的索引构建
在执行二阶段的Commit操作时,需要构建出Half消息的索引。