消费者1:
public class Consumer1 { public static final String EXCHANGE_NAME="byte004"; public static void main(String[] args) throws Exception { // 1. 创建出链接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setVirtualHost("http://www.likecs.com/"); // 2. 通过链接工厂创建链接对象 Connection connection = factory.newConnection(); // 3. 通过链接对象创建出channel Channel channel = connection.createChannel(); String queueName = "queue005"; channel.queueDeclare(queueName,false,false,false,null); // 绑定队列到交换机 /** * 参数3是routingkey,只有和它一样的routingkey的消息才会被当前消费者收到 */ channel.queueBind(queueName,EXCHANGE_NAME,"key1"); // 如果要接收多个routingkey的消息,在执行一次上面的代码即可,如下 channel.queueBind(queueName,EXCHANGE_NAME,"key2"); channel.basicQos(1); DefaultConsumer consumer = new DefaultConsumer(channel) { /** * consumerTag 用来标识.可以再监听队列时候设置 * envelope 信封,通过envelope可以通过这个获取到很多东西 * properties 额外的消息属性 * body:消息体 */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String s = new String(body, "UTF-8"); System.out.println("消费者1:"+s); channel.basicAck(envelope.getDeliveryTag(),false); } }; channel.basicConsume(queueName,false,consumer); } }消费者2:
public class Consumer2 { public static final String EXCHANGE_NAME="byte004"; public static void main(String[] args) throws Exception { // 1. 创建出链接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setVirtualHost("http://www.likecs.com/"); // 2. 通过链接工厂创建链接对象 Connection connection = factory.newConnection(); // 3. 通过链接对象创建出channel Channel channel = connection.createChannel(); String queueName = "queue006"; channel.queueDeclare(queueName,false,false,false,null); // 绑定队列到交换机 channel.queueBind(queueName,EXCHANGE_NAME,"key2"); channel.basicQos(1); DefaultConsumer consumer = new DefaultConsumer(channel) { /** * consumerTag 用来标识.可以再监听队列时候设置 * envelope 信封,通过envelope可以通过这个获取到很多东西 * properties 额外的消息属性 * body:消息体 */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String s = new String(body, "UTF-8"); System.out.println("消费者2:"+s); channel.basicAck(envelope.getDeliveryTag(),false); } }; channel.basicConsume(queueName,false,consumer); } } 五、主题模式原理:路由模式的一种,路由功能添加了模糊匹配。星号(*)代表1个单词,#号(#)代表一个或多个单词。具体可参考路由模式。
场景:还是一样,有一个商城,新添加了一个商品,实时性不是很高,只需要添加到数据库即可,数据库包含了主数据库mysql1和从数据库mysql2的内容,不用刷新缓存。
实现:
生产者:
public class Producter { // 定义交换机的名字 public static final String EXCHANGE_NAME="byte004"; public static void main(String[] args) throws Exception { // 1. 创建出链接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setVirtualHost("http://www.likecs.com/"); // 2. 通过链接工厂创建链接对象 Connection connection = factory.newConnection(); // 3. 通过链接对象创建出channel Channel channel = connection.createChannel(); // 定义一个交换机,类型是topic channel.exchangeDeclare(EXCHANGE_NAME,"topic"); // 因为消息先发到交换机,交换机没有保存功能,所以如果没有消费者,消息会丢失 channel.basicPublish(EXCHANGE_NAME,"key.1.2",null,"发布路由模式的消息".getBytes()); channel.close(); connection.close(); } }消费者1:
public class Consumer1 { public static final String EXCHANGE_NAME="byte004"; public static void main(String[] args) throws Exception { // 1. 创建出链接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setVirtualHost("http://www.likecs.com/"); // 2. 通过链接工厂创建链接对象 Connection connection = factory.newConnection(); // 3. 通过链接对象创建出channel Channel channel = connection.createChannel(); String queueName = "queue005"; channel.queueDeclare(queueName,false,false,false,null); // 绑定队列到交换机 /** * 参数3是routingkey,只有和它一样的routingkey的消息才会被当前消费者收到 */ channel.queueBind(queueName,EXCHANGE_NAME,"key.*"); // 如果要接收多个routingkey的消息,在执行一次上面的代码即可,如下 channel.queueBind(queueName,EXCHANGE_NAME,"abc.#"); channel.basicQos(1); DefaultConsumer consumer = new DefaultConsumer(channel) { /** * consumerTag 用来标识.可以再监听队列时候设置 * envelope 信封,通过envelope可以通过这个获取到很多东西 * properties 额外的消息属性 * body:消息体 */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String s = new String(body, "UTF-8"); System.out.println("消费者1:"+s); channel.basicAck(envelope.getDeliveryTag(),false); } }; channel.basicConsume(queueName,false,consumer); } }