上面的提交方式都是提交当前最大的偏移量,但如果需要提交的是特定的一个偏移量呢?
只需要在重载的提交方法中传入偏移量参数即可。代码样例如下:
2
3
4
// 同步提交特定偏移量
commitSync(Map<TopicPartition, OffsetAndMetadata> offsets)
// 异步提交特定偏移量
commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback)
结束消费
上面的消费过程都是以无限循环的方式来演示的,那么如何来优雅地停止消费者的轮询呢。
Kafka 提供了 consumer.wakeup() 方法用于退出轮询。
如果确定要退出循环,需要通过另一个线程调用consumer.wakeup()方法;如果循环运行在主线程里,可以在ShutdownHook里调用该方法。
它通过抛出 WakeupException 异常来跳出循环。需要注意的是,在退出线程时最好显示的调用 consumer.close() , 此时消费者会提交任何还没有提交的东西,并向群组协调器发送消息,告知自己要离开群组,接下来就会触发再均衡 ,而不需要等待会话超时。
下面的示例代码为监听控制台输出,当输入 exit 时结束轮询,关闭消费者并退出程序:
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// 调用wakeup优雅的退出轮询
final Thread mainThread = Thread.currentThread();
new Thread(() -> {
Scanner sc = new Scanner(System.in);
while (sc.hasNext()) {
if ("exit".equals(sc.next())) {
consumer.wakeup();
try {
// 等待主线程完成提交偏移量、关闭消费者等操作
mainThread.join();
break;
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
for (ConsumerRecord<String, String> rd : records) {
System.out.printf("topic = %s,partition = %d, key = %s, value = %s, offset = %d,\n", rd.topic(), rd.partition(), rd.key(), rd.value(), rd.offset());
}
}
} catch (WakeupException e) {
// 无需处理此异常
} finally {
consumer.close();
}
关注我的公众号,获取更多关于面试、技术的文章及福利资源。
【参考资料】
《Kafka 权威指南》