1、在上面定义的 RabbitConfig 类中,定义一个死信交换机,并将之前的 ttl 队列新增一个属性 x-dead-letter-exchange,最后再将死信队列和死信交换机进行绑定:
//直连死信交换机(也可以用topic或者fanout类型交换机) @Bean("deatLetterExchange") public DirectExchange deatLetterExchange(){ return new DirectExchange("LONGLY_WOLF_DEAD_LETTER_DIRECT_EXCHANGE"); } @Bean("ttlQueue") public Queue ttlQueue(){ Map<String, Object> map = new HashMap<String, Object>(); map.put("x-message-ttl", 5000);//队列中所有消息5秒后过期 map.put("x-dead-letter-exchange", "LONGLY_WOLF_DEAD_LETTER_DIRECT_EXCHANGE");//已死消息会进入死信交换机 return new Queue("LONGLY_WOLF_TTL_QUEUE",false,false,false,map); } //死信队列 @Bean("deadLetterQueue") public Queue deadLetterQueue(){ return new Queue("LONGLY_WOLF_DEAD_LETTER_QUEUE"); }2、在 ExchangeConsumer 消费者类上将监听 TTL 队列的监听取消,注释掉监听:
/** * 监听ttl消息队列 */ @RabbitHandler // @RabbitListener(queues = "LONGLY_WOLF_TTL_QUEUE") public void ttlConsumer(Message message){ System.out.println("ttl队列收到消息:" + new String(message.getBody())); System.out.println("ttl队列收到消息:" + JSONObject.toJSONString(message.getMessageProperties())); }3、此时 TTL 队列无消费者,并且设置了消息的 TTL 为 5 秒,所以 5 秒之后就会进入死信队列。
5、访问接口::8080/exchange/send/ttl?routingKey=test&msg=测试死信队列,发送消息之后,等待 5 秒就查看消息,进入死信队列:
消息真的发送成功了吗了解了消息的基本发送功能之后,就可以高枕无忧了吗?消息发出去之后,消费者真的收到消息了吗?消息发送之后如何知道消息发送成功了?假如发送消息路由错了导致无法路由到队列怎么办?大家是不是都有这些疑问呢?别着急,接下来就让我们来一一来分析一下。
一条消息从生产者开始发送消息到消费者消费完消息主要可以分为以下 4 个阶段:
1、生产者将消息发送到 Broker (即:RabbitMQ 的交换机)。
2、交换机将消息路由到队列。
3、队列收到消息后存储消息。
4、消费者从队列获取消息进行消费。
接下来我们就从这 4 个步骤上来逐步分析 RabbitMQ 如何保证消息发送的可靠性。
消息真的到达交换机了吗当我们发送一条消息之后,如何知道对方收到消息了?这就和我们写信一样,写一封信出去,如何知道对方收到我们寄出去的信?最简单的方式就是对方也给我们回一封信,我们收到对方的回信之后就可以知道自己的信已经成功寄达。
在 RabbitMQ 中服务端也提供了 2 种方式来告诉客户端(生产者)是否收到消息:Transaction(事务)模式和 Confirm(确认)模式。
Transaction(事务) 模式在 Java API 编程中开启事务只需要增加以下代码即可:
try { channel.txSelect();//开启事务 channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); channel.txCommit();//提交事务 }catch (Exception e){ channel.txRollback();//消息回滚 }在 Spring Boot 中需要对 RabbitTemplate 进行事务设置:
@Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){ RabbitTemplate rabbitTemplate = new RabbitTemplate(); rabbitTemplate.setConnectionFactory(connectionFactory); rabbitTemplate.setChannelTransacted(true);//开启事务 return rabbitTemplate; }为了了解 RabbitMQ 当中事务机制的原理,我们在 Wireshark 中输入 ip.addr==192.168.1.1 对本地 ip 进行抓包,发送一条消息之后,抓到如下数据包:
通过数据包,可以得出开启事务之后,除了原本的发送消息之外,多出了开启事务和事务提交的通信:
开启事务之后,有一个致命的缺点就是发送消息流程会被阻塞。也就是说必须一条消息发送成功之后,才会允许发送另一条消息。正因为事务模式有这个缺点,所以一般情况下并不建议在生产环境开启事务,那么有没有更好的方式来实现消息的送达确认呢?那么就让我们再看看Confirm(确认)模式。
Confirm(确认)模式消息确认模式又可以分为三种(事务模式和确认模式无法同时开启):
单条确认模式:发送一条消息,确认一条消息。此种确认模式的效率也不高。