ActiceMQ详解 (3)

队列消息消费者 - 同步阻塞式 receive

public class JmsConsumer { public static final String ACTIVEMQ_URL = "tcp://mpolaris.top:61616"; public static final String QUEUE_NAME = "queue_01"; public static void main(String[] args) throws JMSException { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); Connection conn = factory.createConnection(); conn.start(); Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue(QUEUE_NAME); // 创建消息的消费者 MessageConsumer consumer = session.createConsumer(queue); while (true) { // reveive():一直等待接收消息,在能够接收到消息之前将一直阻塞。 是同步阻塞方式,和socket的accept方法类似的。 // reveive(Long time):等待n毫秒之后还没有收到消息就结束阻塞。 // 因为消息发送者是 TextMessage,所以消息接受者也要是TextMessage TextMessage message = (TextMessage) consumer.receive(4000L); if (null != message) { System.out.println("====> 消费者的消息:" + message.getText()); } else { break; } } consumer.close(); session.close(); conn.close(); } }

队列消息消费者 - 异步非阻塞监听式 MessageListener

public class JmsConsumer { public static final String ACTIVEMQ_URL = "tcp://mpolaris.top:61616"; public static final String QUEUE_NAME = "queue_01"; public static void main(String[] args) throws Exception { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); Connection conn = factory.createConnection(); conn.start(); Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue(QUEUE_NAME); MessageConsumer consumer = session.createConsumer(queue); // 监听器 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { if(null != message && message instanceof TextMessage) { TextMessage textMessage = (TextMessage) message; try { System.out.println("====> 消费者接受到消息:" + textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } } }); System.in.read(); //保证控制台不停 consumer.close(); session.close(); conn.close(); } }

消费者三种情况

先生产,只启动一个消费者 ① => ①消费者会消费掉全部消息

先生产,然后先启动消费者①,再启动消费者② => ①消费者会消费掉全部消息,②消费者不能消费消息

先启动消费者①和②,再生产 => ①和②轮询消费,各自消费一半消息

2.4 主题消息(Topic)

主题消息特点

在发布订阅消息传递域中,目的地被称为主题(topic)。

发布/订阅消息传递域的特点如下:

生产者将消息发布到topic中,每个消息可以有多个消费者,属于1:N的关系。

生产者和消费者之间有时间上的相关性。订阅某一个主题的消费者只能消费 自它订阅之后发布的消息。

生产者生产时,topic 不保存消息,它是 无状态的 不落地的,假如无人订阅就去生产那就是一条废消息,所以一般先启动消费者再启动生产者。

默认情况下如上所述,但是JMS规范允许客户创建持久订阅,这在一定程度上放松了时间上的相关性要求。持久订阅允许消费者消费它在未处于激活状态时发送的消息。一句话,好比我们的微信公众号订阅。

image-20210115212649958

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zyyysx.html