如果某个提交失败,同步提交还会进行重试,这可以保证数据能够最大限度提交成功,但是同时也会降低程序的吞吐量。
异步提交
为了解决同步提交降低程序吞吐量的问题,又有了异步提交的方案。
异步提交可以提高程序的吞吐量,因为此时你可以尽管请求数据,而不用等待 Broker 的响应。代码样例如下:
2
3
4
5
6
7
8
9
10
11
12
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
// 异步提交并定义回调
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
if (exception != null) {
offsets.forEach((x, y) -> System.out.printf("topic = %s,partition = %d, offset = %s \n", x.topic(), x.partition(), y.offset()));
}
}
});
}
异步提交如果失败,错误信息和偏移量都会被记录下来。
尽管如此,异步提交存在的问题是,如果提交失败不能重试,因为重试可能会出现小偏移量覆盖大偏移量的问题。
虽然程序不能在失败时候进行自动重试,但是我们是可以手动进行重试。可以通过一个 Map<TopicPartition, Integer> offsets 来维护你提交的每个分区的偏移量,也就是异步提交的顺序,在每次提交偏移量之后或在回调里提交偏移量时递增序列号。然后当失败时候,你可以判断失败的偏移量是否小于你维护的同主题同分区的最后提交的偏移量,如果小于则代表你已经提交了更大的偏移量请求,此时不需要重试,否则就可以进行手动重试。
同步和异步组合提交:
当发生关闭消费者或者再均衡时,一定要确保能够提交成功,为了保证性能和可靠性,又有了同步和异步组合提交的方式。也就是在消费者关闭前组合使用commitAsync()方法和commitSync()方法。代码样例如下:
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record);
}
// 异步提交
consumer.commitAsync();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
// 因为即将要关闭消费者,所以要用同步提交保证提交成功
consumer.commitSync();
} finally {
consumer.close();
}
}
提交特定的偏移量