批量确认模式:发送一批消息,然后同时确认。批量发送有一个缺点就是同一批消息一旦有一条消息发送失败,就会收到失败的通知,需要将这一批消息全部重发。
异步确认模式:一边发送一边确认,消息可能被单条确认也可能会被批量确认。
Java API 实现确认模式单条消息确认模式
channel.confirmSelect();//开启确认模式 channel.basicPublish("",QUEUE_NAME,null,msg.getBytes()); if (channel.waitForConfirms()){//wait.ForConfirms(long time)方法可以指定等待时间 System.out.println("消息确认发送成功"); }批量确认模式
channel.confirmSelect();//开启确认模式 //批量发送 for (int i=0;i<10;i++){ channel.basicPublish("",QUEUE_NAME,null,msg.getBytes()); } try{ channel.waitForConfirmsOrDie(); }catch (IOException e){//只要有1条消息未被确认,就会抛出异常 System.out.println("有消息发送失败了"); }异步确认模式
channel.addConfirmListener(new ConfirmListener() { /** * 已确认消息,即发送成功后回调 * @param deliveryTag -唯一标识id(即发送消息时获取到的nextPublishSeqNo) * @param multiple - 是否批量确认,当multiple=true,表示<=deliveryTag的消息被批量确认,multiple=false,表示只确认了单条 */ @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException {//成功回调 System.out.println("收到确认消息了"); //TODO 可以做一些想做的事 } /** * 发送失败消息后回调 * @param deliveryTag -唯一标识id(即发送消息时获取到的nextPublishSeqNo) * @param multiple - 是否批量确认,当multiple=true,表示<=deliveryTag的消息被批量确认,multiple=false,表示只确认了单条 */ @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException {//失败回调 if (multiple) {//批量确认,<deliveryTag的消息都发送失败 //TODO 消息重发? } else {//非批量,=deliveryTag的消息发送失败 //TODO 消息重发? } } }); channel.confirmSelect();//开启确认模式 for (int i=0;i<10;i++){//批量发送 long nextSeqNo = channel.getNextPublishSeqNo();//获取发送消息的唯一标识(从1开始递增) //TODO 可以考虑把消息id存起来 channel.basicPublish("",QUEUE_NAME,null,msg.getBytes()); } SpringBoot 实现确认模式通过配置文件 spring.rabbitmq.publisher-confirm-type 参数进行配置确认(旧版本是 spring.rabbitmq.publisher-confirms 参数)。
1、新增配置文件属性配置
spring: rabbitmq: publisher-confirm-type: correlated # none-表示禁用回调(默认) simple- 参考RabbitExchangeController#sendWithSimpleConfirm()方法2、RabbitConfig 配置文件中修改如下:
@Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){ RabbitTemplate rabbitTemplate = new RabbitTemplate(); rabbitTemplate.setConnectionFactory(connectionFactory); // rabbitTemplate.setChannelTransacted(true);//开启事务 //消息是否成功发送到Exchange rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (!ack){//消息发送失败 System.out.println("消息发送失败,原因为:" + cause); return; } //消息发送成功 System.out.println("消息发送成功"); } }); return rabbitTemplate; }这样当我们发送消息成功之后,就会收到回调。
3、当上面的参数配置修改为 simple,则需要在发送消息的时候使用 invoke 调用 waitForConfirms 或者 waitForConfirmsOrDie 方法来确认是否发送成功:
@GetMapping(value="/send/confirm") public String sendWithSimpleConfirm(String routingKey,@RequestParam(value = "msg",defaultValue = "no direct message") String msg){ //使用waitForConfirms方法确认 boolean sendFlag = rabbitTemplate.invoke(operations -> { rabbitTemplate.convertAndSend( "LONGLY_WOLF_DIRECT_EXCHANGE", "routingKey", msg ); return rabbitTemplate.waitForConfirms(5000); }); //也可以使用waitForConfirmsOrDie方法确认 boolean sendFlag2 = rabbitTemplate.invoke(operations -> { rabbitTemplate.convertAndSend( "LONGLY_WOLF_DIRECT_EXCHANGE", "routingKey", msg ); try { rabbitTemplate.waitForConfirmsOrDie(5000); }catch (Exception e){ return false; } return true; }); System.out.println(sendFlag); System.out.println(sendFlag2); return "succ"; } 消息无法从交换机路由到正确的队列怎么办