项目基于maven构建,不得不说kafka Java客户端实在是太糟糕了;构建环境会遇到很多麻烦。建议参考如下pom.xml;其中各个依赖包必须版本协调一致。如果kafka client的版本和kafka server的版本不一致,将会有很多异常,比如"broker id not exists"等;因为kafka从0.7升级到0.8之后(正名为2.8.0),client与server通讯的protocol已经改变.
<dependencies>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.14</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.8.2</artifactId>
<version>0.8.0</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.8.2</version>
</dependency>
<dependency>
<groupId>com.yammer.metrics</groupId>
<artifactId>metrics-core</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.3</version>
</dependency>
</dependencies>
四.Producer端代码
1) producer.properties文件:此文件放在/resources目录下
#partitioner.class=
##broker列表可以为kafka server的子集,因为producer需要从broker中获取metadata
##尽管每个broker都可以提供metadata,此处还是建议,将所有broker都列举出来
##此值,我们可以在spring中注入过来
##metadata.broker.list=127.0.0.1:9092,127.0.0.1:9093
##,127.0.0.1:9093
##同步,建议为async
producer.type=sync
compression.codec=0
serializer.class=kafka.serializer.StringEncoder
##在producer.type=async时有效
#batch.num.messages=100
2) KafkaProducerClient.java代码样例
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
/**
* User: guanqing-liu
*/
public class KafkaProducerClient {
private Producer<String, String> inner;
private String brokerList;//for metadata discovery,spring setter
private String location = "kafka-producer.properties";//spring setter
private String defaultTopic;//spring setter
public void setBrokerList(String brokerList) {
this.brokerList = brokerList;
}
public void setLocation(String location) {
this.location = location;
}
public void setDefaultTopic(String defaultTopic) {
this.defaultTopic = defaultTopic;
}
public KafkaProducerClient(){}
public void init() throws Exception {
Properties properties = new Properties();
properties.load(Thread.currentThread().getContextClassLoader().getResourceAsStream(location));
if(brokerList != null) {
properties.put("metadata.broker.list", brokerList);
}
ProducerConfig config = new ProducerConfig(properties);
inner = new Producer<String, String>(config);
}
public void send(String message){
send(defaultTopic,message);
}
public void send(Collection<String> messages){
send(defaultTopic,messages);
}
public void send(String topicName, String message) {
if (topicName == null || message == null) {
return;
}
KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,message);
inner.send(km);
}
public void send(String topicName, Collection<String> messages) {
if (topicName == null || messages == null) {
return;
}
if (messages.isEmpty()) {
return;
}
List<KeyedMessage<String, String>> kms = new ArrayList<KeyedMessage<String, String>>();
int i= 0;
for (String entry : messages) {
KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,entry);
kms.add(km);
i++;
if(i % 20 == 0){
inner.send(kms);
kms.clear();
}
}
if(!kms.isEmpty()){
inner.send(kms);
}
}
public void close() {
inner.close();
}
/**
* @param args
*/
public static void main(String[] args) {
KafkaProducerClient producer = null;
try {
producer = new KafkaProducerClient();
//producer.setBrokerList("");
int i = 0;
while (true) {
producer.send("test-topic", "this is a sample" + i);
i++;
Thread.sleep(2000);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (producer != null) {
producer.close();
}
}
}
}
3) spring配置
<bean init-method="init" destroy-method="close">
<property value="${zookeeper_cluster}"></property>
<property value="${kafka_topic}"></property>
</bean>