Java SpringBoot集成RabbitMQ实战和总结(5)

其实也就只是在声明的时候多加了两个参数x-dead-letter-exchange和x-dead-letter-routing-key。这里一开始踩了一个坑,因为@QueueBinding注解中也有arguments属性,我一开始将参数声明到@QueueBinding中,导致一直没绑定成功。如果绑定成功可以在控制台看到queue的Featrues有DLX(死信队列交换器)和DLK(死信队列绑定)。如下:

关于消息进入死信的规则:

消息被拒绝(basic.reject/basic.nack)并且requeue=false
消息TTL过期
队列达到最大长度
我们用到的就是第一种。

RPC模式的消息(不常用)
本来生产者和消费者是没有耦合的,但是可以通过一些属性产生耦合。在早期版本中,如果一个生产者想要收到消费者的回复,实现方案是生产者在消息头中加入reply-to属性也就是队列(一般是私有,排他,用完即销毁)的名字,然后在这个队列上进行监听,消费者将回复发送到这个队列中。RabbitMq3.3之后有了改进,就是不用没有都去创建一个临时队列,这样很耗费性能,可以采用drect-to模式,省去了每次创建队列的性能损耗,但是还是要创建一次队列。现在Spring默认的就是这个模式。RabbitTemplate中有一系列的sendAndReceiveXX方法。默认等待5秒,超时返回null。用
法和不带返回的差不多。

消费者的方法通过返回值直接返回消息(下面的方法是有返回值的):

public String receive(@Headers Map<String, Object> headers, @Payload ExampleEvent msg) {
log.info("reply to consumer param:{headers = [" + headers + "], msg = [" + msg + "]} info:");
return REPLY;
}
这里的提一下最后一个注解@SendTo,用在消费方法上,指明返回值的目的地,默认不用的话就是返回给发送者,可以通过这个注解改变这种行为。如下代码:

@RabbitListener(
bindings = @QueueBinding(
exchange = @Exchange(value = RabbitMQConstant.REPLY_EXCHANGE, type = ExchangeTypes.TOPIC,
durable = RabbitMQConstant.FALSE_CONSTANT, autoDelete = RabbitMQConstant.true_CONSTANT),
value = @Queue(value = RabbitMQConstant.REPLY_QUEUE, durable = RabbitMQConstant.FALSE_CONSTANT,
autoDelete = RabbitMQConstant.true_CONSTANT),
key = RabbitMQConstant.REPLY_KEYbr/>)
)
@SendTo("queue.reply.s")
public ExampleEvent log(ExampleEvent event) {
log.info("log receive message:O{}", event);
return new ExampleEvent(1, "log result");
}
上面的代码就是会将消息直接发送到默认交换器,并且以queue.reply.s作为路由键。@SendTo的格式为exchange/routingKey用法如下:

foo/bar: 指定的交换器和key
foo/: 指定的交换器,key为空
bar或者/bar: 到空交换器
/或者空:空的交换器和空的key
这里还需要提一下,因为默认所有的队列都会绑定到空交换器,并且以队列名字作为Routekey, 所以SendTo里面可以直接填写队列名字机会发送到相应的队列.如日志队列。因为RPC模式不常用,专业的东西做专业的事,就像我们一般不用Redis来做消息队列一样(虽然他也可以实现),一般公司都有特定的技术栈,肯定有更合适的RPC通信框架。当然如果要跨语言的集成这个方案也是一种不错的方案,可以继续考虑采用异步发送AsyncRabbitTemplate来降低延迟等优化方案!

关于消费模型
RabbitMQ底层的消费模型有两种Push和Pull。我在网上查阅资料的时候发现有很多教程采用了pull这种模式。RabbitMq实战和
RabbitMQ之Consumer消费模式(Push & Pull)都指出这种模式性能低,会影响消息的吞吐量,增加不必要的IO,所以除非有特殊的业务需求,不要采用这种方案。Spring的封装就是采用了push的方案。

关于RabbitMq客户端的线程模型
这里讲的是消费者的,生产者没什么好讲的。先看消息流转图:

图中椭圆表示线程,矩形是队列。消息到达AMQP的连接线程,然后分发到client线程池,随后分发到监听器。注意除了监听器的线程,其他都是在com.rabbitmq.client.impl.AMQConnection中创建的线程,我们对线程池做一些修改。连接线程名字不能修改就是AMQP Connection打头。心跳线程可以设置setConnectionThreadFactory来设置名字。如下:

connectionFactory.setConnectionThreadFactory(new ThreadFactory() {
        public final AtomicInteger id = new AtomicInteger();

@Override
        public Thread newThread(Runnable r) {
            return new Thread(r, MessageFormat.format("amqp-heart-{0}", id.getAndIncrement()));
        }
    });

client线程池见:com.rabbitmq.client.impl.ConsumerWorkService构造方法。Executors.newFixedThreadPool(DEFAULT_NUM_THREADS, threadFactory)。

final ExecutorService executorService = Executors.newFixedThreadPool(5, new ThreadFactory() {
public final AtomicInteger id = new AtomicInteger();

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/c569ff20be4daff390a9dcda8df89c4b.html