原理:生产者将消息交给默认的交换机,交换机获取消息后交给绑定这个生产者的队列(投递规则为队列名称和routing key 相同的队列),监听当前队列的消费者获取信息并执行消费逻辑。
场景:有一个oa系统,用户通过接收手机验证码进行注册,页面上点击获取验证码后,将验证码放到消息队列,然后短信服务从队列中获取到验证码,并发送给用户。
实现:
生产者:
public class Producter { public static void main(String[] args) throws Exception { // 1. 创建出链接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setVirtualHost("http://www.likecs.com/"); // 2. 通过链接工厂创建链接对象 Connection connection = factory.newConnection(); // 3. 通过链接对象创建出channel Channel channel = connection.createChannel(); // 4. 通过channel发布消息 /** * 四个参数: * 第一个参数是交换机的名称 * 第二个参数是路由键 * 第三个参数标识消息的一些额外的属性 * 第四个是消息的具体的内容 */ String message = "字节"; for(int i = 0;i < 5;i ++){ channel.basicPublish("","byte001",null,message.getBytes()); } // 5. 释放资源,释放channel 和 链接对象 channel.close(); connection.close(); } }消费者:
public class Consumer { public static void main(String[] args) throws Exception { // 1. 创建出链接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setVirtualHost("http://www.likecs.com/"); // 2. 通过链接工厂创建链接对象 Connection connection = factory.newConnection(); // 3. 通过链接对象创建出channel Channel channel = connection.createChannel(); // 4. 创建出消息队列 /** * 第一个参数是消息队列的名称 * 第二个参数表示消息是否持久化 * 第三个参数标识消息队列是否被channel独占 * 第四个参数标识是否自动删除消息队列,当消息队列没有绑定交换机后是否自动删除 * 第五个是消息队列扩展参数 */ String queueName = "byte001"; channel.queueDeclare(queueName, true, false, false, null); // 5. 创建消费者,对消息进行处理 DefaultConsumer consumer = new DefaultConsumer(channel) { /** * consumerTag 用来标识.可以再监听队列时候设置 * envelope 信封,通过envelope可以通过这个获取到很多东西 * properties 额外的消息属性 * body:消息体 */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String s = new String(body, "UTF-8"); System.out.println("获取到的消息:"+s); } }; // 6. 通过channel消费者和消息队列关联 /** * 第一个参数是消息队列的名字 * 第二个参数是否自动签收(即消费消息后告知服务器已被消费) * 第三个参数是消费者 */ channel.basicConsume(queueName, true, consumer); } } 二、工作模式原理:生产者将消息交给交换机,交换机交给绑定的队列,队列有多个消费者监听,一条消息只能由一个消费者消费,这样就形成了资源竞争,谁的资源空闲大,争抢到的可能性就大。
场景:有一个电商平台,有两个订单服务,用户下单的时候,任意一个订单服务消费用户的下单请求生成订单即可。不用两个订单服务同时消费用户的下单请求。
实现:
生产者:
public class Producter { public static final String QUEUE_NAME = "byte002"; public static void main(String[] args) throws Exception { // 1. 创建出链接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setVirtualHost("http://www.likecs.com/"); // 2. 通过链接工厂创建链接对象 Connection connection = factory.newConnection(); // 3. 通过链接对象创建出channel Channel channel = connection.createChannel(); // 申明队列 channel.queueDeclare(QUEUE_NAME,false,false,false,null); // 4. 通过channel发布消息 /** * 四个参数: * 第一个参数是交换机的名称 * 第二个参数是路由键 * 第三个参数标识消息的一些额外的属性 * 第四个是消息的具体的内容 */ String message = "字节"; for(int i = 0;i < 100;i ++){ channel.basicPublish("",QUEUE_NAME,null,(message+i).getBytes()); } // 5. 释放资源,释放channel 和 链接对象 channel.close(); connection.close(); } }