生产者向队列中发送一条延时10s的消息再发一条延时5秒的消息,但消费者却先拿到延时10s的,再拿到延时5秒的,我想要的结果是先拿到延时5s的再拿到延时10s的,是什么原因呢。
原因是:队列是先进先出的,而RabbitMQ只会对首位第一条消息做检测,第一条没过期,那么后面的消息就会阻塞住等待前面的过期。
解决办法:增加一个消费者对延时队列消费,不ack,把第一条消息放到队列尾部。一直让消息在流动,这样就能检测到了。
2)新增消费者代码:
public static void SendMessage() { //死信交换机 string dlxexChange = "dlx.exchange"; //死信队列 string dlxQueueName = "dlx.queue"; //消息交换机 string exchange = "direct-exchange"; //消息队列 string queueName = "delay_queue"; using (var connection = RabbitMQHelper.GetConnection()) { using (var channel = connection.CreateModel()) { //创建死信交换机 channel.ExchangeDeclare(dlxexChange, type: ExchangeType.Direct, durable: true, autoDelete: false); //创建死信队列 channel.QueueDeclare(dlxQueueName, durable: true, exclusive: false, autoDelete: false); //死信队列绑定死信交换机 channel.QueueBind(dlxQueueName, dlxexChange, routingKey: dlxQueueName); // 创建消息交换机 channel.ExchangeDeclare(exchange, type: ExchangeType.Direct, durable: true, autoDelete: false); //创建消息队列,并指定死信队列,和设置这个队列的消息过期时间为10s channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments: new Dictionary<string, object> { { "x-dead-letter-exchange",dlxexChange}, //设置当前队列的DLX(死信交换机) { "x-dead-letter-routing-key",dlxQueueName}, //设置DLX的路由key,DLX会根据该值去找到死信消息存放的队列 //{ "x-message-ttl",10000} //设置队列的消息过期时间 }); //消息队列绑定消息交换机 channel.QueueBind(queueName, exchange, routingKey: queueName); string message = "hello rabbitmq message 10s后处理"; var properties = channel.CreateBasicProperties(); properties.Persistent = true; properties.Expiration = "10000";//消息的有效期10s //发布消息,延时10s channel.BasicPublish(exchange: exchange, routingKey: queueName, basicProperties: properties, body: Encoding.UTF8.GetBytes(message)); Console.WriteLine($"{DateTime.Now},向队列:{queueName}发送消息:{message},延时:10s"); string message2 = "hello rabbitmq message 5s后处理"; var properties2 = channel.CreateBasicProperties(); properties2.Persistent = true; properties2.Expiration = "5000";//消息有效期5s //发布消息,延时5s channel.BasicPublish(exchange: exchange, routingKey: queueName, basicProperties: properties2, body: Encoding.UTF8.GetBytes(message2)); Console.WriteLine($"{DateTime.Now},向队列:{queueName}发送消息:{message2},延时:5s"); } } }
执行效果:
这会得到了想要的效果。
RabbitMQ管理界面:
四、延时消息用延时插件的方式实现相比上面第三的延时消息,这里的插件方式会显的更加简单,也推荐用这种。
因为这里只需要一个交换机和一个对队列,生产者向队列发送消息,会直接是延时才会到队列。
安装插件:
地址:https://www.rabbitmq.com/community-plugins.html
找到和自己RabbitMQ一样的版本,下载下来上传到Linux,或F12查看这个文件的地址,直接Linux上下载(这里用这种)
Linux下载插件:
#下载插件
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.9.0/rabbitmq_delayed_message_exchange-3.9.0.ez
已经下载到Linux上
#把文件复制到rabbitmq docker容器下的plugins文件夹
docker cp rabbitmq_delayed_message_exchange-3.9.0.ez rabbitmq:/plugins
#进入rabbitmq docker容器
docker exec -it rabbitmq bash
#开启插件:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
做完上面这些在RabbitMQ管理界面可以看到多了一个延时消息的交换机。
插件装好了,生产者代码: