Java SpringBoot集成RabbitMQ实战和总结

交换器、队列、绑定的声明
关于消息序列化
同一个队列多消费类型
注解将消息和消息头注入消费者方法
关于消费者确认
关于发送者确认模式
消费消息、死信队列和RetryTemplate
RPC模式的消息(不常用)
关于消费模型
关于RabbitMq客户端的线程模型

在公司里一直在用RabbitMQ,由于api已经封装的很简单,关于RabbitMQ本身还有封装的实现没有了解,最近在看RabbitMQ实战这本书,结合网上的一些例子和spring文档,实现了RabbitMQ和spring的集成,对着自己平时的疑惑做了一些总结。

交换器、队列、绑定的声明
SpringAMQP项目对RabbitMQ做了很好的封装,可以很方便的手动声明队列,交换器,绑定。如下:

/**
 * 队列
 * @return
 */
@Bean
@Qualifier(RabbitMQConstant.PROGRAMMATICALLY_QUEUE)
Queue queue() {
    return new Queue(RabbitMQConstant.PROGRAMMATICALLY_QUEUE, false, false, true);
}

/**
 * 交换器
 * @return
 */
@Bean
@Qualifier(RabbitMQConstant.PROGRAMMATICALLY_EXCHANGE)
TopicExchange exchange() {
    return new TopicExchange(RabbitMQConstant.PROGRAMMATICALLY_EXCHANGE, false, true);
}
/**
 * 声明绑定关系
 * @return
 */
@Bean
Binding binding(@Qualifier(RabbitMQConstant.PROGRAMMATICALLY_EXCHANGE) TopicExchange exchange,
                @Qualifier(RabbitMQConstant.PROGRAMMATICALLY_QUEUE) Queue queue) {
    return BindingBuilder.bind(queue).to(exchange).with(RabbitMQConstant.PROGRAMMATICALLY_KEY);
}

/**
 * 声明简单的消费者,接收到的都是原始的{@link Message}
 *
 * @param connectionFactory
 *
 * @return
 */
@Bean
SimpleMessageListenerContainer simpleContainer(ConnectionFactory connectionFactory) {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.setMessageListener(message -> log.info("simple receiver,message:{}", message));
    container.setQueueNames(RabbitMQConstant.PROGRAMMATICALLY_QUEUE);
    return container;
}

消费者和生产者都可以声明,交换器这种一般经常创建,可以手动创建。需要注意对于没有路由到队列的消息会被丢弃。

如果是Spring的话还需要声明连接:

@Bean
ConnectionFactory connectionFactory(@Value("${spring.rabbitmq.port}") int port,
                                    @Value("${spring.rabbitmq.host}") String host,
                                    @Value("${spring.rabbitmq.username}") String userName,
                                    @Value("${spring.rabbitmq.password}") String password,
                                    @Value("${spring.rabbitmq.publisher-confirms}") boolean isConfirm,
                                    @Value("${spring.rabbitmq.virtual-host}") String vhost) {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
    connectionFactory.setHost(host);
    connectionFactory.setVirtualHost(vhost);
    connectionFactory.setPort(port);
    connectionFactory.setUsername(userName);
    connectionFactory.setPassword(password);
    connectionFactory.setPublisherConfirms(isConfirm);
}

在配置类使用@EnableRabbit的情况下,也可以基于注解进行声明,在Bean的方法上加上@RabbitListener,如下:

/**
 * 可以直接通过注解声明交换器、绑定、队列。但是如果声明的和rabbitMq中已经存在的不一致的话
 * 会报错便于测试,我这里都是不使用持久化,没有消费者之后自动删除
 * {@link RabbitListener}是可以重复的。并且声明队列绑定的key也可以有多个.
 *
 * @param headers
 * @param msg
 */
@RabbitListener(
    bindings = @QueueBinding(
        exchange = @Exchange(value = RabbitMQConstant.DEFAULT_EXCHANGE, type = ExchangeTypes.TOPIC,
            durable = RabbitMQConstant.FALSE_CONSTANT, autoDelete = RabbitMQConstant.true_CONSTANT),
        value = @Queue(value = RabbitMQConstant.DEFAULT_QUEUE, durable = RabbitMQConstant.FALSE_CONSTANT,
            autoDelete = RabbitMQConstant.true_CONSTANT),
        key = DKEY
    ),
    //手动指明消费者的监听容器,默认Spring为自动生成一个SimpleMessageListenerContainer
    containerFactory = "container",
    //指定消费者的线程数量,一个线程会打开一个Channel,一个队列上的消息只会被消费一次(不考虑消息重新入队列的情况),下面的表示至少开启5个线程,最多10个。线程的数目需要根据你的任务来决定,如果是计算密集型,线程的数目就应该少一些
    concurrency = "5-10"
)
public void process(@Headers Map<String, Object> headers, @Payload ExampleEvent msg) {
    log.info("basic consumer receive message:{headers = [" + headers + "], msg = [" + msg + "]}");
}

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/c569ff20be4daff390a9dcda8df89c4b.html