先决条件
本教程假定RabbitMQ已经安装,并运行在localhost标准端口(5672)。如果你使用不同的主机、端口或证书,则需要调整连接设置。
从哪里获得帮助
如果您在阅读本教程时遇到困难,可以通过邮件列表。
(使用.NET客户端)
在第一篇教程中,我们编写了两个程序,用于从一个指定的队列发送和接收消息。在本文中,我们将创建一个工作队列,用于在多个工作线程间分发耗时的任务。
工作队列(又名:任务队列)背后的主要想法是避免立即执行资源密集型、且必须等待其完成的任务。相反的,我们把这些任务安排在稍后完成。我们可以将任务封装为消息并把它发送到队列中,在后台运行的工作进程将从队列中取出任务并最终执行。当您运行多个工作线程,这些任务将在这些工作线程之间共享。
这个概念在Web应用程序中特别有用,因为在一个HTTP请求窗口中无法处理复杂的任务。
2.准备我们将略微修改上一个示例中的Send程序,以其可以在命令行发送任意消息。
这个程序将调度任务到我们的工作队列中,所以让我们把它命名为NewTask:
像教程[1],我们需要生成两个项目:
dotnet new console --name NewTask mv NewTask/Program.cs NewTask/NewTask.cs dotnet new console --name Worker mv Worker/Program.cs Worker/Worker.cs cd NewTask dotnet add package RabbitMQ.Client dotnet restore cd ../Worker dotnet add package RabbitMQ.Client dotnet restore var message = GetMessage(args); var body = Encoding.UTF8.GetBytes(message); var properties = channel.CreateBasicProperties(); properties.Persistent = true; channel.BasicPublish(exchange: "", routingKey: "task_queue", basicProperties: properties, body: body);从命令行参数获取消息的帮助方法:
private static string GetMessage(string[] args) { return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!"); }我们旧的Receive.cs脚本也需要进行一些更改:它需要为消息体中的每个点模拟一秒种的时间消耗。它将处理由RabbitMQ发布的消息,并执行任务,因此我们把它复制到Worker项目并修改:
// 构建消费者实例。 var consumer = new EventingBasicConsumer(channel); // 绑定消息接收事件。 consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine(" [x] Received {0}", message); // 模拟耗时操作。 int dots = message.Split('.').Length - 1; Thread.Sleep(dots * 1000); Console.WriteLine(" [x] Done"); }; channel.BasicConsume(queue: "task_queue", autoAck: true, consumer: consumer);模拟虚拟任务的执行时间:
int dots = message.Split('.').Length - 1; Thread.Sleep(dots * 1000); 3.循环调度使用任务队列的优点之一是能够轻松地并行工作。如果我们正在积累积压的工作,我们仅要增加更多的工作者,并以此方式可以轻松扩展。
首先,我们尝试同时运行两个Worker实例。他们都会从队列中获取消息,但究竟如何?让我们来看看。
您需要打开三个控制台,两个运行Worker程序,这些控制台作为我们的两个消费者 - C1和C2。
# shell 1 cd Worker dotnet run # => [*] Waiting for messages. To exit press CTRL+C # shell 2 cd Worker dotnet run # => [*] Waiting for messages. To exit press CTRL+C在第三个控制台中,我们将发布一些新的任务。一旦你已经运行了消费者,你可以尝试发布几条消息:
# shell 3 cd NewTask dotnet run "First message." dotnet run "Second message.." dotnet run "Third message..." dotnet run "Fourth message...." dotnet run "Fifth message....."让我们看看有什么发送到了我们的Worker程序:
# shell 1 # => [*] Waiting for messages. To exit press CTRL+C # => [x] Received 'First message.' # => [x] Received 'Third message...' # => [x] Received 'Fifth message.....' # shell 2 # => [*] Waiting for messages. To exit press CTRL+C # => [x] Received 'Second message..' # => [x] Received 'Fourth message....'默认情况下,RabbitMQ会按顺序将每条消息发送给下一个消费者。消费者数量平均的情况下,每个消费者将会获得相同数量的消息。这种分配消息的方式称为循环(Round-Robin)。请尝试开启三个或更多的Worker程序来验证。
4.消息确认处理一项任务可能会需要几秒钟的时间。如果其中一个消费者开启了一项长期的任务并且只完成了部分就挂掉了,您可能想知道会发生什么?在我们当前的代码中,一旦RabbitMQ把消息分发给了消费者,它会立即将这条消息标记为删除。在这种情况下,如果您停掉某一个Worker,我们将会丢失这条正在处理的消息,也将丢失所有分发到该Worker但尚未处理的消息。
但是我们不想丢失任何一个任务。如果一个Worker挂掉了,我们希望这个任务能被重新分发给其他Worker。