JMS之ActiveMQ Linux下安装与应用实例(5)

protected String username = ActiveMQConnection.DEFAULT_USER;
 protected String password = ActiveMQConnection.DEFAULT_PASSWORD;
 //protected String brokerURL = "tcp://127.0.0.1:61616";
 protected String brokerURL = "tcp://120.24.85.167:61616";

protected static transient ConnectionFactory factory;
 protected transient Connection connection;

public static void main(String[] args) {
  Subscriber consumer = new Subscriber();
  consumer.receiveMessage();
 }

public Subscriber() {

try {
   factory = new ActiveMQConnectionFactory(username, password,
     brokerURL);
   connection = factory.createConnection();
   connection.start();
  } catch (JMSException jmse) {
   close();
  }
 }

public Subscriber(String username, String password, String brokerURL)
   throws JMSException {
  this.username = username;
  this.password = password;
  this.brokerURL = brokerURL;

factory = new ActiveMQConnectionFactory(username, password, brokerURL);
  connection = factory.createConnection();
  try {
   connection.start();
  } catch (JMSException jmse) {
   connection.close();
   throw jmse;
  }
 }

public void close() {
  try {
   if (connection != null) {
    connection.close();
   }
  } catch (JMSException e) {
   e.printStackTrace();
  }
 }

protected void receiveMessage() {
  Session session = null;
  try {

session = connection.createSession(Boolean.FALSE,
     Session.AUTO_ACKNOWLEDGE);
   Topic topic = session.createTopic("MessageTopic");
   MessageConsumer consumer = session.createConsumer(topic);
   
   consumer.setMessageListener(new MessageListener() {
   
    @Override
    public void onMessage(Message message) {

if (message instanceof ObjectMessage) {
      System.out.println("deal ObjectMessage....");
      dealObjectMessage((ObjectMessage) message);
     } else if (message instanceof MapMessage) {
      System.out.println("deal MapMessage....");
      dealMapMessage((MapMessage) message);
     } else if (message instanceof TextMessage) {
      System.out.println("deal TextMessage....");
      dealTextMessage((TextMessage) message);
     }
     
    }
   }) ;

} catch (Exception e) {
   e.printStackTrace();
  } finally {
   /*if (session != null) {
    try {
     session.commit();
    } catch (JMSException e) {
     e.printStackTrace();
    }
   }*/

}

}

/**
  *
  * 处理 TextMessage消息
  *
  * @throws JMSException
  */
 private void dealTextMessage(TextMessage message) {
  try {
   String text = message.getText();
   System.out.println("text = " + text);
  } catch (JMSException e) {
   e.printStackTrace();
  }

}

/**
  *
  * 处理 MapMessage消息
  *
  * @throws JMSException
  */
 private void dealMapMessage(MapMessage message){
  try {
   String stack = message.getString("stock");
   Double price = message.getDouble("price");
   System.out.println("stock = " + stack + " , price =" + price);
  } catch (JMSException e) {
   e.printStackTrace();
  }
 }

/**
  * 处理ObjectMessage消息
  */
 private void dealObjectMessage(ObjectMessage message){

try {
   User user = (User) message.getObject();
   System.out.println(user.toString());
  } catch (JMSException e) {
   e.printStackTrace();
  }

}

}

推荐阅读:

Linux系统下ActiveMQ 安装

Ubuntu下的ACTIVEMQ服务器

CentOS 6.5启动ActiveMQ报错解决

Spring+JMS+ActiveMQ+Tomcat实现消息服务

Linux环境下面ActiveMQ端口号设置和WEB端口号设置

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/16021.html