Apache Kafka:下一代分布式消息系统(4)

Kafka生产者代码示例

/** * Instantiates a new Kafka producer. * * @param topic the topic * @param directoryPath the directory path */ public KafkaMailProducer(String topic, String directoryPath) { props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("metadata.broker.list", "localhost:9092"); producer = new kafka.javaapi.producer.Producer<Integer, String>(new ProducerConfig(props)); this.topic = topic; this.directoryPath = directoryPath; } public void run() { Path dir = Paths.get(directoryPath); try { new WatchDir(dir).start(); new ReadDir(dir).start(); } catch (IOException e) { e.printStackTrace(); } }

上面的代码片断展示了Kafka生产者API的基本用法,例如设置生产者的属性,包括发布哪个话题的消息,可以使用哪个序列化类以及代理的相关信息。这个类的基本功能是从邮件目录读取邮件消息文件,然后作为消息发布到Kafka代理。目录通过java.nio.WatchService类监视,一旦新的邮件消息Dump到该目录,就会被立即读取并作为消息发布到Kafka代理。

Kafka消费者代码示例

public KafkaMailConsumer(String topic) { consumer = Kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig()); this.topic = topic; } /** * Creates the consumer config. * * @return the consumer config */ private static ConsumerConfig createConsumerConfig() { Properties props = new Properties(); props.put("zookeeper.connect", KafkaMailProperties.zkConnect); props.put("group.id", KafkaMailProperties.groupId); props.put("zookeeper.session.timeout.ms", "400"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); return new ConsumerConfig(props); } public void run() { Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, new Integer(1)); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0); ConsumerIterator<byte[], byte[]> it = stream.iterator(); while (it.hasNext()) System.out.println(new String(it.next().message())); }

上面的代码演示了基本的消费者API。正如我们前面提到的,消费者需要设置消费的消息流。在Run方法中,我们进行了设置,并在控制台打印收到的消息。在我的项目中,我们将其输入到解析系统以提取OTC定价。

在当前的质量保证系统中,我们使用Kafka作为消息服务器用于概念验证(Proof of Concept,POC)项目,它的整体性能优于JMS消息服务。其中一个我们感到非常兴奋的特性是消息的再消费(re-consumption),这让我们的解析系统可以按照业务需求重新解析某些消息。基于Kafka这些很好的效果,我们正计划使用它,而不是用Nagios系统,去做日志聚合与分析。

总结

Kafka是一种处理大量数据的新型系统。Kafka基于拉的消费模型让消费者以自己的速度处理消息。如果处理消息时出现了异常,消费者始终可以选择再消费该消息。

关于作者

Apache Kafka:下一代分布式消息系统

Abhishek Sharma是金融领域产品的自然语言处理(NLP)、机器学习和解析程序员。他为多个公司提供算法设计和解析开发。Abhishek的兴趣包括分布式系统、自然语言处理和使用机器算法进行大数据分析。

相关阅读

分布式发布订阅消息系统 Kafka 架构设计

Apache Kafka 代码实例

Apache Kafka 教程笔记

Apache kafka原理与特性(0.8V) 

Kafka部署与代码实例 

Kafka介绍和集群环境搭建 

Kafka 的详细介绍请点这里
Kafka 的下载地址请点这里

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/7fcfa478f0db699e0ee73954f12a4b16.html