[root@JD redis_install]# python3 create_redis_cluster.py
############################ execute reshard #########################################
redis-cli -a ****** --cluster reshard 127.0.0.1:10001 --cluster-from 3645e00a8ec3a902bd6effb4fc20c56a00f2c982 --cluster-to 6164025849a8ff9297664fc835bc851af5004f61 --cluster-slots 1365 --cluster-yes --cluster-timeout 50000 --cluster-pipeline 10000 --cluster-replace >/dev/null 2>&1
############################ execute reshard #########################################
redis-cli -a ****** --cluster reshard 127.0.0.1:10002 --cluster-from 3645e00a8ec3a902bd6effb4fc20c56a00f2c982 --cluster-to 64e634307bdc339b503574f5a77f1b156c021358 --cluster-slots 1365 --cluster-yes --cluster-timeout 50000 --cluster-pipeline 10000 --cluster-replace >/dev/null 2>&1
############################ execute reshard #########################################
redis-cli -a ****** --cluster reshard 127.0.0.1:10003 --cluster-from 3645e00a8ec3a902bd6effb4fc20c56a00f2c982 --cluster-to 8b75325c59a7242344d0ebe5ee1e0068c66ffa2a --cluster-slots 1365 --cluster-yes --cluster-timeout 50000 --cluster-pipeline 10000 --cluster-replace >/dev/null 2>&1
{'host': '127.0.0.1', 'port': 10001, 'password': '******'}--->cluster forget 3645e00a8ec3a902bd6effb4fc20c56a00f2c982
{'host': '127.0.0.1', 'port': 10001, 'password': '******'}--->cluster forget 4854375c501c3dbfb4e2d94d50e62a47520c4f12
{'host': '127.0.0.1', 'port': 10002, 'password': '******'}--->cluster forget 3645e00a8ec3a902bd6effb4fc20c56a00f2c982
{'host': '127.0.0.1', 'port': 10002, 'password': '******'}--->cluster forget 4854375c501c3dbfb4e2d94d50e62a47520c4f12
{'host': '127.0.0.1', 'port': 10003, 'password': '******'}--->cluster forget 3645e00a8ec3a902bd6effb4fc20c56a00f2c982
{'host': '127.0.0.1', 'port': 10003, 'password': '******'}--->cluster forget 4854375c501c3dbfb4e2d94d50e62a47520c4f12
{'host': '127.0.0.1', 'port': 10004, 'password': '******'}--->cluster forget 3645e00a8ec3a902bd6effb4fc20c56a00f2c982
{'host': '127.0.0.1', 'port': 10004, 'password': '******'}--->cluster forget 4854375c501c3dbfb4e2d94d50e62a47520c4f12
{'host': '127.0.0.1', 'port': 10005, 'password': '******'}--->cluster forget 3645e00a8ec3a902bd6effb4fc20c56a00f2c982
{'host': '127.0.0.1', 'port': 10005, 'password': '******'}--->cluster forget 4854375c501c3dbfb4e2d94d50e62a47520c4f12
{'host': '127.0.0.1', 'port': 10006, 'password': '******'}--->cluster forget 3645e00a8ec3a902bd6effb4fc20c56a00f2c982
{'host': '127.0.0.1', 'port': 10006, 'password': '******'}--->cluster forget 4854375c501c3dbfb4e2d94d50e62a47520c4f12
################# cluster nodes info: #################
23e1871c4e1dc1047ce567326e74a6194589146c 127.0.0.1:10005@20005 slave 64e634307bdc339b503574f5a77f1b156c021358 0 1575968426000 76 connected
026f0179631f50ca858d46c2b2829b3af71af2c8 127.0.0.1:10004@20004 slave 6164025849a8ff9297664fc835bc851af5004f61 0 1575968422619 75 connected
6164025849a8ff9297664fc835bc851af5004f61 127.0.0.1:10001@20001 myself,master - 0 1575968426000 75 connected 0-5460
9f265545ebb799d2773cfc20c71705cff9d733ae 127.0.0.1:10006@20006 slave 8b75325c59a7242344d0ebe5ee1e0068c66ffa2a 0 1575968425000 77 connected
8b75325c59a7242344d0ebe5ee1e0068c66ffa2a 127.0.0.1:10003@20003 master - 0 1575968427626 77 connected 10923-16383
64e634307bdc339b503574f5a77f1b156c021358 127.0.0.1:10002@20002 master - 0 1575968426000 76 connected 5461-10922
[root@JD redis_install]#
其实到这里并没有结束,这里要求缩容之后集群中的所有节点都要成功地执行cluster forget master_node_id(和slave_node_id)
否则其他节点仍然有10007节点的心跳信息,超过1分钟之后,仍旧会将已经踢出集群的10007节点(以及从节点10008)会被添加回来
这就一开始就遇到一个奇葩问题,因为没有在缩容后的集群的slave节点上执行cluster forget,被移除的节点,会不断地被添加回来……。
参考这里:
完整的代码实现如下
import os import time import redis from time import ctime,sleep def create_redis_cluster(list_master_node,list_slave_node): print('################# flush master/slave slots #################') for node in list_master_node: currenrt_conn = redis.StrictRedis(host=node["host"], port=node["port"], password=node["password"], decode_responses=True) currenrt_conn.execute_command('flushall') currenrt_conn.execute_command('cluster reset') for node in list_slave_node: currenrt_conn = redis.StrictRedis(host=node["host"], port=node["port"], password=node["password"], decode_responses=True) #currenrt_conn.execute_command('flushall') currenrt_conn.execute_command('cluster reset') print('################# create cluster #################') master_nodes = '' for node in list_master_node: master_nodes = master_nodes + node["host"] + ':' + str(node["port"]) + ' ' command = "redis-cli --cluster create {0} -a ****** --cluster-yes".format(master_nodes) print(command) msg = os.system(command) print(msg) time.sleep(5) print('################# add slave nodes #################') counter = 0 for node in list_master_node: currenrt_conn = redis.StrictRedis(host=node["host"], port=node["port"], password=node["password"], decode_responses=True) current_master_node = node["host"] + ':' + str(node["port"]) current_slave_node = list_slave_node[counter]["host"] + ':' + str(list_slave_node[counter]["port"]) myid = currenrt_conn.cluster('myid') #slave 节点在前,master节点在后 command = "redis-cli --cluster add-node {0} {1} --cluster-slave --cluster-master-id {2} -a ****** ". format(current_slave_node,current_master_node,myid) print(command) msg = os.system(command) counter = counter + 1 print(msg) # show cluster nodes info time.sleep(10) print("################# cluster nodes info: #################") cluster_nodes = currenrt_conn.execute_command('cluster nodes') print(cluster_nodes) # 返回扩容后,原始节点中,每个主节点需要迁出的slot数量 def get_migrated_slot(list_master_node,n): migrated_slot_count = int(16384/len(list_master_node)) - int(16384/(len(list_master_node)+n)) return migrated_slot_count def redis_cluster_expansion(list_master_node,dict_master_node,dict_slave_node): new_master_node = dict_master_node["host"] + ':' + str(dict_master_node["port"]) new_slave_node = dict_slave_node["host"] + ':' + str(dict_slave_node["port"]) print("#########################cleanup instance#################################") new_master_conn = redis.StrictRedis(host=dict_master_node["host"], port=dict_master_node["port"], password=dict_master_node["password"], decode_responses=True) new_master_conn.execute_command('flushall') new_master_conn.execute_command('cluster reset') new_master_id = new_master_conn.cluster('myid') new_slave_conn = redis.StrictRedis(host=dict_slave_node["host"], port=dict_slave_node["port"], password=dict_slave_node["password"], decode_responses=True) new_slave_conn.execute_command('cluster reset') new_slave_id = new_slave_conn.cluster('myid') #new_slave_conn.execute_command('slaveof no one') # 判断新增的节点是否归属于当前集群, # 如果已经归属于当前集群且不占用slot,则先踢出当前集群 cluster forget nodeid,或者终止,给出告警,总之,怎么开心怎么来 # 登录集群中的任何一个节点 cluster_node_conn = redis.StrictRedis(host=list_master_node[0]["host"], port=list_master_node[0]["port"], password=list_master_node[0]["password"],decode_responses=True) dict_node_info = cluster_node_conn.cluster('nodes') '''dict_node_info format example : { '127.0.0.1:10008@20008': {'node_id': '1d10c3ce3b9b7f956a26122980827fe6ce623d22', 'flags': 'master', 'master_id': '-','last_ping_sent': '0', 'last_pong_rcvd': '1575599442000', 'epoch': '8', 'slots': [], 'connected': True}, '127.0.0.1:10002@20002': {'node_id': '64e634307bdc339b503574f5a77f1b156c021358', 'flags': 'master', 'master_id': '-', 'last_ping_sent': '0', 'last_pong_rcvd': '1575599442000', 'epoch': '7', 'slots': [['5461', '10922']], 'connected': True}, '127.0.0.1:10001@20001': {'node_id': '6164025849a8ff9297664fc835bc851af5004f61', 'flags': 'myself,master', 'master_id': '-', 'last_ping_sent': '0', 'last_pong_rcvd': '1575599438000', 'epoch': '6', 'slots': [['0', '5460']], 'connected': True}, '127.0.0.1:10007@20007': {'node_id': '307f589ec7b1eb7bd65c680527afef1e30ce2303', 'flags': 'master', 'master_id': '-', 'last_ping_sent': '0', 'last_pong_rcvd': '1575599443599', 'epoch': '5', 'slots': [], 'connected': True}, '127.0.0.1:10005@20005': {'node_id': '23e1871c4e1dc1047ce567326e74a6194589146c', 'flags': 'slave', 'master_id': '64e634307bdc339b503574f5a77f1b156c021358', 'last_ping_sent': '0', 'last_pong_rcvd': '1575599441000', 'epoch': '7', 'slots': [], 'connected': True}, '127.0.0.1:10004@20004': {'node_id': '026f0179631f50ca858d46c2b2829b3af71af2c8', 'flags': 'slave', 'master_id': '6164025849a8ff9297664fc835bc851af5004f61', 'last_ping_sent': '0', 'last_pong_rcvd': '1575599440000', 'epoch': '6', 'slots': [], 'connected': True}, '127.0.0.1:10006@20006': {'node_id': '9f265545ebb799d2773cfc20c71705cff9d733ae', 'flags': 'slave', 'master_id': '8b75325c59a7242344d0ebe5ee1e0068c66ffa2a', 'last_ping_sent': '0', 'last_pong_rcvd': '1575599442000', 'epoch': '8', 'slots': [], 'connected': True}, '127.0.0.1:10003@20003': {'node_id': '8b75325c59a7242344d0ebe5ee1e0068c66ffa2a', 'flags': 'master', 'master_id': '-', 'last_ping_sent': '0', 'last_pong_rcvd': '1575599442599', 'epoch': '8', 'slots': [['10923', '16383']], 'connected': True} } ''' dict_master_node_in_cluster = 0 dict_slave_node_in_cluster = 0 for key_node in dict_node_info: if new_master_node in key_node: dict_master_node_in_cluster = 1 if len(dict_node_info[key_node]['slots']) > 0: print('error: ' +new_master_node + ' already existing in cluster and alloted slots,execute break......') return if new_slave_node in key_node: dict_slave_node_in_cluster = 1 if len(dict_node_info[key_node]['slots']) > 0: print('error: ' +new_slave_node + ' already existing in cluster and alloted slots,execute break......') return if dict_master_node_in_cluster == 1: for master_node in list_master_node: key_node_conn = redis.StrictRedis(host=master_node["host"], port=master_node["port"],password=master_node["password"], decode_responses=True) print('waring: ' + new_master_node + ' already existing in cluster,cluster forget it......') forget_command = 'cluster forget {0}'.format(new_master_id) key_node_conn.execute_command(forget_command) if dict_slave_node_in_cluster == 1: for master_node in list_master_node: key_node_conn = redis.StrictRedis(host=master_node["host"], port=master_node["port"],password=master_node["password"], decode_responses=True) print('waring: ' + new_slave_node + ' already existing in cluster,forget it......') forget_command = 'cluster forget {0}'.format(new_slave_id) key_node_conn.execute_command(forget_command) print("#########################add node into cluster#################################") try: cluster_node = list_master_node[0]["host"] + ':' + str(list_master_node[0]["port"]) # 1,待加入节点在前,第二个节点为集群中的任意一个节点 add_node_command = " redis-cli --cluster add-node {0} {1} -a ****** ".format(new_master_node,cluster_node) print(add_node_command) print(os.system(add_node_command)) time.sleep(20) # slave 节点在前,master节点在后 add_node_command = " redis-cli --cluster add-node {0} {1} --cluster-slave --cluster-master-id {2} -a ****** ". format(new_slave_node,new_master_node,new_master_id) print(add_node_command) print(os.system(add_node_command)) time.sleep(20) except Exception as e: print('add new node error,the reason is:') print(e) print("#########################reshard slots#################################") migrated_slot_count = get_migrated_slot(list_master_node,1) for node in list_master_node: current_master_conn = redis.StrictRedis(host=node["host"], port=node["port"], password=node["password"], decode_responses=True) current_master_node = node["host"] + ':' + str(node["port"]) current_master_node_id = current_master_conn.cluster('myid') ''' example:3节点-->扩容4节点,每个迁移1365 ''' try: command = r'''redis-cli -a ****** --cluster reshard {0} --cluster-from {1} --cluster-to {2} --cluster-slots {3} --cluster-yes --cluster-timeout 50000 --cluster-pipeline 10000 --cluster-replace >/dev/null 2>&1 '''. format(current_master_node,current_master_node_id,new_master_id,migrated_slot_count) print('############################ execute reshard #########################################') print(command) msg = os.system(command) time.sleep(20) except Exception as e: print('reshard slots error,the reason is:') print(e) print("################# cluster nodes info: #################") cluster_nodes = new_master_conn.execute_command('cluster nodes') print(cluster_nodes) def redis_cluster_shrinkage(list_master_node,list_slave_node,dict_master_node,dict_slave_node): # 判断新增的节点是否归属于当前集群, # 如果不归属当前集群,则退出 cluster_node_conn = redis.StrictRedis(host=list_master_node[0]["host"], port=list_master_node[0]["port"], password=list_master_node[0]["password"],decode_responses=True) dict_node_info = cluster_node_conn.cluster('nodes') removed_master_node = dict_master_node["host"] + ':' + str(dict_master_node["port"])+'@'+str(dict_master_node["port"]+10000) removed_slave_node = dict_slave_node["host"] + ':' + str(dict_slave_node["port"])+'@'+str(dict_slave_node["port"]+10000) if not removed_master_node in dict_node_info.keys(): print('Error:'+ str(removed_master_node) +' not in cluster,exiting') return if not removed_slave_node in dict_node_info.keys(): print('Error:' + str(removed_slave_node) + ' not in cluster,exiting') return removed_master_conn = redis.StrictRedis(host=dict_master_node["host"], port=dict_master_node["port"], password=dict_master_node["password"], decode_responses=True) removed_master_id = removed_master_conn.cluster('myid') removed_slave_conn = redis.StrictRedis(host=dict_slave_node["host"], port=dict_slave_node["port"], password=dict_slave_node["password"], decode_responses=True) removed_slave_id = removed_slave_conn.cluster('myid') for node in list_master_node: current_master_conn = redis.StrictRedis(host=node["host"], port=node["port"], password=node["password"], decode_responses=True) current_master_node = node["host"] + ':' + str(node["port"]) current_master_node_id = current_master_conn.cluster('myid') ''' 4节点-->缩容3节点,平均将slot归还到三个master节点 ''' try: command = r'''redis-cli -a ****** --cluster reshard {0} --cluster-from {1} --cluster-to {2} --cluster-slots 1365 --cluster-yes --cluster-timeout 50000 --cluster-pipeline 10000 --cluster-replace >/dev/null 2>&1 '''.\ format(current_master_node, removed_master_id, current_master_node_id) print('############################ execute reshard #########################################') print(command) msg = os.system(command) time.sleep(10) except Exception as e: print('reshard slots error,the reason is:') print(e) removed_master_conn.execute_command('cluster reset') removed_slave_conn.execute_command('cluster reset') for master_node in list_master_node: master_node_conn = redis.StrictRedis(host=master_node["host"], port=master_node["port"],password=master_node["password"], decode_responses=True) foget_master_command = 'cluster forget {0}'.format(removed_master_id) foget_slave_command = 'cluster forget {0}'.format(removed_slave_id) print(str(master_node)+ '--->' + foget_master_command) print(str(master_node)+ '--->' + foget_slave_command) master_node_conn.execute_command(foget_master_command) master_node_conn.execute_command(foget_slave_command) for slave_node in list_slave_node: slave_node_conn = redis.StrictRedis(host=slave_node["host"], port=slave_node["port"], password=slave_node["password"], decode_responses=True) foget_master_command = 'cluster forget {0}'.format(removed_master_id) foget_slave_command = 'cluster forget {0}'.format(removed_slave_id) print(str(slave_node)+ '--->' +foget_master_command) print(str(slave_node)+ '--->' +foget_slave_command) slave_node_conn.execute_command(foget_master_command) slave_node_conn.execute_command(foget_slave_command) print("################# cluster nodes info: #################") cluster_nodes = cluster_node_conn.execute_command('cluster nodes') print(cluster_nodes) if __name__ == '__main__': # master node_1 = {'host': '127.0.0.1', 'port': 10001, 'password': '******'} node_2 = {'host': '127.0.0.1', 'port': 10002, 'password': '******'} node_3 = {'host': '127.0.0.1', 'port': 10003, 'password': '******'} # slave node_4 = {'host': '127.0.0.1', 'port': 10004, 'password': '******'} node_5 = {'host': '127.0.0.1', 'port': 10005, 'password': '******'} node_6 = {'host': '127.0.0.1', 'port': 10006, 'password': '******'} # 主从节点个数必须相同 list_master_node = [node_1, node_2, node_3] list_slave_node = [node_4, node_5, node_6] # 自动化集群创建 #create_redis_cluster(list_master_node,list_slave_node) # 自动化扩容 node_1 = {'host': '127.0.0.1', 'port': 10007, 'password': '******'} node_2 = {'host': '127.0.0.1', 'port': 10008, 'password': '******'} redis_cluster_expansion(list_master_node,node_1,node_2) # 自动化缩容, #redis_cluster_shrinkage(list_master_node,list_slave_node,node_1,node_2)