消息中间件 - RabbitMQ (4)

在真正消费之前,消费者客户端需要向Broker 发送Basic.Consume 命令(即调用channel.basicConsume 方法〉将Channel 置为接收模式,之后Broker 回执Basic . Consume - Ok 以告诉消费者客户端准备好消费消息。

Broker 向消费者客户端推送(Push) 消息,即Basic.Deliver 命令,这个命令和Basic.Publish 命令一样会携带Content Header 和Content Body。

消费者接收到消息并正确消费之后,向Broker 发送确认,即Basic.Ack 命令。

客户端发送完消息需要关闭资源时,涉及到Channel.Close和Channl.Close-Ok 与Connetion.Close和Connection.Close-Ok的命令交互。

消息中间件 - RabbitMQ

5. RabbitMQ工作模式 4.1. Work queues工作队列模式 4.1.1. 模式说明

消息中间件 - RabbitMQ

Work Queues与入门程序的简单模式相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息。

应用场景:对于 任务过重或任务较多情况使用工作队列可以提高任务处理的速度。

4.1.2. 代码

Work Queues与入门程序的简单模式的代码是几乎一样的;可以完全复制,并复制多一个消费者进行多个消费者同时消费消息的测试。

1)生产者 package com.baidu.rabbitmq.work; import com.baidu.rabbitmq.util.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Producer { static final String QUEUE_NAME = "work_queue"; public static void main(String[] args) throws Exception { //创建连接 Connection connection = ConnectionUtil.getConnection(); // 创建频道 Channel channel = connection.createChannel(); // 声明(创建)队列 /** * 参数1:队列名称 * 参数2:是否定义持久化队列 * 参数3:是否独占本次连接 * 参数4:是否在不使用的时候自动删除队列 * 参数5:队列其它参数 */ channel.queueDeclare(QUEUE_NAME, true, false, false, null); for (int i = 1; i <= 30; i++) { // 发送信息 String message = "你好;小兔子!work模式--" + i; /** * 参数1:交换机名称,如果没有指定则使用默认Default Exchage * 参数2:路由key,简单模式可以传递队列名称 * 参数3:消息其它属性 * 参数4:消息内容 */ channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println("已发送消息:" + message); } // 关闭资源 channel.close(); connection.close(); } } 2)消费者1 package com.baidu.rabbitmq.work; import com.baidu.rabbitmq.util.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; public class Consumer1 { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); // 创建频道 Channel channel = connection.createChannel(); // 声明(创建)队列 /** * 参数1:队列名称 * 参数2:是否定义持久化队列 * 参数3:是否独占本次连接 * 参数4:是否在不使用的时候自动删除队列 * 参数5:队列其它参数 */ channel.queueDeclare(Producer.QUEUE_NAME, true, false, false, null); //一次只能接收并处理一个消息 channel.basicQos(1); //创建消费者;并设置消息处理 DefaultConsumer consumer = new DefaultConsumer(channel){ @Override /** * consumerTag 消息者标签,在channel.basicConsume时候可以指定 * envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送) * properties 属性信息 * body 消息 */ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { try { //路由key System.out.println("路由key为:" + envelope.getRoutingKey()); //交换机 System.out.println("交换机为:" + envelope.getExchange()); //消息id System.out.println("消息id为:" + envelope.getDeliveryTag()); //收到的消息 System.out.println("消费者1-接收到的消息为:" + new String(body, "utf-8")); Thread.sleep(1000); //确认消息 channel.basicAck(envelope.getDeliveryTag(), false); } catch (InterruptedException e) { e.printStackTrace(); } } }; //监听消息 /** * 参数1:队列名称 * 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认 * 参数3:消息接收到后回调 */ channel.basicConsume(Producer.QUEUE_NAME, false, consumer); } } 3)消费者2 package com.baidu.rabbitmq.work; import com.baidu.rabbitmq.util.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; public class Consumer2 { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); // 创建频道 Channel channel = connection.createChannel(); // 声明(创建)队列 /** * 参数1:队列名称 * 参数2:是否定义持久化队列 * 参数3:是否独占本次连接 * 参数4:是否在不使用的时候自动删除队列 * 参数5:队列其它参数 */ channel.queueDeclare(Producer.QUEUE_NAME, true, false, false, null); //一次只能接收并处理一个消息 channel.basicQos(1); //创建消费者;并设置消息处理 DefaultConsumer consumer = new DefaultConsumer(channel){ @Override /** * consumerTag 消息者标签,在channel.basicConsume时候可以指定 * envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送) * properties 属性信息 * body 消息 */ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { try { //路由key System.out.println("路由key为:" + envelope.getRoutingKey()); //交换机 System.out.println("交换机为:" + envelope.getExchange()); //消息id System.out.println("消息id为:" + envelope.getDeliveryTag()); //收到的消息 System.out.println("消费者2-接收到的消息为:" + new String(body, "utf-8")); Thread.sleep(1000); //确认消息 channel.basicAck(envelope.getDeliveryTag(), false); } catch (InterruptedException e) { e.printStackTrace(); } } }; //监听消息 /** * 参数1:队列名称 * 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认 * 参数3:消息接收到后回调 */ channel.basicConsume(Producer.QUEUE_NAME, false, consumer); } } 4.1.3. 测试

启动两个消费者,然后再启动生产者发送消息;到IDEA的两个消费者对应的控制台查看是否竞争性的接收到消息。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zzdpfy.html