@SneakyThrows
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(value = RabbitMQConstant.CONFIRM_EXCHANGE, type = ExchangeTypes.TOPIC,
durable = RabbitMQConstant.FALSE_CONSTANT, autoDelete = RabbitMQConstant.true_CONSTANT),
value = @Queue(value = RabbitMQConstant.CONFIRM_QUEUE, durable = RabbitMQConstant.FALSE_CONSTANT,
autoDelete = RabbitMQConstant.true_CONSTANT),
key = RabbitMQConstant.CONFIRM_KEY),
containerFactory = "containerWithConfirm")
public void process(ExampleEvent msg, Channel channel, @Header(name = "amqp_deliveryTag") long deliveryTag,
@Header("amqp_redelivered") boolean redelivered, @Headers Map<String, String> head) {
try {
log.info("ConsumerWithConfirm receive message:{},header:{}", msg, head);
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
log.error("consume confirm error!", e);
//这一步千万不要忘记,不会会导致消息未确认,消息到达连接的qos之后便不能再接收新消息
//一般重试肯定的有次数,这里简单的根据是否已经重发过来来决定重发。第二个参数表示是否重新分发
channel.basicReject(deliveryTag, !redelivered);
//这个方法我知道的是比上面多一个批量确认的参数
// channel.basicNack(deliveryTag, false,!redelivered);
}
}
关于spring的AcknowledgeMode需要说明,他一共有三种模式:NONE,MANUAL,AUTO,默认是AUTO模式。这比RabbitMq原生多了一种。这一点很容易混淆,这里的NONE对应其实就是RabbitMq的自动确认,MANUAL是手动。而AUTO其实也是手动模式,只不过是Spring的一层封装,他根据你方法执行的结果自动帮你发送ack和nack。如果方法未抛出异常,则发送ack。如果方法抛出异常,并且不是AmqpRejectAndDontRequeueException则发送nack,并且重新入队列。如果抛出异常时AmqpRejectAndDontRequeueException则发送nack不会重新入队列。我有一个例子专门测试NONE,见CunsumerWithNoneTest。
还有一点需要注意的是消费者有一个参数prefetch,它表示的是一个Channel(也就是SimpleMessageListenerContainer的一个线程)预取的消息数量,这个参数只会在手动确认的消费者才生效。可以客户端利用这个参数来提高性能和做流量控制。如果prefetch设置的是10,当这个Channel上unacked的消息数量到达10条时,RabbitMq便不会在向你发送消息,客户端如果处理的慢,便可以延迟确认在方法消息的接收。至于提高性能就非常容易理解,因为这个是批量获取消息,如果客户端处理的很快便不用一个一个去等着去新的消息。SpringAMQP2.0开始默认是250,这个参数应该已经足够了。注意之前的版本默认值是1所以有必要重新设置一下值。当然这个值也不能设置的太大,RabbitMq是通过round robin这个策略来做负载均衡的,如果设置的太大会导致消息不多时一下子积压到一台消费者,不能很好的均衡负载。另外如果消息数据量很大也应该适当减小这个值,这个值过大会导致客户端内存占用问题。如果你用到了事务的话也需要考虑这个值的影响,因为事务的用处不大,所以我也没做过多的深究。
关于发送者确认模式
考虑这样一个场景:你发送了一个消息给RabbitMq,RabbitMq接收了但是存入磁盘之前服务器就挂了,消息也就丢了。为了保证消息的投递有两种解决方案,最保险的就是事务(和DB的事务没有太大的可比性), 但是因为事务会极大的降低性能,会导致生产者和RabbitMq之间产生同步(等待确认),这也违背了我们使用RabbitMq的初衷。所以一般很少采用,这就引入第二种方案:发送者确认模式。
发送者确认模式是指发送方发送的消息都带有一个id,RabbitMq会将消息持久化到磁盘之后通知生产者消息已经成功投递,如果因为RabbitMq内部的错误会发送ack。注意这里的发送者和RabbitMq之间是异步的,所以相较于事务机制性能大大提高。其实很多操作都是不能保证绝对的百分之一百的成功,哪怕采用了事务也是如此,可靠性和性能很多时候需要做一些取舍,想很多互联网公司吹嘘的5个9,6个9也是一样的道理。如果不是重要的消息性能计数器,完全可以不采用发送者确认模式。