一个基于RabbitMQ的可复用的事务消息方案 (2)

还有一种极为特殊的情况是RabbitMQ服务端本身出现故障导致消息推送异常,这种情况下需要进行重试(补偿推送),经验证明短时间内的反复重试是没有意义的,故障的服务一般不会瞬时恢复,所以可以考虑使用指数退避算法进行重试,同时需要限制最大重试次数。

一个基于RabbitMQ的可复用的事务消息方案

指数值、间隔值和最大重试次数上限需要根据实际情况设定,否则容易出现消息延时过大或者重试过于频繁等问题。

方案实施

引入核心依赖:

<properties> <spring.boot.version>2.2.4.RELEASE</spring.boot.version> <redisson.version>3.12.1</redisson.version> <mysql.connector.version>5.1.48</mysql.connector.version> </properties> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-dependencies</artifactId> <version>${spring.boot.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>${mysql.connector.version}</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jdbc</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-aop</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>${redisson.version}</version> </dependency> </dependencies>

spring-boot-starter-jdbc、mysql-connector-java和spring-boot-starter-aop是MySQL事务相关,而spring-boot-starter-amqp是RabbitMQ客户端的封装,redisson主要使用其分布式锁,用于补偿定时任务的加锁执行(以防止服务多个节点并发执行补偿推送)。

表设计

事务消息模块主要涉及两张表,以MySQL为例,建表DDL如下:

CREATE TABLE `t_transactional_message` ( id BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY, create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, edit_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, creator VARCHAR(20) NOT NULL DEFAULT 'admin', editor VARCHAR(20) NOT NULL DEFAULT 'admin', deleted TINYINT NOT NULL DEFAULT 0, current_retry_times TINYINT NOT NULL DEFAULT 0 COMMENT '当前重试次数', max_retry_times TINYINT NOT NULL DEFAULT 5 COMMENT '最大重试次数', queue_name VARCHAR(255) NOT NULL COMMENT '队列名', exchange_name VARCHAR(255) NOT NULL COMMENT '交换器名', exchange_type VARCHAR(8) NOT NULL COMMENT '交换类型', routing_key VARCHAR(255) COMMENT '路由键', business_module VARCHAR(32) NOT NULL COMMENT '业务模块', business_key VARCHAR(255) NOT NULL COMMENT '业务键', next_schedule_time DATETIME NOT NULL COMMENT '下一次调度时间', message_status TINYINT NOT NULL DEFAULT 0 COMMENT '消息状态', init_backoff BIGINT UNSIGNED NOT NULL DEFAULT 10 COMMENT '退避初始化值,单位为秒', backoff_factor TINYINT NOT NULL DEFAULT 2 COMMENT '退避因子(也就是指数)', INDEX idx_queue_name (queue_name), INDEX idx_create_time (create_time), INDEX idx_next_schedule_time (next_schedule_time), INDEX idx_business_key (business_key) ) COMMENT '事务消息表'; CREATE TABLE `t_transactional_message_content` ( id BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY, message_id BIGINT UNSIGNED NOT NULL COMMENT '事务消息记录ID', content TEXT COMMENT '消息内容' ) COMMENT '事务消息内容表';

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zyjddj.html