建立topic的时候,可以通过指定参数 --replication-factor 设置备份数量。但是,一旦完成建立topic,则无法通过kafka-topic.sh 或者 命令修改replica数量。
二、解决办法 实际上,我们可以考虑一种 “另类” 的办法:可以利用 kafka-reassign-partitions.sh 命令对所有分区进行重新分布,在做分区重新分布的时候,通过增加每个分区的replica备份数量来达到目的。
本文将介绍如何利用 kafka-reassign-partitions.sh 命令增加topic的备份数量。
注意:以下命令使用到的topic名称、zookeeper的ip和port,需要读者替换成为实际集群的参数。
(假设kafka集群有4个broker,id分别为:1001,1002,1003,1004)
2.1、获取当前topic的所有分区分布在broker的情况 [root@tbds bin]# ./kafka-topics.sh --zookeeper 172.16.32.13:2181 --topic ranger_audits --describe Topic:ranger_audits PartitionCount:10 ReplicationFactor:1 Configs: Topic: ranger_audits Partition: 0 Leader: 1001 Replicas: 1001 Isr: 1001 Topic: ranger_audits Partition: 1 Leader: 1002 Replicas: 1002 Isr: 1002 Topic: ranger_audits Partition: 2 Leader: 1001 Replicas: 1001 Isr: 1001 Topic: ranger_audits Partition: 3 Leader: 1002 Replicas: 1002 Isr: 1002 Topic: ranger_audits Partition: 4 Leader: 1001 Replicas: 1001 Isr: 1001 Topic: ranger_audits Partition: 5 Leader: 1002 Replicas: 1002 Isr: 1002 Topic: ranger_audits Partition: 6 Leader: 1001 Replicas: 1001 Isr: 1001 Topic: ranger_audits Partition: 7 Leader: 1002 Replicas: 1002 Isr: 1002 Topic: ranger_audits Partition: 8 Leader: 1001 Replicas: 1001 Isr: 1001 Topic: ranger_audits Partition: 9 Leader: 1002 Replicas: 1002 Isr: 1002可以看出,ranger_audits 这个topic有10个分区,每个分区只有一个feplica备份,分布在1001和1002两台broker上面。
下面我们需要将ranger_audits 的每个分区数据都增加到2个replica备份,且分布到4个broker上面。
2.2、创建增加replica备份数量的配置文件(注意:尽量保持topic的原有每个分区的主备份不变化。因此,配置文件的每个分区的第一个broker保持不变。)
[root@tbds bin]# vim ../config/increase-replication-factor.json {"version":1, "partitions":[ {"topic":"ranger_audits","partition":0,"replicas":[1001,1003]}, {"topic":"ranger_audits","partition":1,"replicas":[1002,1004]}, {"topic":"ranger_audits","partition":2,"replicas":[1001,1003]}, {"topic":"ranger_audits","partition":3,"replicas":[1002,1004]}, {"topic":"ranger_audits","partition":4,"replicas":[1001,1003]}, {"topic":"ranger_audits","partition":5,"replicas":[1002,1004]}, {"topic":"ranger_audits","partition":6,"replicas":[1001,1003]}, {"topic":"ranger_audits","partition":7,"replicas":[1002,1004]}, {"topic":"ranger_audits","partition":8,"replicas":[1001,1003]}, {"topic":"ranger_audits","partition":9,"replicas":[1002,1004]} ]}上面的配置文件说明,我们将topic的每个分区都增加了一个replica,且保持每个分区原有的主备份所在broker不变化,将每个分区新增的replica备份数据放到到1003和1004两个broker上面。
2.3、开始执行增加分区 [root@tbds bin]# ./kafka-reassign-partitions.sh -zookeeper 172.16.32.13:2181 --reassignment-json-file ../config/increase-replication-factor.json --execute Current partition replica assignment {"version":1,"partitions":[{"topic":"ranger_audits","partition":3,"replicas":[1002]},{"topic":"ranger_audits","partition":9,"replicas":[1002]},{"topic":"ranger_audits","partition":8,"replicas":[1001]},{"topic":"ranger_audits","partition":1,"replicas":[1002]},{"topic":"ranger_audits","partition":4,"replicas":[1001]},{"topic":"ranger_audits","partition":2,"replicas":[1001]},{"topic":"ranger_audits","partition":5,"replicas":[1002]},{"topic":"ranger_audits","partition":0,"replicas":[1001]},{"topic":"ranger_audits","partition":6,"replicas":[1001]},{"topic":"ranger_audits","partition":7,"replicas":[1002]}]} Save this to use as the --reassignment-json-file option during rollback Successfully started reassignment of partitions {"version":1,"partitions":[{"topic":"ranger_audits","partition":0,"replicas":[1001,1003]},{"topic":"ranger_audits","partition":8,"replicas":[1001,1003]},{"topic":"ranger_audits","partition":5,"replicas":[1002,1004]},{"topic":"ranger_audits","partition":2,"replicas":[1001,1003]},{"topic":"ranger_audits","partition":9,"replicas":[1002,1004]},{"topic":"ranger_audits","partition":1,"replicas":[1002,1004]},{"topic":"ranger_audits","partition":3,"replicas":[1002,1004]},{"topic":"ranger_audits","partition":4,"replicas":[1001,1003]},{"topic":"ranger_audits","partition":7,"replicas":[1002,1004]},{"topic":"ranger_audits","partition":6,"replicas":[1001,1003]}]} 2.4、查看执行进度 [root@tbds bin]# ./kafka-reassign-partitions.sh -zookeeper 172.16.32.13:2181 --reassignment-json-file ../config/increase-replication-factor.json --verify Status of partition reassignment: Reassignment of partition [ranger_audits,0] completed successfully Reassignment of partition [ranger_audits,8] completed successfully Reassignment of partition [ranger_audits,5] completed successfully Reassignment of partition [ranger_audits,2] completed successfully Reassignment of partition [ranger_audits,9] completed successfully Reassignment of partition [ranger_audits,1] completed successfully Reassignment of partition [ranger_audits,3] completed successfully Reassignment of partition [ranger_audits,4] completed successfully Reassignment of partition [ranger_audits,7] completed successfully Reassignment of partition [ranger_audits,6] completed successfully