估计运维年前没有祭拜服务器,Nginx的问题修复了,Kafka又不行了。今天,本来想再睡会,结果,电话又响了。还是运营,“喂,冰河,到公司了吗?赶紧看看服务器吧,又出问题了“。“在路上了,运维那哥们儿还没上班吗”? “还在休假。。。”, 我:“。。。”。哎,这哥们儿是跑路了吗?先不管他,问题还是要解决。
问题重现到公司后,放下我专用的双肩包,拿出我的利器——笔记本电脑,打开后迅速登录监控系统,发现主要业务系统没啥问题。一个非核心服务发出了告警,并且监控系统中显示这个服务频繁的抛出如下异常。
2021-02-28 22:03:05 131 pool-7-thread-3 ERROR [] - commit failed org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records. at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:713) ~[MsgAgent-jar-with-dependencies.jar:na] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:596) ~[MsgAgent-jar-with-dependencies.jar:na] at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1218) ~[MsgAgent-jar-with-dependencies.jar:na] at com.today.eventbus.common.MsgConsumer.run(MsgConsumer.java:121) ~[MsgAgent-jar-with-dependencies.jar:na] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_161] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_161] at java.lang.Thread.run(Thread.java:748) [na:1.8.0_161]从上面输出的异常信息,大概可以判断出系统出现的问题:Kafka消费者在处理完一批poll消息后,在同步提交偏移量给broker时报错了。大概就是因为当前消费者线程的分区被broker给回收了,因为Kafka认为这个消费者挂掉了,我们可以从下面的输出信息中可以看出这一点。
Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.Kafka内部触发了Rebalance机制,明确了问题,接下来,我们就开始分析问题了。
分析问题既然Kafka触发了Rebalance机制,那我就来说说Kafka触发Rebalance的时机。
什么是Rebalance举个具体点的例子,比如某个分组下有10个Consumer实例,这个分组订阅了一个50个分区的主题。正常情况下,Kafka会为每个消费者分配5个分区。这个分配的过程就是Rebalance。
触发Rebalance的时机当Kafka中满足如下条件时,会触发Rebalance:
组内成员的个数发生了变化,比如有新的消费者加入消费组,或者离开消费组。组成员离开消费组包含组成员崩溃或者主动离开消费组。
订阅的主题个数发生了变化。
订阅的主题分区数发生了变化。
后面两种情况我们可以人为的避免,在实际工作过程中,对于Kafka发生Rebalance最常见的原因是消费组成员的变化。
消费者成员正常的添加和停掉导致Rebalance,这种情况无法避免,但是时在某些情况下,Consumer 实例会被 Coordinator 错误地认为 “已停止” 从而被“踢出”Group,导致Rebalance。
当 Consumer Group 完成 Rebalance 之后,每个 Consumer 实例都会定期地向 Coordinator 发送心跳请求,表明它还存活着。如果某个 Consumer 实例不能及时地发送这些心跳请求,Coordinator 就会认为该 Consumer 已经 “死” 了,从而将其从 Group 中移除,然后开启新一轮 Rebalance。这个时间可以通过Consumer 端的参数 session.timeout.ms 进行配置。默认值是 10 秒。
除了这个参数,Consumer 还提供了一个控制发送心跳请求频率的参数,就是 heartbeat.interval.ms。这个值设置得越小,Consumer 实例发送心跳请求的频率就越高。频繁地发送心跳请求会额外消耗带宽资源,但好处是能够更加快速地知晓当前是否开启 Rebalance,因为,目前 Coordinator 通知各个 Consumer 实例开启 Rebalance 的方法,就是将 REBALANCE_NEEDED 标志封装进心跳请求的响应体中。
除了以上两个参数,Consumer 端还有一个参数,用于控制 Consumer 实际消费能力对 Rebalance 的影响,即 max.poll.interval.ms 参数。它限定了 Consumer 端应用程序两次调用 poll 方法的最大时间间隔。它的默认值是 5 分钟,表示 Consumer 程序如果在 5 分钟之内无法消费完 poll 方法返回的消息,那么 Consumer 会主动发起 “离开组” 的请求,Coordinator 也会开启新一轮 Rebalance。
通过上面的分析,我们可以看一下那些rebalance是可以避免的: