消费者
public class JmsConsumer { public static final String ACTIVEMQ_URL = "tcp://mpolaris.top:61616"; public static final String TOPIC_NAME = "topic_01"; public static void main(String[] args) throws Exception { System.out.println("=====> 1号消费者"); ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); Connection conn = factory.createConnection(); conn.start(); // 创建会话session,两个参数transacted=事务,acknowledgeMode=确认模式(签收) // 消费者开启了事务就必须手动提交,不然会重复消费消息 final Session session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic(TOPIC_NAME); MessageConsumer consumer = session.createConsumer(topic); consumer.setMessageListener(new MessageListener() { int a = 0; @Override public void onMessage(Message message) { if(null != message && message instanceof TextMessage) { TextMessage textMessage = (TextMessage) message; try { System.out.println("消息体:" + textMessage.getText()); if(a == 0){ System.out.println("commit"); session.commit(); } if (a == 2) { System.out.println("rollback"); session.rollback(); } a++; } catch (JMSException e) { System.out.println("出现异常,消费失败,放弃消费"); try { session.rollback(); } catch (JMSException ex) { ex.printStackTrace(); } } } } }); System.in.read(); consumer.close(); session.close(); conn.close(); } } // 不执行commit方法的1和2消息不会标记已消费,下次还会被消费 // 执行rollback方法不能回滚之前执行过的业务逻辑,但是能够回滚之前的消息,回滚后的消息下次还会被消费 // =====> 1号消费者 // 消息体:topic 0 // commit // 消息体:topic 1 // 消息体:topic 2 // rollback // 消息体:topic 1 // 消息体:topic 2Acknowledge - 签收
签收的几种方式
自动签收(Session.AUTO_ACKNOWLEDGE):该方式是默认的,该种方式无需我们程序做任何操作,框架会帮我们自动签收收到的消息。
手动签收(Session.CLIENT_ACKNOWLEDGE):手动签收,该种方式需要我们手动调用Message.acknowledge()来签收消息。如果不签收消息该消息会被我们反复消费直到被签收。
允许重复消息(Session.DUPS_OK_ACKNOWLEDGE):多线程或多个消费者同时消费到一个消息,因为线程不安全可能会重复消费。该种方式很少使用到。
事务下的签收(Session.SESSION_TRANSACTED):开启事务的情况下可以使用该方式,该种方式很少使用到。
事务和签收的关系
在事务性会话中,当一个事务被成功提交则消息被自动签收。如果事务回滚则消息会被再次传送。事务优先于签收,开始事务后签收机制不再起任何作用。
非事务性会话中,消息何时被确认取决于创建会话时的应答模式。
生产者事务开启,只有commit后才能将全部消息变为已消费。
事务偏向生产者,签收偏向消费者。也就是说生产者使用事务更好点,消费者使用签收机制更好点。
非事务下的消费者如何使用手动签收的方式
非事务下的生产者跟之前的代码一样
public class JmsProduce { public static final String ACTIVEMQ_URL = "tcp://mpolaris.top:61616"; public static final String QUEUE_NAME = "queue_01"; public static void main(String[] args) throws JMSException { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); Connection conn = factory.createConnection(); conn.start(); Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue(QUEUE_NAME); MessageProducer producer = session.createProducer(queue); for (int i = 0; i < 3; i++) { TextMessage textMessage = session.createTextMessage("msg -> " + i); producer.send(textMessage); } producer.close(); session.close(); conn.close(); System.out.println("====> 消息发布到MQ完成"); } }非事务下的消费者如何手动签收
public class JmsConsumer { public static final String ACTIVEMQ_URL = "tcp://mpolaris.top:61616"; public static final String QUEUE_NAME = "queue_01"; public static void main(String[] args) throws JMSException { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); Connection conn = factory.createConnection(); conn.start(); //这里改为Session.CLIENT_ACKNOWLEDGE Session session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE); Queue queue = session.createQueue(QUEUE_NAME); MessageConsumer consumer = session.createConsumer(queue); while (true) { TextMessage message = (TextMessage) consumer.receive(4000L); if (null != message) { System.out.println("====> 消费者的消息:" + message.getText()); //设置为Session.CLIENT_ACKNOWLEDGE后,要调用该方法,标志着该消息已被签收(消费)。 //如果不调用该方法,该消息的标志还是未消费,下次启动消费者或其他消费者还会收到改消息。 message.acknowledge(); } else { break; } } consumer.close(); session.close(); conn.close(); } }注意:JMS保证可靠有四种方式,除了上面讲到的持久性,事务,签收,还可以通过多节点集群的方式来保证可靠性。
3.6 JMS的点对点总结