ActiceMQ详解 (4)

主题消息生产者

public class JmsProduce { public static final String ACTIVEMQ_URL = "tcp://mpolaris.top:61616"; public static final String TOPIC_NAME = "topic_01"; public static void main(String[] args) throws JMSException { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); Connection conn = factory.createConnection(); conn.start(); Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); //只有这一步和Queue有区别 Topic topic = session.createTopic(TOPIC_NAME); MessageProducer producer = session.createProducer(topic); for (int i = 0; i < 3; i++) { TextMessage textMessage = session.createTextMessage("msg -> " + i); producer.send(textMessage); } producer.close(); session.close(); conn.close(); System.out.println("====> 消息发布到MQ完成"); } }

主题消息消费者

存在多个消费者,每个消费者都能收到自从自己启动后所有生产的消息。

public class JmsConsumer { public static final String ACTIVEMQ_URL = "tcp://mpolaris.top:61616"; public static final String TOPIC_NAME = "topic_01"; public static void main(String[] args) throws Exception { System.out.println("=====> 1号消费者");//多加几个消费者做实验 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); Connection conn = factory.createConnection(); conn.start(); Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); //只有这一步和Queue有区别 Topic topic = session.createTopic(TOPIC_NAME); MessageConsumer consumer = session.createConsumer(topic); consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { if(null != message && message instanceof TextMessage) { TextMessage textMessage = (TextMessage) message; try { System.out.println("====> 消费者接受到消息:" + textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } } }); System.in.read(); consumer.close(); session.close(); conn.close(); } }

image-20210115214856154

2.5 Topic和Queue对比

image-20210115215225937

3. JMS (Java消息服务) 详解 3.1 Java消息服务是什么

Java消息服务指的是两个应用程序之间进行异步通信的API,它为标准协议和消息服务提供了一组通用接口,包括创建、发送、读取消息等,用于支持Java应用程序开发。在JavaEE中当两个应用程序使用JMS进行通信时,它们之间不是直接相连的,而是通过一个共同的消息收发服务组件关联起来以达到解耦/异步削峰的效果。

image-20210115221749323

3.2 JMS四大组成元素

image-20210115221836939

Message - 消息头

JMS的消息头有哪些属性:

JMSDestination:消息目的地。主要是指Queue和Topic。

JMSDeliveryMode:消息持久化模式。分为持久模式和非持久模式,一条持久性的消息应该被传送“一次仅仅一次”,这就意味着如果JMS提供者出现故障,该消息并不会丢失,它会在服务器恢复之后再次传递。一条非持久的消息最多会传递一次,这意味着服务器出现故障,该消息将会永远丢失。

JMSExpiration:消息过期时间。可以设置消息在一定时间后过期,默认是永不过期消息过期时间,等于Destination的send方法中的timeToLive值加上发送时刻的GMT时间值。如果timeToLive值等于0,则JMSExpiration被设为0,表示该消息永不过期。如果发送后在消息过期时间之后还没有被发送到目的地,则该消息被清除。

JMSPriority:消息的优先级。消息优先级从0-9十个级别,0-4是普通消息,5-9是加急消息。 JMS不要求MQ严格按照这十个优先级发送消息但必须保证加急消息要先于普通消息到达。默认是4级。

JMSMessageID:消息的唯一标识符。唯一标识每个消息的标识由MQ产生,也可以自己指定但是每个消息的标识要求唯一。

说明:消息的生产者可以set这些属性,消息的消费者可以get这些属性。这些属性在send方法里面也可以设置。

public class JmsProduce { public static final String ACTIVEMQ_URL = "tcp://mpolaris.top:61616"; public static final String TOPIC_NAME = "topic_01"; public static void main(String[] args) throws JMSException { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); Connection conn = factory.createConnection(); conn.start(); Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic(TOPIC_NAME); MessageProducer producer = session.createProducer(topic); for (int i = 0; i < 3; i++) { TextMessage textMessage = session.createTextMessage("msg -> " + i); //这里可以指定每个消息的目的地 textMessage.setJMSDestination(topic); //消息的模式,持久模式/非持久模式 textMessage.setJMSDeliveryMode(0); //消息的过期时间 textMessage.setJMSExpiration(1000); //消息的优先级 textMessage.setJMSPriority(10); //指定每个消息的标识。MQ会给我们默认生成一个,我们也可以自己指定。 textMessage.setJMSMessageID("ABCD"); //上面的属性也可以通过send重载方法进行设置 producer.send(textMessage); } producer.close(); session.close(); conn.close(); System.out.println("====> 消息发布到MQ完成"); } }

Message - 消息体

理解:封装具体的消息数据

五种消息格式

image-20210115232313263

注意:发送和接收的消息体类型必须一致对应

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zyyysx.html