
directQueueTest2:

6. 主题模式(Topics模式 Tpoic通配符)
6.1 编写路由配置类
package com.gmtgo.demo.topic;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author 大帅
*/
@Configuration
public class TopicQueueConfig {
/**
* 声明队列名.
*/
private final String topic1 = "topic_queue_1";
private final String topic2 = "topic_queue_2";
/**
* 声明交换机的名字.
*/
private final String topicExchange = "topicExchange";
/**
* 声明队列.
*
* @return
*/
@Bean
public Queue topicQueue1() {
return new Queue(topic1);
}
@Bean
public Queue topicQueue2() {
return new Queue(topic2);
}
/**
* 声明路由交换机.
*
* @return
*/
@Bean
public TopicExchange topicExchange() {
return new TopicExchange(topicExchange);
}
/**
* 队列绑定交换机,指定routingKey,也可在可视化工具中进行绑定.
*
* @return
*/
@Bean
Binding bindingTopicExchange1(Queue topicQueue1, TopicExchange exchange) {
return BindingBuilder.bind(topicQueue1).to(exchange).with("topic.keyA");
}
/**
* 队列绑定交换机,指定routingKey,也可在可视化工具中进行绑定.
* 绑定的routing key 也可以使用通配符:
* *:匹配不多不少一个词
* #:匹配一个或多个词
*
* @return
*/
@Bean
Binding bindingTopicExchange2(Queue topicQueue2, TopicExchange exchange) {
return BindingBuilder.bind(topicQueue2).to(exchange).with("topic.#");
}
}
6.2 编写生产者
package com.gmtgo.demo.topic;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @author 大帅
*/
@Slf4j
@Component
public class TopicProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessageA() {
for (int i = 0; i < 5; i++) {
String message = "通配符模式--routingKey=topic.keyA消息" + i;
log.info("我是生产信息:{}", message);
rabbitTemplate.convertAndSend("topicExchange", "topic.keyA", message);
}
}
public void sendMessageB() {
for (int i = 0; i < 5; i++) {
String message = "通配符模式--routingKey=topic.#消息" + i;
log.info("我是生产信息:{}", message);
rabbitTemplate.convertAndSend("topicExchange", "topic.keyD.keyE", message);
}
}
}
6.3 编写消费者1
package com.gmtgo.demo.topic;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @author 大帅
*/
@Slf4j
@Component
public class TopicConsumers1 {
@RabbitListener(queues = "topic_queue_1")
public void readMessage(Message message, Channel channel) throws IOException {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
log.info("我是消费信息1:{}",new String(message.getBody()));
}
}
6.4 编写消费者2
package com.gmtgo.demo.topic;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @author 大帅
*/
@Slf4j
@Component
public class TopicConsumers2 {
@RabbitListener(queues = "topic_queue_2")
public void readMessage(Message message, Channel channel) throws IOException {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
log.info("我是消费信息2:{}",new String(message.getBody()));
}
}
6.5 编写访问类
package com.gmtgo.demo.topic;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author 大帅
*/
@RestController
@RequestMapping(value = "rabbitMq")
public class TopicRabbitMqController {
@Autowired
private TopicProducer topicProducer;
@RequestMapping(value = "topicQueueTest1")
public String topicQueueTest1() {
topicProducer.sendMessageA();
return "success";
}
@RequestMapping(value = "topicQueueTest2")
public String topicQueueTest2() {
topicProducer.sendMessageB();
return "success";
}
}
6.6 测试启动项目访问topicQueueTest1 , topicQueueTest2
访问地址:8801/rabbitMq/topicQueueTest1
访问地址:8801/rabbitMq/topicQueueTest2
结果:
topicQueueTest1,两个消费者都能消费