在很多的流处理框架的介绍中,都会说kafka是一个可靠的数据源,并且推荐使用Kafka当作数据源来进行使用。这是因为与其他消息引擎系统相比,kafka提供了可靠的数据保存及备份机制。并且通过消费者位移这一概念,可以让消费者在因某些原因宕机而重启后,可以轻易得回到宕机前的位置。
但其实kafka的可靠性也只能说是相对的,在整条数据链条中,总有可以让数据出现丢失的情况,今天就来讨论如何避免kafka数据丢失,以及实现精确一致处理的语义。
kafka无消息丢失处理在讨论如何实现kafka无消息丢失的时候,首先要先清楚大部分情况下消息丢失是在什么情况下发生的。为什么是大部分,因为总有一些非常特殊的情况会被人忽略,而我们只需要关注普遍的情况就足够了。接下来我们来讨论如何较为普遍的数据丢失情况。
1.1 生产者丢失前面介绍Kafka分区和副本的时候,有提到过一个producer客户端有一个acks的配置,这个配置为0的时候,producer是发送之后不管的,这个时候就很有可能因为网络等原因造成数据丢失,所以应该尽量避免。但是将ack设置为1就没问题了吗,那也不一定,因为有可能在leader副本接收到数据,但还没同步给其他副本的时候就挂掉了,这时候数据也是丢失了。并且这种时候是客户端以为消息发送成功,但kafka丢失了数据。
同时还需要使用带有回调的producer api,来发送数据。注意这里讨论的都是异步发送消息,同步发送不在讨论范围。
public class send{ ...... public static void main(){ ... /* * 第一个参数是 ProducerRecord 类型的对象,封装了目标 Topic,消息的 kv * 第二个参数是一个 CallBack 对象,当生产者接收到 Kafka 发来的 ACK 确认消息的时候, * 会调用此 CallBack 对象的 onCompletion() 方法,实现回调功能 */ producer.send(new ProducerRecord<>(topic, messageNo, messageStr), new DemoCallBack(startTime, messageNo, messageStr)); ... } ...... } class DemoCallBack implements Callback { /* 开始发送消息的时间戳 */ private final long startTime; private final int key; private final String message; public DemoCallBack(long startTime, int key, String message) { this.startTime = startTime; this.key = key; this.message = message; } /** * 生产者成功发送消息,收到 Kafka 服务端发来的 ACK 确认消息后,会调用此回调函数 * @param metadata 生产者发送的消息的元数据,如果发送过程中出现异常,此参数为 null * @param exception 发送过程中出现的异常,如果发送成功为 null */ @Override public void onCompletion(RecordMetadata metadata, Exception exception) { long elapsedTime = System.currentTimeMillis() - startTime; if (metadata != null) { System.out.printf("message: (%d, %s) send to partition %d, offset: %d, in %d\n", key, message, metadata.partition(), metadata.offset(), elapsedTime); } else { exception.printStackTrace(); } } }更详细的代码可以参考这里:Kafka生产者分析——KafkaProducer。
我们之前提到过,producer发送到kafka broker的时候,是有多种可能会失败的,而回调函数能准确告诉你是否确认发送成功,当然这依托于acks和min.insync.replicas的配置。而当数据发送丢失的时候,就可以进行手动重发或其他操作,从而确保生产者发送成功。
1.2 kafka内部丢失有些时候,kafka内部因为一些不大好的配置,可能会出现一些极为隐蔽的数据丢失情况,那么我们分别讨论下大致都有哪几种情况。
首先是replication.factor配置参数,这个配置决定了副本的数量,默认是1。注意这个参数不能超过broker的数量。说这个参数其实是因为如果使用默认的1,或者不在创建topic的时候指定副本数量(也就是副本数为1),那么当一台机器出现磁盘损坏等情况,那么数据也就从kafka里面丢失了。所以replication.factor这个参数最好是配置大于1,比如说3。
接下来要说的还是和副本相关的,也是上一篇副本中提到的unclean.leader.election.enable 参数,这个参数是在主副本挂掉,然后在ISR集合中没有副本可以成为leader的时候,要不要让进度比较慢的副本成为leader的。不用多说,让进度比较慢的副本成为leader,肯定是要丢数据的。虽然可能会提高一些可用性,但如果你的业务场景丢失数据更加不能忍受,那还是将unclean.leader.election.enable设置为false吧。
1.3 消费者丢失