Java SpringBoot集成RabbitMQ实战和总结(4)

这里有一点我当时纠结了很久,我一直以为发送者确认模式的回调是客户端的ack触发的,这里是大大的误解!发送者确认模式和消费者没有一点关系,消费者确认也和发送者没有一点关系,两者都是在和RabbitMq打交道,发送者不会管消费者有没有收到,只要消息到了RabbitMq并且已经持久化便会通知生产者,这个ack是RabbitMq本身发出的,和消费者无关

发送者确认模式需要将Channel设置成Confirm模式,这样才会收到通知。Spring中需要将连接设置成Confirm模式:

connectionFactory.setPublisherConfirms(isConfirm);
然后在RabbitTemplate中设置确认的回调,correlationData是消息的id,如下(只是简单打印下):

// 设置RabbitTemplate每次发送消息都会回调这个方法
    rabbitTemplate.setConfirmCallback((correlationData, ack, cause)
        -> log.info("confirm callback id:{},ack:{},cause:{}", correlationData, ack, cause));

发送时需要给出唯一的标识(CorrelationData):

rabbitTemplateWithConfirm.convertAndSend(RabbitMQConstant.DEFAULT_EXCHANGE, RabbitMQConstant.DEFAULT_KEY,
            new ExampleEvent(i, "confirm message id:" + i),
            new CorrelationData(Integer.toString(i)));

还有一个参数需要说下:mandatory。这个参数为true表示如果发送消息到了RabbitMq,没有对应该消息的队列。那么会将消息返回给生产者,此时仍然会发送ack确认消息。

设置RabbitTemplate的回调如下:

rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey)
-> log.info("return callback message:{},code:{},text:{}", message, replyCode, replyText));
另外如果是RabbitMq内部的错误,不会调用该方法。所以如果消息特别重要,对于未确认的消息,生产者应该在内存用保存着,在确认时候根据返回的id删除该消息。如果是nack可以将该消息记录专门的日志或者转发到相应处理的逻辑进行后续补偿。RabbitTemplate也可以配置RetryTemplate,发送失败时直接进行重试,具体还是要结合业务。

最后关于发送者确认需要提的是spring,因为spring默认的Bean是单例的,所以针对不同的确认方案(其实有不同的确认方案是比较合理的,很多消息不需要确认,有些需要确认)需要配置不同的bean.

消费消息、死信队列和RetryTemplate
上面也提到了如果消费者抛出异常时默认的处理逻辑。另外我们还可以给消费者配置RetryTemplate,如果是采用SpringBoot的话,可以在application.yml配置中配置如下:

spring:
rabbitmq:
listener:
retry:

重试次数

max-attempts: 3
    #  开启重试机制
      enabled: true

如上,如果消费者失败的话会进行重试,默认是3次。注意这里的重试机制RabbitMq是为感知的!到达3次之后会抛出异常调用MessageRecoverer。默认的实现为RejectAndDontRequeueRecoverer,也就是打印异常,发送nack,不会重新入队列。
我想既然配置了重试机制消息肯定是很重要的,消息肯定不能丢,仅仅是日志可能会因为日志滚动丢失而且信息不明显,所以我们要讲消息保存下来。可以有如下这些方案:

使用RepublishMessageRecoverer这个MessageRecoverer会发送发送消息到指定队列
给队列绑定死信队列,因为默认的RepublishMessageRecoverer会发送nack并且requeue为false。这样抛出一场是这种方式和上面的结果一样都是转发到了另外一个队列。详见DeadLetterConsumer
注册自己实现的MessageRecoverer
给MessageListenerContainer设置RecoveryCallback
对于方法手动捕获异常,进行处理
我比较推荐前两种。这里说下死信队列,死信队列其实就是普通的队列,只不过一个队列声明的时候指定的属性,会将死信转发到该交换器中。声明死信队列方法如下:

@RabbitListener(
    bindings = @QueueBinding(
        exchange = @Exchange(value = RabbitMQConstant.DEFAULT_EXCHANGE, type = ExchangeTypes.TOPIC,
            durable = RabbitMQConstant.FALSE_CONSTANT, autoDelete = RabbitMQConstant.true_CONSTANT),
        value = @Queue(value = RabbitMQConstant.DEFAULT_QUEUE, durable = RabbitMQConstant.FALSE_CONSTANT,
            autoDelete = RabbitMQConstant.true_CONSTANT, arguments = {
            @Argument(name = RabbitMQConstant.DEAD_LETTER_EXCHANGE, value = RabbitMQConstant.DEAD_EXCHANGE),
            @Argument(name = RabbitMQConstant.DEAD_LETTER_KEY, value = RabbitMQConstant.DEAD_KEY)
        }),
        key = RabbitMQConstant.DEFAULT_KEY
    ))

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/c569ff20be4daff390a9dcda8df89c4b.html