Kafka部署与代码实例(3)

1) consumer.properties:文件位于/resources目录下

## 此值可以配置,也可以通过spring注入
##zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183
##,127.0.0.1:2182,127.0.0.1:2183
# timeout in ms for connecting to zookeeper
zookeeper.connectiontimeout.ms=1000000
#consumer group id
group.id=test-group
#consumer timeout
#consumer.timeout.ms=5000
auto.commit.enable=true
auto.commit.interval.ms=60000

2) KafkaConsumerClient.Java代码样例

package com.test.kafka;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.Message;
import kafka.message.MessageAndMetadata;

/**
 * User: guanqing-liu
 */
public class KafkaConsumerClient {

private String groupid; //can be setting by spring
 private String zkConnect;//can be setting by spring
 private String location = "kafka-consumer.properties";//配置文件位置
 private String topic;
 private int partitionsNum = 1;
 private MessageExecutor executor; //message listener
 private ExecutorService threadPool;
 
 private ConsumerConnector connector;
 
 private Charset charset = Charset.forName("utf8");

public void setGroupid(String groupid) {
  this.groupid = groupid;
 }

public void setZkConnect(String zkConnect) {
  this.zkConnect = zkConnect;
 }

public void setLocation(String location) {
  this.location = location;
 }

public void setTopic(String topic) {
  this.topic = topic;
 }

public void setPartitionsNum(int partitionsNum) {
  this.partitionsNum = partitionsNum;
 }

public void setExecutor(MessageExecutor executor) {
  this.executor = executor;
 }

public KafkaConsumerClient() {}

//init consumer,and start connection and listener
 public void init() throws Exception {
  if(executor == null){
   throw new RuntimeException("KafkaConsumer,exectuor cant be null!");
  }
  Properties properties = new Properties();
  properties.load(Thread.currentThread().getContextClassLoader().getResourceAsStream(location));
 
  if(groupid != null){
   properties.put("groupid", groupid);
  }
  if(zkConnect != null){
   properties.put("zookeeper.connect", zkConnect);
  }
  ConsumerConfig config = new ConsumerConfig(properties);

connector = Consumer.createJavaConsumerConnector(config);
  Map<String, Integer> topics = new HashMap<String, Integer>();
  topics.put(topic, partitionsNum);
  Map<String, List<KafkaStream<byte[], byte[]>>> streams = connector.createMessageStreams(topics);
  List<KafkaStream<byte[], byte[]>> partitions = streams.get(topic);
  threadPool = Executors.newFixedThreadPool(partitionsNum * 2);
 
  //start
  for (KafkaStream<byte[], byte[]> partition : partitions) {
   threadPool.execute(new MessageRunner(partition));
  }
 }

public void close() {
  try {
   threadPool.shutdownNow();
  } catch (Exception e) {
   //
  } finally {
   connector.shutdown();
  }

}

class MessageRunner implements Runnable {
  private KafkaStream<byte[], byte[]> partition;

MessageRunner(KafkaStream<byte[], byte[]> partition) {
   this.partition = partition;
  }

public void run() {
   ConsumerIterator<byte[], byte[]> it = partition.iterator();
   while (it.hasNext()) {
    // connector.commitOffsets();手动提交offset,当autocommit.enable=false时使用
    MessageAndMetadata<byte[], byte[]> item = it.next();
    try{
     executor.execute(new String(item.message(),charset));// UTF-8,注意异常
    }catch(Exception e){
     //
    }
   }
  }
 
  public String getContent(Message message){
            ByteBuffer buffer = message.payload();
            if (buffer.remaining() == 0) {
                return null;
            }
            CharBuffer charBuffer = charset.decode(buffer);
            return charBuffer.toString();
  }
 }

public static interface MessageExecutor {

public void execute(String message);
 }

/**
  * @param args
  */
 public static void main(String[] args) {
  KafkaConsumerClient consumer = null;
  try {
   MessageExecutor executor = new MessageExecutor() {

public void execute(String message) {
     System.out.println(message);
    }
   };
   consumer = new KafkaConsumerClient();
   
   consumer.setTopic("test-topic");
   consumer.setPartitionsNum(2);
   consumer.setExecutor(executor);
   consumer.init();
  } catch (Exception e) {
   e.printStackTrace();
  } finally {
    if(consumer != null){
    consumer.close();
    }
  }

}

}

3) spring配置(略)

需要提醒的是,上述LogConsumer类中,没有太多的关注异常情况,必须在MessageExecutor.execute()方法中抛出异常时的情况.

在测试时,建议优先启动consumer,然后再启动producer,这样可以实时的观测到最新的消息。

Kafka 的详细介绍请点这里
Kafka 的下载地址请点这里

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/8c7324087f12d2b47b50b7bc109c7ff5.html