RabbitMQ集群跨网段消息迁移

将阿里云同一个VPC下的RabbitMQ集群的消息从一个网段集群迁移到另一个网段集群。消息中间件的消息是即时消费,为何还有历史消息,因为是历史遗留问题。故要迁移

整个网络拓扑图如下

注意:

若对于跨VPC网络

1. 确保各主机网络互通

2. 配置好各主机名

两边安全组出方向开发:15672、25672、5672、4369端口

否在在加入集群会出现问题

RabbitMQ集群跨网段消息迁移

资源清单

主机名

 

IP地址

 

角色

 

备注

 

node171

 

172.20.0.171

 

老的MQ集群_1

     

node172

 

172.20.0.172

 

老的MQ集群_2

     

node173

 

192.168.0.173

 

MQ集群_1

     

node174

 

192.168.0.174

 

新的MQ集群_2

     

基础软件及环境信息

操作系统:CentOS Linux release 7.3.1611

Erlang:Erlang/OTP 20 [erts-9.3.3.3]

RabbitMQ:rabbitmq_server-3.7.8

集群的部署

node171、node172组成集群A

node173、node174组成集群B

这里的环境部署略

创建测试账户

在【node171上进行操作】

rabbitmqctl add_user root root123

rabbitmqctl add_vhost kcvhost

rabbitmqctl set_permissions -p kcvhost root  ".*" ".*" ".*"

rabbitmqctl add_user admin admin123

rabbitmqctl set_permissions -p kcvhost admin  ".*" ".*" ".*"

rabbitmqctl set_user_tags admin administrator

rabbitmq-plugins enable rabbitmq_management

rabbitmqctl stop_app

rabbitmqctl start_app

生成测试数据

消息生产者代码:

package com.zjkj.rabbitmq.demo;
 
import Java.io.IOException;
import java.util.concurrent.TimeoutException;
 
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
 
/**
 * 消息的生产者
 * @author zjkj
 *
 */
public class Rabbitmq_Producer {

private static final String EXCHANGE_NAME = "exchange_test_3";
private static final String ROUTING_KEY = "routingkey_demo";
private static final String QUEUE_NAME = "queue_test_3";
private static final String IP_ADDRESS = "172.20.0.171";
private static final int PORT = 5672; //RabbitMQ服务默认端口号为5672


public static void main(String[] args) throws IOException,TimeoutException,InterruptedException{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(IP_ADDRESS);
factory.setPort(PORT);
factory.setUsername("root");
factory.setPassword("root123");
Connection connection = factory.newConnection(); //创建连接
Channel channel = connection.createChannel(); // 创建信道
 //创建一个type="direct"、持久化、非自动删除的交换器
channel.exchangeDeclare(EXCHANGE_NAME, "direct",true, false, null);
// 创建一个持久化、非排他的、非自动删除的队列
channel.queueDeclare(QUEUE_NAME, true, false, false,null);
// 将交换器与队列通过路由键绑定
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
// 发送一条持久化的消息:hello world!
for(int i=1;i<=100000;i++){
String msg = "交换器_1与队列1绑定:Message_"+i;
channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY,MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
}
// 关闭资源
channel.close();
connection.close();
 
}
 
}


消费者代码

package com.zjkj.rabbitmq.demo;
 
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
 
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Address;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
 
/**
 * 消息的消费者
 * @author zjkj
 *
 */
public class Rabbitmq_Consumer {
 
private static final String QUEUE_NAME = "queue_test_3";
private static final String IP_ADDRESS = "192.168.6.171";
private static final  int PORT = 5672;

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/20365ead785bd60d6f932dda8aecc2b8.html