注意:只要服务器没有宕机,即便是非持久,消费者不在线的话消息也不会丢失,等待消费者在线还是能够收到消息的。
//非持久化的消费者和之前的代码一样。下面演示非持久化的生产者。 // 非持久化 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);Queue持久化,当服务器宕机消息依然存在。Queue消息默认是持久化的。
持久化消息,保证这些消息只被传送一次和成功使用一次。对于这些消息可靠性是优先考虑的因素。可靠性的另一个重要方面是确保持久性消息传送至目标后,消息服务在向消费者传送它们之前不会丢失这些消息。
//持久化的消费者和之前的代码一样。下面演示持久化的生产者。 //持久化 producer.setDeliveryMode(DeliveryMode.PERSISTENT);Topic消息非持久和持久
Topic非持久,Topic默认就是非持久化的,因为生产者生产消息时消费者也要在线,这样消费者才能消费到消息。
Topic消息持久化,只要消费者向MQ服务器注册过,所有生产者发布成功的消息该消费者都能收到,不管是MQ服务器宕机还是消费者不在线。
//持久化topic生产者代码 // 设置持久化topic producer.setDeliveryMode(DeliveryMode.PERSISTENT); // 设置持久化topic之后再启动连接 conn.start(); //持久化topic消费者代码 public static void main(String[] args) throws Exception{ ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); Connection conn = activeMQConnectionFactory.createConnection(); // 设置客户端ID,向MQ服务器注册自己的名称 conn.setClientID("marrry"); Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic(TOPIC_NAME); // 创建一个topic订阅者对象。一参是topic,二参是订阅者名称 TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic,"remark..."); // 之后再开启连接 connection.start(); //之前是消息的消费者,这里就改为主题的订阅者 Message message = topicSubscriber.receive(); while (null != message){ TextMessage textMessage = (TextMessage)message; System.out.println(" 收到的持久化 topic:" + textMessage.getText()); message = topicSubscriber.receive(2000L);//继续监听2s,从激活到离线 //经测试:离线再激活后仍然能收到之前的消息 } session.close(); conn.close(); }注意:
一定要先运行一次消费者,等于向MQ注册,类似我订阅了这个主题。
然后再运行生产者发送消息。
之后无论消费者是否在线都会收到消息。如果不在线的话,下次连接的时候会把没有收过的消息都接收过来。
Transaction - 事务
生产者开启事务后,执行commit方法这批消息才真正的被提交。不执行commit方法这批消息不会提交。执行rollback方法之前的消息会回滚掉。生产者的事务机制要高于签收机制,当生产者开启事务后签收机制不再重要。
消费者开启事务后,执行commit方法这批消息才算真正的被消费。不执行commit方法这些消息不会标记已消费,下次还会被消费。执行rollback方法不能回滚之前执行过的业务逻辑,但是能够回滚之前的消息,回滚后的消息下次还会被消费。消费者利用commit和rollback方法,甚至能够违反一个消费者只能消费一次消息的原理。
注意:消费者和生产者需要同时操作事务才行吗? => 消费者和生产者的事务完全没有关联,各自是各自的事务。
生产者
public class JmsProduce { public static final String ACTIVEMQ_URL = "tcp://mpolaris.top:61616"; public static final String TOPIC_NAME = "topic_01"; public static void main(String[] args) throws JMSException { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); Connection conn = factory.createConnection(); conn.start(); //1.创建会话session,两个参数transacted=事务,acknowledgeMode=确认模式(签收) //设置为开启事务 Session session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic(TOPIC_NAME); MessageProducer producer = session.createProducer(topic); try { for (int i = 0; i < 3; i++) { TextMessage textMessage = session.createTextMessage("topic " + i); producer.send(textMessage); // if(i == 2) { // throw new RuntimeException("=====> GG"); // } } // 2. 开启事务后,使用commit提交事务,这样这批消息才能真正的被提交。 session.commit(); System.out.println("====> 消息发布到MQ完成"); } catch (JMSException e) { System.out.println("出现异常,消息回滚"); // 3. 工作中一般当代码出错我们在catch代码块中回滚。这样这批发送的消息就能回滚。 session.rollback(); } finally { producer.close(); session.close(); conn.close(); } } } //如果有一条抛出异常,则回滚 //Exception in thread "main" java.lang.RuntimeException: =====> GG