最后是定时任务的配置类:
@Slf4j @RequiredArgsConstructor @Configuration @EnableScheduling public class ScheduleJobAutoConfiguration { private final TransactionalMessageManagementService managementService; /** * 这里用的是本地的Redis,实际上要做成配置 */ private final RedissonClient redisson = Redisson.create(); @Scheduled(fixedDelay = 10000) public void transactionalMessageCompensationTask() throws Exception { RLock lock = redisson.getLock("transactionalMessageCompensationTask"); // 等待时间5秒,预期300秒执行完毕,这两个值需要按照实际场景定制 boolean tryLock = lock.tryLock(5, 300, TimeUnit.SECONDS); if (tryLock) { try { long start = System.currentTimeMillis(); log.info("开始执行事务消息推送补偿定时任务..."); managementService.processPendingCompensationRecords(); long end = System.currentTimeMillis(); long delta = end - start; // 以防锁过早释放 if (delta < 5000) { Thread.sleep(5000 - delta); } log.info("执行事务消息推送补偿定时任务完毕,耗时:{} ms...", end - start); } finally { lock.unlock(); } } } }基本代码编写完,整个项目的结构如下:
最后添加两个测试类:
@RequiredArgsConstructor @Component public class MockBusinessRunner implements CommandLineRunner { private final MockBusinessService mockBusinessService; @Override public void run(String... args) throws Exception { mockBusinessService.saveOrder(); } } @Slf4j @RequiredArgsConstructor @Service public class MockBusinessService { private final JdbcTemplate jdbcTemplate; private final TransactionalMessageService transactionalMessageService; private final ObjectMapper objectMapper; @Transactional(rollbackFor = Exception.class) public void saveOrder() throws Exception { String orderId = UUID.randomUUID().toString(); BigDecimal amount = BigDecimal.valueOf(100L); Map<String, Object> message = new HashMap<>(); message.put("orderId", orderId); message.put("amount", amount); jdbcTemplate.update("INSERT INTO t_order(order_id,amount) VALUES (?,?)", p -> { p.setString(1, orderId); p.setBigDecimal(2, amount); }); String content = objectMapper.writeValueAsString(message); transactionalMessageService.sendTransactionalMessage( DefaultDestination.builder() .exchangeName("tm.test.exchange") .queueName("tm.test.queue") .routingKey("tm.test.key") .exchangeType(ExchangeType.DIRECT) .build(), DefaultTxMessage.builder() .businessKey(orderId) .businessModule("SAVE_ORDER") .content(content) .build() ); log.info("保存订单:{}成功...", orderId); } }某次测试结果如下:
2020-02-05 21:10:13.287 INFO 49556 --- [ main] club.throwable.cm.MockBusinessService : 保存订单:07a75323-460b-42cb-aa63-1a0a45ce19bf成功...模拟订单数据成功保存,而且RabbitMQ消息在事务成功提交后正常发送到RabbitMQ服务端中,如RabbitMQ控制台数据所示。
小结事务消息模块的设计仅仅是使异步消息推送这个功能实现趋向于完备,其实一个合理的异步消息交互系统,一定会提供同步查询接口,这一点是基于异步消息没有回调或者没有响应的特性导致的。一般而言,一个系统的吞吐量和系统的异步化处理占比成正相关(这一点可以参考Amdahl's Law),所以在系统架构设计实际中应该尽可能使用异步交互,提高系统吞吐量同时减少同步阻塞带来的无谓等待。事务消息模块可以扩展出一个后台管理,甚至可以配合Micrometer、Prometheus和Grafana体系做实时数据监控。