AY C# RabbitMQ 2019 微笔记 (6)

 忘记写 BasicAck这行代码, 这是一个简单的错误,但后果是严重的。 当您的客户端退出时,消息将被重新传递(这可能看起来像随机重新传递),但RabbitMQ将会占用越来越多的内存,因为它无法释放任何未经处理的消息。

假如忘了unack


测试4

注释掉代码,然后生产个消息,然后运行消费者

再运行消费者,当然 连接不要释放,不然任务客户端死了,又恢复回去了

image.png

这里我们打开命令行

rabbitmqctl list_queues name messages_ready messages_unacknowledged

image.png

貌似超时了  这里就列出名字了。算了,遇到再看。




=============================================================

持久性,如果兔子挂了,消息还是会丢丢失了。

hannel.QueueDeclare(queue: "hello",

                     durable: true,

                     exclusive: false,

                     autoDelete: false,

                     arguments: null);


设置持久化,就会不丢失了。但是兔子不允许你重新定义一个已经存在的队列,然后更改属性

你可以换个名字重新定义一个。


对了,如果服务器重启,我们在上篇博客说到 消息恢复了,但是不可再被消费了,但是如果生产消息时候,加上下面代码就好了,终于解决了 durable=true也无效的问题了。

     var properties = channel.CreateBasicProperties();

                properties.Persistent = true;

将消息标记为持久性并不能完全保证消息不会丢失。 虽然它告诉RabbitMQ将消息保存到磁盘,但是当RabbitMQ接s收消息并且尚未保存消息时,仍然有一个短时间窗口。 此外,RabbitMQ不会为每条消息执行fsync(2) - 它可能只是保存到缓存而不是真正写入磁盘。 持久性保证不强,但对于我们简单的任务队列来说已经足够了。 如果您需要更强的保证,那么您可以使用发布者确认(publisher confirms)。



公平调度 Fair Dispatch

2个消费者,一个很忙,一个几乎不做事,兔子不知道谁忙谁不忙的,还是均匀的发消息的。

发生这种情况是因为RabbitMQ只是在消息进入队列时调度消息。

 它不会查看消费者未确认消息的数量。

 它只是盲目地向第n个消费者发送每个第n个消息

image.png

为了改变这种行为,我们可以使用BasicQos方法,shezhi PrefetchCount=1

这会告诉兔子,不要同一时间给超过一个消息以上给一个消费者,因为它很忙,可能还没处理完,你又来了。

换句话说,在处理并确认前一个消息之前,不要向该工作程序发送新消息。 相反,它会将它发送给下一个不忙的 消费者。

channel.BasicQos(0, 1, false);


这里注意队列的 size

如果所有的 消费者都很忙,并且你的queue填满了。你就要考虑是否添加更多的消费者,或者换个思路去解决问题。


消费者修改后的代码如下:

using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace AyTestMQ2 { class Program { static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost", UserName = "ay", Password = "123456", Port = 5672 }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.QueueDeclare(queue: "task_queue", durable: true, exclusive: false, autoDelete: false, arguments: null); channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); Console.WriteLine(" [*] Waiting for messages."); var consumer = new EventingBasicConsumer(channel); channel.BasicConsume(queue: "task_queue", autoAck: false, consumer: consumer); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine(" [x] Received {0}", message); var _3 = message.Split('.'); foreach (var item in _3) { Console.WriteLine(item); } Console.WriteLine(" [x] Done"); channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); }; Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } Console.ReadKey(); } } }

====================www.ayjs.net       杨洋    wpfui.com        ayui      ay  aaronyang=======请不要转载谢谢了。=========


关于IModel内的方法和IBasicProperties你想了解的,可以查看 RabbitMQ .NET client API reference online

特别推荐以下指南

particularly recommend the following guides: Publisher Confirms and Consumer AcknowledgementsProduction Checklist and Monitoring.



 

Exchange 发布订阅 1个生产者对多个消费者 

队列是一个存储消息的buffer

image.png

对的,会爬要走路了。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zygdwx.html