试想一下,如果会重试的话,当提交 66 的偏移量时发生网络问题,与此同时提交了 88 的偏移量,这时候刚好网络又通了,然后 88 的偏移量就提交成功了,然后 66 就重试,成功后又变成 66 了,就有可能造成重复消费。
之所以说这个问题,是因为异步提交支持在broker响应时回调,常被用于记录错误或生成度量指标。如果用他重试的话一定要注意提交的顺序。
while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for(ConsumerRecords<String, String> record: records){ System.out.println("topic=%s, offset=%s,partition=%s", record.topic(), record.offset(),record.partition()); } consumer.commitAsync(new OffsetCommitCallback() { public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception e){ if(e != null){ log.error("Error"); } } }); } 异步与同步组合提交如果发生在关闭消费者或者再均衡前的最后一次提交,就需要确保其成功。
因此在消费者关闭前一般会通过组合使用的方式确保其提交成功。
try{ while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for(ConsumerRecords<String, String> record: records){ System.out.println("topic=%s, offset=%s,partition=%s", record.topic(),record.offset(),record.partition()); } consumer.commitAsync(); } }catch(Exception e){ log.error(e); }finally { try { consumer.commitSync(); } finally{ consumer.close(); } } 提交特定偏移量commitSync() 和 commitAsync() 方法一般是在处理完一个批次后提交偏移量。如果需要更频繁的提交偏移量,需要在处理的过程中间提交的话,消费者 API 允许在调用 commitSync()和 commitAsync () 方法时传进去希望提交的分区和偏移量的 map
Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<TopicPartition, OffsetAndMetadata>(); int count = 0; try { while(true){ ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); if (records.isEmpty()){ continue; } for (ConsumerRecord<String, String> record : records){ System.out.println("topic=%s, offset=%s,partition=%s", record.topic(),record.offset(),record.partition()); currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset(), "no metadata")); // 每处理完1000条消息后就提交偏移量 if (count%1000==0) { consumer.commitAsync(currentOffsets, null); } count++; } } } finally { try{ consumer.commitSync(); } finally{ consumer.close(); } } 消费者分区分配策略分区会被分配给消费者组里的消费者进行消费,在Kafka种可以通过配置参数partition.assignment.strategy选择分区分配策略。
Range 范围分区
假设现在有10个分区,消费者组里有3个消费者。
分区数量 10 除以消费者数量 3 取整(10/3)得 3,设为 x;分区数量 10 模 消费者数量 3(10%3)得 1,设为 y
则前 y 个消费者分得 x+1 个分区;其余消费者分得 x 个分区。
RoundRobin 轮询分区
假设有10个分区,3个消费者,第一个分区给第一个消费者,第二个给第二个消费者,第三个分区给第三个消费者,第四个给第一个消费者... 以此类推
到这,消费者的点就讲得差不多了,可能有些细节没写或者没讲明白。后面如果发现了,我另写一篇补上。如果觉得写得还行得的话,麻烦点个小赞,谢谢!