AY C# RabbitMQ 2019 微笔记 (4)

发布者2

using RabbitMQ.Client; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace MQ.Product2 { class Program { static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost", UserName = "ay", Password = "123456", Port = 5672 }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { var message = GetMessage(args); var body = Encoding.UTF8.GetBytes(message); channel.QueueDeclare(queue: "task_queue", durable: true, exclusive: false, autoDelete: true, arguments: null); var properties = channel.CreateBasicProperties(); properties.Persistent = true; channel.BasicPublish(exchange: "", routingKey: "task_queue", basicProperties: properties, body: body); //channel.QueueDeclare(queue: "hello", // durable: true, // exclusive: false, // autoDelete: true, // arguments: null); //string message = "Hello World!"; //var body = Encoding.UTF8.GetBytes(message); //channel.BasicPublish(exchange: "", // routingKey: "hello", // basicProperties: null, // body: body); //Console.WriteLine(" [x] Sent {0}", message); } Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } private static string GetMessage(string[] args) { return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!"); } } }

这里基于上一个DEMO改的,这里我们设置了一个properties了。

运行项目。


然后消费者修改代码(基于DEMO1的消费者 代码)

using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace AyTestMQ2 { class Program { static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost", UserName = "ay", Password = "123456", Port = 5672 }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { //channel.QueueDeclare(queue: "task_queue", // durable: true, // exclusive: false, // autoDelete: true, // arguments: null); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine(" [x] Received {0}", message); int dots = message.Split('.').Length - 1; Thread.Sleep(dots * 1000); Console.WriteLine(" [x] Done"); }; channel.BasicConsume(queue: "task_queue", autoAck: true, consumer: consumer); //var consumer = new EventingBasicConsumer(channel); //consumer.Received += (model, ea) => //{ // var body = ea.Body; // var message = Encoding.UTF8.GetString(body); // Console.WriteLine(" [x] Received {0}", message); //}; //channel.BasicConsume(queue: "hello", // autoAck: true, // consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } Console.ReadKey(); } } }

主要接收消息,处理,模拟耗时工作。

发的消息一个 点号 停顿1秒


生产端消息改下

  private static string GetMessage(string[] args)

        {

            return ((args.Length > 0) ? string.Join(" ", args) : "Hello.World.AY.2019");

        }

消费端改改

      consumer.Received += (model, ea) =>

                {

                    var body = ea.Body;

                    var message = Encoding.UTF8.GetString(body);

                    Console.WriteLine(" [x] Received {0}", message);

                    var _3 = message.Split('.');

                    //int dots = message.Split('.').Length - 1;

                    foreach (var item in _3)

                    {

                        Console.WriteLine(item);

                        Thread.Sleep(1000);

                    }

运行生产端,然后消费端效果如下

6F.gif

测试2,

开启生产者,然后开启消费者,如上所示,不要关闭,关掉生产者在打开,消费者那段又收到消息了。

image.png

同样的,如果有2个消费者, rabbitmq会发给下一个消费者,这种分发消息叫做 round-robin(循环调度)

一个消息只给一个消费者处理。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zygdwx.html