@Override
public Thread newThread(Runnable r) {
return new Thread(r, MessageFormat.format("amqp-client-{0}", id.getAndIncrement()));
}
});
listener的线程设置如下:
simpleRabbitListenerContainerFactory.setTaskExecutor(new SimpleAsyncTaskExecutor"amqp-consumer-"));
注意:SimpleAsyncTaskExecutor每次执行一个任务都会新建一个线程,对于生命周期很短的任务不要使用这个线程池(如client线程池的任务), 这里的消费者线程生命周期直到SimpleMessageListenerContainer停止所以没有适合这个场景
修改过之后的线程如下:
消息投递过程如下:
在AMQConnection中开启连接线程,该线程用于处理和RabbitMq的通信:
public void startMainLoop() {
MainLoop loop = new MainLoop();
final String name = "AMQP Connection " + getHostAddress() + ":" + getPort();
mainLoopThread = Environment.newThread(threadFactory, loop, name);
mainLoopThread.start();
}
AMQConnection.heartbeatExecutor是心跳线程。
AMQConnection.consumerWorkServiceExecutor则是用来处理事件的线程池,AMQConnection线程收到消息投递到这里。
分发逻辑详见com.rabbitmq.client.impl.ChannelN#processAsync->com.rabbitmq.client.impl.ConsumerDispatcher#handleDelivery->投递到线程池.
线程池中继续将消息投递到org.springframework.amqp.rabbit.listener.BlockingQueueConsumer#queue中
consumer线程进行最终消息
上面的是默认的消费者监听器。SpringAMQP 2.0引入了一个新的监听器实现DirectMessageListenerContainer。这个实现最大的变化在于消费者的处理逻辑不是在自己的线程池中执行而是直接在client线程池中处理,这样最明显的是省去了线程的上下文切换的开销,而且设计上也变得更为直观。所以如果采用这个监听器需要覆盖默认的线程池加大Connection的线程池。采用这个监听器只需要设置@RabbitListener的containerFactory属性。声明方法如下:
@Bean
DirectRabbitListenerContainerFactory directRabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
final DirectRabbitListenerContainerFactory directRabbitListenerContainerFactory = new DirectRabbitListenerContainerFactory();
directRabbitListenerContainerFactory.setConsumersPerQueue(Runtime.getRuntime().availableProcessors());
directRabbitListenerContainerFactory.setConnectionFactory(connectionFactory);
directRabbitListenerContainerFactory.setMessageConverter(new Jackson2JsonMessageConverter());
directRabbitListenerContainerFactory.setConsumersPerQueue(10);
return directRabbitListenerContainerFactory;
}
这时的消息流转图如下:
还有一些关于监听器的例子和Springboot配置我放在了源码里,这里不再讲述。
Linux公社的RSS地址:https://www.linuxidc.com/rssFeed.aspx