directQueueTest2:
6. 主题模式(Topics模式 Tpoic通配符) 6.1 编写路由配置类 package com.gmtgo.demo.topic; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @author 大帅 */ @Configuration public class TopicQueueConfig { /** * 声明队列名. */ private final String topic1 = "topic_queue_1"; private final String topic2 = "topic_queue_2"; /** * 声明交换机的名字. */ private final String topicExchange = "topicExchange"; /** * 声明队列. * * @return */ @Bean public Queue topicQueue1() { return new Queue(topic1); } @Bean public Queue topicQueue2() { return new Queue(topic2); } /** * 声明路由交换机. * * @return */ @Bean public TopicExchange topicExchange() { return new TopicExchange(topicExchange); } /** * 队列绑定交换机,指定routingKey,也可在可视化工具中进行绑定. * * @return */ @Bean Binding bindingTopicExchange1(Queue topicQueue1, TopicExchange exchange) { return BindingBuilder.bind(topicQueue1).to(exchange).with("topic.keyA"); } /** * 队列绑定交换机,指定routingKey,也可在可视化工具中进行绑定. * 绑定的routing key 也可以使用通配符: * *:匹配不多不少一个词 * #:匹配一个或多个词 * * @return */ @Bean Binding bindingTopicExchange2(Queue topicQueue2, TopicExchange exchange) { return BindingBuilder.bind(topicQueue2).to(exchange).with("topic.#"); } } 6.2 编写生产者 package com.gmtgo.demo.topic; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * @author 大帅 */ @Slf4j @Component public class TopicProducer { @Autowired private RabbitTemplate rabbitTemplate; public void sendMessageA() { for (int i = 0; i < 5; i++) { String message = "通配符模式--routingKey=topic.keyA消息" + i; log.info("我是生产信息:{}", message); rabbitTemplate.convertAndSend("topicExchange", "topic.keyA", message); } } public void sendMessageB() { for (int i = 0; i < 5; i++) { String message = "通配符模式--routingKey=topic.#消息" + i; log.info("我是生产信息:{}", message); rabbitTemplate.convertAndSend("topicExchange", "topic.keyD.keyE", message); } } } 6.3 编写消费者1 package com.gmtgo.demo.topic; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException; /** * @author 大帅 */ @Slf4j @Component public class TopicConsumers1 { @RabbitListener(queues = "topic_queue_1") public void readMessage(Message message, Channel channel) throws IOException { channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); log.info("我是消费信息1:{}",new String(message.getBody())); } } 6.4 编写消费者2 package com.gmtgo.demo.topic; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException; /** * @author 大帅 */ @Slf4j @Component public class TopicConsumers2 { @RabbitListener(queues = "topic_queue_2") public void readMessage(Message message, Channel channel) throws IOException { channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); log.info("我是消费信息2:{}",new String(message.getBody())); } } 6.5 编写访问类 package com.gmtgo.demo.topic; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * @author 大帅 */ @RestController @RequestMapping(value = "rabbitMq") public class TopicRabbitMqController { @Autowired private TopicProducer topicProducer; @RequestMapping(value = "topicQueueTest1") public String topicQueueTest1() { topicProducer.sendMessageA(); return "success"; } @RequestMapping(value = "topicQueueTest2") public String topicQueueTest2() { topicProducer.sendMessageB(); return "success"; } } 6.6 测试启动项目访问topicQueueTest1 , topicQueueTest2访问地址:8801/rabbitMq/topicQueueTest1
访问地址:8801/rabbitMq/topicQueueTest2
结果:
topicQueueTest1,两个消费者都能消费