发布者2
using RabbitMQ.Client; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace MQ.Product2 { class Program { static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost", UserName = "ay", Password = "123456", Port = 5672 }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { var message = GetMessage(args); var body = Encoding.UTF8.GetBytes(message); channel.QueueDeclare(queue: "task_queue", durable: true, exclusive: false, autoDelete: true, arguments: null); var properties = channel.CreateBasicProperties(); properties.Persistent = true; channel.BasicPublish(exchange: "", routingKey: "task_queue", basicProperties: properties, body: body); //channel.QueueDeclare(queue: "hello", // durable: true, // exclusive: false, // autoDelete: true, // arguments: null); //string message = "Hello World!"; //var body = Encoding.UTF8.GetBytes(message); //channel.BasicPublish(exchange: "", // routingKey: "hello", // basicProperties: null, // body: body); //Console.WriteLine(" [x] Sent {0}", message); } Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } private static string GetMessage(string[] args) { return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!"); } } }这里基于上一个DEMO改的,这里我们设置了一个properties了。
运行项目。
然后消费者修改代码(基于DEMO1的消费者 代码)
using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace AyTestMQ2 { class Program { static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost", UserName = "ay", Password = "123456", Port = 5672 }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { //channel.QueueDeclare(queue: "task_queue", // durable: true, // exclusive: false, // autoDelete: true, // arguments: null); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine(" [x] Received {0}", message); int dots = message.Split('.').Length - 1; Thread.Sleep(dots * 1000); Console.WriteLine(" [x] Done"); }; channel.BasicConsume(queue: "task_queue", autoAck: true, consumer: consumer); //var consumer = new EventingBasicConsumer(channel); //consumer.Received += (model, ea) => //{ // var body = ea.Body; // var message = Encoding.UTF8.GetString(body); // Console.WriteLine(" [x] Received {0}", message); //}; //channel.BasicConsume(queue: "hello", // autoAck: true, // consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } Console.ReadKey(); } } }主要接收消息,处理,模拟耗时工作。
发的消息一个 点号 停顿1秒
生产端消息改下
private static string GetMessage(string[] args)
{
return ((args.Length > 0) ? string.Join(" ", args) : "Hello.World.AY.2019");
}
消费端改改
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);
var _3 = message.Split('.');
//int dots = message.Split('.').Length - 1;
foreach (var item in _3)
{
Console.WriteLine(item);
Thread.Sleep(1000);
}
运行生产端,然后消费端效果如下
测试2,
开启生产者,然后开启消费者,如上所示,不要关闭,关掉生产者在打开,消费者那段又收到消息了。
同样的,如果有2个消费者, rabbitmq会发给下一个消费者,这种分发消息叫做 round-robin(循环调度)
一个消息只给一个消费者处理。