消息中间件 - RabbitMQ (5)

消息中间件 - RabbitMQ

消息中间件 - RabbitMQ

4.1.4. 小结

在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关系是竞争的关系。

4.2. 订阅模式类型

订阅模式示例图:

消息中间件 - RabbitMQ

前面2个案例中,只有3个角色:

P:生产者,也就是要发送消息的程序

C:消费者:消息的接受者,会一直等待消息到来。

queue:消息队列,图中红色部分

而在订阅模型中,多了一个exchange角色,而且过程略有变化:

P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)

C:消费者,消息的接受者,会一直等待消息到来。

Queue:消息队列,接收消息、缓存消息。

Exchange:交换机,图中的X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:

Fanout:广播,将消息交给所有绑定到交换机的队列

Direct:定向,把消息交给符合指定routing key 的队列

Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

4.3. Publish/Subscribe发布与订阅模式 4.3.1. 模式说明

消息中间件 - RabbitMQ

发布订阅模式:
1、每个消费者监听自己的队列。
2、生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收
到消息

4.3.2. 代码 1)生产者 package com.baidu.rabbitmq.ps; import com.baidu.rabbitmq.util.ConnectionUtil; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; /** * 发布与订阅使用的交换机类型为:fanout */ public class Producer { //交换机名称 static final String FANOUT_EXCHAGE = "fanout_exchange"; //队列名称 static final String FANOUT_QUEUE_1 = "fanout_queue_1"; //队列名称 static final String FANOUT_QUEUE_2 = "fanout_queue_2"; public static void main(String[] args) throws Exception { //创建连接 Connection connection = ConnectionUtil.getConnection(); // 创建频道 Channel channel = connection.createChannel(); /** * 声明交换机 * 参数1:交换机名称 * 参数2:交换机类型,fanout、topic、direct、headers */ channel.exchangeDeclare(FANOUT_EXCHAGE, BuiltinExchangeType.FANOUT); // 声明(创建)队列 /** * 参数1:队列名称 * 参数2:是否定义持久化队列 * 参数3:是否独占本次连接 * 参数4:是否在不使用的时候自动删除队列 * 参数5:队列其它参数 */ channel.queueDeclare(FANOUT_QUEUE_1, true, false, false, null); channel.queueDeclare(FANOUT_QUEUE_2, true, false, false, null); //队列绑定交换机 channel.queueBind(FANOUT_QUEUE_1, FANOUT_EXCHAGE, ""); channel.queueBind(FANOUT_QUEUE_2, FANOUT_EXCHAGE, ""); for (int i = 1; i <= 10; i++) { // 发送信息 String message = "你好;小兔子!发布订阅模式--" + i; /** * 参数1:交换机名称,如果没有指定则使用默认Default Exchage * 参数2:路由key,简单模式可以传递队列名称 * 参数3:消息其它属性 * 参数4:消息内容 */ channel.basicPublish(FANOUT_EXCHAGE, "", null, message.getBytes()); System.out.println("已发送消息:" + message); } // 关闭资源 channel.close(); connection.close(); } } 2)消费者1 package com.baidu.rabbitmq.ps; import com.baidu.rabbitmq.util.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; public class Consumer1 { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); // 创建频道 Channel channel = connection.createChannel(); //声明交换机 channel.exchangeDeclare(Producer.FANOUT_EXCHAGE, BuiltinExchangeType.FANOUT); // 声明(创建)队列 /** * 参数1:队列名称 * 参数2:是否定义持久化队列 * 参数3:是否独占本次连接 * 参数4:是否在不使用的时候自动删除队列 * 参数5:队列其它参数 */ channel.queueDeclare(Producer.FANOUT_QUEUE_1, true, false, false, null); //队列绑定交换机 channel.queueBind(Producer.FANOUT_QUEUE_1, Producer.FANOUT_EXCHAGE, ""); //创建消费者;并设置消息处理 DefaultConsumer consumer = new DefaultConsumer(channel){ @Override /** * consumerTag 消息者标签,在channel.basicConsume时候可以指定 * envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送) * properties 属性信息 * body 消息 */ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //路由key System.out.println("路由key为:" + envelope.getRoutingKey()); //交换机 System.out.println("交换机为:" + envelope.getExchange()); //消息id System.out.println("消息id为:" + envelope.getDeliveryTag()); //收到的消息 System.out.println("消费者1-接收到的消息为:" + new String(body, "utf-8")); } }; //监听消息 /** * 参数1:队列名称 * 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认 * 参数3:消息接收到后回调 */ channel.basicConsume(Producer.FANOUT_QUEUE_1, true, consumer); } } 3)消费者2 package com.baidu.rabbitmq.ps; import com.baidu.rabbitmq.util.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; public class Consumer2 { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); // 创建频道 Channel channel = connection.createChannel(); //声明交换机 channel.exchangeDeclare(Producer.FANOUT_EXCHAGE, BuiltinExchangeType.FANOUT); // 声明(创建)队列 /** * 参数1:队列名称 * 参数2:是否定义持久化队列 * 参数3:是否独占本次连接 * 参数4:是否在不使用的时候自动删除队列 * 参数5:队列其它参数 */ channel.queueDeclare(Producer.FANOUT_QUEUE_2, true, false, false, null); //队列绑定交换机 channel.queueBind(Producer.FANOUT_QUEUE_2, Producer.FANOUT_EXCHAGE, ""); //创建消费者;并设置消息处理 DefaultConsumer consumer = new DefaultConsumer(channel){ @Override /** * consumerTag 消息者标签,在channel.basicConsume时候可以指定 * envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送) * properties 属性信息 * body 消息 */ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //路由key System.out.println("路由key为:" + envelope.getRoutingKey()); //交换机 System.out.println("交换机为:" + envelope.getExchange()); //消息id System.out.println("消息id为:" + envelope.getDeliveryTag()); //收到的消息 System.out.println("消费者2-接收到的消息为:" + new String(body, "utf-8")); } }; //监听消息 /** * 参数1:队列名称 * 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认 * 参数3:消息接收到后回调 */ channel.basicConsume(Producer.FANOUT_QUEUE_2, true, consumer); } } 4.3.3. 测试

启动所有消费者,然后使用生产者发送消息;在每个消费者对应的控制台可以查看到生产者发送的所有消息;到达广播的效果。

在执行完测试代码后,其实到RabbitMQ的管理后台找到Exchanges选项卡,点击 fanout_exchange 的交换机,可以查看到如下的绑定:

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zzdpfy.html