[译]RabbitMQ教程C#版 - 工作队列 (3)

您可能已经注意到调度仍然无法完全按照我们期望的方式工作。例如,在有两个Worker的情况下,假设所有奇数消息都很庞大、偶数消息都很轻量,那么一个Worker将会一直忙碌,而另一个Worker几乎不做任何工作。是的,RabbitMQ并不知道存在这种情况,它仍然会平均地分发消息。

发生这种情况是因为RabbitMQ只是在消息进入队列后就将其分发。它不会去检查每个消费者所拥有的未确认消息的数量。它只是盲目地将第n条消息分发给第n位消费者。

公平调度

为了改变上述这种行为,我们可以使用参数设置prefetchCount = 1的basicQos方法。

这就告诉RabbitMQ同一时间不要给一个Worker发送多条消息。或者换句话说,不要向一个Worker发送新的消息,直到它处理并确认了前一个消息。
相反,它会这个消息调度给下一个不忙碌的Worker。

channel.BasicQos(0, 1, false);

关于队列大小的说明
如果所有的Worker都很忙,您的队列可能会被填满。请留意这一点,可以尝试添加更多的Worker,或者使用其他策略。

7.组合在一起

我们NewTask.cs类的最终代码:

using System; using RabbitMQ.Client; using System.Text; class NewTask { public static void Main(string[] args) { // 实例化连接工厂。 var factory = new ConnectionFactory() { HostName = "localhost" }; // 创建连接、信道。 using(var connection = factory.CreateConnection()) using(var channel = connection.CreateModel()) { // 声明队列,标记为持久性。 channel.QueueDeclare(queue: "task_queue", durable: true, exclusive: false, autoDelete: false, arguments: null); // 获取发送消息。 var message = GetMessage(args); var body = Encoding.UTF8.GetBytes(message); // 将消息标记为持久性。 var properties = channel.CreateBasicProperties(); properties.Persistent = true; // 发送数据包。 channel.BasicPublish(exchange: "", routingKey: "task_queue", basicProperties: properties, body: body); Console.WriteLine(" [x] Sent {0}", message); } Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } private static string GetMessage(string[] args) { return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!"); } }

(NewTask.cs源码)

还有我们的Worker.cs:

using System; using RabbitMQ.Client; using RabbitMQ.Client.Events; using System.Text; using System.Threading; class Worker { public static void Main() { // 实例化连接工厂。 var factory = new ConnectionFactory() { HostName = "localhost" }; // 创建连接、信道。 using(var connection = factory.CreateConnection()) using(var channel = connection.CreateModel()) { // 声明队列,标记为持久性。 channel.QueueDeclare(queue: "task_queue", durable: true, exclusive: false, autoDelete: false, arguments: null); // 告知RabbitMQ,在未收到当前Worker的消息确认信号时,不再分发给消息,确保公平调度。 channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); Console.WriteLine(" [*] Waiting for messages."); // 构建消费者实例。 var consumer = new EventingBasicConsumer(channel); // 绑定消息接收事件。 consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine(" [x] Received {0}", message); // 模拟耗时操作。 int dots = message.Split('.').Length - 1; Thread.Sleep(dots * 1000); Console.WriteLine(" [x] Done"); // 手动发送消息确认信号。 channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); }; channel.BasicConsume(queue: "task_queue", autoAck: false, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } }

(Worker.cs源码)

使用消息确认机制和BasicQ您可以创建一个工作队列。即使RabbitMQ重新启动,通过持久性选项也可让任务继续存在。

有关IModel方法和IBasicProperties的更多信息,您可以在线浏览RabbitMQ .NET客户端API参考。

现在,我们可以继续阅读教程[3],学习如何向多个消费者发送相同的消息。

8.写在最后

本文翻译自RabbitMQ官方教程C#版本。本文介绍如与官方有所出入,请以官方最新内容为准。

水平有限,翻译的不好请见谅,如有翻译错误还请指正。

原文链接:RabbitMQ tutorial - Work Queues

实验环境:RabbitMQ 3.7.4 、.NET Core 2.1.3、Visual Studio Code

最后更新:2018-04-03

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpwypw.html