Worker的消息队列定义好了之后,就是接受Worker的里消息了,这里定义了“taskWorkerContainer”,其属性分别定义了连接池、目标队列、消息处理器(我们自己的Java类,后面再讲),参数pubSubDomain用于指定是使用订阅模式还是使用点对点模式,如果是ActiveMQTopic则要设置为true,默认是false。
好了,Server现在已经可以通过自己定义的“lekko.mq.task.TaskWorkerListener”类接受并处理taskWorkerTopic的消息了。
如法炮制,定义一个专门用于往Worker里发消息的队列“taskServerTopic”,并定义发送消息的模板“taskServerTemplate”备用。
3、Server端的接收类与发送类
lekko.mq.task.TaskWorkerListener便是一个接收类示例:
package lekko.mq.task;
import javax.jms.Message;
import javax.jms.MessageListener;
import org.apache.activemq.command.ActiveMQObjectMessage;
import org.apache.log4j.Logger;
import org.springframework.stereotype.Service;
import lekko.mq.model.MessageModel;
/**
* Task消息监听类
* @author lekko
*/
@Service
public class TaskWorkerListener implements MessageListener {
private Logger _logger = Logger.getLogger(TaskWorkerListener.class);
@Override
public void onMessage(Message message) {
if (message instanceof ActiveMQObjectMessage) {
ActiveMQObjectMessage aMsg = (ActiveMQObjectMessage) message;
try {
onMessage((MessageModel) aMsg.getObject());
} catch (Exception e) {
_logger.warn("Message:${} is not a instance of MessageModel.", e);
}
} else {
_logger.warn("Message:${} is not a instance of ActiveMQObjectMessage.");
}
}
/**
* 处理消息
* @param message 自定义消息实体
*/
public void onMessage(MessageModel message) { ... }
}
这里给大家演示的并不是最基础的知识,处理的消息是一个自定义的类“lekko.mq.model.MessageModel”,这个类怎么写可以随便整,反正就是一些你要传递的数据字段,但是记得要实现Serializable接口。如果你需要传递的仅仅是纯字符串,那么直接在代码的23行片,把message.toString()即可。这个类通过前面XML配置会处理来自“worker_topic”队列中的消息。
再就是发送类,实际上就是把前面的taskServiceTemplate拿来用就行了:
package lekko.mq.task;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Service;
import lekko.mq.model.MessageModel;
/**
* 服务器任务消息分发
* @author lekko
*/
@Service
public class TaskServerSender {
@Autowired
@Qualifier("taskServerTemplate")
private JmsTemplate jmsTemplate;
/**
* 发送消息
*/
public void sendMessage(MessageModel msg) {
jmsTemplate.convertAndSend(msg);
}
}
把这个类TaskServerSender注入到任意需要用到的地方,调用sendMessage方法即可。它会往前面定义的“server_topic”中塞消息,等Worker来取。
4、关于Zookeeper配置MQ连接信息
Worker端的配置我这里不再阐述,因为它跟在Server端的配置太相像,区别就在于Server端是从worker_topic中取消息,往server_topic中写消息;而Worker端的代码则是反过来,往worker_topic中写消息,从server_topic中取消息。
那么如何使用Java代码来控制ActiveMQ的配置消息呢:
package lekko.mq.util;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
/**
* 获取MQ配置
* @author lekkoli
*/
public class MQPropertiesFactory {
private static boolean isLoaded = false;
private static String ZOOKEEPER_CLUST = "xxx.xxx.xxx.xxx:2181";
private static ZooKeeper _zk;
private static String _ip;
private static String _port;
private static String getProperty(String path) throws Exception {
if (_zk == null) {
if (ZOOKEEPER_CLUST == null) {
throw new Exception("Zookeeper, Host \"" + ZOOKEEPER_CLUST + "\" is null!");
}
_zk = new ZooKeeper(ZOOKEEPER_CLUST, 90000, null);
}
Stat s = _zk.exists(path, false);
if (s != null)
return new String(_zk.getData(path, false, s));
throw new Exception("Zookeeper, Path \"" + path + "\" is not exist!");
}