抽一根烟的时间学会.NET Core 操作RabbitMQ (2)

启动成功后,consumer的Received方法,会收到一条来自MQ的消息,

抽一根烟的时间学会.NET Core 操作RabbitMQ

如果处理完成后,不调用chennel的BasicAck方法,那么这条消息依然会存在,下次有消费者出现,会再次推送给消费者。

简单的RabbitMQ Hello World到这里就算完成了。接下来就是稍微高级一点的应用

RabbitMQ的工作模式 Work Queue 工作队列模式

抽一根烟的时间学会.NET Core 操作RabbitMQ

工作队列模式的意思就是一个生产者对应多个消费者。RabbitMQ会使用轮询去给每个消费者发送消息。

publish/subscribe

发布订阅模式是属于比较用多的一种。

抽一根烟的时间学会.NET Core 操作RabbitMQ

发布订阅,是由交换机发布消息给多个队列。多个队列再对应多个消费者。

发布订阅模式对应的交换机类型的fanout。

消费者

A

const string QUEUENAME = "HELLO_MQ_B"; const string TESTEXCHANGE = "TESTEXCHANGE"; var factory = new ConnectionFactory() { UserName = "guest", Password = "guest", HostName = "localhost", Port = 5672, }; var conn = factory.CreateConnection(); var channel = conn.CreateModel(); //定义队列 channel.QueueDeclare(QUEUENAME, true, false, false); //定义交换机 channel.ExchangeDeclare(TESTEXCHANGE, ExchangeType.Fanout, true, false); //绑定队列到交换机 channel.QueueBind(QUEUENAME, TESTEXCHANGE, ""); var consumer = new EventingBasicConsumer(channel); consumer.Received += (a, e) => { Console.WriteLine($"{DateTime.Now.ToString()}接收到消息:" + Encoding.Default.GetString(e.Body.ToArray())); channel.BasicAck(e.DeliveryTag, true); //收到回复后,RabbitMQ会直接在队列中删除这条消息 }; channel.BasicConsume(QUEUENAME, false, consumer); Console.WriteLine("启动成功"); Console.ReadLine();

B

const string QUEUENAME = "HELLO_MQ"; const string TESTEXCHANGE = "TESTEXCHANGE"; var factory = new ConnectionFactory() { UserName = "guest", Password = "guest", HostName = "localhost", Port = 5672, }; var conn = factory.CreateConnection(); var channel = conn.CreateModel(); //定义队列 channel.QueueDeclare(QUEUENAME, true, false, false); //定义交换机 channel.ExchangeDeclare(TESTEXCHANGE, ExchangeType.Fanout, true, false); //绑定队列到交换机 channel.QueueBind(QUEUENAME, TESTEXCHANGE, ""); var consumer = new EventingBasicConsumer(channel); consumer.Received += (a, e) => { Console.WriteLine($"{DateTime.Now.ToString()}接收到消息:" + Encoding.Default.GetString(e.Body.ToArray())); channel.BasicAck(e.DeliveryTag, true); //收到回复后,RabbitMQ会直接在队列中删除这条消息 }; channel.BasicConsume(QUEUENAME, false, consumer); Console.WriteLine("启动成功"); Console.ReadLine(); 生产者 const string QUEUENAME = "HELLO_MQ"; const string QUEUENAME_B = "HELLO_MQ_B"; const string TESTEXCHANGE = "TESTEXCHANGE"; //创建连接对象工厂 var factory = new ConnectionFactory() { UserName = "guest", Password = "guest", HostName = "localhost", Port = 5672, //RabbitMQ默认的端口 }; using var conn = factory.CreateConnection(); while (true) { var channel = conn.CreateModel(); //定义交换机 channel.ExchangeDeclare(TESTEXCHANGE, ExchangeType.Fanout, true, false); Console.WriteLine("输入生产内容:"); var input = Console.ReadLine(); channel.BasicPublish(TESTEXCHANGE,"", null, Encoding.Default.GetBytes("hello rabbitmq:" + input)); }

在生产者运行成功后,RabbitMQ后台会出现一个交换机,点击交换机会看到交换机下绑定了两个队列

抽一根烟的时间学会.NET Core 操作RabbitMQ


抽一根烟的时间学会.NET Core 操作RabbitMQ

从生产者发送消息到队列,两个消费者会同时收到消息

抽一根烟的时间学会.NET Core 操作RabbitMQ

routing模式

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wssgzf.html