rabbitmq系列(二)几种常见模式的应用场景及实现 (2)

消费者1:

public class Consumer { public static final String QUEUE_NAME = "byte002"; public static void main(String[] args) throws Exception { // 1. 创建出链接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setVirtualHost("http://www.likecs.com/"); // 2. 通过链接工厂创建链接对象 Connection connection = factory.newConnection(); // 3. 通过链接对象创建出channel Channel channel = connection.createChannel(); // 4. 创建出消息队列 /** * 第一个参数是消息队列的名称 * 第二个参数表示消息是否持久化 * 第三个参数标识消息队列是否被channel独占 * 第四个参数标识是否自动删除消息队列,当消息队列没有绑定交换机后是否自动删除 * 第五个是消息队列扩展参数 */ channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.basicQos(1); // 告诉服务器,在我们没有确认当前消息时不要给我们发送新的消息 // 5. 创建消费者,对消息进行处理 DefaultConsumer consumer = new DefaultConsumer(channel) { /** * consumerTag 用来标识.可以再监听队列时候设置 * envelope 信封,通过envelope可以通过这个获取到很多东西 * properties 额外的消息属性 * body:消息体 */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String s = new String(body, "UTF-8"); System.out.println("消费者1收到的内容:"+s); try { Thread.sleep(10); // 模拟消费耗时 } catch (InterruptedException e) { e.printStackTrace(); } channel.basicAck(envelope.getDeliveryTag(),false); // 参数2false为确认收到消息,true为拒绝收到消息 } }; // 6. 通过channel消费者和消息队列关联 /** * 第一个参数是消息队列的名字 * 第二个参数是否自动签收(即消费消息后告知服务器已被消费) * 第三个参数是消费者 */ channel.basicConsume(QUEUE_NAME, false, consumer); } }

消费者2:

public class Consumer2 { public static final String QUEUE_NAME = "byte002"; public static void main(String[] args) throws Exception { // 1. 创建出链接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setVirtualHost("http://www.likecs.com/"); // 2. 通过链接工厂创建链接对象 Connection connection = factory.newConnection(); // 3. 通过链接对象创建出channel Channel channel = connection.createChannel(); // 4. 创建出消息队列 /** * 第一个参数是消息队列的名称 * 第二个参数表示消息是否持久化 * 第三个参数标识消息队列是否被channel独占 * 第四个参数标识是否自动删除消息队列,当消息队列没有绑定交换机后是否自动删除 * 第五个是消息队列扩展参数 */ channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.basicQos(1); // 告诉服务器,在我们没有确认当前消息时不要给我们发送新的消息 // 5. 创建消费者,对消息进行处理 DefaultConsumer consumer = new DefaultConsumer(channel) { /** * consumerTag 用来标识.可以再监听队列时候设置 * envelope 信封,通过envelope可以通过这个获取到很多东西 * properties 额外的消息属性 * body:消息体 */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String s = new String(body, "UTF-8"); System.out.println("消费者2收到的内容:"+s); try { Thread.sleep(500); // 模拟消费耗时 } catch (InterruptedException e) { e.printStackTrace(); } channel.basicAck(envelope.getDeliveryTag(),false); // 参数2false为确认收到消息,true为拒绝收到消息 } }; // 6. 通过channel消费者和消息队列关联 /** * 第一个参数是消息队列的名字 * 第二个参数是否自动签收(即消费消息后告知服务器已被消费) * 第三个参数是消费者 */ channel.basicConsume(QUEUE_NAME, false, consumer); } }

保证资源竞争的代码就是这一行channel.basicQos(1);如果不加这一行,我们会发现两个消费者是轮询消费消息的。

三、发布订阅模式

原理:生产者将消息扔给交换机,交换机类型是fanout,不同的队列注册到交换机上,不同的消费注册在不同的队列上。所有消费者都会收到消息。

发布订阅模式.png

场景:有一个商城,我们新添加一个商品后,可能同时需要去更新缓存和数据库。

实现:

生产者:

public class Producter { // 定义交换机的名字 public static final String EXCHANGE_NAME="byte003"; public static void main(String[] args) throws Exception { // 1. 创建出链接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setVirtualHost("http://www.likecs.com/"); // 2. 通过链接工厂创建链接对象 Connection connection = factory.newConnection(); // 3. 通过链接对象创建出channel Channel channel = connection.createChannel(); // 定义一个交换机,类型是fanout channel.exchangeDeclare(EXCHANGE_NAME,"fanout"); // 因为消息先发到交换机,交换机没有保存功能,所以如果没有消费者,消息会丢失 channel.basicPublish(EXCHANGE_NAME,"",null,"发布订阅模式的消息".getBytes()); channel.close(); connection.close(); } }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zyppdj.html