prefetchCount:1,作用限流,告诉RabbitMQ不要同时给一个消费者推送多于N个消息,消费者会把N条消息缓存到本地一条条消费,如果不设,RabbitMQ会进可能快的把消息推到客户端,导致客户端内存升高。设置合理可以不用频繁从RabbitMQ 获取能提升消费速度和性能,设的太多的话则会增大本地内存,需要根据机器性能合理设置,官方建议设为30。
global:是否为全局设置。
这些限流设置针对消费者autoAck:false时才有效,如果是自动Ack的,限流不生效。
执行两个消费者,效果:
可以看到消费者号的标识,8,2,8,2是平均的,一个消费者5个,RabbitMQ上也能看到有2个消费者,Unacked数是2,因为每个客户端的限流数是1。
工作队列模式也是很常用的队列模式。
(3)发布订阅模式Pulish/Subscribe,无选择接收消息,一个消息生产者,一个交换机(交换机类型为fanout),多个消息队列,多个消费者。称为发布/订阅模式
在应用中,只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。
生产者P只需把消息发送到交换机X,绑定这个交换机的队列都会获得一份一样的数据。
应用场景:适合于用同一份数据源做不同的业务。
生产者代码:
/// <summary> /// 发布订阅, 扇形队列 /// </summary> public static void SendMessageFanout() { //创建连接 using (var connection = RabbitMQHelper.GetConnection()) { //创建信道 using (var channel = connection.CreateModel()) { string exchangeName = "fanout_exchange"; //创建交换机,fanout类型 channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout); string queueName1 = "fanout_queue1"; string queueName2 = "fanout_queue2"; string queueName3 = "fanout_queue3"; //创建队列 channel.QueueDeclare(queueName1, false, false, false); channel.QueueDeclare(queueName2, false, false, false); channel.QueueDeclare(queueName3, false, false, false); //把创建的队列绑定交换机,routingKey不用给值,给了也没意义的 channel.QueueBind(queue: queueName1, exchange: exchangeName, routingKey: ""); channel.QueueBind(queue: queueName2, exchange: exchangeName, routingKey: ""); channel.QueueBind(queue: queueName3, exchange: exchangeName, routingKey: ""); var properties = channel.CreateBasicProperties(); properties.Persistent = true; //消息持久化 //向交换机写10条消息 for (int i = 0; i < 10; i++) { string message = $"RabbitMQ Fanout {i + 1} Message"; var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchangeName, routingKey: "", null, body); Console.WriteLine($"发送Fanout消息:{message}"); } } } }
执行代码:
向交换机发送10条消息,则绑定这个交换机的3个队列都会有10条消息。
消费端的代码和工作队列的一样,只需知道队列名即可消费,声明时要和生产者的声明一样。
(4)路由模式(推荐使用)在发布/订阅模式的基础上,有选择的接收消息,也就是通过 routing 路由进行匹配条件是否满足接收消息。
上图是一个结合日志消费级别的配图,在路由模式它会把消息路由到那些 binding key 与 routing key 完全匹配的 Queue 中,此模式也就是 Exchange 模式中的direct模式。
生产者P发送数据是要指定交换机(X)和routing发送消息 ,指定的routingKey=error,则队列Q1和队列Q2都会有一份数据,如果指定routingKey=into,或=warning,交换机(X)只会把消息发到Q2队列。
生产者代码:
/// <summary> /// 路由模式,点到点直连队列 /// </summary> public static void SendMessageDirect() { //创建连接 using (var connection = RabbitMQHelper.GetConnection()) { //创建信道 using (var channel = connection.CreateModel()) { //声明交换机对象,fanout类型 string exchangeName = "direct_exchange"; channel.ExchangeDeclare(exchangeName, ExchangeType.Direct); //创建队列 string queueName1 = "direct_errorlog"; string queueName2 = "direct_alllog"; channel.QueueDeclare(queueName1, true, false, false); channel.QueueDeclare(queueName2, true, false, false); //把创建的队列绑定交换机,direct_errorlog队列只绑定routingKey:error channel.QueueBind(queue: queueName1, exchange: exchangeName, routingKey: "error"); //direct_alllog队列绑定routingKey:error,info channel.QueueBind(queue: queueName2, exchange: exchangeName, routingKey: "info"); channel.QueueBind(queue: queueName2, exchange: exchangeName, routingKey: "error"); var properties = channel.CreateBasicProperties(); properties.Persistent = true; //消息持久化 //向交换机写10条错误日志和10条Info日志 for (int i = 0; i < 10; i++) { string message = $"RabbitMQ Direct {i + 1} error Message"; var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchangeName, routingKey: "error", properties, body); Console.WriteLine($"发送Direct消息error:{message}"); string message2 = $"RabbitMQ Direct {i + 1} info Message"; var body2 = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchangeName, routingKey: "info", properties, body2); Console.WriteLine($"info:{message2}"); } } } }