Kafka Producer TimeoutException

程序读取HDFS上的日志发送至Kafka集群
由于日志量较大 每小时约7亿条+ 采用多线程 多producer实例发送
TPS 可达到120W+

修改前Producer配置 val props = new Properties() props.put("bootstrap.servers", Config.kafka_server) props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") props.put("acks", "1") props.put("retries", "3")

send()采用异步发送的方式 并传入自己的Callback函数(用于处理异常逻辑)程序运行一段时间后经过callback函数统计发现会有不少消息出现TimeoutException并且这些消息并不会重试。以为是retries参数设置未生效,于是去查阅资料等,最后没有找到还是原因。

org.apache.kafka.common.errors.TimeoutException Expiring 190 record(s) for feedback-0: 60261 ms has passed since last append

最后不得不从源码入手
发现错误信息是从这里产生的

boolean maybeExpire(int requestTimeoutMs, long retryBackoffMs, long now, long lingerMs, boolean isFull) { if (!this.inRetry() && isFull && requestTimeoutMs < (now - this.lastAppendTime)) expiryErrorMessage = (now - this.lastAppendTime) + " ms has passed since last append"; else if (!this.inRetry() && requestTimeoutMs < (createdTimeMs(now) - lingerMs)) expiryErrorMessage = (createdTimeMs(now) - lingerMs) + " ms has passed since batch creation plus linger time"; else if (this.inRetry() && requestTimeoutMs < (waitedTimeMs(now) - retryBackoffMs)) expiryErrorMessage = (waitedTimeMs(now) - retryBackoffMs) + " ms has passed since last attempt plus backoff time"; boolean expired = expiryErrorMessage != null; if (expired) abortRecordAppends(); return expired; }

后找到这个方法的调用
发现是由于kafka把这些消息标记为expired(过期)
当每一批消息满了(batch.size)且 requestTimeoutMs < (now - this.lastAppendTime)) 这一批消息就会被标记为过期且不会放到RecordAccumulator中(不会再次重试发送)

解决方法

调大batch.size 参数和request.timeout.ms 参数
batch.size 可根据发送数据量的大小来调整

修改后Producer配置 val props = new Properties() props.put("bootstrap.servers", Config.feedback_log_kafka_server) props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") props.put("retries", "3") props.put("request.timeout.ms", "120000") props.put("acks", "1") props.put("batch.size", "32768")

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpxwyf.html