Spring下ActiveMQ实战

MessageQueue是分布式的系统里经常要用到的组件,一般来说,当需要把消息跨网段、跨集群的分发出去,就可以用这个。一些典型的示例就是:

1、集群A中的消息需要发送给多个机器共享;

2、集群A中消息需要主动推送,但彼此的网络不是互通的(如集群A只有过HA才能被外界访问);

Spring下ActiveMQ实战

当然上面的几个点,除了用MQ还有其它实现方式,但是MQ无疑是非常适合用来做这些事的。众多MQ中,ActiveMQ是比较有名气也很稳定的,它发送消息的成本非常廉价,支持Queue与Topic两种消息机制。本文主要就是讲如何在Spring环境下配置此MQ:

1、场景假设

现有机器两台Server、Worker需要进行异步通信,另有一台ActiveMQ机器,关于MQ的配置信息存放在Zookeeper中,Zookeeper的节点有:

- /mq/activemq/ip:mq的机器ip

-/mq/activemq/port:这是mq的机器端口

2、Server的Spring XML配置

Server主要的工作就是接受Worker消息,并发送消息给Worker。主要是定义了连接MQ的连接池,接受Worker消息的队列worker,发送消息给Worker的队列server:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:jms="http://www.springframework.org/schema/jms" xmlns:p="http://www.springframework.org/schema/p" xsi:schemaLocation="
        http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
        http://www.springframework.org/schema/jms/spring-jms-4.1.xsd">

<!-- ActiveMQ连接池 -->
    <bean>
        <property>
            <bean>
                <property>
                    <bean factory-method="getUrl" />
                </property>
                <property value="60000" />
                <!-- <property value="admin" /> -->
                <!-- <property value="admin" /> -->
                <!-- <property value="true" /> -->
                <property value="10000" />
            </bean>
        </property>
    </bean>


    <!-- Worker任务消息 -->
    <bean>
        <constructor-arg value="worker_topic" />
    </bean>
    <!-- 任务监听容器 -->
    <bean>
        <property ref="conFactory" />
        <property ref="taskWorkerTopic" />
        <property>
            <bean />
        </property>
        <property value="true" />
    </bean>


    <!-- Server任务消息 -->
    <bean>
        <constructor-arg value="server_topic" />
    </bean>   
    <!-- 任务消息发送模板 -->
    <bean p:connectionFactory-ref="conFactory" p:defaultDestination-ref="taskServerTopic" />

</beans>

一段一段地分析,ActiveMQ连接池这里,定义了连接的bean为“conFactory”,其中broberURL属性是通过后台Java代码的静态方法来设置的,方便线上环境通过Java代码动态地切换,稍后会介绍这块代码,你现在需要知道的是,它实际上返回的就是一个字符串,格式像:tcp://xxx.xxx.xxx.xxx:port,如果不要用后台来管理连接信息,直接改成“<property value="tcp://xxx.xxx.xxx.xxx:port">”也是OK的。

接下来,便是Worker消息队列的定义,这里定义为“taskWorkerTopic”,类型是org.apache.activemq.command.ActiveMQTopic,(订阅模式)它表示一个消息可以被多个机器收到并处理,其它的还有org.apache.activemq.command.ActiveMQQueue,(点对点模式)表示一个消息只能被一台机器收到,当收到后消息就出队列了,其它机器无法处理。它们都有一个构造参数constructor-arg,指定了消息队列的名称,一个MQ中一个消息队列的名字是唯一的。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/b5e0f7a8c63abe8220b01c1e0b04ca01.html