没错,这就是我们想要达到的效果!基本可以宣布Direct模式验证成功。服务端生成exchange,客户端去生成队列绑定的方式在direct模式下完全可行。为了保险起见,再验证一下生成多个消费者绑定到同一个队列是否可行。
DirectConsumerTwo代码如下:
@Component @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "Direct_test_queue_2", durable = "true"), exchange = @Exchange(value = "directTest"), key = "direct_routing_key" )) public class DirectConsumerTwo { @RabbitHandler private void onMessage(String message){ System.out.println("监听队列Direct_test_queue_2接到消息" + message); } }DirectConsumerThree代码如下:
@Component @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "Direct_test_queue_3", durable = "true"), exchange = @Exchange(value = "directTest"), key = "direct_routing_key" )) public class DirectConsumerThree { @RabbitHandler private void onMessage(String message){ System.out.println("监听队列Direct_test_queue_3接到消息" + message); } }启动测试类,我们去看两个地方:
rabbitmq是否创建了客户端绑定的三个队列Direct_test_queue_1、Direct_test_queue_2、Direct_test_queue_3
消费者应该各自收到2条消息(Test中发送了两条,参看上面 RabbitmqApplicationTests 的代码)。那3个队列,控制台中应该打印了6条消息。
hohohoho!创建成功,并且绑定关系我看了也全都正确。我们去看控制台
6条!没有任何毛病,至此,可以宣布Direct模式下,完全支持我们最初的想法:服务端生成exchange,客户端去生成队列绑定的方式在direct模式下完全可行。
fanout模式验证接下来我们验证一下fanout的方式,基本操作流程和Direct模式一致。代码的结构也差不多:
针对生产者,需要RabbitmqConfig,直接在Direct模式下的rabbitmqConfig里直接添加Fanout的交换器配置
针对生产者,需要FanoutRabbitSender,用来实现Fanout模式的消息发送
针对消费者,需要FanoutConsumerOne,来测试第一个队列Fanout_test_queue_1生成和消息接收
针对消费者,需要FanoutConsumerTwo,来测试第二个队列Fanout_test_queue_2生成和消息接收
针对消费者,需要FanoutConsumerThree,来测试第三个队列Fanout_test_queue_3生成和消息接收
测试类RabbitmqApplicationTests也直接复用Direact模式下测试的类
我就不多BB,直接上代码了。
RabbitmqConfig代码如下
@Configuration public class RabbitmqConfig { @Bean DirectExchange directExchange(){ return new DirectExchange("directTest", true, false); } @Bean FanoutExchange fanoutExchange(){ return new FanoutExchange("fanoutTest", true, false); } }FanoutRabbitSender的代码如下,此处和direct模式的区别是Fanout中没有routingkey,所以代码里也没定义routingkey:
@Component public class FanoutRabbitSender{ @Autowired private RabbitTemplate rabbitTemplate; private final String EXCHANGE_NAME = "fanoutTest"; public void send(Object message) { rabbitTemplate.convertAndSend(EXCHANGE_NAME, null, message); } }我们到这里先启动程序试试,看看fanoutTest的交换器在没有绑定队列的情况下是否生成了。
棒棒棒!和我们想的一样,那接下来去写完所有的消费者,这里和Direct模式最重要的区别是@Exchange中必须要指定type为fanout。direct模式的代码里没指定是因为@Exchange的type默认值就是direct。我直接上代码了:
/** * 监听器主动去声明queue=fanout_test_queue_1,并绑定到fanoutTest交换器 */ @Component @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "fanout_test_queue_1", durable = "true"), exchange = @Exchange(value = "fanoutTest", type = ExchangeTypes.FANOUT) )) public class FanoutConsumerOne { @RabbitHandler private void onMessage(String message){ System.out.println("监听队列fanout_test_queue_1接到消息" + message); } } @Component @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "fanout_test_queue_2", durable = "true"), exchange = @Exchange(value = "fanoutTest", type = ExchangeTypes.FANOUT) )) public class FanoutConsumerTwo { @RabbitHandler private void onMessage(String message){ System.out.println("监听队列fanout_test_queue_2接到消息" + message); } } @Component @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "fanout_test_queue_3", durable = "true"), exchange = @Exchange(value = "fanoutTest", type = ExchangeTypes.FANOUT) )) public class FanoutConsumerThree { @RabbitHandler private void onMessage(String message){ System.out.println("监听队列fanout_test_queue_3接到消息" + message); } }