对于更新操作,天然具有幂等性。
对于新增操作,可以给每条消息一个唯一的id,处理前判断是否被处理过。这个id可以存储在 Redis 中,如果是写数据库可以用主键约束。
根据kafka架构,有三个地方可能丢失消息:Consumer,Producer和 Server
消费端弄丢了数据
当 server.properties/enable.auto.commit 设置为 true 的时候,kafka 会先 commit offset 再处理消息,如果这时候出现异常,这条消息就丢失了。
因此可以关闭自动提交 offset,在处理完成后手动提交 offset,这样可以保证消息不丢失;但是如果提交 offset 失败,可能导致重复消费的问题, 这时保证幂等性即可。
Kafka弄丢了消息
如果某个 broker 不小心挂了,此时若 replica 只有一个,broker 上的消息就丢失了;若 replica > 1 ,给 leader 重新选一个 follower 作为新的 leader, 如果 follower 还有些消息没有同步,这部分消息便丢失了。
可以进行如下配置,避免上面的问题:
给 topic 设置 replication.factor 参数:这个值必须大于 1,要求每个 partition 必须有至少 2 个副本。
在 Kafka 服务端设置 min.insync.replicas 参数:这个值必须大于 1,这个是要求一个 leader 至少感知到有至少一个 follower 还跟自己保持联系,没掉队,这样才能确保 leader 挂了还有一个 follower 吧。
在 producer 端设置 acks=all:这个是要求每条数据,必须是写入所有 replica 之后,才能认为是写成功了。
在 producer 端设置 retries=MAX(很大很大很大的一个值,无限次重试的意思):这个是要求一旦写入失败,就无限重试,卡在这里了。
Producer弄丢了消息
在 producer 端设置 acks=all,保证所有的ISR都同步了消息才认为写入成功。
如何保证消息的顺序性?kafka 中 partition 上的消息是顺序的,可以将需要顺序消费的消息发送到同一个 partition 上,用单个 consumer 消费。
上面是学习kafka时总结的,如有错误或不合理的地方,欢迎指正!
参考:
1: kafka学习笔记:知识点整理
2: advanced-java
3: Kafka的Log存储解析
4: kafka生产者Producer参数设置及参数调优建议-商业环境实战系列
5: 震惊了!原来这才是kafka!
:
7: kafka 2.3.0 API
8: kafka consumer 配置详解和提交方式