【RabbitMQ】如何进行消息可靠投递【下篇】

上一篇文章里,我们了解了如何保证消息被可靠投递到RabbitMQ的交换机中,但还有一些不完美的地方,试想一下,如果向RabbitMQ服务器发送一条消息,服务器确实也接收到了这条消息,于是给你返回了ACK确认消息,但服务器拿到这条消息一看,找不到路由它的队列,于是就把它丢进了垃圾桶,emmm,我猜应该属于可回收垃圾。

1

如何让消息可靠投递到队列

如果你对上面的描述还不是很清楚,那我再用代码来说明一次。

在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由,那么消息会被直接丢弃,此时,生产者是不知道消息被丢弃这个事件的。

我们将上一篇中的交换机类型改为DirectExchange,这样就只有当消息的 RoutingKey 和队列绑定时设置的 Bindingkey (这里即“key”)一致时,才会真正将该消息进行路由。

public static final String BUSINESS_EXCHANGE_NAME = "rabbitmq.tx.demo.simple.business.exchange"; public static final String BUSINESS_QUEUEA_NAME = "rabbitmq.tx.demo.simple.business.queue"; // 声明业务 Exchange @Bean("businessExchange") public DirectExchange businessExchange(){ return new DirectExchange(BUSINESS_EXCHANGE_NAME); } // 声明业务队列 @Bean("businessQueue") public Queue businessQueue(){ return QueueBuilder.durable(BUSINESS_QUEUEA_NAME).build(); } // 声明业务队列绑定关系 @Bean public Binding businessBinding(@Qualifier("businessQueue") Queue queue, @Qualifier("businessExchange") DirectExchange exchange){ return BindingBuilder.bind(queue).to(exchange).with("key"); }

对消息生产者也稍作修改:

@Autowired private RabbitTemplate rabbitTemplate; @PostConstruct private void init() { // rabbitTemplate.setChannelTransacted(true); rabbitTemplate.setConfirmCallback(this); } public void sendCustomMsg(String exchange, String msg) { CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); log.info("消息id:{}, msg:{}", correlationData.getId(), msg); rabbitTemplate.convertAndSend(exchange, "key", msg, correlationData); correlationData = new CorrelationData(UUID.randomUUID().toString()); log.info("消息id:{}, msg:{}", correlationData.getId(), msg); rabbitTemplate.convertAndSend(exchange, "key2", msg, correlationData); } @Override public void confirm(CorrelationData correlationData, boolean b, String s) { String id = correlationData != null ? correlationData.getId() : ""; if (b) { log.info("消息确认成功, id:{}", id); } else { log.error("消息未成功投递, id:{}, cause:{}", id, s); } }

然后我们调用该方法,发送两条消息测试一下:

消息id:ba6bf502-9381-4220-8dc9-313d6a289a4e, msg:1 消息id:f0040a41-dc02-4e45-b8af-e3cfa8a118b2, msg:1 消息确认成功, id:ba6bf502-9381-4220-8dc9-313d6a289a4e 消息确认成功, id:f0040a41-dc02-4e45-b8af-e3cfa8a118b2 收到业务消息:1

可以看到,发送了两条消息,第一条消息的 RoutingKey 为 “key”,第二条消息的 RoutingKey 为 “key2”,两条消息都成功被交换机接收,也收到了交换机的确认回调,但消费者只收到了一条消息,因为第二条消息的 RoutingKey 与队列的 BindingKey 不一致,也没有其它队列能接收这个消息,所有第二条消息被直接丢弃了。

那么,如何让消息被路由到队列后再返回ACK呢?或者无法被路由的消息帮我想办法处理一下?最起码通知我一声,我好自己处理啊。

别慌别慌,RabbitMQ里有两个机制刚好可以解决我们上面的疑问:

1、mandatory 参数 2、备份交换机

mandatory 参数

设置 mandatory 参数可以在当消息传递过程中不可达目的地时将消息返回给生产者。

当把 mandotory 参数设置为 true 时,如果交换机无法将消息进行路由时,会将该消息返回给生产者,而如果该参数设置为false,如果发现消息无法进行路由,则直接丢弃。

2.png

那么如何设置这个参数呢?在发送消息的时候,只需要在初始化方法添加一行代码即可:

rabbitTemplate.setMandatory(true);

开启之后我们再重新运行前面的代码:

消息id:19729f33-15c4-4c1b-8d48-044c301e2a8e, msg:1 消息id:4aea5c57-3e71-4a7b-8a00-1595d2b568eb, msg:1 消息确认成功, id:19729f33-15c4-4c1b-8d48-044c301e2a8e Returned message but no callback available 消息确认成功, id:4aea5c57-3e71-4a7b-8a00-1595d2b568eb 收到业务消息:1

我们看到中间多了一行提示 Returned message but no callback available 这是什么意思呢?

我们上面提到,设置 mandatory 参数后,如果消息无法被路由,则会返回给生产者,是通过回调的方式进行的,所以,生产者需要设置相应的回调函数才能接受该消息。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zwzgfy.html