AY C# RabbitMQ 2019 微笔记 (12)

Maximum priority(x-max-priority):优先级队列,声明队列时先定义最大优先级值(定义最大值一般不要太大),在发布消息的时候指定该消息的优先级, 优先级更高(数值更大的)的消息先被消费,


Lazy mode(x-queue-mode=lazy): Lazy Queues: 先将消息保存到磁盘上,不放在内存中,当消费者开始消费的时候才加载到内存中

Master locator(x-queue-master-locator) 将队列设置为主位置模式,确定在节点集群上声明时队列主机所在的规则。

(设置“x-queue-master-locator”参数。)


整理下:  1个 Exchange 可以多个 Queue   1个Queue可以多个Message  Message可以设置优先级,Queue可以设置最多多少个Message,多大字节的存储,也可以设置优先级。


上次持久化的问题理解:

设置消息持久化必须先设置队列持久化,要不然队列不持久化消息持久化队列都不存在了,消息存在还有什么意义。消息持久化需要将交换机持久化、队列持久化、消息持久化,才能最终达到持久化的目的。




上面代码,我们看到了,消息过期了,就会被扔到一个叫exchange-direct的 邮箱,真好,那么等他过期了,那么消费者 监听这个邮箱,是不是8秒过后就收到了消息,达到了 延迟的效果。跟RPC的设计异曲同工之妙。如果没有指定邮箱,8秒后消息就会被删除了。 10秒后 队列就会被删除。

image.png


接下来,

消费者,就是创建个 被转发的 exchange的监听,

using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace AyTestMQ2 { class Program { static void Main(string[] args) { Console.Title = "AY 2019 2018-12-5"; var factory = new ConnectionFactory() { HostName = "localhost", UserName = "ay", Password = "123456", Port = 5672 }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "exchange-direct", type: "direct"); string name = channel.QueueDeclare().QueueName; //string name = "deadQueue"; channel.QueueBind(queue: name, exchange: "exchange-direct", routingKey: "routing-delay"); //回调,当consumer收到消息后会执行该函数 var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine(ea.RoutingKey); Console.WriteLine(" [x] Received {0}", message); }; //Console.WriteLine("name:" + name); //消费队列"hello"中的消息 channel.BasicConsume(queue: name, autoAck: true, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } Console.ReadKey(); } } }

打开生产端,按下回车,进入循环,然后打开消费端,8秒后消费者就收到了消息。

如果8秒后,消息都被转发了,消费者打开,是收不到消息的。

 


心跳检测,客户端服务端是否断开了。

默认情况下,在3.5.5版本之前,rabbitmq设置的默认与客户端心跳时间为580秒,之后为60秒(如果时间间隔配置为0,则表示不启用heartbeat检测),两者时间会每隔timeout / 2 进行一次心跳互通。

=ERROR REPORT==== 01-Dec-2018::12:38:00 ===
closing AMQP connection <0.909.1> (125.120.18.131:5060 -> 120.27.140.42:5672):
Missed heartbeats from client, timeout: 10s


启用心跳检测后,rabbitmq会为每个tcp连接创建两个进程用于心跳检测(这可以通过rabbitmq.log看到每个客户端确实有两个连接,关闭的时候也是成对的方式),一个进程定时检测tcp连接上是否有数据发送(这里的发送是指rabbitmq发送数据给客户端),如果一段时间内没有数据发送给客户端,则发送一个心跳包给客户端,然后循环进行下一次检测;另一个进程定时检测tcp连接上是否有数据的接收,如果一段时间内没有收到任何数据,则判定为心跳超时,最终会关闭tcp连接。另外,rabbitmq的流量控制机制可能会暂停heartbeat检测。


如果超过2次心跳无响应,则会认为对方不可到达并关闭连接。此时,客户端通常需要重新连接。具体视客户端的不同而不同。

具体可见,小坦克。

在我们的环境中,应该来说负载并不是特别的高,ping 1k的延时基本上都在20ms以内,照理设置10ms的心跳间隔足以,为什么还是会出现,还需要trace_on看下详细的网络包情况。


内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zygdwx.html