上面两篇聊了Kafka概况和Kafka生产者,包含了Kafka的基本概念、设计原理、设计核心以及生产者的核心原理。本篇单独聊聊Kafka的消费者,包括如下内容:
消费者和消费者组
如何创建消费者
如何消费消息
消费者配置
提交和偏移量
再均衡
结束消费
消费者和消费者组 概念Kafka消费者对象订阅主题并接收Kafka的消息,然后验证消息并保存结果。
Kafka消费者是消费者组的一部分。一个消费者组里的消费者订阅的是同一个主题,每个消费者接收主题一部分分区的消息。
消费者组的设计是对消费者进行的一个横向伸缩,用于解决消费者消费数据的速度跟不上生产者生产数据的速度的问题,通过增加消费者,让它们分担负载,分别处理部分分区的消息。
在一个消费者组中的消费者消费的是一个主题的部分分区的消息,而一个主题中包含若干个分区,一个消费者组中也包含着若干个消费者。当二者的数量关系处于不同的大小关系时,Kafka消费者的工作状态也是不同的。看以下三种情况:
消费者数目<分区数目:此时不同分区的消息会被均衡地分配到这些消费者;
消费者数目=分区数目:每个消费者会负责一个分区的消息进行消费;
消费者数目>分区数目:此时会有多余的消费者处于空闲状态,其他的消费者与分区一对一地进行消费。
分区再均衡
当消费者数目与分区数目在以上三种关系间变化时,比如有新的消费者加入、或者有一个消费者发生崩溃时,会发生分区再均衡。
分区再均衡是指分区的所有权从一个消费者转移到另一个消费者。再均衡为消费者组带来了高可用性和伸缩性。但是同时,也会发生如下问题:
在再均衡发生的时候,消费者无法读取消息,会造成整个消费者组有一小段时间的不可用;
当分区被重新分配给另一个消费者时,消费者当前的读取状态会丢失,它有可能需要去刷新缓存,在它重新恢复状态之前会拖慢应用。
因此也要尽量避免不必要的再均衡。
那么消费者组是怎么知道一个消费者可不可用呢?
消费者通过向被指派为群组协调器的Broker发送心跳来维持它们和群组的从属关系以及它们对分区的所有权关系。只要消费者以正常的时间间隔发送心跳,就被认为是活跃的,说明它还在读取分区里的消息。消费者会在轮询消息或提交偏移量时发送心跳。如果消费者停止发送心跳的时间足够长,会话就会过期,群组协调器认为它已经死亡,就会触发一次再均衡。
还有一点需要注意的是,当发生再均衡时,需要做一些清理工作,具体的操作方法可以通过在调用subscribe()方法时传入一个ConsumerRebalanceListener实例即可。
如何创建消费者
创建Kafka的消费者对象的过程与创建生产者的过程是类似的,需要传入必要的属性。在创建消费者的时候以下以下三个选项是必选的:
bootstrap.servers :指定 broker 的地址清单,清单里不需要包含所有的 broker 地址,生产者会从给定的 broker 里查找 broker 的信息。不过建议至少要提供两个 broker 的信息作为容错;
key.deserializer :指定键的反序列化器;
value.deserializer :指定值的反序列化器。
后两个序列化器的说明与生产者的是一样的。
一个简单的创建消费者的代码样例如下:
1
2
3
4
5
6
7
8
9
String topic = "Hello";
String group = "group1";
Properties props = new Properties();
props.put("bootstrap.servers", "server:9091");
/*指定分组 ID*/
props.put("group.id", group);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
创建了Kafka消费者之后,接着就可以订阅主题了。订阅主题可以使用如下两个 API :
consumer.subscribe(Collection topics) :指明需要订阅的主题的集合;
consumer.subscribe(Pattern pattern) :使用正则来匹配需要订阅的集合。
代码样例:
1
consumer.subscribe(Collections.singletonList(topic));