RabbitMQ不讲武德,发个消息也这么多花招 (6)

上面通过事务或者确认机制确保了消息成功发送到交换机,那么接下来交换机会负责将消息路由到队列,这时候假如队列不存在或者路由错误就会导致消息路由失败,这又该如何保证呢?

同样的,RabbitMQ 中也提供了 2 种方式来确保消息可以正确路由到队列:开启监听模式或者通过新增备份交换机模式来备份数据。

监听回调

上面介绍的是消息是否发送到交换机的回调,而从交换机路由到队列,同样可以开启确认模式。

Java API 方式开启监听模式

下面就是开启监听主要代码,为了节省篇幅,省略了其余不相干代码(完成代码已上传至 GitHub)

channel.addReturnListener(new ReturnListener() { @Override public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("收到未路由到队列的回调消息:" + new String(body)); } }); //注意这里的第三个参数,mandatory需要设置为true(发送一个错误的路由,即可收到回调) channel.basicPublish(EXCHANGE_NAME,"ERROR_ROUTING_KEY",true,null,msg.getBytes()); Spring Boot 开启监听模式

在 RabitConfig 类中添加如下配置:

@Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){ RabbitTemplate rabbitTemplate = new RabbitTemplate(); rabbitTemplate.setConnectionFactory(connectionFactory); rabbitTemplate.setMandatory(true);//开启监听回调 //消息是否成功被路由到队列,没有路由到队列时会收到回调(原setReturnCallback在2.0版本已过期) rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() { @Override public void returnedMessage(ReturnedMessage returnedMessage) { System.out.println("收到未路由到队列的回调消息:" + new String(returnedMessage.getMessage().getBody())); } }); return rabbitTemplate; } 备份交换机

除了开启监听的方式,还可以通过定义备份交换机的方式来实现,当原交换机无法正确路由到队列时,则会进入备份交换机,再由备份交换机路由到正确队列(要注意区分备份交换机和死信交换机的区别)。

Java API 实现备份交换机

下面就是一个实现备份交换机的例子,因为这里备份交换机定义的是 Topic 类型,所有路由必须满足定义好的路由,实际使用中一般会设置会 Fanout,因为无法预测错误的路由到底是多少:

//声明交换机且指定备份交换机 Map<String,Object> argMap = new HashMap<String,Object>(); argMap.put("alternate-exchange","TEST_ALTERNATE_EXCHANGE"); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT,false,false,argMap); //队列和交换机进行绑定 channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTEING_KEY); //声明备份交换机和备份队列,并绑定(为了防止收不到消息,备份交换机一般建议设置为Fanout类型) channel.queueDeclare("BAK_QUEUE", false, false, false, null); channel.exchangeDeclare("TEST_ALTERNATE_EXCHANGE", BuiltinExchangeType.TOPIC); channel.queueBind("BAK_QUEUE","TEST_ALTERNATE_EXCHANGE","ERROR.#"); String msg = "I'm a bak exchange msg"; channel.basicPublish(EXCHANGE_NAME,"ERROR.ROUTING_KEY",null,msg.getBytes()); Spring Boot 实现备份交换机

Spring Boot 实现备份交换机原理和 Java API 实现相同:

1、首先在 RabbiConfig 中新增两个交换机,一个是原始交换机,一个是备份交换机,同时新增一个备份队列和备份交换机进行绑定,这里的备份交换机是一个 Fanout 类型,注意因为这里主要是演示备份交换机,所以这里的原始交换机没有和任何队列绑定,也就无法路由到队列,从而使得消息进入备份交换机:

//用于测试备份交换机的原直连交换机 @Bean("bakDirectEchange") public DirectExchange bakDirectEchange(){ Map argMap = new HashMap<>(); argMap.put("alternate-exchange", "LONGLY_WOLF_BAK_FANOUT_EXCHANGE"); return new DirectExchange("LONGLY_WOLF_BAK_ORIGIN_DIRECT_EXCHANGE",false,false,argMap); } //备份广播交换机 @Bean("bakFanoutExchange") public FanoutExchange bakFanoutExchange(){ return new FanoutExchange("LONGLY_WOLF_BAK_FANOUT_EXCHANGE"); } //备份队列 @Bean("bakQueue") public Queue bakQueue(){ return new Queue("LONELY_WOLF_BAK_QUEUE"); } //备份交换机和备份队列进行绑定 @Bean public Binding BindExchange(@Qualifier("bakQueue") Queue queue, @Qualifier("bakFanoutExchange") FanoutExchange fanoutExchange){ return BindingBuilder.bind(queue).to(fanoutExchange); }

2、在消费者类 ExchangeConsumer 中监听备份队列:

/** * 监听备份消息队列 */ @RabbitHandler @RabbitListener(queues = "LONELY_WOLF_BAK_QUEUE") public void bakQueueConsumer(Message message){ System.out.println("备份队列收到消息:" + new String(message.getBody())); }

3、最后在生产者类 RabbitExchangeController 中新增一个消息发送的方法进行消息发送:

@GetMapping(value="/send/bak") public String sendBak(String routingKey,@RequestParam(value = "msg",defaultValue = "no bak message") String msg){ rabbitTemplate.convertAndSend("LONGLY_WOLF_BAK_ORIGIN_DIRECT_EXCHANGE",routingKey,msg); return "succ"; }

调用之后可以看到,备份队列会收到消息,从而说明了消息在无法路由到队列时会进入到备份队列。

队列存储消息后发生异常怎么办

在保证了前面两个阶段的可靠性之后,消息终于安全抵达了队列,那么这时候就绝对安全了吗?

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpsxyy.html