消息生产者
for (int i = 0; i < 3; i++) { // 发送TextMessage消息体 TextMessage textMessage = session.createTextMessage("topic " + i); producer.send(textMessage); // 发送MapMessage 消息体。set方法添加,get方式获取 MapMessage mapMessage = session.createMapMessage(); mapMessage.setString("name","rose" + i); mapMessage.setInt("age", 18 + i); producer.send(mapMessage); }消息消费者
public void onMessage(Message message) { if(null != message && message instanceof TextMessage) { TextMessage textMessage = (TextMessage) message; try { System.out.println("====> 消费者接受到text消息:" + textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } if(null != message && message instanceof MapMessage) { MapMessage mapMessage = (MapMessage) message; try { System.out.println("====> 消费者接受到map消息:" + mapMessage.getString("name")); System.out.println("====> 消费者接受到map消息:" + mapMessage.getString("age")); } catch (JMSException e) { e.printStackTrace(); } } }Message - 消息属性
如果需要除消息头字段之外的值那么可以使用消息属性。它是 识别 / 去重 / 重点标注 等操作非常有用的方法。
它们是以属性名和属性值对的形式制定的。可以将属性是为消息头得扩展,属性指定一些消息头没有包括的附加信息,比如可以在属性里指定消息选择器。消息的属性就像可以分配给一条消息的附加消息头一样。它们允许开发者添加有关消息的不透明附加信息。它们还用于暴露消息选择器在消息过滤时使用的数据。
下图是设置消息属性的API:
生产者
for (int i = 0; i < 3; i++) { TextMessage textMessage = session.createTextMessage("topic " + i); // 调用Message的set*Property()方法就能设置消息属性 // 根据value的数据类型的不同,有相应的API textMessage.setStringProperty("From","rose@qq.com"); textMessage.setByteProperty("Spec", (byte) 1); textMessage.setBooleanProperty("Invalide",true); producer.send(textMessage); }消费者
public void onMessage(Message message) { if(null != message && message instanceof TextMessage) { TextMessage textMessage = (TextMessage) message; try { System.out.println("消息体:" + textMessage.getText()); System.out.println("消息属性:" + textMessage.getStringProperty("From")); System.out.println("消息属性:" + textMessage.getByteProperty("Spec")); System.out.println("消息属性:" + textMessage.getBooleanProperty("Invalide")); } catch (JMSException e) { e.printStackTrace(); } } } 3.5 JMS的可靠性RERSISTENT - 持久性
什么是持久化消息 => 保证消息只被传送一次和成功使用一次。在持久性消息传送至目标时,消息服务将其放入持久性数据存储。如果消息服务由于某种原因导致失败,它可以恢复此消息并将此消息传送至相应的消费者,虽然这样增加了消息传送的开销但却增加了可靠性。
我的理解:在消息生产者将消息成功发送给MQ消息中间件之后。无论是出现任何问题如:MQ服务器宕机、消费者掉线等。都保证(topic要之前注册过,queue不用)消息消费者能够成功消费消息。如果消息生产者发送消息就失败了,那么消费者也不会消费到该消息。
Queue消息非持久和持久
Queue非持久,当服务器宕机消息不存在(消息丢失了)。