protected String username = ActiveMQConnection.DEFAULT_USER;
protected String password = ActiveMQConnection.DEFAULT_PASSWORD;
//protected String brokerURL = "tcp://127.0.0.1:61616";
protected String brokerURL = "tcp://120.24.85.167:61616";
protected static transient ConnectionFactory factory;
protected transient Connection connection;
public static void main(String[] args) {
try {
new Publish().sendObjectMessage(new User("wzh","q123456"));
new Publish().sendMapMessage();
new Publish().sendTextMessage("海,你好");
} catch (Exception e) {
e.printStackTrace();
}
}
public Publish() {
try {
factory = new ActiveMQConnectionFactory(username, password,
brokerURL);
connection = factory.createConnection();
connection.start();
} catch (JMSException jmse) {
close();
}
}
public Publish(String username, String password, String brokerURL)
throws JMSException {
this.username = username;
this.password = password;
this.brokerURL = brokerURL;
factory = new ActiveMQConnectionFactory(username, password, brokerURL);
connection = factory.createConnection();
try {
connection.start();
} catch (JMSException jmse) {
connection.close();
throw jmse;
}
}
public void close() {
try {
if (connection != null) {
connection.close();
}
} catch (JMSException e) {
e.printStackTrace();
}
}
protected void sendObjectMessage(Serializable serializable) throws JMSException {
Session session = null;
try {
session = connection.createSession(Boolean.TRUE,
Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("MessageTopic");
MessageProducer producer = session.createProducer(topic);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
Message message = session.createObjectMessage(serializable);
producer.send(message);
session.commit();
} catch (JMSException e) {
try {
session.rollback() ;
} catch (JMSException e1) {
e1.printStackTrace();
}
throw e ;
} finally {
close();
}
}
protected void sendTextMessage(String text) throws JMSException {
Session session = null;
try {
session = connection.createSession(Boolean.TRUE,
Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("MessageTopic");
MessageProducer producer = session.createProducer(topic);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
Message message = session.createTextMessage(text);
producer.send(message);
session.commit();
} catch (JMSException e) {
try {
session.rollback() ;
} catch (JMSException e1) {
e1.printStackTrace();
}
throw e ;
} finally {
close();
}
}
protected void sendMapMessage() throws JMSException {
Session session = null;
try {
session = connection.createSession(Boolean.TRUE,
Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("MessageTopic");
MessageProducer producer = session.createProducer(topic);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
MapMessage message = session.createMapMessage();
message.setString("stock", "string");
message.setDouble("price", 11.14);
producer.send(message);
session.commit();
} catch (JMSException e) {
try {
session.rollback() ;
} catch (JMSException e1) {
e1.printStackTrace();
}
throw e ;
} finally {
close();
}
}
}
消息订阅者:
package com.wzh.activemq;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Subscriber {