pom.xml文件如下:
所有kafka依赖的jar包都在com.sksamuel.kafka下面。其中kafka使用的版本是0.8.0-beta1,kafka是2.10。
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 ">
<modelVersion>4.0.0</modelVersion>
<groupId>com.iflytek.cpcloud.kafka</groupId>
<artifactId>kafkatest</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>kafkatest</name>
<url></url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.14</version>
</dependency>
<dependency>
<groupId>com.sksamuel.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.0-beta1</version>
</dependency>
</dependencies>
</project>
然后写一个kafka producer的测试程序如下:
package com.iflytek.cpcloud.kafka.kafkatest;
import Java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
/**
* Test the Kafka Producer
* @author jcsong2
*
*/
public class ProducerTest {
public static void main(String[] args) {
Properties props = new Properties();
props.put("zk.connect", "192.168.20.99:2181");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("metadata.broker.list", "192.168.20.99:9092");
ProducerConfig config = new ProducerConfig(props);
Producer<String, String> producer = new Producer<String, String>(config);
for (int i = 0; i < 10; i++)
producer.send(new KeyedMessage<String, String>("test", "test" + i));
}
}
在consuemr端可以看到test0到test9十行输出。