上篇文章已经简单介绍过上两项配置的含义,这里不再重复,重点说一下第三项配置。对于任意(Broker, Leader)元组,都会有replication.factor-1个Broker作为Replica,在Replica上会启动若干Fetch线程把对应的数据同步到本地,而num.replica.fetchers这个参数是用来控制Fetch线程的数量。
一般来说如果发现Partition的ISR当中只有自己一个Partition,且长时间没有新的Replica增加进来时,就可以考虑适当的增大这个参数加快复制进度。其内部实现上,每个Fetch就对应了一个SimpleConsumer,对于任意一台其他机器上需要Catch-up的Leader,会创建num.replica.fetchers个SimpleConsumer来拉取Log。
当初刚知道这块设计的时候还蛮疑惑的,在Kafka文档开篇的时候就郑重介绍过,同一个ConsumerGroup内的Consumer和Partition在同一时间内必须保证是一对一的消费关系,而这么简单地增加SimpleConsumer就可以提高效率又是什么原因呢?
查看源码,在AbstractFetcherThread.scala里可以看到,Fetch启动的多线程其实就是一个个的SimpleConsumer。
首先,getFetcherId()利用numFetcher来控制FetchId的范围,进而控制Consumer数量。partitionsPerFetcher结构则是一个从Partition到Partition上启动的Fetchers的Mapping。
上面为每个Partition启动的多个Fetcher(也就是SimpleConsumer)之间通过partitionMap: mutable.HashMap[TopicAndPartition, Long]来共享offset,达到并行Fetch数据的目的。因此,通过共享offset既保证了同一时间内Consumer和Partition之间的一对一关系,又允许我们通过增多Fetch线程来提高效率。
default.replication.factor:1
这个参数指新创建一个topic时,默认的Replica数量。当Producer中的 acks!=0 && acks!=1时,Replica的大小可能会导致在Produce数据时的性能表现有很大不同。Replica过少会影响数据的可用性,太多则会白白浪费存储资源,一般建议在2~3为宜。
fetch.purgatory.purge.interval.requests:1000producer.purgatory.purge.interval.requests:1000