最近有同学咨询Kafka的消费和心跳机制,今天笔者将通过这篇博客来逐一介绍这些内容。
2.内容 2.1 Kafka消费首先,我们来看看消费。Kafka提供了非常简单的消费API,使用者只需初始化Kafka的Broker Server地址,然后实例化KafkaConsumer类即可拿到Topic中的数据。一个简单的Kafka消费实例代码如下所示:
public class JConsumerSubscribe extends Thread { public static void main(String[] args) { JConsumerSubscribe jconsumer = new JConsumerSubscribe(); jconsumer.start(); } /** 初始化Kafka集群信息. */ private Properties configure() { Properties props = new Properties(); props.put("bootstrap.servers", "dn1:9092,dn2:9092,dn3:9092");// 指定Kafka集群地址 props.put("group.id", "ke");// 指定消费者组 props.put("enable.auto.commit", "true");// 开启自动提交 props.put("auto.commit.interval.ms", "1000");// 自动提交的时间间隔 // 反序列化消息主键 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 反序列化消费记录 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); return props; } /** 实现一个单线程消费者. */ @Override public void run() { // 创建一个消费者实例对象 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configure()); // 订阅消费主题集合 consumer.subscribe(Arrays.asList("test_kafka_topic")); // 实时消费标识 boolean flag = true; while (flag) { // 获取主题消息数据 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) // 循环打印消息记录 System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } // 出现异常关闭消费者对象 consumer.close(); } }