Kafka部署与代码实例(2)

项目基于maven构建,不得不说kafka Java客户端实在是太糟糕了;构建环境会遇到很多麻烦。建议参考如下pom.xml;其中各个依赖包必须版本协调一致。如果kafka client的版本和kafka server的版本不一致,将会有很多异常,比如"broker id not exists"等;因为kafka从0.7升级到0.8之后(正名为2.8.0),client与server通讯的protocol已经改变.

<dependencies>
 <dependency>
  <groupId>log4j</groupId>
  <artifactId>log4j</artifactId>
  <version>1.2.14</version>
 </dependency>
 <dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka_2.8.2</artifactId>
  <version>0.8.0</version>
  <exclusions>
   <exclusion>
    <groupId>log4j</groupId>
    <artifactId>log4j</artifactId>
   </exclusion>
  </exclusions>
 </dependency>
 <dependency>
  <groupId>org.scala-lang</groupId>
  <artifactId>scala-library</artifactId>
  <version>2.8.2</version>
 </dependency>
 <dependency>
  <groupId>com.yammer.metrics</groupId>
  <artifactId>metrics-core</artifactId>
  <version>2.2.0</version>
 </dependency>
 <dependency>
  <groupId>com.101tec</groupId>
  <artifactId>zkclient</artifactId>
  <version>0.3</version>
 </dependency>
</dependencies>

四.Producer端代码

1) producer.properties文件:此文件放在/resources目录下

#partitioner.class=
##broker列表可以为kafka server的子集,因为producer需要从broker中获取metadata
##尽管每个broker都可以提供metadata,此处还是建议,将所有broker都列举出来
##此值,我们可以在spring中注入过来
##metadata.broker.list=127.0.0.1:9092,127.0.0.1:9093
##,127.0.0.1:9093
##同步,建议为async
producer.type=sync
compression.codec=0
serializer.class=kafka.serializer.StringEncoder
##在producer.type=async时有效
#batch.num.messages=100

2) KafkaProducerClient.java代码样例

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

/**
 * User: guanqing-liu
 */
public class KafkaProducerClient {

private Producer<String, String> inner;
 
 private String brokerList;//for metadata discovery,spring setter
 private String location = "kafka-producer.properties";//spring setter
 
 private String defaultTopic;//spring setter

public void setBrokerList(String brokerList) {
  this.brokerList = brokerList;
 }

public void setLocation(String location) {
  this.location = location;
 }

public void setDefaultTopic(String defaultTopic) {
  this.defaultTopic = defaultTopic;
 }

public KafkaProducerClient(){}
 
 public void init() throws Exception {
  Properties properties = new Properties();
  properties.load(Thread.currentThread().getContextClassLoader().getResourceAsStream(location));
 
 
  if(brokerList != null) {
   properties.put("metadata.broker.list", brokerList);
  }

ProducerConfig config = new ProducerConfig(properties);
  inner = new Producer<String, String>(config);
 }

public void send(String message){
  send(defaultTopic,message);
 }
 
 public void send(Collection<String> messages){
  send(defaultTopic,messages);
 }
 
 public void send(String topicName, String message) {
  if (topicName == null || message == null) {
   return;
  }
  KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,message);
  inner.send(km);
 }

public void send(String topicName, Collection<String> messages) {
  if (topicName == null || messages == null) {
   return;
  }
  if (messages.isEmpty()) {
   return;
  }
  List<KeyedMessage<String, String>> kms = new ArrayList<KeyedMessage<String, String>>();
  int i= 0;
  for (String entry : messages) {
   KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,entry);
   kms.add(km);
   i++;
   if(i % 20 == 0){
    inner.send(kms);
    kms.clear();
   }
  }
 
  if(!kms.isEmpty()){
   inner.send(kms);
  }
 }

public void close() {
  inner.close();
 }

/**
  * @param args
  */
 public static void main(String[] args) {
  KafkaProducerClient producer = null;
  try {
   producer = new KafkaProducerClient();
   //producer.setBrokerList("");
   int i = 0;
   while (true) {
    producer.send("test-topic", "this is a sample" + i);
    i++;
    Thread.sleep(2000);
   }
  } catch (Exception e) {
   e.printStackTrace();
  } finally {
   if (producer != null) {
    producer.close();
   }
  }

}

}

3) spring配置

<bean init-method="init" destroy-method="close">
        <property value="${zookeeper_cluster}"></property>
        <property value="${kafka_topic}"></property>
    </bean>

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/8c7324087f12d2b47b50b7bc109c7ff5.html