一个基于RabbitMQ的可复用的事务消息方案 (4)

消息记录状态和内容持久化的管理统一放在TransactionalMessageManagementService中:

@Slf4j @RequiredArgsConstructor @Service public class TransactionalMessageManagementService { private final TransactionalMessageDao messageDao; private final TransactionalMessageContentDao contentDao; private final RabbitTemplate rabbitTemplate; private static final LocalDateTime END = LocalDateTime.of(2999, 1, 1, 0, 0, 0); private static final long DEFAULT_INIT_BACKOFF = 10L; private static final int DEFAULT_BACKOFF_FACTOR = 2; private static final int DEFAULT_MAX_RETRY_TIMES = 5; private static final int LIMIT = 100; public void saveTransactionalMessageRecord(TransactionalMessage record, String content) { record.setMessageStatus(TxMessageStatus.PENDING.getStatus()); record.setNextScheduleTime(calculateNextScheduleTime(LocalDateTime.now(), DEFAULT_INIT_BACKOFF, DEFAULT_BACKOFF_FACTOR, 0)); record.setCurrentRetryTimes(0); record.setInitBackoff(DEFAULT_INIT_BACKOFF); record.setBackoffFactor(DEFAULT_BACKOFF_FACTOR); record.setMaxRetryTimes(DEFAULT_MAX_RETRY_TIMES); messageDao.insertSelective(record); TransactionalMessageContent messageContent = new TransactionalMessageContent(); messageContent.setContent(content); messageContent.setMessageId(record.getId()); contentDao.insert(messageContent); } public void sendMessageSync(TransactionalMessage record, String content) { try { rabbitTemplate.convertAndSend(record.getExchangeName(), record.getRoutingKey(), content); if (log.isDebugEnabled()) { log.debug("发送消息成功,目标队列:{},消息内容:{}", record.getQueueName(), content); } // 标记成功 markSuccess(record); } catch (Exception e) { // 标记失败 markFail(record, e); } } private void markSuccess(TransactionalMessage record) { // 标记下一次执行时间为最大值 record.setNextScheduleTime(END); record.setCurrentRetryTimes(record.getCurrentRetryTimes().compareTo(record.getMaxRetryTimes()) >= 0 ? record.getMaxRetryTimes() : record.getCurrentRetryTimes() + 1); record.setMessageStatus(TxMessageStatus.SUCCESS.getStatus()); record.setEditTime(LocalDateTime.now()); messageDao.updateStatusSelective(record); } private void markFail(TransactionalMessage record, Exception e) { log.error("发送消息失败,目标队列:{}", record.getQueueName(), e); record.setCurrentRetryTimes(record.getCurrentRetryTimes().compareTo(record.getMaxRetryTimes()) >= 0 ? record.getMaxRetryTimes() : record.getCurrentRetryTimes() + 1); // 计算下一次的执行时间 LocalDateTime nextScheduleTime = calculateNextScheduleTime( record.getNextScheduleTime(), record.getInitBackoff(), record.getBackoffFactor(), record.getCurrentRetryTimes() ); record.setNextScheduleTime(nextScheduleTime); record.setMessageStatus(TxMessageStatus.FAIL.getStatus()); record.setEditTime(LocalDateTime.now()); messageDao.updateStatusSelective(record); } /** * 计算下一次执行时间 * * @param base 基础时间 * @param initBackoff 退避基准值 * @param backoffFactor 退避指数 * @param round 轮数 * @return LocalDateTime */ private LocalDateTime calculateNextScheduleTime(LocalDateTime base, long initBackoff, long backoffFactor, long round) { double delta = initBackoff * Math.pow(backoffFactor, round); return base.plusSeconds((long) delta); } /** * 推送补偿 - 里面的参数应该根据实际场景定制 */ public void processPendingCompensationRecords() { // 时间的右值为当前时间减去退避初始值,这里预防把刚保存的消息也推送了 LocalDateTime max = LocalDateTime.now().plusSeconds(-DEFAULT_INIT_BACKOFF); // 时间的左值为右值减去1小时 LocalDateTime min = max.plusHours(-1); Map<Long, TransactionalMessage> collect = messageDao.queryPendingCompensationRecords(min, max, LIMIT) .stream() .collect(Collectors.toMap(TransactionalMessage::getId, x -> x)); if (!collect.isEmpty()) { StringJoiner joiner = new StringJoiner(",", "(", ")"); collect.keySet().forEach(x -> joiner.add(x.toString())); contentDao.queryByMessageIds(joiner.toString()) .forEach(item -> { TransactionalMessage message = collect.get(item.getMessageId()); sendMessageSync(message, item.getContent()); }); } } }

这里有一点尚待优化:更新事务消息记录状态的方法可以优化为批量更新,在limit比较大的时候,批量更新的效率会更高。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zyjddj.html