AY C# RabbitMQ 2019 微笔记 (7)

生产者代码如下

using RabbitMQ.Client; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace MQ3 { class Program { public static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost", UserName = "ay", Password = "123456", Port = 5672 }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "logs", type: "fanout"); var message = GetMessage(args); var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "logs", routingKey: "", basicProperties: null, body: body); Console.WriteLine(" [x] Sent {0}", message); } Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } private static string GetMessage(string[] args) { return ((args.Length > 0) ? string.Join(" ", args) : "info: Hello.World!"); } } }

====================www.ayjs.net       杨洋    wpfui.com        ayui      ay  aaronyang=======请不要转载谢谢了。=========

运行,

image.png

命令行列出rabbitmqctl list_exchanges

image.png

amq.*开头的 默认邮箱,安装好RabbitMQ就有的,暂时用不到。

在以前的代码,我们没有用到exchange,但是仍然可以发消息到队列。那是因为我们发到默认邮箱去了,我们给exchange赋值是空白

var message = GetMessage(args); var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: null, body: body);


现在我们代码 声明一个 fanout的 邮箱(交换器),取个名字叫logs

channel.ExchangeDeclare(exchange: "logs", type: "fanout");


临时队列

前面的代码,我们给队列取名字 hello,task_queue还记得吗?命名后,消费者通过名字拿到队列然后 处理消息的。

在这个例子对于我们的logger不重要,我们只关注log内容,

我们可以通过下面,随机取名字。然后一旦消费者断开连接,消息要能删除。

默认,创建的是非 持久化,exclusive,自动删除的队列

ar queueName = channel.QueueDeclare().QueueName;

exclusive queue独占队列,相当于给队列lock了,别人不能拿到。 一般消费者死了,独占队列会被删除。因此用于特定的某些场景。

你占用了,当别人尝试访问,就会报 RESOURCE_LOCKED的错误异常。表示无法获得对 锁定队列的 独占访问权限。


Binding

队列要放入邮箱才好,放进去叫绑定

channel.QueueBind(queue: queueName, exchange: "logs", routingKey: "");

上面代码我没给队列名字,因为我不关心,你也可以写个 产品约定好的名字 ,或者调用临时队列的知识,随机名字。

列出绑定(命令行)

rabbitmqctl list_bindings


发布消息

channel.BasicPublish(exchange: "logs", routingKey: "", basicProperties: null, body: body);

上面的exchange的邮箱一定要存在,不然发布失败。

如果队列没有绑定邮箱,消息将会丢失,但这对我们没有问题; 因为如果没有消费者在监听,我们可以安全地丢弃该消息。



消费者:

using System; using RabbitMQ.Client; using RabbitMQ.Client.Events; using System.Text; class Program { public static void Main() { var factory = new ConnectionFactory() { HostName = "localhost", UserName = "ay", Password = "123456", Port = 5672 }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "logs", type: "fanout"); var queueName = channel.QueueDeclare().QueueName; channel.QueueBind(queue: queueName, exchange: "logs", routingKey: ""); Console.WriteLine(" [*] Waiting for logs."); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine(" [x] {0}", message); }; channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } }

消费者先打开,然后打开生产者

image.png


AY 过程理解:

测试1:

把邮箱都删掉,保持干净环境

我们运行消费者:此时通过web管理,看到多了个logs

image.png

对内多了一个amq.genXXXX的名字的queue,

image.png


关闭消费者==》》 临时队列删除了,exchange依旧存在。

image.png


然后运行生产者,不退出,队列没变化

打开消费者,也无法获得消息。

我们生产个消息,消费者不关闭,立即就可以收到消息了。

总结: 消费者连接了在线才可以收到消息。

(由于我们的代码,释放连接,就关闭消费者程序就行了,意味着 消费者死亡,那么临时队列也会被删除了。 满足条件)


接下来运行多个消费者,然后打开生产者,每个消费者都能正确拿到消息了。

image.png

image.png

运行多个,会产生多个临时队列,理解OK。


fanout 就是根据exchange名字来拿消息,没啥过滤在里面,但是你也可以 设计名字,来分配业务。


下面会将 可以过滤的topic,header


关于我以前写的fanout教程: 小坦克


 


Routing 路由  集中处理 数据  然后 按照 约定/规则 正确的 广播到 消费者

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zygdwx.html