如何用RabbitMQ实现延迟队列 (2)

然后将 DelayQueueController 类中的发送消息方法修改一下,对每条信息设置过期时间:

@GetMapping(value="/ttl/send") public String ttlMsgSend(@RequestParam(value = "msg",defaultValue = "no message") String msg, @RequestParam(value = "time") String millTimes){ MessageProperties messageProperties = new MessageProperties(); messageProperties.setExpiration(millTimes);//单条消息设置过期时间,单位:毫秒 Message message = new Message(msg.getBytes(), messageProperties); rabbitTemplate.convertAndSend("TTL_DELAY_FANOUT_EXCHANGE","",message); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); System.out.println("消息发送成功【" + sdf.format(new Date()) + "】"); return "succ"; }

然后执行 2 条消息发送,一条 10 秒过期,一条 5 秒过期,先发送 10 秒的:

:8080/delay/ttl/send?msg=10秒过期消息&time=10000 :8080/delay/ttl/send?msg=5秒过期消息&time=5000

执行之后得到如下结果:

如何用RabbitMQ实现延迟队列

我们看到,两条消息都是 10 秒后过期,这是巧合吗?并不是,这是因为 RabbitMQ 中的机制就是如果前一条消息没有出队,那么即使后一条消息已经失效,也必须要等前一条消息出队之后才能出队,所以这就是为什么一般都尽量避免同一个队列单条消息设置不同过期时间的做法。

死信队列实现的延迟队列缺点

通过以上两个例子,使用死信队列来实现延迟队列,我们可以得到几个很明显的缺点:

如果有非常多的时间点(比如有的 10 分钟过期,有的 20 分钟过期等),则需要创建不同的交换机和队列来实现消息的路由。

单独设置消息的 TTL 时可能会造成消息的阻塞。因为当前一条消息没有出队,后一条消息即使到期了也不能出队。

消息可能会有一定的延迟(上面的示例中就可以看到有一点延迟)。

为了避免 TTL 和死信队列可能造成的问题,所以就非常有必要用一种新的更好的方案来替代实现延迟队列,这就是延时队列插件。

利用插件实现延迟队列

在 RabbitMQ 的 3.5.7 版本之后,提供了一个插件(rabbitmq-delayed-message-exchange)来实现延迟队列 ,同时需保证 Erlang/OPT 版本为 18.0 之后。

安装延迟队列插件

RabbitMQ版本在 3.5.7-3.7.x 的可以执行以下命令进行下载(也可以直接通过浏览器下载):

wget https://bintray.com/rabbitmq/community-plugins/download_file?file_path=rabbitmq_delayed_message_exchange-0.0.1.ez

如果 RabbitMQ 是 3.8 之后的版本,可以点击这里,找到延迟队列对应版本的插件,然后下载。

下载好之后,将插件上传到 plugins 目录下,执行 rabbitmq-plugins enable rabbitmq_delayed_message_exchange 命令启动插件。如果要禁止该插件,则可以执行命令 rabbitmq-plugins disable rabbitmq_delayed_message_exchange(启用插件后需要重启 RabbitMQ 才会生效)。

延迟队列插件示例

新建一个 PluginDelayRabbitConfig 配置类:

@Configuration public class PluginDelayRabbitConfig { @Bean("pluginDelayExchange") public CustomExchange pluginDelayExchange() { Map<String, Object> argMap = new HashMap<>(); argMap.put("x-delayed-type", "direct");//必须要配置这个类型,可以是direct,topic和fanout //第二个参数必须为x-delayed-message return new CustomExchange("PLUGIN_DELAY_EXCHANGE","x-delayed-message",false, false, argMap); } @Bean("pluginDelayQueue") public Queue pluginDelayQueue(){ return new Queue("PLUGIN_DELAY_QUEUE"); } @Bean public Binding pluginDelayBinding(@Qualifier("pluginDelayQueue") Queue queue,@Qualifier("pluginDelayExchange") CustomExchange customExchange){ return BindingBuilder.bind(queue).to(customExchange).with("delay").noargs(); } }

新建一个消费者类 PluginDelayConsumer:

@Component public class PluginDelayConsumer { @RabbitHandler @RabbitListener(queues = "PLUGIN_DELAY_QUEUE")//监听延时队列 public void fanoutConsumer(String msg){ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); System.out.println("【插件延迟队列】【" + sdf.format(new Date()) + "】收到消息:" + msg); } }

在上面示例中的 DelayQueueController 类,新增一个方法:

@GetMapping(value="/plugin/send") public String pluginMsgSend(@RequestParam(value = "msg",defaultValue = "no message") String msg){ MessageProperties messageProperties = new MessageProperties(); messageProperties.setHeader("x-delay",5000);//延迟5秒被删除 Message message = new Message(msg.getBytes(), messageProperties); amqpTemplate.convertAndSend("PLUGIN_DELAY_EXCHANGE","delay",message);//交换机和路由键必须和配置文件类中保持一致 SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); System.out.println("消息发送成功【" + sdf.format(new Date()) + "】"); return "succ"; }

接下来就可以访问地址 :8080/delay/plugin/send?msg=插件延迟队列消息 进行测试,可以看到,消息在延时 5 秒之后被消费:

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpjgww.html