AY C# RabbitMQ 2019 微笔记 (5)

场景:其实我们可以做  用户的请求,每个请求放入消息队列,然后让消息队列给空闲的 消费者去消费处理。1个消费者不够处理,可以运行多个来吃完任务。



任务会耗时间的。

您可能想知道如果其中一个消费者开始执行长任务并且仅在部分完成时死亡会发生什么。

上面的代码,一旦RabbitMQ向客户发送消息,它立即将其标记为删除。 

在这种情况下,如果当前的消费者挂了,我们将丢失它刚刚处理的消息。 

我还将丢失分发给这个消费者的 还未处理的所有消息。


但是我不想丢失任何的消息(1个消息一个任务),如果消费者处理挂了,我当然更想把消息给其他的消费者处理。

为了确保消息永不丢失,RabbitMQ 提供了一个 ack机制, 手动应答,处理完了,告诉兔子,我处理完了,等兔子空闲时候就删除该消息了。

一些文章


定义 消费者死了,就是 channel关闭,connection关闭,tcp断开了,没网络了。

当消费者还没发送 ack,兔子那边就会认为 消息没有被处理,又会恢复回去了。如果同一时间,还有其他消费者在线,兔子会把这烫手山芋给其他的消费者。

恩,所以啊,你的程序没死,他的消息一直存在兔子那的,除非你手动应答。如果你挂了没应答,会看有没有其他的消费者处理。


接下来模拟这个场景

生产者生产个消息,然后修改消费者代码

var factory = new ConnectionFactory() { HostName = "localhost", UserName = "ay", Password = "123456", Port = 5672 }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { //channel.QueueDeclare(queue: "task_queue", // durable: true, // exclusive: false, // autoDelete: true, // arguments: null); var consumer = new EventingBasicConsumer(channel); channel.BasicConsume(queue: "task_queue", autoAck: false, consumer: consumer); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); throw new NotImplementedException(); Console.WriteLine(" [x] Received {0}", message); var _3 = message.Split('.'); foreach (var item in _3) { Console.WriteLine(item); Thread.Sleep(2000); } Console.WriteLine(" [x] Done"); channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); };

设置了autoack:false了

然后received里面设置了 ea.DeliveryTag


测试1

这里处理消息,停留了2秒1个字段,我们再BasicAck应答之前关闭程序,看消息会不会被删除了。

image.png

由于抛出异常Unacked 为1了。

把程序关了

消息还是删除了。。

我怀疑服务端设置了 自动删除导致的。我改为false测试,这样生产了1个不会自动删除的消息。

image.png

测试2

运行修改后的生产者

image.png

消费者代码不改,让抛出异常

然后关闭程序,过一会,消息恢复正常了。这次就对了。也就是生产者自动删除我觉得大部分都是关闭的。

image.png

测试3

正确处理,看消息会不会删除,移除抛弃异常的代码

ready终于是0了

image.png

然后关闭客户端,断开连接(执行完using,释放连接),队列被处理了,没删除哦

image.png

那如果想要删除呢,暂时先这样吧,因为用的最多的还是Exchange的Topic


注意:

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zygdwx.html