生产者代码如下
using RabbitMQ.Client; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace MQ3 { class Program { public static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost", UserName = "ay", Password = "123456", Port = 5672 }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "logs", type: "fanout"); var message = GetMessage(args); var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "logs", routingKey: "", basicProperties: null, body: body); Console.WriteLine(" [x] Sent {0}", message); } Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } private static string GetMessage(string[] args) { return ((args.Length > 0) ? string.Join(" ", args) : "info: Hello.World!"); } } }====================www.ayjs.net 杨洋 wpfui.com ayui ay aaronyang=======请不要转载谢谢了。=========
运行,
命令行列出rabbitmqctl list_exchanges
amq.*开头的 默认邮箱,安装好RabbitMQ就有的,暂时用不到。
在以前的代码,我们没有用到exchange,但是仍然可以发消息到队列。那是因为我们发到默认邮箱去了,我们给exchange赋值是空白
var message = GetMessage(args); var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: null, body: body);现在我们代码 声明一个 fanout的 邮箱(交换器),取个名字叫logs
channel.ExchangeDeclare(exchange: "logs", type: "fanout");
临时队列
前面的代码,我们给队列取名字 hello,task_queue还记得吗?命名后,消费者通过名字拿到队列然后 处理消息的。
在这个例子对于我们的logger不重要,我们只关注log内容,
我们可以通过下面,随机取名字。然后一旦消费者断开连接,消息要能删除。
默认,创建的是非 持久化,exclusive,自动删除的队列
ar queueName = channel.QueueDeclare().QueueName;exclusive queue独占队列,相当于给队列lock了,别人不能拿到。 一般消费者死了,独占队列会被删除。因此用于特定的某些场景。
你占用了,当别人尝试访问,就会报 RESOURCE_LOCKED的错误异常。表示无法获得对 锁定队列的 独占访问权限。
Binding
队列要放入邮箱才好,放进去叫绑定
channel.QueueBind(queue: queueName, exchange: "logs", routingKey: "");上面代码我没给队列名字,因为我不关心,你也可以写个 产品约定好的名字 ,或者调用临时队列的知识,随机名字。
列出绑定(命令行)
rabbitmqctl list_bindings发布消息
channel.BasicPublish(exchange: "logs", routingKey: "", basicProperties: null, body: body);上面的exchange的邮箱一定要存在,不然发布失败。
如果队列没有绑定邮箱,消息将会丢失,但这对我们没有问题; 因为如果没有消费者在监听,我们可以安全地丢弃该消息。
消费者:
using System; using RabbitMQ.Client; using RabbitMQ.Client.Events; using System.Text; class Program { public static void Main() { var factory = new ConnectionFactory() { HostName = "localhost", UserName = "ay", Password = "123456", Port = 5672 }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "logs", type: "fanout"); var queueName = channel.QueueDeclare().QueueName; channel.QueueBind(queue: queueName, exchange: "logs", routingKey: ""); Console.WriteLine(" [*] Waiting for logs."); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine(" [x] {0}", message); }; channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } }消费者先打开,然后打开生产者
AY 过程理解:
测试1:
把邮箱都删掉,保持干净环境
我们运行消费者:此时通过web管理,看到多了个logs
对内多了一个amq.genXXXX的名字的queue,
关闭消费者==》》 临时队列删除了,exchange依旧存在。
然后运行生产者,不退出,队列没变化
打开消费者,也无法获得消息。
我们生产个消息,消费者不关闭,立即就可以收到消息了。
总结: 消费者连接了在线才可以收到消息。
(由于我们的代码,释放连接,就关闭消费者程序就行了,意味着 消费者死亡,那么临时队列也会被删除了。 满足条件)
接下来运行多个消费者,然后打开生产者,每个消费者都能正确拿到消息了。
运行多个,会产生多个临时队列,理解OK。
fanout 就是根据exchange名字来拿消息,没啥过滤在里面,但是你也可以 设计名字,来分配业务。
下面会将 可以过滤的topic,header
关于我以前写的fanout教程: 小坦克
Routing 路由 集中处理 数据 然后 按照 约定/规则 正确的 广播到 消费者