一般在稍微大一点的项目中,需要配置多个数据库数据源,最简单的方式是用 Spring 来实现,只需要继承 AbstractRoutingDataSource 类,实现 determineCurrentLookupKey 方法,再配合使用 ThreadLocal 就可以实现。
但是如何实现 MQ 的多数据源呢?假设有部署在不同服务器上的两个消息队列,或者是同一服务器,不同 vhost 的消息队列,在一个项目中,我如何自由地选择从哪个队列收发消息呢?下面说说用 Spring AMQP + Rabbit 的实现过程及踩过的坑。
最开始的单数据源的实现很简单,网上有好多博文可以参考,官网也有介绍。主要就是创建一个 xml 的配置文件,添加各种必要的配置,声明 connection-factory、rabbitListenerContainerFactory、rabbitTemplate、queue、exchange、binding 等等。然后用 RabbitTemplate 来发消息,用 @RabbitListener 注解来监听,用 queue 指定队列来收消息,这里就不赘述了。主要说一下,在现有的基础上实现多数据源的收发。
先说配置方面,为了对比,下面先给出单数据源配置:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans "> <rabbit:connection-factory id="rabbitConnectionFactory" host="${rabbit.host}" port="${rabbit.port}" username="${rabbit.username}" password="${rabbit.password}" requested-heartbeat="30" virtual-host="${rabbit.vhost}" channel-cache-size="50"/> <bean id="rabbitListenerContainerFactory" class="org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory"> <property name="connectionFactory" ref="rabbitConnectionFactory"/> <property name="concurrentConsumers" value="16"/> <property name="maxConcurrentConsumers" value="50"/> </bean> <rabbit:admin id="rabbitAdmin" connection-factory="rabbitConnectionFactory"/> <!-- queue declare --> <rabbit:queue durable="true" auto-delete="false" exclusive="false" name="queue.test"/> <!-- bind queue to exchange --> <rabbit:direct-exchange name="exchange" auto-delete="false" durable="true"> <rabbit:bindings> <rabbit:binding queue="queue.test" key="rkey.test"></rabbit:binding> </rabbit:bindings> </rabbit:direct-exchange> <rabbit:template id="rabbitTemplate" connection-factory="rabbitConnectionFactory" retry-template="retryTemplate" reply-timeout="60000"/> <bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate"> <property name="backOffPolicy"> <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy"> <property name="initialInterval" value="500"/> <property name="multiplier" value="10.0"/> <property name="maxInterval" value="10000"/> </bean> </property> </bean> </beans>