kafka知识体系51-消费者编程实践 (2)

程序中设置配置enable.auto.commit=true,auto.commit.interval.ms=1000即每秒钟一次的频率自动提交offset的值,这种方式最简单方便。存在的问题是如果我们拉取消息后,如果在处理消息的过程中出现异常,而此时offset值已经更新提交了,会导致消息没有正确处理即已丢失。

手工提交偏移量
针对自动提交偏移量的缺陷,kafka消费者允许我们手工控制何时将记录视为已消耗,并提交其偏移量,这能将消息的拉取和消息的处理过程解耦。

package com.molyeo.kafka; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Properties; public class ManualOffsetControlDemo { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "LAPTOP-2CBRDCI0:9092"); props.put("group.id", "mygroup"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("TEST")); final int minBatchSize = 20; List<ConsumerRecord<String, String>> buffer = new ArrayList<>(); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { buffer.add(record); } if (buffer.size() >= minBatchSize) { handle(buffer); consumer.commitSync(); buffer.clear(); } } } public static void handle(List<ConsumerRecord<String, String>> recordList) { //do something } }

在上面的代码中,我们拉取消息,并将消息放入到缓存中,当消息累计到一定数量后,调用handle方法去处理数据,处理完成后再去提交偏移量。如果在handle处理后,偏移量提交前进程失败了,则下次启动后由于偏移量还是之前的,我们还能再重复消费一次数据。

上面的代码中,是将已接收的消息标记为已提交,此外我们还可以明确指标偏移量来更好的控制已提交的记录。如下我们将一个分区的消息接收处理完成后,按照分区去提交偏移量。

try { while(running) { ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE); for (TopicPartition partition : records.partitions()) { List<ConsumerRecord<String, String>> partitionRecords = records.records(partition); for (ConsumerRecord<String, String> record : partitionRecords) { System.out.println(record.offset() + ": " + record.value()); } long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1))); } } } finally { consumer.close(); }

多线程并发消费
多线程并发消费的时候,尤其得注意KafkaConsumer不是线程安全的。这样每个线程不能共享KafkaConsumer实例,常用的方法是每个线程包含一个KafkaConsumer实例,这样offset的控制则只需要关注本线程,不需要考虑其他线程的情况。

package com.molyeo.kafka; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.errors.WakeupException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; /** * Created by zhangkh on 2018/7/9. * auto commit offset * multi thread */ public class MultiKafkaConsumerDemo { Logger logger = LoggerFactory.getLogger(MultiKafkaConsumerDemo.class.getName()); public static void main(String[] args) { String topic = "TEST"; String groupId = "mygrouop"; int nThreads = 3; ConsumerGroup consumerGroup = new ConsumerGroup(topic, groupId, nThreads); consumerGroup.run(); } } class ConsumerGroup { Logger logger = LoggerFactory.getLogger(ConsumerGroup.class.getName()); private ExecutorService executorService; private List<ConsumerTask> consumerTaskList; ConsumerGroup(String topic, String groupId, int nThreads) { executorService = Executors.newFixedThreadPool(nThreads); consumerTaskList = new ArrayList<>(nThreads); for (int i = 0; i < nThreads; i++) { consumerTaskList.add(new ConsumerTask(topic, groupId, i)); } } public void run() { for (ConsumerTask task : consumerTaskList) { executorService.submit(task); } } public void shutdown() { if (executorService != null) { executorService.shutdown(); } try { if (!executorService.awaitTermination(5000, TimeUnit.MILLISECONDS)) { logger.info("Timed out waiting for consumer threads to shut down, exiting uncleanly"); } } catch (InterruptedException e) { logger.error("Interrupted during shutdown, exiting uncleanly"); e.printStackTrace(); } } } class ConsumerTask<K, V> implements Runnable { private final AtomicBoolean closed = new AtomicBoolean(false); Logger logger = LoggerFactory.getLogger(ConsumerTask.class.getName()); private final KafkaConsumer consumer; private String topic; private String groupId; private int threadNo; public ConsumerTask(String topic, String groupId, int threadNo) { this.topic = topic; this.groupId = groupId; this.threadNo = threadNo; Properties props = getConsumerConfig(groupId); consumer = new KafkaConsumer(props); consumer.subscribe(Arrays.asList(topic)); } public Properties getConsumerConfig(String groupId) { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaCommonConfig.BOOTSTRAP_SERVERS); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaCommonConfig.KEY_DESERIALIZER_CLASS); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaCommonConfig.VALUE_DESERIALIZER_CLASS); return props; } @Override public void run() { try{ while (!closed.get()) { ConsumerRecords<K, V> records = consumer.poll(100); for (ConsumerRecord record : records) { //TODO do something logger.info("{},threadNo={},topic={},groupId={},offset = {}, key = {}, value = {}", Thread.currentThread().getName(),threadNo, record.topic(),groupId, record.offset(), record.key(), record.value()); } } }catch (WakeupException e){ if (!closed.get()) throw e; }finally { consumer.close(); } } public void shutdown() { closed.set(true); consumer.wakeup(); } }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zyyjzg.html