1、在上面定义的 RabbitConfig 类中,再新增一个 TTL 队列并将其绑定到 direct 交换机上:
@Bean("ttlQueue") public Queue ttlQueue(){ Map<String, Object> map = new HashMap<String, Object>(); map.put("x-message-ttl", 5000);//队列中所有消息5秒后过期 map.put("x-expires", 100000);//队列闲置10秒后被删除 //参数1-name:队列名称 //参数2-durable:是否持久化 //参数3-exclusive:是否排他。设置为true时,则该队列只对声明当前队列的连接(Connection)可用,一旦连接断开,队列自动被删除 //参数4-autoDelete:是否自动删除。前提是必须要至少有一个消费者先连上当前队列,然后当所有消费者都断开连接之后,队列自动被删除 return new Queue("LONGLY_WOLF_TTL_QUEUE",false,false,false,map); } //ttl队列绑定到direct交换机(交换机和队列可以多对多) @Bean public Binding ttlBindFanoutExchange(@Qualifier("ttlQueue") Queue queue, @Qualifier("directExchange") DirectExchange directExchange){ return BindingBuilder.bind(queue).to(directExchange).with("test.ttl"); }2、在 ExchangeConsumer 消费者类上监听 TTL 队列(和其他消费者不同的时候,这里为了打印出队列属性,改成了通过 Message 对象来接收消息 ):
/** * 监听ttl消息队列 */ @RabbitHandler @RabbitListener(queues = "LONGLY_WOLF_TTL_QUEUE") public void ttlConsumer(Message message){ System.out.println("ttl队列收到消息:" + new String(message.getBody())); System.out.println("ttl队列收到消息:" + JSONObject.toJSONString(message.getMessageProperties())); }3、在生产者类 RabbitExchangeController 上新增一个接口用来测试发送过期消息,这里通过 MessageProperties 设置的 expiration 属性就相当于是给单条消息设置了一个 TTL :
@GetMapping(value="/send/ttl") public String sendTtl(String routingKey,@RequestParam(value = "msg",defaultValue = "no ttl message") String msg){ MessageProperties messageProperties = new MessageProperties(); messageProperties.setExpiration("5000");//5秒后被删除,即TTL属性(针对单条消息) Message message = new Message(msg.getBytes(), messageProperties); rabbitTemplate.convertAndSend("LONGLY_WOLF_DIRECT_EXCHANGE",routingKey,message); return "succ"; }4、此时如果我们把消费者的监听去掉之后再发送消息,在管理后台就可以看到 5 秒之后消息会被删除,10 秒之后队列会被删除。
PS:如果同时给队列和单条消息都设置了 TTL,则会以时间短的为主。
其他属性队列中还有其他一些属性可以设置,在这里我们就不一一举例了:
x-message-ttl:队列中消息的存活时间(毫秒),达到TTL的消息可能会被删除。
x-expires:队列在多长时间(毫秒)没有被访问以后会被删除。
x-max-length:队列中的最大消息数。
x-max-length-bytes:队列的最大容量(bytes)。
overflow:队列溢出之后的策略。主要可以配置如下参数:reject-publish - 直接丢弃最近发布的消息,如若启用了 publisher confirm(发布者确认),发布者将通过发送 basic.nack 消息通知拒绝,如果当前队列绑定有多个消费者,则消息在收到 basic.nack 拒绝通知后,仍然会被发布到其他队列;drop-head - 丢弃队列头部消息(集群模式下只支持这种策略) reject-publish-dlx - 最近发布的消息会进入死信队列。
x-dead-letter-exchange:队列的死信交换机。
x-dead-letter-routing-key:死信交换机的路由键。
x-single-active-consumer:true/false。表示是否最多只允许一个消费者消费,如果有多个消费者同时绑定,则只会激活第一个,除非第一个消费者被取消或者死亡,才会自动转到下一个消费者。
x-max-priority:队列中消息的最大优先级, 消息的优先级不能超过它。
x-queue-mode:3.6.0 版本引入的,主要是为了实现惰性加载。队列将收到的消息尽可能快的进行持久化操作到磁盘上,然后只有在用户请求的时候才会加载到 RAM 内存。这个参数支持两个值:default 和 lazy。当不进行设置的时候,就是默认为 default,不做任何改变;当设置为 lazy 就会进行懒加载。
x-queue-master-locator:为了保证消息的 FIFO,所以在高可用集群模式下需要选择一个节点作为主节点。这个参数主要有三种模式:min-masters- 托管最小数量的绑定主机的节点;client-local- 选择声明的队列已经连接到客户端的节点;random- 随机选择一个节点。
神奇的死信队列(Dead Letter)上面的参数介绍中,提到了死信队列,这又是什么新鲜的东西呢?其实从名字上来看很好理解,就是指的已死的消息,或者说无家可归的消息。一个消息进入死信队列,主要有以下三种条件:
1、消息被消费者拒绝并且未设置重回队列。
2、消息过期(即设置了 TTL)。
3、队列达到最大长度,超过了 Max length 或 Max length bytes,则队列头部的消息会被发送到死信队列。
死信队列实战