接下来,分别为业务交换机和备份交换机创建消费者:
@Slf4j @Component public class BusinessMsgConsumer { @RabbitListener(queues = BUSINESS_QUEUE_NAME) public void receiveMsg(Message message, Channel channel) throws IOException { String msg = new String(message.getBody()); log.info("收到业务消息:{}", msg); channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); } } @Slf4j @Component public class BusinessWaringConsumer { @RabbitListener(queues = BUSINESS_BACKUP_WARNING_QUEUE_NAME) public void receiveMsg(Message message, Channel channel) throws IOException { String msg = new String(message.getBody()); log.error("发现不可路由消息:{}", msg); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } }接下来我们分别发送一条可路由消息和不可路由消息:
@Slf4j @Component public class BusinessMsgProducer { @Autowired private RabbitTemplate rabbitTemplate; public void sendCustomMsg(String exchange, String msg) { CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); log.info("消息id:{}, msg:{}", correlationData.getId(), msg); rabbitTemplate.convertAndSend(exchange, "key", msg, correlationData); correlationData = new CorrelationData(UUID.randomUUID().toString()); log.info("消息id:{}, msg:{}", correlationData.getId(), msg); rabbitTemplate.convertAndSend(exchange, "key2", msg, correlationData); } }消息如下:
消息id:5c3a33c9-0764-4d1f-bf6a-a00d771dccb4, msg:1 消息id:42ac8c35-1d0a-4413-a1df-c26a85435354, msg:1 收到业务消息:1 发现不可路由消息:1这里仅仅使用 error 日志配合日志系统进行报警,如果是敏感数据,可以使用邮件、钉钉、短信、电话等报警方式来提高时效性。
那么问题来了,mandatory 参数与备份交换机可以一起使用吗?设置 mandatory 参数会让交换机将不可路由消息退回给生产者,而备份交换机会让交换机将不可路由消息转发给它,那么如果两者同时开启,消息究竟何去何从??
emmm,想这么多干嘛,试试不就知道了。
修改一下生产者即可:
@Slf4j @Component public class BusinessMsgProducer implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback{ @Autowired private RabbitTemplate rabbitTemplate; @PostConstruct private void init() { // rabbitTemplate.setChannelTransacted(true); rabbitTemplate.setConfirmCallback(this); rabbitTemplate.setMandatory(true); rabbitTemplate.setReturnCallback(this); } public void sendCustomMsg(String exchange, String msg) { CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); log.info("消息id:{}, msg:{}", correlationData.getId(), msg); rabbitTemplate.convertAndSend(exchange, "key", msg, correlationData); correlationData = new CorrelationData(UUID.randomUUID().toString()); log.info("消息id:{}, msg:{}", correlationData.getId(), msg); rabbitTemplate.convertAndSend(exchange, "key2", msg, correlationData); } @Override public void confirm(CorrelationData correlationData, boolean b, String s) { String id = correlationData != null ? correlationData.getId() : ""; if (b) { log.info("消息确认成功, id:{}", id); } else { log.error("消息未成功投递, id:{}, cause:{}", id, s); } } @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.info("消息被服务器退回。msg:{}, replyCode:{}. replyText:{}, exchange:{}, routingKey :{}", new String(message.getBody()), replyCode, replyText, exchange, routingKey); } }