程序中设置配置enable.auto.commit=true,auto.commit.interval.ms=1000即每秒钟一次的频率自动提交offset的值,这种方式最简单方便。存在的问题是如果我们拉取消息后,如果在处理消息的过程中出现异常,而此时offset值已经更新提交了,会导致消息没有正确处理即已丢失。
手工提交偏移量
针对自动提交偏移量的缺陷,kafka消费者允许我们手工控制何时将记录视为已消耗,并提交其偏移量,这能将消息的拉取和消息的处理过程解耦。
package com.molyeo.kafka;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
public class ManualOffsetControlDemo {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "LAPTOP-2CBRDCI0:9092");
props.put("group.id", "mygroup");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("TEST"));
final int minBatchSize = 20;
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
buffer.add(record);
}
if (buffer.size() >= minBatchSize) {
handle(buffer);
consumer.commitSync();
buffer.clear();
}
}
}
public static void handle(List<ConsumerRecord<String, String>> recordList) {
//do something
}
}
在上面的代码中,我们拉取消息,并将消息放入到缓存中,当消息累计到一定数量后,调用handle方法去处理数据,处理完成后再去提交偏移量。如果在handle处理后,偏移量提交前进程失败了,则下次启动后由于偏移量还是之前的,我们还能再重复消费一次数据。
上面的代码中,是将已接收的消息标记为已提交,此外我们还可以明确指标偏移量来更好的控制已提交的记录。如下我们将一个分区的消息接收处理完成后,按照分区去提交偏移量。
try {
while(running) {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) {
System.out.println(record.offset() + ": " + record.value());
}
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
}
}
} finally {
consumer.close();
}
多线程并发消费
多线程并发消费的时候,尤其得注意KafkaConsumer不是线程安全的。这样每个线程不能共享KafkaConsumer实例,常用的方法是每个线程包含一个KafkaConsumer实例,这样offset的控制则只需要关注本线程,不需要考虑其他线程的情况。
package com.molyeo.kafka;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Created by zhangkh on 2018/7/9.
* auto commit offset
* multi thread
*/
public class MultiKafkaConsumerDemo {
Logger logger = LoggerFactory.getLogger(MultiKafkaConsumerDemo.class.getName());
public static void main(String[] args) {
String topic = "TEST";
String groupId = "mygrouop";
int nThreads = 3;
ConsumerGroup consumerGroup = new ConsumerGroup(topic, groupId, nThreads);
consumerGroup.run();
}
}
class ConsumerGroup {
Logger logger = LoggerFactory.getLogger(ConsumerGroup.class.getName());
private ExecutorService executorService;
private List<ConsumerTask> consumerTaskList;
ConsumerGroup(String topic, String groupId, int nThreads) {
executorService = Executors.newFixedThreadPool(nThreads);
consumerTaskList = new ArrayList<>(nThreads);
for (int i = 0; i < nThreads; i++) {
consumerTaskList.add(new ConsumerTask(topic, groupId, i));
}
}
public void run() {
for (ConsumerTask task : consumerTaskList) {
executorService.submit(task);
}
}
public void shutdown() {
if (executorService != null) {
executorService.shutdown();
}
try {
if (!executorService.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
logger.info("Timed out waiting for consumer threads to shut down, exiting uncleanly");
}
} catch (InterruptedException e) {
logger.error("Interrupted during shutdown, exiting uncleanly");
e.printStackTrace();
}
}
}
class ConsumerTask<K, V> implements Runnable {
private final AtomicBoolean closed = new AtomicBoolean(false);
Logger logger = LoggerFactory.getLogger(ConsumerTask.class.getName());
private final KafkaConsumer consumer;
private String topic;
private String groupId;
private int threadNo;
public ConsumerTask(String topic, String groupId, int threadNo) {
this.topic = topic;
this.groupId = groupId;
this.threadNo = threadNo;
Properties props = getConsumerConfig(groupId);
consumer = new KafkaConsumer(props);
consumer.subscribe(Arrays.asList(topic));
}
public Properties getConsumerConfig(String groupId) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaCommonConfig.BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaCommonConfig.KEY_DESERIALIZER_CLASS);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaCommonConfig.VALUE_DESERIALIZER_CLASS);
return props;
}
@Override
public void run() {
try{
while (!closed.get()) {
ConsumerRecords<K, V> records = consumer.poll(100);
for (ConsumerRecord record : records) {
//TODO do something
logger.info("{},threadNo={},topic={},groupId={},offset = {}, key = {}, value = {}", Thread.currentThread().getName(),threadNo, record.topic(),groupId, record.offset(), record.key(), record.value());
}
}
}catch (WakeupException e){
if (!closed.get()) throw e;
}finally {
consumer.close();
}
}
public void shutdown() {
closed.set(true);
consumer.wakeup();
}
}