JMS之ActiveMQ Linux下安装与应用实例(4)

protected String username = ActiveMQConnection.DEFAULT_USER;
 protected String password = ActiveMQConnection.DEFAULT_PASSWORD;
 //protected String brokerURL = "tcp://127.0.0.1:61616";
 protected String brokerURL = "tcp://120.24.85.167:61616";

protected static transient ConnectionFactory factory;
 protected transient Connection connection;

public static void main(String[] args) {
  try {
   new Publish().sendObjectMessage(new User("wzh","q123456"));
   new Publish().sendMapMessage();
   new Publish().sendTextMessage("海,你好");
  } catch (Exception e) {
   e.printStackTrace();
  }
 }

public Publish() {

try {
   factory = new ActiveMQConnectionFactory(username, password,
     brokerURL);
   connection = factory.createConnection();
   connection.start();
  } catch (JMSException jmse) {
   close();
  }
 }

public Publish(String username, String password, String brokerURL)
   throws JMSException {
  this.username = username;
  this.password = password;
  this.brokerURL = brokerURL;

factory = new ActiveMQConnectionFactory(username, password, brokerURL);
  connection = factory.createConnection();
  try {
   connection.start();
  } catch (JMSException jmse) {
   connection.close();
   throw jmse;
  }
 }

public void close() {
  try {
   if (connection != null) {
    connection.close();
   }
  } catch (JMSException e) {
   e.printStackTrace();
  }
 }

protected void sendObjectMessage(Serializable serializable) throws JMSException {
  Session session = null;
  try {

session = connection.createSession(Boolean.TRUE,
     Session.AUTO_ACKNOWLEDGE);
   Topic topic = session.createTopic("MessageTopic");
   MessageProducer producer = session.createProducer(topic);
   producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

Message message = session.createObjectMessage(serializable);

producer.send(message);

session.commit();

} catch (JMSException e) {
   try {
    session.rollback() ;
   } catch (JMSException e1) {
    e1.printStackTrace();
   }
   throw e ;
  } finally {
   close();
  }

}


 protected void sendTextMessage(String text) throws JMSException {
  Session session = null;
  try {

session = connection.createSession(Boolean.TRUE,
     Session.AUTO_ACKNOWLEDGE);
   Topic topic = session.createTopic("MessageTopic");
   MessageProducer producer = session.createProducer(topic);
   producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

Message message = session.createTextMessage(text);

producer.send(message);
   session.commit();

} catch (JMSException e) {
   try {
    session.rollback() ;
   } catch (JMSException e1) {
    e1.printStackTrace();
   }
   throw e ;
  } finally {
   close();
  }

}
 
 protected void sendMapMessage() throws JMSException {
  Session session = null;
  try {

session = connection.createSession(Boolean.TRUE,
     Session.AUTO_ACKNOWLEDGE);
   Topic topic = session.createTopic("MessageTopic");
   MessageProducer producer = session.createProducer(topic);
   producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

MapMessage message = session.createMapMessage();
   message.setString("stock", "string");
   message.setDouble("price", 11.14);
   producer.send(message);

session.commit();

} catch (JMSException e) {
   try {
    session.rollback() ;
   } catch (JMSException e1) {
    e1.printStackTrace();
   }
   throw e ;
  } finally {
   close();
  }

}
}

消息订阅者:

package com.wzh.activemq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Subscriber {

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/16021.html