前言 本篇博客已被收录GitHub:https://zhouwenxing.github.io/ 文中所涉及的源码也已被收录GitHub:https://github.com/zhouwenxing/lonely-wolf-note (message-queue模块)
使用消息队列必须要保证生产者发送的消息能被消费者所接收,那么生产者如何接收消息呢?下图是 RabbitMQ 的工作模型:
上图中生产者会将消息发送到交换机 Exchange 上,再由 Exchange 发送给不同的 Queue ,而 Queue 是用来存储消息队列,那么假如有多个生产者,那么消息发送到交换机 Exchange 之后,应该如何和 Queue 之间建立绑定关系呢?
如何使用 RabbitMQ 发送消息RabbitMQ 中提供了3种发送消息的路由方式。
直连 Direct 模式通过指定一个精确的绑定键来实现 Exchange(交换机) 和 Queue(消息队列) 之间的绑定,也就是说,当创建了一个直连类型的交换机时,生产者在发送消息时携带的路由键(routing key),必须与某个绑定键(binding key)完全匹配时,这条消息才会从交换机路由到满足路由关系消息队列上,然后消费者根据各自监听的队列就可以获取到消息(如下如吐所示,Queue1 绑定了 order ,那么这时候发送消息的路由键必须为 order 才能分配到 Queue1 上):
主题 Topic 模式Direct 模式会存在一定的局限性,有时候我们需要按类型划分,比如订单类路由到一个队列,产品类路由到另一个队列,所以在 RabbitMQ 中,提供了主题模式来实现模糊匹配。使用主题类型连接方式支持两种通配符:
直连方式只能精确匹配,有时候我们需要实现模糊匹配,那么这时候就需要主题类型的连接方式,在 RabbitMQ 中,使用主题类型连接方式支持两种通配符:
:表示 0 个或者多个单词
*:表示 1 个单词
PS:使用通配符时,单词指的是用英文符号的小数点 . 隔开的字符,如:abc.def 就表示有 abc 和 def 两个单词。
下图所示中,因为 Queue1 绑定了 order.#,所以当发送消息的路由键为 order 或者 order.xxx时都可以使得消息分配到 Queue1 上:
广播 Fanout 模式当我们定义了一个广播类型的交换机时就不需要指定绑定键,而且生产者发送消息到交换机上时,也不需要携带路由键,此时当消息到达交换机时,所有与其绑定的队列都会收到消息,这种模式的消息发送适用于消息通知类需求。
如下如所示,Queue1,Queue2,Queue3 三个队列都绑定到了一个 Fanout 交换机上,那么当 Fanout Exchange 收到消息时,会同时将消息发送给三个队列:
在 RabbitMQ 提供的后台管理系统中也能查询到创建的交换机和队列等信息,并且可以通过管理后台直接创建队列和交换机:
消息发送实战下面通过一个 SpringBoot 例子来体会一下三种发送消息的方式。
1、application.yml 文件中添加如下配置:
spring: rabbitmq: host: ip port: 5672 username: admin password: 1234562、新增一个 RabbitConfig 配置类(为了节省篇幅省略了包名和导入 ),此类中声明了三个交换机和三个队列,并分别进行绑定:
@Configuration public class RabbitConfig { //直连交换机 @Bean("directExchange") public DirectExchange directExchange(){ return new DirectExchange("LONGLY_WOLF_DIRECT_EXCHANGE"); } //主题交换机 @Bean("topicExchange") public TopicExchange topicExchange(){ return new TopicExchange("LONGLY_WOLF_TOPIC_EXCHANGE"); } //广播交换机 @Bean("fanoutExchange") public FanoutExchange fanoutExchange(){ return new FanoutExchange("LONGLY_WOLF_FANOUT_EXCHANGE"); } @Bean("orderQueue") public Queue orderQueue(){ return new Queue("LONGLY_WOLF_ORDER_QUEUE"); } @Bean("userQueue") public Queue userQueue(){ return new Queue("LONGLY_WOLF_USER_QUEUE"); } @Bean("productQueue") public Queue productQueue(){ return new Queue("LONGLY_WOLF_PRODUCT_QUEUE"); } //Direct交换机和orderQueue绑定,绑定键为:order.detail @Bean public Binding bindDirectExchange(@Qualifier("orderQueue") Queue queue, @Qualifier("directExchange") DirectExchange directExchange){ return BindingBuilder.bind(queue).to(directExchange).with("order.detail"); } //Topic交换机和userQueue绑定,绑定键为:user.# @Bean public Binding bindTopicExchange(@Qualifier("userQueue") Queue queue, @Qualifier("topicExchange") TopicExchange topicExchange){ return BindingBuilder.bind(queue).to(topicExchange).with("user.#"); } //Fanout交换机和productQueue绑定 @Bean public Binding bindFanoutExchange(@Qualifier("productQueue") Queue queue, @Qualifier("fanoutExchange") FanoutExchange fanoutExchange){ return BindingBuilder.bind(queue).to(fanoutExchange); } }