acks=all 只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应
这种模式是最安全的,就算有服务器发生崩溃,数据也不会丢失。
不过,它的延迟比 acks=1 时更高,因为我们要等待不只一个服务器节点接收消息。
该参数决定了生产者可以重发消息的次数(每次重试之间等待 retry.backoff.ms)。
服务器返回临时性的错误(比如:分区找不到 leader)时,生产者会自动重试,没必要在代码逻辑里处理可重试的错误。
作为开发者,只需要处理那些不可重试的错误(比如:消息字节数超过单个发送批次上限)或重试次数超出上限的情况即可。
该参数指定生产者,最多可以发送未响应在途消息批次数量。
在途消息批次越多,会占用更多的内存,不过也会提升吞吐量。
当retries > 0且max.in.flight.requests.per.connection > 1时,可能出现消息乱序。
如果第一个批次消息写入失败,而第二个批次写入成功,broker 会重试写入第一个批次。
如果此时第一个批次也写入成功,那么两个批次的顺序就反过来了。
一般不建议设置retries=0,而是令max.in.flight.requests.per.connection = 1来保证消息顺序。
在生产者尝试发送第一批消息时,就不会有其他的消息发送给 broker,即使发生重试消息也不会乱序。
不过这样会严重影响生产者的吞吐量,所以只有在对消息的顺序有严格要求的情况下才能这么做。
当 broker 失效时生产者可能会自动重试,导致一条消息被重复写入多次。
为了避免这种情况,Kafka 在生产者端提供来幂等保证:同一条消息被生产者发送多次,但在 broker端这条消息只会被写入日志一次。
在发送端设置 enable.idempotence = true 可以开启幂等性,此时配置同时满足以下条件:
max.in.flight.requests.per.connection ≤ 5
retries > 0
acks = all
其工作机制如下:
producer 在初始化时必须分配一个 PIDproducer id该过程对用户来说是完全透明的)
发送到 broker 端的每批消息都会被赋予一个单调递增的 SNsequence number用于消息去重(每个分区都有独立的序列号)
接收到消息的 broker 会将批次的(PID, SN)信息一同持久化到对应的分区日志中(保证 leader 切换后去重仍然生效)
若重试导致 broker 接收到小于或等于已知最大序列号的消息,broker 会拒绝写入这些消息,从而保证每条消息也只会被保存在日志中一次。
由于每个 producer 实例都会被分配不同的 PID,该机制只能保证单个 producer 实例的幂等性,无法实现协同多个 producer 实现幂等。
Kafka 事务可以实现 producer 对多个主题和分区的原子写入,并且保证 consumer 不会读取到未提交的数据。
Kafka 要求应用程序必须提供一个全局唯一的 TIDtransactional id:
初始化时,producer 首先要向 broker 集群注册其 TID,broker 会根据给定的 TID 检查是否存在未完成的事务。
如果某个 producer 实例失效,该机制能够保证下一个拥有相同 TID 的实例首先完成之前未完成的事务。