忘记写 BasicAck这行代码, 这是一个简单的错误,但后果是严重的。 当您的客户端退出时,消息将被重新传递(这可能看起来像随机重新传递),但RabbitMQ将会占用越来越多的内存,因为它无法释放任何未经处理的消息。
假如忘了unack
测试4
注释掉代码,然后生产个消息,然后运行消费者
再运行消费者,当然 连接不要释放,不然任务客户端死了,又恢复回去了
这里我们打开命令行
rabbitmqctl list_queues name messages_ready messages_unacknowledged
貌似超时了 这里就列出名字了。算了,遇到再看。
=============================================================
持久性,如果兔子挂了,消息还是会丢丢失了。
hannel.QueueDeclare(queue: "hello",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
设置持久化,就会不丢失了。但是兔子不允许你重新定义一个已经存在的队列,然后更改属性
你可以换个名字重新定义一个。
对了,如果服务器重启,我们在上篇博客说到 消息恢复了,但是不可再被消费了,但是如果生产消息时候,加上下面代码就好了,终于解决了 durable=true也无效的问题了。
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
将消息标记为持久性并不能完全保证消息不会丢失。 虽然它告诉RabbitMQ将消息保存到磁盘,但是当RabbitMQ接s收消息并且尚未保存消息时,仍然有一个短时间窗口。 此外,RabbitMQ不会为每条消息执行fsync(2) - 它可能只是保存到缓存而不是真正写入磁盘。 持久性保证不强,但对于我们简单的任务队列来说已经足够了。 如果您需要更强的保证,那么您可以使用发布者确认(publisher confirms)。
公平调度 Fair Dispatch
2个消费者,一个很忙,一个几乎不做事,兔子不知道谁忙谁不忙的,还是均匀的发消息的。
发生这种情况是因为RabbitMQ只是在消息进入队列时调度消息。
它不会查看消费者未确认消息的数量。
它只是盲目地向第n个消费者发送每个第n个消息
为了改变这种行为,我们可以使用BasicQos方法,shezhi PrefetchCount=1
这会告诉兔子,不要同一时间给超过一个消息以上给一个消费者,因为它很忙,可能还没处理完,你又来了。
换句话说,在处理并确认前一个消息之前,不要向该工作程序发送新消息。 相反,它会将它发送给下一个不忙的 消费者。
channel.BasicQos(0, 1, false);
这里注意队列的 size
如果所有的 消费者都很忙,并且你的queue填满了。你就要考虑是否添加更多的消费者,或者换个思路去解决问题。
消费者修改后的代码如下:
using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace AyTestMQ2 { class Program { static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost", UserName = "ay", Password = "123456", Port = 5672 }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.QueueDeclare(queue: "task_queue", durable: true, exclusive: false, autoDelete: false, arguments: null); channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); Console.WriteLine(" [*] Waiting for messages."); var consumer = new EventingBasicConsumer(channel); channel.BasicConsume(queue: "task_queue", autoAck: false, consumer: consumer); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine(" [x] Received {0}", message); var _3 = message.Split('.'); foreach (var item in _3) { Console.WriteLine(item); } Console.WriteLine(" [x] Done"); channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); }; Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } Console.ReadKey(); } } }====================www.ayjs.net 杨洋 wpfui.com ayui ay aaronyang=======请不要转载谢谢了。=========
关于IModel内的方法和IBasicProperties你想了解的,可以查看 RabbitMQ .NET client API reference online
特别推荐以下指南
particularly recommend the following guides: Publisher Confirms and Consumer Acknowledgements, Production Checklist and Monitoring.
Exchange 发布订阅 1个生产者对多个消费者
队列是一个存储消息的buffer
对的,会爬要走路了。