[译]RabbitMQ教程C#版 - 发布订阅

先决条件
本教程假定RabbitMQ已经安装,并运行在localhost标准端口(5672)。如果你使用不同的主机、端口或证书,则需要调整连接设置。

从哪里获得帮助
如果您在阅读本教程时遇到困难,可以通过邮件列表。

1.发布/订阅

(使用.NET客户端)

在教程[2]中,我们创建了一个工作队列,假设在工作队列中的每一个任务都只被分发给一个Worker。那么在这一章节,我们要做与之完全不同的事,那就是我们将要把一条消息分发给多个消费者。这种模式被称为“发布/订阅”。

为了说明、体现这种模式,我们将会建一个简单的日志系统。它将会包含两个程序 - 第一个用来发送日志消息,第二个用来接收并打印它们。

在我们建立的日志系统中,每个接收程序的运行副本都会收到消息。这样我们就可以运行一个接收程序接收消息并将日志写入磁盘;同时运行另外一个接收程序接收消息并将日志打印到屏幕上。

实质上,发布的日志消息将会被广播给所有的接收者。

2.交换器

在教程的前几部分,我们是发送消息到队列并从队列中接收消息。现在是时候介绍Rabbit中完整的消息传递模型了。

让我们快速回顾一下前面教程中的内容:

生产者是发送消息的用户应用程序。

队列是存储消息的缓冲区。

消费者是接收消息的用户应用程序。

在RabbitMQ中,消息传递模型的核心理念是生产者从来不会把任何消息直接发送到队列,其实,通常生产者甚至不知道消息是否会被分发到任何队列中。

然而,生产者只能把消息发送给交换器。交换器非常简单,一方面它接收来自生产者的消息,另一方面又会把接收的消息推送到队列中。交换器必须明确知道该如何处理收到的消息,应该追加到一个特定队列中?还是应该追加到多个队列中?或者应该把它丢弃?这些规则都被定义在交换器类型中。

Exchanges

目前有这几种的交换器类型可用:direct,topic,headers和fanout。我们重点关注最后一个 -- fanout,让我们来创建一个这种类型的交换器,将其命名为logs:

channel.ExchangeDeclare("logs", "fanout");

fanout类型交换器非常简单。正如您可能从名字中猜出的那样,它会把收到的所有消息广播到它已知的所有队列中。这恰巧是我们的日志系统所需要的。

列举交换器
要列举出服务器上的交换器,您可以使用非常有用的rabbitmqctl命令行工具:

sudo rabbitmqctl list_exchanges

执行上述命令后,出现的列表中将会有一些amq.*交换器和默认(未命名)交换器。这些是默认创建的,不过目前您可能用不到它们。

默认交换器
在教程的前些部分,我们对交换器这一概念还一无所知,但仍然可以把消息发送到队列。之所以这样,是因为我们使用了一个用空字符串("")标识的默认交换器。

回顾一下我们之前如何发布消息:

var message = GetMessage(args); var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: null, body: body);

第一个参数就是交换器的名称,空字符串表示默认或匿名交换器:将消息路由到routingKey指定的队列(如果存在)中。

现在,我们可以把消息发布到我们指定的交换器:

var message = GetMessage(args); var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "logs", routingKey: "", basicProperties: null, body: body); 3.临时队列

您是否还记得之前我们使用过的队列,它们都有一个特定的名称(记得应该是hello和task_queue吧)。给队列命名对我们来说是至关重要的 -- 因为我们可能需要多个Worker指向同一个队列;当您想要在生产者和消费者之间共享队列时,给队列一个名称也是非常重要的。

但是,我们创建的日志系统并不希望如此。我们希望监听所有的日志消息,而不仅仅是其中一部分。我们也只对目前流动的消息感兴趣,而不是旧消息。为解决这个问题,我们需要做好两件事。

首先,我们无论何时连接Rabbit,都需要一个新的、空的队列。要做到这一点,我们可以使用随机名称来创建队列,或许,甚至更好的方案是让服务器为我们选择一个随机队列名称。

其次,一旦我们与消费者断开连接,与之相关的队列应该被自动删除。

在.NET客户端中,如果不向QueueDeclare()方法提供任何参数,实际上就是创建了一个非持久化、独占、且自动删除的随机命名队列:

var queueName = channel.QueueDeclare().QueueName;

您可以在队列指南中了解更多关于exclusive参数和其他队列属性的信息。

此时,queueName包含一个随机队列名称。例如,它看起来可能像amq.gen-JzTY20BRgKO-HjmUJj0wLg。

4.绑定

Bindings

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpjszs.html