简单说一下虚拟主机和隔离吧,简单来说想MySQL 你可以创建很多个库,里面有很多个表。然后呢 rabbitMQ可以创建很多个虚拟主机,虚拟主机里面有很多个交换器,交换器里面有很多个队列,解释得完美。默认会提供一个 默认虚拟主机 vhost : “/”。后面找时间再说这个 vhost 吧。
重头大戏上DEMO,由于方便我阅读回忆,所以我忽略的封装性,一切以容易快速看懂和回忆为目标。别喷~
生产者:
package com.maxfunner; import com.maxfunner.mq.EndPoint; import com.rabbitmq.client.*; import org.apache.commons.lang.SerializationUtils; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; /** * Producer */ public class Producer { private Connection connection; private Channel channel; private Map<Long, String> messageMap = new HashMap<Long, String>(); private int maxID = 0; private static final String EXCHANGE_NAME = "MY_EXCHANGE"; public void createConnectionAndChannel() throws IOException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); //服务器地址 factory.setUsername("guest"); //默认用户名 factory.setPassword("guest"); //默认密码 factory.setPort(5672); //默认端口,对就是这么屌全部默认的 this.connection = factory.newConnection(); //创建链接 this.channel = this.connection.createChannel(); } public void initChannelAndCreateExchange() throws IOException { this.channel.confirmSelect(); //启用消息确认已经投递成功的回调 /** * 创建了一个交换器,类型为 direct 非持久化 自动删除 没有额外参数 */ this.channel.exchangeDeclare(EXCHANGE_NAME, "direct", false, true, null); this.channel.addConfirmListener(new ConfirmListener() { /** * 成功的时候回调【这个是当消息到达交换器的时候回调】 * @param deliveryTag 每一条消息都有一个唯一ID【只是同一个channel唯一】,每次发出消息递增1 因为同一个channel所有也保证了消息的流水性。 * @param multiple * @throws IOException */ public void handleAck(long deliveryTag, boolean multiple) throws IOException { String message = messageMap.get(deliveryTag); System.out.println("message : " + message + " ! 发送成功"); messageMap.remove(message); //最后一个消息都搞掂之后 关闭所有东西 if (deliveryTag >= maxID) { closeAnything(); } } /** * 失败的时候回调 * @param deliveryTag 每一条消息都有一个唯一ID【只是同一个channel唯一】,每次发出消息递增1 因为同一个channel所有也保证了消息的流水性。 * @param multiple * @throws IOException */ public void handleNack(long deliveryTag, boolean multiple) throws IOException { String message = messageMap.get(deliveryTag); System.out.println("message : " + message + " ! 发送失败"); messageMap.remove(message); //发送失败就不重发了,发脾气 //最后一个消息都搞掂之后 关闭所有东西 if (deliveryTag >= maxID) { closeAnything(); } } }); } public void sendMessage(String message) throws IOException { AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder() .contentType("text/plain") //指定是一个文本 .build(); // 发送一个消息 到 EXCHANGE_NAME 的交换器中 路由键为 KEY_A 发送 message 之前序列化一下 具体用什么包上面import自己看 this.channel.basicPublish(EXCHANGE_NAME, "KEY_A", properties, SerializationUtils.serialize(message)); } public void closeAnything() throws IOException { this.channel.close(); //跪安吧 小channel this.connection.close(); //你也滚吧 connection } public static void main(String[] args) throws IOException { Producer producer = new Producer(); producer.createConnectionAndChannel(); producer.initChannelAndCreateExchange(); List<String> messageList = new ArrayList<String>(); messageList.add("message_A"); messageList.add("message_B"); messageList.add("message_C"); messageList.add("message_D"); messageList.add("message_E"); messageList.add("message_F"); producer.maxID = messageList.size(); //记录最后一个ID 当最后一个消息发送成功后关闭连接 //注意:因为channel产生的ID 是从1开始的 for (int i = 1; i <= messageList.size(); i++) { producer.messageMap.put(new Long(i), messageList.get(i - 1)); //这里看懂了吗?没看懂也没有办法了,这里我真不知道怎么解释 producer.sendMessage(messageList.get(i - 1)); } } }