可以使用通配符进行模糊匹配
#:匹配一个或多个词
*:匹配一个词
消费者:
public class Consumer4TopicExchange { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory() ; connectionFactory.setHost("192.168.11.76"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("http://www.likecs.com/"); connectionFactory.setAutomaticRecoveryEnabled(true); connectionFactory.setNetworkRecoveryInterval(3000); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); //4 声明 String exchangeName = "test_topic_exchange"; String exchangeType = "topic"; String queueName = "test_topic_queue"; //String routingKey = "user.*"; String routingKey = "user.*"; // 1 声明交换机 channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null); // 2 声明队列 channel.queueDeclare(queueName, false, false, false, null); // 3 建立交换机和队列的绑定关系: channel.queueBind(queueName, exchangeName, routingKey); //创建消费者 QueueingConsumer consumer = new QueueingConsumer(channel); //参数:队列名称、是否自动ACK、Consumer channel.basicConsume(queueName, true, consumer); //循环获取消息 while(true){ //获取消息,如果没有消息,这一步将会一直阻塞 Delivery delivery = consumer.nextDelivery(); String msg = new String(delivery.getBody()); System.out.println("收到消息:" + msg); } } }生产者:
public class Producer4TopicExchange { public static void main(String[] args) throws Exception { //1 创建ConnectionFactory ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.11.76"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("http://www.likecs.com/"); //2 创建Connection Connection connection = connectionFactory.newConnection(); //3 创建Channel Channel channel = connection.createChannel(); //4 声明 String exchangeName = "test_topic_exchange"; String routingKey1 = "user.save"; String routingKey2 = "user.update"; String routingKey3 = "user.delete.abc"; //5 发送 String msg = "Hello World RabbitMQ 4 Topic Exchange Message ..."; channel.basicPublish(exchangeName, routingKey1 , null , msg.getBytes()); channel.basicPublish(exchangeName, routingKey2 , null , msg.getBytes()); channel.basicPublish(exchangeName, routingKey3 , null , msg.getBytes()); channel.close(); connection.close(); } } Fanout Exchange不处理路由键,发送到交换机的消息会被转发到与该交换机绑定的所有队列上,这种方式转发消息的速度最快
消费者:
public class Consumer4FanoutExchange { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory() ; connectionFactory.setHost("192.168.11.76"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("http://www.likecs.com/"); connectionFactory.setAutomaticRecoveryEnabled(true); connectionFactory.setNetworkRecoveryInterval(3000); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); //4 声明 String exchangeName = "test_fanout_exchange"; String exchangeType = "fanout"; String queueName = "test_fanout_queue"; String routingKey = ""; //不设置路由键 channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null); channel.queueDeclare(queueName, false, false, false, null); channel.queueBind(queueName, exchangeName, routingKey); //durable 是否持久化消息 QueueingConsumer consumer = new QueueingConsumer(channel); //参数:队列名称、是否自动ACK、Consumer channel.basicConsume(queueName, true, consumer); //循环获取消息 while(true){ //获取消息,如果没有消息,这一步将会一直阻塞 Delivery delivery = consumer.nextDelivery(); String msg = new String(delivery.getBody()); System.out.println("收到消息:" + msg); } } }生产者:
public class Producer4FanoutExchange { public static void main(String[] args) throws Exception { //1 创建ConnectionFactory ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.11.76"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("http://www.likecs.com/"); //2 创建Connection Connection connection = connectionFactory.newConnection(); //3 创建Channel Channel channel = connection.createChannel(); //4 声明 String exchangeName = "test_fanout_exchange"; //5 发送 for(int i = 0; i < 10; i ++) { String msg = "Hello World RabbitMQ 4 FANOUT Exchange Message ..."; channel.basicPublish(exchangeName, "", null , msg.getBytes()); } channel.close(); connection.close(); } }