背景
某线上日志收集服务报警,打开域名报502错误码。 收集服务由2台netty HA服务器组成,netty服务器将客户端投递来的protobuf日志解析并发送到kafka,打开其中一个应用的日志,发现如下报错:
org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s)在排除了netty服务的错误之后,去查看kafka的日志。 发现报错,排查过程如下;
配置信息 系统 kafka版本 broker数量CentOS7.4 2.1.0 3
线上有三台Kafka Broker,id分别为0、1、2,服务器只部署了Kafka服务。
问题 线程是否存活首先jps查看Kafka线程是否存活,三台机器都没问题,kafka依然在运行。
GC问题查看kafkaServer-gc.log.1.current的日志,gc日志没发现异常。
Broker 0/server.log [2019-08-02 15:17:03,699] WARN Attempting to send response via channel for which there is no open connection, connection id 172.21.3.14:9092-172.21.3.11:54311-107706 (kafka.network.Processor) [2019-08-02 15:19:12,490] INFO [GroupMetadataManager brokerId=0] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager) [2019-08-02 15:26:54,405] INFO [ReplicaFetcher replicaId=0, leaderId=1, fetcherId=0] Error sending fetch request (sessionId=1112819217, epoch=1897450) to node 1: java.io.IOException: Connection to 1 was disconnected before the response was read. (org.apache.kafka.clients.FetchSessionHandler) [2019-08-02 15:26:54,411] WARN [ReplicaFetcher replicaId=0, leaderId=1, fetcherId=0] Error in response for fetch request (type=FetchRequest, replicaId=0, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={galaxy_client-7=(offset=15680912, logStartOffset=14755985, maxBytes=1048576, currentLeaderEpoch=Optional[9])}, isolationLevel=READ_UNCOMMITTED, toForget=, metadata=(sessionId=1112819217, epoch=1897450)) (kafka.server.ReplicaFetcherThread) java.io.IOException: Connection to 1 was disconnected before the response was read at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:97) at kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:97) at kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:190) at kafka.server.AbstractFetcherThread.kafka$server$AbstractFetcherThread$$processFetchRequest(AbstractFetcherThread.scala:241) at kafka.server.AbstractFetcherThread$$anonfun$maybeFetch$1.apply(AbstractFetcherThread.scala:130) at kafka.server.AbstractFetcherThread$$anonfun$maybeFetch$1.apply(AbstractFetcherThread.scala:129) at scala.Option.foreach(Option.scala:257) at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:129) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:111) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) [2019-08-02 15:27:26,433] INFO [ReplicaFetcher replicaId=0, leaderId=1, fetcherId=0] Error sending fetch request (sessionId=1112819217, epoch=INITIAL) to node 1: java.io.IOException: Connection to 1 was disconnected before the response was read. (org.apache.kafka.clients.FetchSessionHandler) Broker 1/server.log [2019-08-02 15:26:53,751] WARN [GroupCoordinator 1]: Failed to write empty metadata for group StoreToHiveV2: The group is rebalancing, so a rejoin is needed. (kafka.coordinator.group.GroupCoordinator) [2019-08-02 15:26:55,515] WARN [GroupCoordinator 1]: Failed to write empty metadata for group LPmkt: The group is rebalancing, so a rejoin is needed. (kafka.coordinator.group.GroupCoordinator) [2019-08-02 15:26:56,124] WARN [GroupCoordinator 1]: Failed to write empty metadata for group Store2CarbonClientServer: The group is rebalancing, so a rejoin is needed. (kafka.coordinator.group.GroupCoordinator) [2019-08-02 15:26:56,575] WARN [GroupCoordinator 1]: Failed to write empty metadata for group StoreToHive: The group is rebalancing, so a rejoin is needed. (kafka.coordinator.group.GroupCoordinator) Broker 2/server.log [2019-08-02 15:26:54,514] WARN [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Error in response for fetch request (type=FetchRequest, replicaId=2, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={galaxy_client-7=(offset=15680912, logStartOffset=14755985, maxBytes=1048576, currentLeaderEpoch=Optional[9])}, isolationLevel=READ_UNCOMMITTED, toForget=, metadata=(sessionId=1550207796, epoch=1890003)) (kafka.server.ReplicaFetcherThread) java.io.IOException: Connection to 1 was disconnected before the response was read at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:97) at kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:97) at kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:190) at kafka.server.AbstractFetcherThread.kafka$server$AbstractFetcherThread$$processFetchRequest(AbstractFetcherThread.scala:241) at kafka.server.AbstractFetcherThread$$anonfun$maybeFetch$1.apply(AbstractFetcherThread.scala:130) at kafka.server.AbstractFetcherThread$$anonfun$maybeFetch$1.apply(AbstractFetcherThread.scala:129) at scala.Option.foreach(Option.scala:257) at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:129) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:111) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) [2019-08-02 15:27:26,569] INFO [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Error sending fetch request (sessionId=1550207796, epoch=INITIAL) to node 1: java.io.IOException: Connection to 1 was disconnected before the response was read. (org.apache.kafka.clients.FetchSessionHandler) 问题排查由于是线上应用,基本没时间排查问题,所以立即重启了三台Kafka,重启后得到缓解。
在服务正常之后,在网上搜了很多答案,有的人说是网络问题,close wait过多导致broker网络不通,从而Kafka产生脑裂问题。 在各个服务器上运行命令:
netstat -n | awk '/^tcp/ {++S[$NF]} END {for(a in S) print a, S[a]}'结果
ESTABLISHED 172 TIME_WAIT 32也没发现异常,等下次再出现问题时,要运行这个命令看看服务器的状态。
社区bug?https://issues.apache.org/jira/browse/KAFKA-6582
下方有人评论:We are running 2.1.1 in production (and four other environments) since March without this issue showing again.