如何用RabbitMQ实现延迟队列

在 jdk 的 juc 工具包中,提供了一种延迟队列 DelayQueue。延迟队列用处非常广泛,比如我们最常见的场景就是在网购或者外卖平台中发起一个订单,如果不付款,一般 15 分钟后就会被关闭,这个直接用定时任务是不好实现的,因为每个用户下单的时间并不确定,所以这时候就需要用到延迟队列。

什么是延迟队列

延迟队列本身也是队列,只不过这个队列是延迟的,意思就是说当我们把一条消息放入延迟队列,消息并不会立刻出队,而是会在到达指定时间之后(或者说过了指定时间)才会出队,从而被消费者消费。

利用死信队列实现延迟队列

RabbitMQ 中的死信队列就是用来存储特定条件下的消息,那么假如我们把这个条件设定为指定时间过期(设定带TTL 的消息或者队列),就可以用来实现延迟队列的功能。

新建一个 TtlDelayRabbitConfig 配置类(省略了包名和导入),消息最开始发送至 ttl 消息队列,这个队列中所有的消息在 5 秒后过期,后期后会进入死信队列:

@Configuration public class TtlDelayRabbitConfig { //路由ttl消息交换机 @Bean("ttlDelayFanoutExchange") public FanoutExchange fanoutExchange(){ return new FanoutExchange("TTL_DELAY_FANOUT_EXCHANGE"); } //ttl消息队列 @Bean("ttlDelayQueue") public Queue ttlQueue(){ Map<String, Object> map = new HashMap<String, Object>(); map.put("x-message-ttl", 5000);//队列中所有消息5秒后过期 map.put("x-dead-letter-exchange", "TTL_DELAY_DEAD_LETTER_FANOUT_EXCHANGE");//过期后进入死信队列 return new Queue("TTL_QUEUE",false,false,false,map); } //Fanout交换机和productQueue绑定 @Bean public Binding bindTtlFanoutExchange(@Qualifier("ttlDelayQueue") Queue queue, @Qualifier("ttlDelayFanoutExchange") FanoutExchange fanoutExchange){ return BindingBuilder.bind(queue).to(fanoutExchange); } //fanout死信交换机 @Bean("ttlDelayDeadLetterExchange") public FanoutExchange deadLetterExchange(){ return new FanoutExchange("TTL_DELAY_DEAD_LETTER_FANOUT_EXCHANGE"); } //死信队列 @Bean("ttlDelayDeadLetterQueue") public Queue ttlDelayDeadLetterQueue(){ return new Queue("TTL_DELAY_DEAD_LETTER_FANOUT_QUEUE"); } //死信队列和死信交换机绑定 @Bean public Binding deadLetterQueueBindExchange(@Qualifier("ttlDelayDeadLetterQueue") Queue queue, @Qualifier("ttlDelayDeadLetterExchange") FanoutExchange fanoutExchange){ return BindingBuilder.bind(queue).to(fanoutExchange); } }

新建一个消费者 TtlDelayConsumer 类,监听死信队列,这里收到的消息都是生产者生产消息之后的 5 秒,也就是延迟了 5 秒的消息:

@Component public class TtlDelayConsumer { @RabbitHandler @RabbitListener(queues = "TTL_DELAY_DEAD_LETTER_FANOUT_QUEUE") public void fanoutConsumer(String msg){ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); System.out.println("【延迟队列】【" + sdf.format(new Date()) + "】收到死信队列消息:" + msg); } }

新建一个 DelayQueueController 类做生产者来发送消息:

@RestController @RequestMapping("/delay") public class DelayQueueController { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping(value="/ttl/send") public String clearVipInfo(@RequestParam(value = "msg",defaultValue = "no message") String msg){ rabbitTemplate.convertAndSend("TTL_DELAY_FANOUT_EXCHANGE","",msg); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); System.out.println("消息发送成功【" + sdf.format(new Date()) + "】"); return "succ"; } }

最后我们在浏览器输入地址 :8080/delay/ttl/send?msg=测试ttl延迟队列 进行测试,可以看到每条消息都是在发送 5 秒之后才能收到消息:

如何用RabbitMQ实现延迟队列

TTL 延迟队列的问题

假如我们实际中,有的消息是 10 分钟过期,有的是 20 分钟过期,这时候我们就需要建立多个队列,一旦时间维度非常庞大,那么就需要维护非常多的队列。说到这里,可能很多人会有疑问,我们可以针对单条信息设置过期时间,大可不必去定义多个队列?

然而事实真的是如此吗?接下来我们通过一个例子来验证下。

把上面示例中 TtlDelayRabbitConfig 类中的队列定义函数 x-message-ttl 属性去掉,不过需要注意的是我们需要先把这个队列后台删除掉,否则同名队列重复创建无效:

@Bean("ttlDelayQueue") public Queue ttlQueue(){ Map<String, Object> map = new HashMap<String, Object>(); // map.put("x-message-ttl", 5000);//注释掉这个属性,队列不设置过期时间 map.put("x-dead-letter-exchange", "TTL_DELAY_DEAD_LETTER_FANOUT_EXCHANGE");//过期后进入死信队列 return new Queue("TTL_QUEUE",false,false,false,map); }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpjgww.html