/**
* {@link Queue#ignoreDeclarationExceptions}声明队列会忽略错误不声明队列,这个消费者仍然是可用的
*
* @param headers
* @param msg
*/
@RabbitListener(queuesToDeclare = @Queue(value = RabbitMQConstant.DEFAULT_QUEUE, ignoreDeclarationExceptions = RabbitMQConstant.true_CONSTANT))
public void process2(@Headers Map<String, Object> headers, @Payload ExampleEvent msg) {
log.info("basic2 consumer receive message:{headers = [" + headers + "], msg = [" + msg + "]}");
}
关于消息序列化
这个比较简单,默认采用了Java序列化,我们一般使用的Json格式,所以配置了Jackson,根据自己的情况来,直接贴代码:
@Bean
MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
同一个队列多消费类型
如果是同一个队列多个消费类型那么就需要针对每种类型提供一个消费方法,否则找不到匹配的方法会报错,如下:
@Componentbr/>@Slf4j
@RabbitListener(
bindings = @QueueBinding(
exchange = @Exchange(value = RabbitMQConstant.MULTIPART_HANDLE_EXCHANGE, type = ExchangeTypes.TOPIC,
durable = RabbitMQConstant.FALSE_CONSTANT, autoDelete = RabbitMQConstant.true_CONSTANT),
value = @Queue(value = RabbitMQConstant.MULTIPART_HANDLE_QUEUE, durable = RabbitMQConstant.FALSE_CONSTANT,
autoDelete = RabbitMQConstant.true_CONSTANT),
key = RabbitMQConstant.MULTIPART_HANDLE_KEYbr/>)
)
@Profile(SpringConstant.MULTIPART_PROFILE)
public class MultipartConsumer {
/**
* RabbitHandler用于有多个方法时但是参数类型不能一样,否则会报错
*
* @param msg
*/
@RabbitHandler
public void process(ExampleEvent msg) {
log.info("param:{msg = [" + msg + "]} info:");
}
@RabbitHandler
public void processMessage2(ExampleEvent2 msg) {
log.info("param:{msg2 = [" + msg + "]} info:");
}
/**
* 下面的多个消费者,消费的类型不一样没事,不会被调用,但是如果缺了相应消息的处理Handler则会报错
*
* @param msg
*/
@RabbitHandler
public void processMessage3(ExampleEvent3 msg) {
log.info("param:{msg3 = [" + msg + "]} info:");
}
}
注解将消息和消息头注入消费者方法
在上面也看到了@Payload等注解用于注入消息。这些注解有:
@Header 注入消息头的单个属性
@Payload 注入消息体到一个JavaBean中
@Headers 注入所有消息头到一个Map中
这里有一点主要注意,如果是com.rabbitmq.client.Channel,org.springframework.amqp.core.Message和org.springframework.messaging.Message这些类型,可以不加注解,直接可以注入。
如果不是这些类型,那么不加注解的参数将会被当做消息体。不能多于一个消息体。如下方法ExampleEvent就是默认的消息体:
public void process2(@Headers Map<String, Object> headers,ExampleEvent msg);
关于消费者确认
RabbitMq消费者可以选择手动和自动确认两种模式,如果是自动,消息已到达队列,RabbitMq对无脑的将消息抛给消费者,一旦发送成功,他会认为消费者已经成功接收,在RabbitMq内部就把消息给删除了。另外一种就是手动模式,手动模式需要消费者对每条消息进行确认(也可以批量确认),RabbitMq发送完消息之后,会进入到一个待确认(unacked)的队列,如下图红框部分:
如果消费者发送了ack,RabbitMq将会把这条消息从待确认中删除。如果是nack并且指明不要重新入队列,那么该消息也会删除。但是如果是nack且指明了重新入队列那么这条消息将会入队列,然后重新发送给消费者,被重新投递的消息消息头amqp_redelivered属性会被设置成true,客户端可以依靠这点来判断消息是否被确认,可以好好利用这一点,如果每次都重新回队列会导致同一消息不停的被发送和拒绝。消费者在确认消息之前和RabbitMq失去了连接那么消息也会被重新投递。所以手动确认模式很大程度上提高可靠性。自动模式的消息可以提高吞吐量。
spring手动确认消息需要将SimpleRabbitListenerContainerFactory设置为手动模式:
simpleRabbitListenerContainerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
手动确认的消费者代码如下: