分布式系统消息中间件——RabbitMQ的使用进阶篇

分布式系统消息中间件——RabbitMQ的使用进阶篇 前言

    上一篇文章(https://www.cnblogs.com/hunternet/p/9668851.html)简单总结了分布式系统中的消息中间件以及RabbitMQ的基本使用,这篇文章主要总结一下RabbitMQ在日常项目开发中比较常用的几个特性。

一 mandatory 参数

    上一篇文章中我们知道,生产者将消息发送到RabbitMQ的交换器中通过RoutingKey与BindingKey的匹配将之路由到具体的队列中以供消费者消费。那么当我们通过匹配规则找不到队列的时候,消息将何去何从呢?Rabbit给我们提供了两种方式。mandatory与备份交换器。

    mandatory参数是channel.BasicPublish方法中的参数。其主要功能是消息传递过程中不可达目的地时将消息返回给生产者。当mandatory 参数设为true 时,交换器无法根据自身的类型和路由键找到一个符合条件的队列,那么RabbitMQ 会调用BasicReturn 命令将消息返回给生产者。当mandatory 参数设置为false 时。则消息直接被丢弃。其运转流程与实现代码如下(以C# RabbitMQ.Client 3.6.9为例):

mandatory 参数

//连接与创建信道--后续的示例代码我们会省略掉这部分代码和释放连接 ConnectionFactory factory = new ConnectionFactory(); factory.UserName = "admin"; factory.Password = "admin"; factory.HostName = "192.168.121.205"; IConnection conn = factory.CreateConnection();//连接Rabbit IModel channel = conn.CreateModel();//创建信道 channel.ExchangeDeclare("exchangeName", "direct", true);//定义交换器 String queueName = channel.QueueDeclare("TestQueue", true, false, false, null).QueueName;//定义 队列 队列名TestQueue,持久化的,非排它的,非自动删除的。 channel.QueueBind(queueName, "exchangeName", "routingKey");//队列绑定交换器 var message = Encoding.UTF8.GetBytes("TestMsg"); channel.BasicPublish("exchangeName", "routingKey", true, null, message);//发布一个可以路由到队列的消息,mandatory参数设置为true var message1 = Encoding.UTF8.GetBytes("TestMsg1"); channel.BasicPublish("exchangeName", "routingKey1", true, null, message);//发布一个不可以路由到队列的消息,mandatory参数设置为true //生产者回调函数 channel.BasicReturn += (model, ea) => { //do something... 消息若不能路由到队列则会调用此回调函数。 }; //关闭信道与连接 channel.close(); conn.close() ; 二 备份交换器

    当消息不能路由到队列时,通过mandatory设置参数,我们可以将消息返回给生产者处理。但这样会有一个问题,就是生产者需要开一个回调的函数来处理不能路由到的消息,这无疑会增加生产者的处理逻辑。备份交换器(Altemate Exchange)则提供了另一种方式来处理不能路由的消息。备份交换器可以将未被路由的消息存储在RabbitMQ中,在需要的时候去处理这些消息。其主要实现代码如下:

IDictionary<string, object> args = new Dictionary<string, object>(); args.Add("alternate-exchange", "altExchange"); channel.ExchangeDeclare("normalExchange", "direct", true, false, args);//定义普通交换器并添加备份交换器参数 channel.ExchangeDeclare("altExchange", "fanout", true, false, null); //定义备份交换器,并声明为扇形交换器 channel.QueueDeclare("normalQueue", true, false, false, null);//定义普通队列 channel.QueueBind("normalQueue", "normalExchange", "NormalRoutingKey1");//普通队列队列绑定普通交换器 channel.QueueDeclare("altQueue", true, false, false, null);//定义备份队列 channel.QueueBind("altQueue", "altExchange", "");//绑定备份队列与交换器 var msg1 = Encoding.UTF8.GetBytes("TestMsg"); channel.BasicPublish("normalExchange", "NormalRoutingKey1", false, null, msg1);//发布一个可以路由到队列的消息,消息最终会路由到normalQueue var msg2 = Encoding.UTF8.GetBytes("TestMsg1"); channel.BasicPublish("normalExchange", "NormalRoutingKey2", false, null, msg2);//发布一个不可以被路由的消息,消息最终会进入altQueue

备份交换器

备份交换器其实和普通的交换器没有太大的区别,为了方便使用,建议设置为fanout类型,若设置为direct 或者topic的类型。需要注意的是,消息被重新发送到备份交换器时的路由键和从生产者发出的路由键是一样的。考虑这样一种情况,如果备份交换器的类型是direct,并且有一个与其绑定的队列,假设绑定的路由键是key1,当某条携带路由键为key2 的消息被转发到这个备份交换器的时候,备份交换器没有匹配到合适的队列,则消息丢失。如果消息携带的路由键为keyl,则可以存储到队列中。
对于备份交换器,有以下几种特殊情况:

如果设置的备份交换器不存在,客户端和RabbitMQ 服务端都不会有异常出现,此时消息会丢失。

如果备份交换器没有绑定任何队列,客户端和RabbitMQ 服务端都不会有异常出现,此时消息会丢失。

如果备份交换器没有任何匹配的队列,客户端和RabbitMQ 服务端都不会有异常出现,此时消息会丢失。

如果备份交换器和mandatory参数一起使用,那么mandatory参数无效。

三 过期时间(TTL) 3.1 设置消息的TTL

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zyjxjy.html