交换器、队列、绑定的声明
关于消息序列化
同一个队列多消费类型
注解将消息和消息头注入消费者方法
关于消费者确认
关于发送者确认模式
消费消息、死信队列和RetryTemplate
RPC模式的消息(不常用)
关于消费模型
关于RabbitMq客户端的线程模型
在公司里一直在用RabbitMQ,由于api已经封装的很简单,关于RabbitMQ本身还有封装的实现没有了解,最近在看RabbitMQ实战这本书,结合网上的一些例子和spring文档,实现了RabbitMQ和spring的集成,对着自己平时的疑惑做了一些总结。
交换器、队列、绑定的声明
SpringAMQP项目对RabbitMQ做了很好的封装,可以很方便的手动声明队列,交换器,绑定。如下:
/**
* 队列
* @return
*/
@Bean
@Qualifier(RabbitMQConstant.PROGRAMMATICALLY_QUEUE)
Queue queue() {
return new Queue(RabbitMQConstant.PROGRAMMATICALLY_QUEUE, false, false, true);
}
/**
* 交换器
* @return
*/
@Bean
@Qualifier(RabbitMQConstant.PROGRAMMATICALLY_EXCHANGE)
TopicExchange exchange() {
return new TopicExchange(RabbitMQConstant.PROGRAMMATICALLY_EXCHANGE, false, true);
}
/**
* 声明绑定关系
* @return
*/
@Bean
Binding binding(@Qualifier(RabbitMQConstant.PROGRAMMATICALLY_EXCHANGE) TopicExchange exchange,
@Qualifier(RabbitMQConstant.PROGRAMMATICALLY_QUEUE) Queue queue) {
return BindingBuilder.bind(queue).to(exchange).with(RabbitMQConstant.PROGRAMMATICALLY_KEY);
}
/**
* 声明简单的消费者,接收到的都是原始的{@link Message}
*
* @param connectionFactory
*
* @return
*/
@Bean
SimpleMessageListenerContainer simpleContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setMessageListener(message -> log.info("simple receiver,message:{}", message));
container.setQueueNames(RabbitMQConstant.PROGRAMMATICALLY_QUEUE);
return container;
}
消费者和生产者都可以声明,交换器这种一般经常创建,可以手动创建。需要注意对于没有路由到队列的消息会被丢弃。
如果是Spring的话还需要声明连接:
@Bean
ConnectionFactory connectionFactory(@Value("${spring.rabbitmq.port}") int port,
@Value("${spring.rabbitmq.host}") String host,
@Value("${spring.rabbitmq.username}") String userName,
@Value("${spring.rabbitmq.password}") String password,
@Value("${spring.rabbitmq.publisher-confirms}") boolean isConfirm,
@Value("${spring.rabbitmq.virtual-host}") String vhost) {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setVirtualHost(vhost);
connectionFactory.setPort(port);
connectionFactory.setUsername(userName);
connectionFactory.setPassword(password);
connectionFactory.setPublisherConfirms(isConfirm);
}
在配置类使用@EnableRabbit的情况下,也可以基于注解进行声明,在Bean的方法上加上@RabbitListener,如下:
/**
* 可以直接通过注解声明交换器、绑定、队列。但是如果声明的和rabbitMq中已经存在的不一致的话
* 会报错便于测试,我这里都是不使用持久化,没有消费者之后自动删除
* {@link RabbitListener}是可以重复的。并且声明队列绑定的key也可以有多个.
*
* @param headers
* @param msg
*/
@RabbitListener(
bindings = @QueueBinding(
exchange = @Exchange(value = RabbitMQConstant.DEFAULT_EXCHANGE, type = ExchangeTypes.TOPIC,
durable = RabbitMQConstant.FALSE_CONSTANT, autoDelete = RabbitMQConstant.true_CONSTANT),
value = @Queue(value = RabbitMQConstant.DEFAULT_QUEUE, durable = RabbitMQConstant.FALSE_CONSTANT,
autoDelete = RabbitMQConstant.true_CONSTANT),
key = DKEY
),
//手动指明消费者的监听容器,默认Spring为自动生成一个SimpleMessageListenerContainer
containerFactory = "container",
//指定消费者的线程数量,一个线程会打开一个Channel,一个队列上的消息只会被消费一次(不考虑消息重新入队列的情况),下面的表示至少开启5个线程,最多10个。线程的数目需要根据你的任务来决定,如果是计算密集型,线程的数目就应该少一些
concurrency = "5-10"
)
public void process(@Headers Map<String, Object> headers, @Payload ExampleEvent msg) {
log.info("basic consumer receive message:{headers = [" + headers + "], msg = [" + msg + "]}");
}