for (Map.Entry<String, List<String>> topicEntry : consumersPerTopic.entrySet()) {
String topic = topicEntry.getKey(); // 主题
List<String> consumersForTopic = topicEntry.getValue(); // 消费者列表
// partitionsPerTopic表示主题和分区数的映射
// 获取主题下有多少个分区
Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
if (numPartitionsForTopic == null)
continue;
// 消费者按字典序排序
Collections.sort(consumersForTopic);
// 分区数量除以消费者数量
int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size();
// 取模,余数就是额外的分区
int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size();
List<TopicPartition> partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic);
for (int i = 0, n = consumersForTopic.size(); i < n; i++) {
int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition);
int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1);
// 分配分区
assignment.get(consumersForTopic.get(i)).addAll(partitions.subList(start, start + length));
}
}
return assignment;
}
4.1.2. roundrobin(轮询)
roundronbin分配策略的具体实现是org.apache.kafka.clients.consumer.RoundRobinAssignor
/**
* The round robin assignor lays out all the available partitions and all the available consumers. It
* then proceeds to do a round robin assignment from partition to consumer. If the subscriptions of all consumer
* instances are identical, then the partitions will be uniformly distributed. (i.e., the partition ownership counts
* will be within a delta of exactly one across all consumers.)
*
* For example, suppose there are two consumers C0 and C1, two topics t0 and t1, and each topic has 3 partitions,
* resulting in partitions t0p0, t0p1, t0p2, t1p0, t1p1, and t1p2.
*
* The assignment will be:
* C0: [t0p0, t0p2, t1p1]
* C1: [t0p1, t1p0, t1p2]
*
* When subscriptions differ across consumer instances, the assignment process still considers each
* consumer instance in round robin fashion but skips over an instance if it is not subscribed to
* the topic. Unlike the case when subscriptions are identical, this can result in imbalanced
* assignments. For example, we have three consumers C0, C1, C2, and three topics t0, t1, t2,
* with 1, 2, and 3 partitions, respectively. Therefore, the partitions are t0p0, t1p0, t1p1, t2p0,
* t2p1, t2p2. C0 is subscribed to t0; C1 is subscribed to t0, t1; and C2 is subscribed to t0, t1, t2.
*
* Tha assignment will be:
* C0: [t0p0]
* C1: [t1p0]
* C2: [t1p1, t2p0, t2p1, t2p2]
*/
轮询分配策略是基于所有可用的消费者和所有可用的分区的
与前面的range策略最大的不同就是它不再局限于某个主题
如果所有的消费者实例的订阅都是相同的,那么这样最好了,可用统一分配,均衡分配
例如,假设有两个消费者C0和C1,两个主题t0和t1,每个主题有3个分区,分别是t0p0,t0p1,t0p2,t1p0,t1p1,t1p2
那么,最终分配的结果是这样的:
C0: [t0p0, t0p2, t1p1]
C1: [t0p1, t1p0, t1p2]
用图形表示大概是这样的:
假设,组中每个消费者订阅的主题不一样,分配过程仍然以轮询的方式考虑每个消费者实例,但是如果没有订阅主题,则跳过实例。当然,这样的话分配肯定不均衡。
什么意思呢?也就是说,消费者组是一个逻辑概念,同组意味着同一时刻分区只能被一个消费者实例消费,换句话说,同组意味着一个分区只能分配给组中的一个消费者。事实上,同组也可以不同订阅,这就是说虽然属于同一个组,但是它们订阅的主题可以是不一样的。
例如,假设有3个主题t0,t1,t2;其中,t0有1个分区p0,t1有2个分区p0和p1,t2有3个分区p0,p1和p2;有3个消费者C0,C1和C2;C0订阅t0,C1订阅t0和t1,C2订阅t0,t1和t2。那么,按照轮询分配的话,C0应该负责