Kafka从入门到放弃(三)—— 详说消费者 (2)

试想一下,如果会重试的话,当提交 66 的偏移量时发生网络问题,与此同时提交了 88 的偏移量,这时候刚好网络又通了,然后 88 的偏移量就提交成功了,然后 66 就重试,成功后又变成 66 了,就有可能造成重复消费。

之所以说这个问题,是因为异步提交支持在broker响应时回调,常被用于记录错误或生成度量指标。如果用他重试的话一定要注意提交的顺序。

while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for(ConsumerRecords<String, String> record: records){ System.out.println("topic=%s, offset=%s,partition=%s", record.topic(), record.offset(),record.partition()); } consumer.commitAsync(new OffsetCommitCallback() { public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception e){ if(e != null){ log.error("Error"); } } }); } 异步与同步组合提交

如果发生在关闭消费者或者再均衡前的最后一次提交,就需要确保其成功。

因此在消费者关闭前一般会通过组合使用的方式确保其提交成功。

try{ while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for(ConsumerRecords<String, String> record: records){ System.out.println("topic=%s, offset=%s,partition=%s", record.topic(),record.offset(),record.partition()); } consumer.commitAsync(); } }catch(Exception e){ log.error(e); }finally { try { consumer.commitSync(); } finally{ consumer.close(); } } 提交特定偏移量

commitSync() 和 commitAsync() 方法一般是在处理完一个批次后提交偏移量。如果需要更频繁的提交偏移量,需要在处理的过程中间提交的话,消费者 API 允许在调用 commitSync()和 commitAsync () 方法时传进去希望提交的分区和偏移量的 map

Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<TopicPartition, OffsetAndMetadata>(); int count = 0; try { while(true){ ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); if (records.isEmpty()){ continue; } for (ConsumerRecord<String, String> record : records){ System.out.println("topic=%s, offset=%s,partition=%s", record.topic(),record.offset(),record.partition()); currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset(), "no metadata")); // 每处理完1000条消息后就提交偏移量 if (count%1000==0) { consumer.commitAsync(currentOffsets, null); } count++; } } } finally { try{ consumer.commitSync(); } finally{ consumer.close(); } } 消费者分区分配策略

分区会被分配给消费者组里的消费者进行消费,在Kafka种可以通过配置参数partition.assignment.strategy选择分区分配策略。

Range 范围分区

假设现在有10个分区,消费者组里有3个消费者。

分区数量 10 除以消费者数量 3 取整(10/3)得 3,设为 x;分区数量 10 模 消费者数量 3(10%3)得 1,设为 y

则前 y 个消费者分得 x+1 个分区;其余消费者分得 x 个分区。

Kafka从入门到放弃(三)—— 详说消费者

RoundRobin 轮询分区

假设有10个分区,3个消费者,第一个分区给第一个消费者,第二个给第二个消费者,第三个分区给第三个消费者,第四个给第一个消费者... 以此类推

Kafka从入门到放弃(三)—— 详说消费者

到这,消费者的点就讲得差不多了,可能有些细节没写或者没讲明白。后面如果发现了,我另写一篇补上。如果觉得写得还行得的话,麻烦点个小赞,谢谢!

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zgjzfg.html