每个消费者组都记录了一个patition的offset,一个partition只能被一个消费者组里的一个消费者去消费。
如图,一个Topic有4个partition,分别在两个broker上。
对于消费者组A来说,他有两个消费者,所以他里面一个消费者消费2个partition。而对于消费者组B,他有4个消费者,所以一个消费者消费1个partition.
消费者-offset同步提交
void commitSyncReceive() throws InterruptedException { Properties props = new Properties(); props.put("bootstrap.servers", "49.234.77.60:9092"); props.put("group.id", "group_id"); //关闭自动提交 props.put("enable.auto.commit", "false"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("max.poll.records", 1000); props.put("auto.offset.reset", "earliest"); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); consumer.subscribe(Arrays.asList(TOPIC)); while (true){ ConsumerRecords<String, String> msgList=consumer.poll(1000); for (ConsumerRecord<String,String> record:msgList){ System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } //同步提交,当前线程会阻塞直到 offset 提交成功 consumer.commitSync(); } }消费者-异步提交
void commitAsyncReceive() throws InterruptedException { Properties props = new Properties(); props.put("bootstrap.servers", "49.234.77.60:9092"); props.put("group.id", "group_id"); props.put("enable.auto.commit", "false"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("max.poll.records", 1000); props.put("auto.offset.reset", "earliest"); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); consumer.subscribe(Arrays.asList(TOPIC)); while (true){ ConsumerRecords<String, String> msgList=consumer.poll(1000); for (ConsumerRecord<String,String> record:msgList){ System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } //异步提交 consumer.commitAsync(new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) { if(e!=null){ System.err.println("commit failed for "+map); } } }); } }消费者-自定义保存offset
void commitCustomSaveOffest() throws InterruptedException { Properties props = new Properties(); props.put("bootstrap.servers", "49.234.77.60:9092"); props.put("group.id", "group_id"); props.put("enable.auto.commit", "false"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("max.poll.records", 1000); props.put("auto.offset.reset", "earliest"); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); consumer.subscribe(Arrays.asList(TOPIC), new ConsumerRebalanceListener() { //调用时机是Consumer停止拉取数据后,Rebalance开始之前,我们可以手动提交offset @Override public void onPartitionsRevoked(Collection<TopicPartition> collection) { } //调用时机是Rebalance之后,Consumer开始拉取数据之前,我们可以在此方法调整offset @Override public void onPartitionsAssigned(Collection<TopicPartition> collection) { } }); while (true){ ConsumerRecords<String, String> msgList=consumer.poll(1000); for (ConsumerRecord<String,String> record:msgList){ System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } consumer.commitAsync(new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) { if(e!=null){ System.err.println("commit failed for "+map); } } }); } } 四、SpringBoot整合Kafka引入依赖
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>