public static void SimpleConsumer() { string queueName = "simple_order"; var connection = RabbitMQHelper.GetConnection(); { //创建信道 var channel = connection.CreateModel(); { //创建队列 channel.QueueDeclare(queueName, durable: false, exclusive: false, autoDelete: false, arguments: null); var consumer = new EventingBasicConsumer(channel); int i = 0; consumer.Received += (model, ea) => { //消费者业务处理 var message = Encoding.UTF8.GetString(ea.Body.ToArray()); Console.WriteLine($"{i},队列{queueName}消费消息长度:{message.Length}"); i++; }; channel.BasicConsume(queueName, true, consumer); } } }
消费者只需要知道队列名就可以消费了,不需要Exchange和routingKey。
注:消费者这里有一个创建队列,它本身不需要,是预防消费端程序先执行,没有队列会报错。
执行效果:
消息已经被消费完。
(2)工作队列模式一个消息生产者,一个交换器,一个消息队列,多个消费者。同样也称为点对点模式
生产者P发送消息到队列,多个消费者C消费队列的数据。
工作队列也称为公平性队列模式,循环分发,RabbitMQ将按顺序将每条消息发送给下一个消费者,每个消费者将获得相同数量的消息。
生产者:
Send.cs代码:
/// <summary> /// 工作队列模式 /// </summary> public static void WorkerSendMsg() { string queueName = "worker_order";//队列名 //创建连接 using (var connection = RabbitMQHelper.GetConnection()) { //创建信道 using (var channel = connection.CreateModel()) { //创建队列 channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments: null); var properties = channel.CreateBasicProperties(); properties.Persistent = true; //消息持久化 for ( var i=0;i<10;i++) { string message = $"Hello RabbitMQ MessageHello,{i+1}"; var body = Encoding.UTF8.GetBytes(message); //发送消息到rabbitmq channel.BasicPublish(exchange: "", routingKey: queueName, mandatory: false, basicProperties: properties, body); Console.WriteLine($"发送消息到队列:{queueName},内容:{message}"); } } } }
参数durable:true,需要持久化,实际项目中肯定需要持久化的,不然重启RabbitMQ数据就会丢失了。
执行效果:
写入10条数据,有持久化标识D
消费端:
Recevie代码:
public static void WorkerConsumer() { string queueName = "worker_order"; var connection = RabbitMQHelper.GetConnection(); { //创建信道 var channel = connection.CreateModel(); { //创建队列 channel.QueueDeclare(queueName, durable: false, exclusive: false, autoDelete: false, arguments: null); var consumer = new EventingBasicConsumer(channel); //prefetchCount:1来告知RabbitMQ,不要同时给一个消费者推送多于 N 个消息,也确保了消费速度和性能 channel.BasicQos(prefetchSize: 0, prefetchCount:1, global: false); int i = 1; int index = new Random().Next(10); consumer.Received += (model, ea) => { //处理业务 var message = Encoding.UTF8.GetString(ea.Body.ToArray()); Console.WriteLine($"{i},消费者:{index},队列{queueName}消费消息长度:{message.Length}"); channel.BasicAck(ea.DeliveryTag, false); //消息ack确认,告诉mq这条队列处理完,可以从mq删除了 Thread.Sleep(1000); i++; }; channel.BasicConsume(queueName,autoAck:false, consumer); } } }
BasicQos参数解析:
prefetchSize:每条消息大小,一般设为0,表示不限制。