SpringBoot集成rabbitmq(二)

         在使用rabbitmq时,我们可以通过消息持久化来解决服务器因异常崩溃而造成的消息丢失。除此之外,我们还会遇到一个问题,当消息生产者发消息发送出去后,消息到底有没有正确到达服务器呢?如果不进行特殊配置,默认情况下发送的消息是不会给生产者返回任何响应的,也就是默认情况下生产者并不知道消息是否正常到达了服务器。对于数据必达的需求,你肯定对消息的来龙去脉都有个了接,这种情况下就需要用到rabbitmq消息确认。

 

消息确认

      rabbitmq消息确认分为生产者确认和消费者确认。

      生产者消费确认提供了两种机制:

      通过事务机制实现

      通过confirm机制实现

     事务机制则用到channel.txSelect、channel.txCommit、channel.txRollback。可以参考下面AMQP协议流转过程(参考Rabbitmq实战指南)

SpringBoot集成rabbitmq(二)

     事务机制在一条消息发送之后会阻塞发送端,以等待rabbitmq回应,之后才继续发送下一条消息。所以相对来说事务机制的性能要差一些。事务机制会降低rabbitmq的吞吐量,所以又引入了另一种轻量级的方式:confirm机制。

     生产者通过调用channel.confirmSelect将信道设置为confirm模式,之后Rabbitmq会返回Confirm.Select-Ok命令表示同意生产者将当前信道设置为confirm模式。所有被发送的后续消息都被ack或nack一次。类似如下代码:

     channel.confirmSelect()

     channel.basicPublish("exchange","routingkey",null,"test".getBytes())

     confirm机制流转过程参考下图(参考Rabbitmq实战指南)

SpringBoot集成rabbitmq(二)

   消费者确认

        消费者在订阅消息队列时指定autoAck参数。当参数设置为false时rabbitmq会等待消费者显式回复确认信号才会从内存或者磁盘种删除这条消息。参数默认为true。当autoAck设置为false时,对于rabbitmq服务器而言,队列中的消息分成了两部分:一部分是等待投递给消费者的消息、一部分是已经投递给消费者的消息但是还没有收到确认信号的消息。可通过RabbitMQ Web平台查看队列中Ready和UnAck对应的数量。

       消费者消息确认涉及到3个方法:channel.basicAck、channel.basicNack、channel.basicReject

             

SpringBoot集成rabbitmq下实现消息确认

       springboot集成rabbitmq实现消息确认主要涉及两个回调方法(ReturnCallback、ConfirmCallback)。这里消费者部分我用两种方式来实现。一种是基于SimpleMessageListenerContainer。 另一种就是用RabbitListener注解实现。

1、application.yml

spring: rabbitmq: host: 192.168.80.128 port: 5672 username: admin password: admin virtual-host: / publisher-confirms: true publisher-returns: true listener: simple: acknowledge-mode: manual concurrency: 1 max-concurrency: 10 retry: enabled: true

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpxgsx.html