消费者1:
public class Consumer1 { public static final String EXCHANGE_NAME="byte003"; public static void main(String[] args) throws Exception { // 1. 创建出链接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setVirtualHost("http://www.likecs.com/"); // 2. 通过链接工厂创建链接对象 Connection connection = factory.newConnection(); // 3. 通过链接对象创建出channel Channel channel = connection.createChannel(); String queueName = "queue003"; channel.queueDeclare(queueName,false,false,false,null); // 绑定队列到交换机 channel.queueBind(queueName,EXCHANGE_NAME,""); channel.basicQos(1); DefaultConsumer consumer = new DefaultConsumer(channel) { /** * consumerTag 用来标识.可以再监听队列时候设置 * envelope 信封,通过envelope可以通过这个获取到很多东西 * properties 额外的消息属性 * body:消息体 */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String s = new String(body, "UTF-8"); System.out.println("消费者1:"+s); channel.basicAck(envelope.getDeliveryTag(),false); } }; channel.basicConsume(queueName,false,consumer); } }消费者2:
public class Consumer2 { public static final String EXCHANGE_NAME="byte003"; public static void main(String[] args) throws Exception { // 1. 创建出链接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setVirtualHost("http://www.likecs.com/"); // 2. 通过链接工厂创建链接对象 Connection connection = factory.newConnection(); // 3. 通过链接对象创建出channel Channel channel = connection.createChannel(); String queueName = "queue004"; channel.queueDeclare(queueName,false,false,false,null); // 绑定队列到交换机 channel.queueBind(queueName,EXCHANGE_NAME,""); channel.basicQos(1); DefaultConsumer consumer = new DefaultConsumer(channel) { /** * consumerTag 用来标识.可以再监听队列时候设置 * envelope 信封,通过envelope可以通过这个获取到很多东西 * properties 额外的消息属性 * body:消息体 */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String s = new String(body, "UTF-8"); System.out.println("消费者2:"+s); channel.basicAck(envelope.getDeliveryTag(),false); } }; channel.basicConsume(queueName,false,consumer); } }需要注意的一点就是交换机没有保存功能,如果没有消费者,则消息会丢失。
四、路由模式原理:生产者将消息发送给交换机,消息携带具体的routingkey。交换机类型是direct,接收到消息中的routingkey,比对与之绑定的队列的routingkey,分发到不同的队列上。
场景:还是一样,有一个商城,新添加了一个商品,实时性不是很高,只需要添加到数据库即可,不用刷新缓存。
实现:
生产者:
public class Producter { // 定义交换机的名字 public static final String EXCHANGE_NAME="byte004"; public static void main(String[] args) throws Exception { // 1. 创建出链接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setVirtualHost("http://www.likecs.com/"); // 2. 通过链接工厂创建链接对象 Connection connection = factory.newConnection(); // 3. 通过链接对象创建出channel Channel channel = connection.createChannel(); // 定义一个交换机,类型是direct channel.exchangeDeclare(EXCHANGE_NAME,"direct"); // 因为消息先发到交换机,交换机没有保存功能,所以如果没有消费者,消息会丢失 channel.basicPublish(EXCHANGE_NAME,"key1",null,"发布路由模式的消息".getBytes()); channel.close(); connection.close(); } }