生产者:
public class Procuder { public static void main(String[] args) throws Exception { //1 创建一个ConnectionFactory, 并进行配置 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.11.76"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("http://www.likecs.com/"); //2 通过连接工厂创建连接 Connection connection = connectionFactory.newConnection(); //3 通过connection创建一个Channel Channel channel = connection.createChannel(); Map<String, Object> headers = new HashMap<>(); headers.put("my1", "111"); headers.put("my2", "222"); AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder() .deliveryMode(2) .contentEncoding("UTF-8") .expiration("10000") .headers(headers) .build(); //4 通过Channel发送数据 for(int i=0; i < 5; i++){ String msg = "Hello RabbitMQ!"; //1 exchange 2 routingKey channel.basicPublish("", "test001", properties, msg.getBytes()); } //5 记得要关闭相关的连接 channel.close(); connection.close(); } } Exchange(交换机)属性
Name:交换机名称
Type:交换机类型
Durability:是否需要持久化
Auto Delete:当最后一个绑定在该Exchange上的队列删除后,自动删除该Exchange
Intenal:当前Exchange是否用于RabbitMQ内部使用,默认false
Arguments:扩展参数,用于扩展AMQP协议定制化使用
Direct Exchange所有发送到Direct Exchange的消息会被转发到RoutingKey指定的Queue
Direct模式可以使用RabbitMQ自带的Exchange:Default Exchange,所以不需要将Exchange进行任何绑定操作,消息传递时,RoutingKey必须完全匹配才能被队列接收,否则会被抛弃。
消费者:
public class Consumer4DirectExchange { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory() ; connectionFactory.setHost("192.168.11.76"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("http://www.likecs.com/"); connectionFactory.setAutomaticRecoveryEnabled(true); connectionFactory.setNetworkRecoveryInterval(3000); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); //4 声明 String exchangeName = "test_direct_exchange"; String exchangeType = "direct"; String queueName = "test_direct_queue"; String routingKey = "test.direct"; //表示声明了一个交换机 channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null); //表示声明了一个队列 channel.queueDeclare(queueName, false, false, false, null); //建立一个绑定关系: channel.queueBind(queueName, exchangeName, routingKey); //创建消费者 QueueingConsumer consumer = new QueueingConsumer(channel); //参数:队列名称、是否自动ACK、Consumer channel.basicConsume(queueName, true, consumer); //循环获取消息 while(true){ //获取消息,如果没有消息,这一步将会一直阻塞 Delivery delivery = consumer.nextDelivery(); String msg = new String(delivery.getBody()); System.out.println("收到消息:" + msg); } } }生产者:
public class Producer4DirectExchange { public static void main(String[] args) throws Exception { //1 创建ConnectionFactory ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.11.76"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("http://www.likecs.com/"); //2 创建Connection Connection connection = connectionFactory.newConnection(); //3 创建Channel Channel channel = connection.createChannel(); //4 声明 String exchangeName = "test_direct_exchange"; String routingKey = "test.direct"; //5 发送 String msg = "Hello World RabbitMQ 4 Direct Exchange Message"; channel.basicPublish(exchangeName, routingKey , null , msg.getBytes()); } } Topic Exchange