第一类非必要 Rebalance 是因为未能及时发送心跳,导致 Consumer 被 “踢出”Group 而引发的。这种情况下我们可以设置 session.timeout.ms 和 heartbeat.interval.ms 的值,来尽量避免rebalance的出现。(以下的配置是在网上找到的最佳实践,暂时还没测试过)
设置 session.timeout.ms = 6s。
设置 heartbeat.interval.ms = 2s。
要保证 Consumer 实例在被判定为 “dead” 之前,能够发送至少 3 轮的心跳请求,即 session.timeout.ms >= 3 * heartbeat.interval.ms。
将 session.timeout.ms 设置成 6s 主要是为了让 Coordinator 能够更快地定位已经挂掉的 Consumer,早日把它们踢出 Group。
第二类非必要 Rebalance 是 Consumer 消费时间过长导致的。此时,max.poll.interval.ms 参数值的设置显得尤为关键。如果要避免非预期的 Rebalance,最好将该参数值设置得大一点,比下游最大处理时间稍长一点。
总之,要为业务处理逻辑留下充足的时间。这样,Consumer 就不会因为处理这些消息的时间太长而引发 Rebalance 。
拉取偏移量与提交偏移量kafka的偏移量(offset)是由消费者进行管理的,偏移量有两种,拉取偏移量(position)与提交偏移量(committed)。拉取偏移量代表当前消费者分区消费进度。每次消息消费后,需要提交偏移量。在提交偏移量时,kafka会使用拉取偏移量的值作为分区的提交偏移量发送给协调者。
如果没有提交偏移量,下一次消费者重新与broker连接后,会从当前消费者group已提交到broker的偏移量处开始消费。
所以,问题就在这里,当我们处理消息时间太长时,已经被broker剔除,提交偏移量又会报错。所以拉取偏移量没有提交到broker,分区又rebalance。下一次重新分配分区时,消费者会从最新的已提交偏移量处开始消费。这里就出现了重复消费的问题。
异常日志提示的方案其实,说了这么多,Kafka消费者输出的异常日志中也给出了相应的解决方案。
接下来,我们说说Kafka中的拉取偏移量和提交偏移量。
其实,从输出的日志信息中,也大概给出了解决问题的方式,简单点来说,就是可以通过增加 max.poll.interval.ms 时长和 session.timeout.ms 时长,减少 max.poll.records的配置值,并且消费端在处理完消息时要及时提交偏移量。
问题解决通过之前的分析,我们应该知道如何解决这个问题了。这里需要说一下的是,我在集成Kafka的时候,使用的是SpringBoot和Kafka消费监听器,消费端的主要代码结构如下所示。
@KafkaListener(topicPartitions = {@TopicPartition(topic = KafkaConstants.TOPIC_LOGS, partitions = { "0" }) }, groupId = "kafka-consumer", containerFactory = "kafkaListenerContainerFactory") public void consumerReceive (ConsumerRecord<?, ?> record, Acknowledgment ack){ logger.info("topic is {}, offset is {}, value is {} n", record.topic(), record.offset(), record.value()); try { Object value = record.value(); logger.info(value.toString()); ack.acknowledge(); } catch (Exception e) { logger.error("日志消费端异常: {}", e); } }上述代码逻辑比较简单,就是获取到Kafka中的消息后直接打印输出到日志文件中。
尝试解决这里,我先根据异常日志的提示信息进行配置,所以,我在SpringBoot的application.yml文件中新增了如下配置信息。
spring: kafka: consumer: properties: max.poll.interval.ms: 3600000 max.poll.records: 50 session.timeout.ms: 60000 heartbeat.interval.ms: 3000配置完成后,再次测试消费者逻辑,发现还是抛出Rebalance异常。
最终解决我们从另一个角度来看下Kafka消费者所产生的问题:一个Consumer在生产消息,另一个Consumer在消费它的消息,它们不能在同一个groupId 下面,更改其中一个的groupId 即可。
这里,我们的业务项目是分模块和子系统进行开发的,例如模块A在生产消息,模块B消费模块A生产的消息。此时,修改配置参数,例如 session.timeout.ms: 60000,根本不起作用,还是抛出Rebalance 异常。
此时,我尝试修改下消费者分组的groupId,将下面的代码
@KafkaListener(topicPartitions = {@TopicPartition(topic = KafkaConstants.TOPIC_LOGS, partitions = { "0" }) }, groupId = "kafka-consumer", containerFactory = "kafkaListenerContainerFactory") public void consumerReceive (ConsumerRecord<?, ?> record, Acknowledgment ack){