初始 Kafka Consumer 消费者

温馨提示:整个 Kafka 专栏基于 kafka-2.2.1 版本。

1、KafkaConsumer 概述

根据 KafkaConsumer 类上的注释上来看 KafkaConsumer 具有如下特征:

在 Kafka 中 KafkaConsumer 是线程不安全的。

2.2.1 版本的KafkaConsumer 兼容 kafka 0.10.0 和 0.11.0 等低版本。

消息偏移量与消费偏移量(消息消费进度)
Kafka 为分区中的每一条消息维护一个偏移量,即消息偏移量。这个偏移量充当该分区内记录的唯一标识符。消费偏移量(消息消费进度)存储的是消费组当前的处理进度。消息消费进度的提交在 kafka 中可以定时自动提交也可以手动提交。手动提交可以调用 ommitSync() 或 commitAsync 方法。

消费组 与 订阅关系
多个消费这可以同属于一个消费组,消费组内的所有消费者共同消费主题下的所有消息。一个消费组可以订阅多个主题。

队列负载机制
既然同一个消费组内的消费者共同承担主题下所有队列的消费,那他们如何进行分工呢?默认情况下采取平均分配,例如一个消费组有两个消费者c1、c2,一个 topic 的分区数为6,那 c1 会负责3个分区的消费,同样 c2 会负责另外3个分区的分配。

那如果其中一个消费者宕机或新增一个消费者,那队列能动态调整吗?

答案是会重新再次平衡,例如如果新增一个消费者 c3,则c1,c2,c3都会负责2个分区的消息消费,分区重平衡会在后续文章中重点介绍。消费者也可以通过 assign 方法手动指定分区,此时会禁用默认的自动分配机制。

消费者故障检测机制
当通过 subscribe 方法订阅某些主题时,此时该消费者还未真正加入到订阅组,只有当 consumeer#poll 方法被调用后,并且会向 broker 定时发送心跳包,如果 broker 在 session.timeout.ms 时间内未收到心跳包,则 broker 会任务该消费者已宕机,会将其剔除,并触发消费端的分区重平衡。

消费者也有可能遇到“活体锁”的情况,即它继续发送心跳,但没有任何进展。在这种情况下,为了防止消费者无限期地占用它的分区,可以使用max.poll.interval.ms 设置提供了一个活性检测机制。基本上,如果您调用轮询的频率低于配置的最大间隔,那么客户机将主动离开组,以便另一个消费者可以接管它的分区。当这种情况发生时,您可能会看到一个偏移提交失败(由调用{@link #commitSync()}抛出的{@link CommitFailedException}表示)。

kafka 对 poll loop 行为的控制参数
Kafka 提供了如下两个参数来控制 poll 的行为:

max.poll.interval.ms
允许 两次调用 poll 方法的最大间隔,即设置每一批任务最大的处理时间。

max.poll.records
每一次 poll 最大拉取的消息条数。

对于消息处理时间不可预测的情况下上述两个参数可能不够用,那将如何是好呢?

通常的建议将消息拉取与消息消费分开,一个线程负责 poll 消息,处理这些消息使用另外的线程,这里就需要手动提交消费进度。为了控制消息拉起的过快,您可能会需要用到 Consumer#pause(Collection) 方法,暂时停止向该分区拉起消息。RocketMQ 的推模式就是采用了这种策略。如果大家有兴趣的话,可以从笔者所著的《RocketMQ技术内幕》一书中详细了解。

2、KafkaConsume 使用示例 2.1 自动提交消费进度 public static void testConsumer1() { Properties props = new Properties(); props.setProperty("bootstrap.servers", "localhost:9092,localhost:9082,localhost:9072"); props.setProperty("group.id", "C_ODS_ORDERCONSUME_01"); props.setProperty("enable.auto.commit", "true"); props.setProperty("auto.commit.interval.ms", "1000"); props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("TOPIC_ORDER")); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.println("消息消费中"); System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } } } 2.2 手动提交消费进度 public static void testConsumer2() { Properties props = new Properties(); props.setProperty("bootstrap.servers", "localhost:9092"); props.setProperty("group.id", "test"); props.setProperty("enable.auto.commit", "false"); props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("foo", "bar")); final int minBatchSize = 200; List<ConsumerRecord<String, String>> buffer = new ArrayList<>(); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { buffer.add(record); } if (buffer.size() >= minBatchSize) { // insertIntoDb(buffer); // 省略处理逻辑 consumer.commitSync(); buffer.clear(); } } } 3、认识 Consumer 接口

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zywdff.html