超详细kafka教程来啦 (3)

超详细kafka教程来啦

每个消费者组都记录了一个patition的offset,一个partition只能被一个消费者组里的一个消费者去消费。

如图,一个Topic有4个partition,分别在两个broker上。

对于消费者组A来说,他有两个消费者,所以他里面一个消费者消费2个partition。而对于消费者组B,他有4个消费者,所以一个消费者消费1个partition.

超详细kafka教程来啦

消费者-offset同步提交

void commitSyncReceive() throws InterruptedException { Properties props = new Properties(); props.put("bootstrap.servers", "49.234.77.60:9092"); props.put("group.id", "group_id"); //关闭自动提交 props.put("enable.auto.commit", "false"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("max.poll.records", 1000); props.put("auto.offset.reset", "earliest"); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); consumer.subscribe(Arrays.asList(TOPIC)); while (true){ ConsumerRecords<String, String> msgList=consumer.poll(1000); for (ConsumerRecord<String,String> record:msgList){ System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } //同步提交,当前线程会阻塞直到 offset 提交成功 consumer.commitSync(); } }

消费者-异步提交

void commitAsyncReceive() throws InterruptedException { Properties props = new Properties(); props.put("bootstrap.servers", "49.234.77.60:9092"); props.put("group.id", "group_id"); props.put("enable.auto.commit", "false"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("max.poll.records", 1000); props.put("auto.offset.reset", "earliest"); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); consumer.subscribe(Arrays.asList(TOPIC)); while (true){ ConsumerRecords<String, String> msgList=consumer.poll(1000); for (ConsumerRecord<String,String> record:msgList){ System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } //异步提交 consumer.commitAsync(new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) { if(e!=null){ System.err.println("commit failed for "+map); } } }); } }

消费者-自定义保存offset

void commitCustomSaveOffest() throws InterruptedException { Properties props = new Properties(); props.put("bootstrap.servers", "49.234.77.60:9092"); props.put("group.id", "group_id"); props.put("enable.auto.commit", "false"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("max.poll.records", 1000); props.put("auto.offset.reset", "earliest"); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); consumer.subscribe(Arrays.asList(TOPIC), new ConsumerRebalanceListener() { //调用时机是Consumer停止拉取数据后,Rebalance开始之前,我们可以手动提交offset @Override public void onPartitionsRevoked(Collection<TopicPartition> collection) { } //调用时机是Rebalance之后,Consumer开始拉取数据之前,我们可以在此方法调整offset @Override public void onPartitionsAssigned(Collection<TopicPartition> collection) { } }); while (true){ ConsumerRecords<String, String> msgList=consumer.poll(1000); for (ConsumerRecord<String,String> record:msgList){ System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } consumer.commitAsync(new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) { if(e!=null){ System.err.println("commit failed for "+map); } } }); } } 四、SpringBoot整合Kafka

引入依赖

<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zzwjzs.html