为了进行回调,我们需要实现一个接口 RabbitTemplate.ReturnCallback。
@Slf4j @Component public class BusinessMsgProducer implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback{ @Autowired private RabbitTemplate rabbitTemplate; @PostConstruct private void init() { rabbitTemplate.setConfirmCallback(this); rabbitTemplate.setMandatory(true); rabbitTemplate.setReturnCallback(this); } public void sendCustomMsg(String exchange, String msg) { CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); log.info("消息id:{}, msg:{}", correlationData.getId(), msg); rabbitTemplate.convertAndSend(exchange, "key", msg, correlationData); correlationData = new CorrelationData(UUID.randomUUID().toString()); log.info("消息id:{}, msg:{}", correlationData.getId(), msg); rabbitTemplate.convertAndSend(exchange, "key2", msg, correlationData); } @Override public void confirm(CorrelationData correlationData, boolean b, String s) { String id = correlationData != null ? correlationData.getId() : ""; if (b) { log.info("消息确认成功, id:{}", id); } else { log.error("消息未成功投递, id:{}, cause:{}", id, s); } } @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.info("消息被服务器退回。msg:{}, replyCode:{}. replyText:{}, exchange:{}, routingKey :{}", new String(message.getBody()), replyCode, replyText, exchange, routingKey); } }然后我们再来重新运行一次:
消息id:2e5c336a-883a-474e-b40e-b6e3499088ef, msg:1 消息id:85c771cb-c88f-47dd-adea-f0da57138423, msg:1 消息确认成功, id:2e5c336a-883a-474e-b40e-b6e3499088ef 消息无法被路由,被服务器退回。msg:1, replyCode:312. replyText:NO_ROUTE, exchange:rabbitmq.tx.demo.simple.business.exchange, routingKey :key2 消息确认成功, id:85c771cb-c88f-47dd-adea-f0da57138423 收到业务消息:1可以看到,我们接收到了被退回的消息,并带上了消息被退回的原因:NO_ROUTE。但是要注意的是, mandatory 参数仅仅是在当消息无法被路由的时候,让生产者可以感知到这一点,只要开启了生产者确认机制,无论是否设置了 mandatory 参数,都会在交换机接收到消息时进行消息确认回调,而且通常消息的退回回调会在消息的确认回调之前。
备份交换机有了 mandatory 参数,我们获得了对无法投递消息的感知能力,有机会在生产者的消息无法被投递时发现并处理。但有时候,我们并不知道该如何处理这些无法路由的消息,最多打个日志,然后触发报警,再来手动处理。而通过日志来处理这些无法路由的消息是很不优雅的做法,特别是当生产者所在的服务有多台机器的时候,手动复制日志会更加麻烦而且容易出错。
而且设置 mandatory 参数会增加生产者的复杂性,需要添加处理这些被退回的消息的逻辑。如果既不想丢失消息,又不想增加生产者的复杂性,该怎么做呢?
前面在设置死信队列的文章中,我们提到,可以为队列设置死信交换机来存储那些处理失败的消息,可是这些不可路由消息根本没有机会进入到队列,因此无法使用死信队列来保存消息。
不要慌,在 RabbitMQ 中,有一种备份交换机的机制存在,可以很好的应对这个问题。
什么是备份交换机呢?备份交换机可以理解为 RabbitMQ 中交换机的“备胎”,当我们为某一个交换机声明一个对应的备份交换机时,就是为它创建一个备胎,当交换机接收到一条不可路由消息时,将会将这条消息转发到备份交换机中,由备份交换机来进行转发和处理,通常备份交换机的类型为 Fanout ,这样就能把所有消息都投递到与其绑定的队列中,然后我们在备份交换机下绑定一个队列,这样所有那些原交换机无法被路由的消息,就会都进入这个队列了。当然,我们还可以建立一个报警队列,用独立的消费者来进行监测和报警。
听的不太明白?没关系,看个图就知道是怎么回事了。
(emmm,调整了一下配色,感觉还是很丑- - 。急需一个UI来拯救我。)
接下来,我们就来设置一下备份交换机:
@Configuration public class RabbitMQConfig { public static final String BUSINESS_EXCHANGE_NAME = "rabbitmq.backup.test.exchange"; public static final String BUSINESS_QUEUE_NAME = "rabbitmq.backup.test.queue"; public static final String BUSINESS_BACKUP_EXCHANGE_NAME = "rabbitmq.backup.test.backup-exchange"; public static final String BUSINESS_BACKUP_QUEUE_NAME = "rabbitmq.backup.test.backup-queue"; public static final String BUSINESS_BACKUP_WARNING_QUEUE_NAME = "rabbitmq.backup.test.backup-warning-queue"; // 声明业务 Exchange @Bean("businessExchange") public DirectExchange businessExchange(){ ExchangeBuilder exchangeBuilder = ExchangeBuilder.directExchange(BUSINESS_EXCHANGE_NAME) .durable(true) .withArgument("alternate-exchange", BUSINESS_BACKUP_EXCHANGE_NAME); return (DirectExchange)exchangeBuilder.build(); } // 声明备份 Exchange @Bean("backupExchange") public FanoutExchange backupExchange(){ ExchangeBuilder exchangeBuilder = ExchangeBuilder.fanoutExchange(BUSINESS_BACKUP_EXCHANGE_NAME) .durable(true); return (FanoutExchange)exchangeBuilder.build(); } // 声明业务队列 @Bean("businessQueue") public Queue businessQueue(){ return QueueBuilder.durable(BUSINESS_QUEUE_NAME).build(); } // 声明业务队列绑定关系 @Bean public Binding businessBinding(@Qualifier("businessQueue") Queue queue, @Qualifier("businessExchange") DirectExchange exchange){ return BindingBuilder.bind(queue).to(exchange).with("key"); } // 声明备份队列 @Bean("backupQueue") public Queue backupQueue(){ return QueueBuilder.durable(BUSINESS_BACKUP_QUEUE_NAME).build(); } // 声明报警队列 @Bean("warningQueue") public Queue warningQueue(){ return QueueBuilder.durable(BUSINESS_BACKUP_WARNING_QUEUE_NAME).build(); } // 声明备份队列绑定关系 @Bean public Binding backupBinding(@Qualifier("backupQueue") Queue queue, @Qualifier("backupExchange") FanoutExchange exchange){ return BindingBuilder.bind(queue).to(exchange); } // 声明备份报警队列绑定关系 @Bean public Binding backupWarningBinding(@Qualifier("warningQueue") Queue queue, @Qualifier("backupExchange") FanoutExchange exchange){ return BindingBuilder.bind(queue).to(exchange); } }这里我们使用 ExchangeBuilder 来创建交换机,并为其设置备份交换机:
.withArgument("alternate-exchange", BUSINESS_BACKUP_EXCHANGE_NAME);为业务交换机绑定了一个队列,为备份交换机绑定了两个队列,一个用来存储不可投递消息,待之后人工处理,一个专门用来做报警用途。