后端多环境治理的实践(二) (2)

然后手动注册消费者

@Slf4j @Configuration public class RabbitmqListerConfig implements InitializingBean{ @Autowired private List<RabbitmqLister> rabbitmqListerList; @Autowired private ConnectionFactory connectionFactory; @Autowired private Environment environment; @Autowired private ApplicationContext applicationContext; @Override public void afterPropertiesSet() throws Exception { rabbitmqListerList.forEach(r->{ try { String queueName = r.getQueueName(); Connection connection = connectionFactory.createConnection(); Channel channel = connection.createChannel(false); RabbitAdmin rabbitAdmin = applicationContext.getBean(RabbitAdmin.class); String version = environment.getProperty(Constont.VERSION); if (!StringUtils.equals(version,Constont.DEFAULT_VERSION)){ queueName = queueName +"-"+version; Queue queue = new Queue(queueName, true, false, true); rabbitAdmin.declareQueue(queue); } Consumer consumerB = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { try { Map<String, Object> headers = properties.getHeaders(); String msgVersion = headers.get(Constont.VERSION)==null?Constont.DEFAULT_VERSION:headers.get(Constont.VERSION).toString(); String version = environment.getProperty(Constont.VERSION); ObjectMapper objectMapper = new ObjectMapper(); String o = objectMapper.readValue(body, String.class); if(StringUtils.equals(version,msgVersion)){ r.handler(o,properties); channel.basicAck(envelope.getDeliveryTag(),false); return; } String queueName = Constont.MY_QUEUE + "-" + msgVersion; QueueInformation queueInfo = rabbitAdmin.getQueueInfo(queueName); if (queueInfo == null || queueInfo.getConsumerCount() ==0) { r.handler(o,properties); channel.basicAck(envelope.getDeliveryTag(),false); return; } channel.basicPublish("",queueName,properties,body); channel.basicAck(envelope.getDeliveryTag(),false); }catch (Exception e){ log.error("",e); } } }; channel.basicConsume(queueName,consumerB); } catch (IOException e) { log.error("",e); } }); } }

4、消费代理者执行如下逻辑:“如果消息的版本与本地版本相同,则本地消费,如果消息的版本与本地版本不相同,则判断是否存在对应版本的消费者,如果存在则转发给对应版本的消费者,如果不存在则本地消费”。

通过判断是有个featrue-queue来判断是否有fetrue-cosumer,如果有则把消费投递到featrue-queue。

图片

具体代码实现细节在3里面。

image.gif

四、验证

在order服务,我们编程一个接口用来发送消息。

@GetMapping("/sendMq")

1、打包

mvn clean install -DskipTests

2、使用docker启动rabbitmq

docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:management

3、启动eureka、gateway和3个版本的order服务

nohup java -jar -Dserver.port=8761 eureka/target/eureka-0.0.1-SNAPSHOT.jar >null 2>&1 & nohup java -jar -Dserver.port=5000 gateway/target/gateway-0.0.1-SNAPSHOT.jar >null 2>&1 & nohup java -jar -Dserver.port=8001 order/target/order-0.0.1-SNAPSHOT.jar >null 2>&1 & nohup java -jar -Dversion=v1 -Dserver.port=8002 order/target/order-0.0.1-SNAPSHOT.jar >null 2>&1 & nohup java -jar -Dversion=v2 -Dserver.port=8003 order/target/order-0.0.1-SNAPSHOT.jar >null 2>&1 &

4、开另外两个终端,去启动user服务,这里直接启动用来看日志打印。

java -jar -Dserver.port=9001 user/target/user-0.0.1-SNAPSHOT.jar java -jar -Dversion=v1 -Dserver.port=9002 user/target/user-0.0.1-SNAPSHOT.jar

5、发送请求验证

curl --location --request GET 'localhost:5000/order/sendMq?msg=djb'

然后查看default的user服务

curl --location --request GET 'localhost:5000/order/sendMq?msg=djbv1' \

然后查看v1的user服务

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zyxdwj.html