分布式系统消息中间件——RabbitMQ的使用进阶篇 (2)

    目前有两种方法可以设置消息的TTL。第一种方法是通过队列属性设置,队列中所有消息都有相同的过期时间。第二种方法是对消息本身进行单独设置,每条消息的TTL可以不同。如果两种方法一起使用,则消息的TTL 以两者之间较小的那个数值为准。消息在队列中的生存时间一旦超过设置的TTL值时,就会变成"死信" (Dead Message) ,消费者将无法再收到该消息。(有关死信队列请往下看)

    通过队列属性设置消息TTL的方法是在channel.QueueDeclare方法中加入x-message-ttl参数实现的,这个参数的单位是毫秒。示例代码下:

IDictionary<string, object> args = new Dictionary<string, object>(); args.Add("x-message-ttl", 6000); channel.QueueDeclare("ttlQueue", true, false, false, args);

    如果不设置TTL.则表示此消息不会过期;如果将TTL设置为0 ,则表示除非此时可以直接将消息投递到消费者,否则该消息会被立即丢弃(或由死信队列来处理)。

    针对每条消息设置TTL的方法是在channel.BasicPublish方法中加入Expiration的属性参数,单位为毫秒。关键代码如下:

BasicProperties properties = new BasicProperties() { Expiration = "20000",//设置TTL为20000毫秒 }; var message = Encoding.UTF8.GetBytes("TestMsg"); channel.BasicPublish("normalExchange", "NormalRoutingKey", true, properties, message);

注意:对于第一种设置队列TTL属性的方法,一旦消息过期,就会从队列中抹去,而在第二种方法中,即使消息过期,也不会马上从队列中抹去,因为每条消息是否过期是在即将投递到消费者之前判定的。Why?在第一种方法里,队列中己过期的消息肯定在队列头部, RabbitMQ 只要定期从队头开始扫描是否有过期的消息即可。而第二种方法里,每条消息的过期时间不同,如果要删除所有过期消息势必要扫描整个队列,所以不如等到此消息即将被消费时再判定是否过期,如果过期再进行删除即可。

3.2 设置队列的TTL

    注意,这里和上述通过队列设置消息的TTL不同。上面删除的是消息,而这里删除的是队列。通过channel.QueueDeclare 方法中的x-expires参数可以控制队列被自动删除前处于未使用状态的时间。这个未使用的意思是队列上没有任何的消费者,队列也没有被重新声明,并且在过期时间段内也未调用过channel.BasicGet命令。

    设置队列里的TTL可以应用于类似RPC方式的回复队列,在RPC中,许多队列会被创建出来,但是却是未被使用的(有关RabbitMQ实现RPC请往下看)。RabbitMQ会确保在过期时间到达后将队列删除,但是不保障删除的动作有多及时。在RabbitMQ 重启后, 持久化的队列的过期时间会被重新计算。用于表示过期时间的x-expires参数以毫秒为单位, 井且服从和x-message-ttl一样的约束条件,不同的是它不能设置为0(会报错)。
示例代码如下:

IDictionary<string, object> args = new Dictionary<string, object>(); args.Add("x-expires", 6000); channel.QueueDeclare("ttlQueue", false, false, false, args); 四 死信队列

    DLX(Dead-Letter-Exchange)死信交换器,当消息在一个队列中变成死信之后,它能被重新被发送到另一个交换器中,这个交换器就是DLX ,绑定DLX的队列就称之为死信队列。
消息变成死信主要有以下几种情况:

消息被拒绝(BasicReject/BasicNack) ,井且设置requeue 参数为false;(消费者确认机制将会在下一篇文章中涉及)

消息过期;

队列达到最大长度。

    DLX也是一个正常的交换器,和一般的交换器没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。当这个队列中存在死信时,RabbitMQ 就会自动地将这个消息重新发布到设置的DLX上去,进而被路由到另一个队列,即死信队列。可以监听这个队列中的消息、以进行相应的处理。

    通过在channel.QueueDeclare 方法中设置x-dead-letter-exchange参数来为这个队列添加DLX。其示例代码如下:

channel.ExchangeDeclare("exchange.dlx", "direct", true);//定义死信交换器 channel.ExchangeDeclare("exchange.normal", "direct", true);//定义普通交换器 IDictionary<String, Object> args = new Dictionary<String, Object>(); args.Add("x-message-ttl",10000);//定义消息过期时间为10000毫秒 args.Add("x-dead-letter-exchange", "exchange.dlx");//定义exchange.dlx为死信交换器 args.Add("x-dead-letter-routing-key", "routingkey");//定义死信交换器的绑定key,这里也可以不指定,则默认使用原队列的路由key channel.QueueDeclare("queue.normal", true, false, false, args);//定义普通队列 channel.QueueBind("queue.normal", "exchange.normal", "normalKey");//普通队列交换器绑定 channel.QueueDeclare("queue.dlx", true, false, false, null);//定义死信队列 channel.QueueBind("queue.dlx", "exchange.dlx", "routingkey");//死信队列交换器绑定,若上方为制定死信队列路由key则这里需要使用原队列的路由key //发布消息 var message = Encoding.UTF8.GetBytes("TestMsg"); channel.BasicPublish("exchange.normal", "normalKey", null, message) ;

    以下为死信队列的运转流程:

死信队列

五 延迟队列

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zyjxjy.html