Kafka从入门到放弃(二) —— 详说生产者
消费者与消费者组在Kafka中消费者是消费消息的对象。假设目前有一个消费者正在消费消息,但生产数据的速度突然上升,这时候消费者会有点力不从心,跟不上消息生产的速度,这时候咋办呢?
我们对消费者进行横向扩展,加几个消费者,达到负载均衡的作用。但是要做点限制吧,不然几个消费者消费同一个分区的消息,不仅没办法提高消费能力,还会造成重复消费。因此让他们分别消费不同的分区。
在Kafka中的消费者组就是如此,一个消费者组内的消费者订阅同一个Topic的数据,但消费不同分区的数据,提高了消费能力。
但是消费者组里的消费者数量建议不要超过分区数量,不然就浪费资源。
LEO & HWKafka中的分区是可以有多个副本的,我们把每个副本中待写入的那个offset称为LEO(Log End Offset),把最少消息的那个副本的LEO称为HW(High Watermark)
对于消费者而言,消费者所能消费的区间就是小于HW那部分,即图中 0-3 部分。这样消费者不管是哪个副本,订阅到的消息都是一致的,即使换了leader也能接着消费。
提交偏移量假如一个消费者退出,另一个消费者接替它的任务,这时候就需要知道上一个消费者消费到了哪条数据,因此消费者需要追踪偏移量。
在Kafka中,有一个名为_consumer_offset的主题,消费者会往里面发送消息,提交偏移量,这个时候消费者也是生产者。
当消费者挂了或者有新的消费者假如消费者组,就会触发在均衡操作,即为消费者重新分配分区。
为了能够继续之前的操作,消费者需要获取每个分区最后一次提交的偏移量。
如果提交的偏移量小于处理的最后一个消息的偏移量,会造成重复消费。比如消费者提交了 6 的offset,此时又拉取了2条数据,还没等提交,消费者就挂掉了,然后就发生了再均衡。新的消费者获取到 6 的偏移量,接着处理,这就造成了重复消费。
如果提交的偏移量大于处理的最后一个消息的偏移量,会造成数据丢失。比如消费者一次性拉取了 88 条数据,并且提交了偏移量,还没处理完就宕机了,新的消费者获取 88 的偏移量,继续消费,就造成了数据丢失。
因此,如何提交偏移量对客户端影响很大,稍有不慎就会造成不好的影响。
在Kafka中,有几种提交偏移量的方式。
自动提交这种提交方式有两个很重要的参数:
enable.auto.commit=true(是否开启自动提交,true or false)
auto.commit.interval.ms=5000(提交偏移量的时间间隔,默认5000ms)
这种方式最容易造成数据丢失以及重复消费。
通过CommitSync()方法手动提交当前偏移量在处理完所有消息后提交,前提要把enable.auto.commit设置为false。
while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for(ConsumerRecords<String, String> record: records){ System.out.println("topic=%s, offset=%s,partition=%s", record.topic(), record.offset(),record.partition()); } try{ consumer.commitSync(); } catch(Exception e){ log.error(e); } }消费者通过poll方***询获取消息,poll里的参数是一个超时时间,用于控制阻塞的时间,如果没有数据则会阻塞这么久,如果设置为0则会立即放回。
使用这种方法一定要在处理完所有记录后调用CommitSync()方法,避免数据丢失。如果发生错误,会进行重试。
异步提交CommitSync() 提交偏移量的方式会造成阻塞,即需要等客户端处理完所有消息后才提交偏移量,限制了吞吐量。因此可以使用异步提交的方式,通过调用commitAsync()方法实现。
while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for(ConsumerRecords<String, String> record: records){ System.out.println("topic=%s, offset=%s,partition=%s", record.topic(), record.offset(),record.partition()); } consumer.commitAsync(); }提交偏移量后就可以去做其他事了。CommitSync()方式发生错误会重试,但CommitAsync()不会。
之所以不重试,是因为有可能在收到broker响应前有其它偏移量提交了。