运用.NetCore实例讲解RabbitMQ死信队列,延时队列

运用.NetCore实例讲解RabbitMQ死信队列,延时队列

描述:Q1队列绑定了x-dead-letter-exchange(死信交换机)为X2,x-dead-letter-routing-key(死信路由key)指向Q2(队列2)

P(生产者)发送消息经X1(交换机1)路由到Q1(队列1),Q1的消息触发特定情况,自动把消息经X2(交换机2)路由到Q2(队列2),C(消费者)直接消息Q2的消息。

特定情况有哪些呢:

1.消息被拒(basic.reject or basic.nack)并且没有重新入队(requeue=false);

2.当前队列中的消息数量已经超过最大长度(创建队列时指定" x-max-length参数设置队列最大消息数量)。

3.消息在队列中过期,即当前消息在队列中的存活时间已经超过了预先设置的TTL(Time To Live)时间;

这里演示情况1:

假如场景:Q1中队列数据不完整,就算从新处理也会报错,那就可以不ack,把这个消息转到死信队列另外处理。

生产者

public static void SendMessage() { //死信交换机 string dlxexChange = "dlx.exchange"; //死信队列 string dlxQueueName = "dlx.queue"; //消息交换机 string exchange = "direct-exchange"; //消息队列 string queueName = "queue_a"; using (var connection = RabbitMQHelper.GetConnection()) { using (var channel = connection.CreateModel()) { //创建死信交换机 channel.ExchangeDeclare(dlxexChange, type: ExchangeType.Direct, durable: true, autoDelete: false); //创建死信队列 channel.QueueDeclare(dlxQueueName, durable: true, exclusive: false, autoDelete: false); //死信队列绑定死信交换机 channel.QueueBind(dlxQueueName, dlxexChange, routingKey: dlxQueueName); // 创建消息交换机 channel.ExchangeDeclare(exchange, type: ExchangeType.Direct, durable: true, autoDelete: false); //创建消息队列,并指定死信队列 channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments: new Dictionary<string, object> { { "x-dead-letter-exchange",dlxexChange}, //设置当前队列的DLX(死信交换机) { "x-dead-letter-routing-key",dlxQueueName}, //设置DLX的路由key,DLX会根据该值去找到死信消息存放的队列 }); //消息队列绑定消息交换机 channel.QueueBind(queueName, exchange, routingKey: queueName); string message = "hello rabbitmq message"; var properties = channel.CreateBasicProperties(); properties.Persistent = true; //发布消息 channel.BasicPublish(exchange: exchange, routingKey: queueName, basicProperties: properties, body: Encoding.UTF8.GetBytes(message)); Console.WriteLine($"向队列:{queueName}发送消息:{message}"); } } }

消费者

public static void Consumer() { //死信交换机 string dlxexChange = "dlx.exchange"; //死信队列 string dlxQueueName = "dlx.queue"; //消息交换机 string exchange = "direct-exchange"; //消息队列 string queueName = "queue_a"; var connection = RabbitMQHelper.GetConnection(); { //创建信道 var channel = connection.CreateModel(); { //创建死信交换机 channel.ExchangeDeclare(dlxexChange, type: ExchangeType.Direct, durable: true, autoDelete: false); //创建死信队列 channel.QueueDeclare(dlxQueueName, durable: true, exclusive: false, autoDelete: false); //死信队列绑定死信交换机 channel.QueueBind(dlxQueueName, dlxexChange, routingKey: dlxQueueName); // 创建消息交换机 channel.ExchangeDeclare(exchange, type: ExchangeType.Direct, durable: true, autoDelete: false); //创建消息队列,并指定死信队列 channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments: new Dictionary<string, object> { { "x-dead-letter-exchange",dlxexChange}, //设置当前队列的DLX { "x-dead-letter-routing-key",dlxQueueName}, //设置DLX的路由key,DLX会根据该值去找到死信消息存放的队列 }); //消息队列绑定消息交换机 channel.QueueBind(queueName, exchange, routingKey: queueName); var consumer = new EventingBasicConsumer(channel); channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: true); consumer.Received += (model, ea) => { //处理业务 var message = Encoding.UTF8.GetString(ea.Body.ToArray()); Console.WriteLine($"队列{queueName}消费消息:{message},不做ack确认"); //channel.BasicAck(ea.DeliveryTag, false); //不ack(BasicNack),且不把消息放回队列(requeue:false) channel.BasicNack(ea.DeliveryTag, false, requeue: false); }; channel.BasicConsume(queueName, autoAck: false, consumer); } } }

消费者加上channel.BasickNack()模拟消息处理不了,不ack确认。

执行结果:

运用.NetCore实例讲解RabbitMQ死信队列,延时队列

RabbitMQ管理界面:

运用.NetCore实例讲解RabbitMQ死信队列,延时队列

看到消息队列为queue_a,特性有DLX(死信交换机),DLK(死信路由)。因为消费端不nack,触发了死信,被转发到了死信队列dlx.queue。

二、延时队列

延时队列其实也是配合死信队列一起用,其实就是上面死信队列的第二中情况。给队列添加消息过时时间(TTL),变成延时队列。

运用.NetCore实例讲解RabbitMQ死信队列,延时队列

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zysfds.html