消息队列中间件(三)Kafka 入门指南 (2)

持久性和扩展性 - Kafka使用分布式提交日志,这意味着消息会尽可能快地保留在磁盘上,因此它是持久的。同时具有一定的容错性,Kafka支持在线的水平扩展,消息的自平衡。

高性能 - Kafka对于发布和订阅消息都具有高吞吐量。 即使存储了许多TB的消息,它也保持稳定的性能。且延迟低,适用高并发。时间复杂的为o(1)。

Kafka 应用

用于聚合分布式应用程序中的消息。进行操作监控。

用于跨组织的从多个服务收集日志,然后提供给多个服务器,解决日志聚合问题。

用于流处理,如Storm和Spark Streaming,从kafka中读取数据,然后处理在写入kafka供应用使用。

Kafka 安装 安装 Jdk

具体步骤此处不说。

安装 Kafka

直接官方网站下载对应系统的版本解压即可。
由于Kafka对于windows和Unix平台的控制脚本是不同的,因此如果是windows平台,要使用bin\windows\而不是bin/,并将脚本扩展名更改为.bat。以下命令是基于Unix平台的使用。

# 解压 tar -xzf kafka_2.11-2.0.0.tgz # 启动Zookeeper bin/zookeeper-server-start.sh config/zookeeper.properties # 启动Kafka bin/kafka-server-start.sh config/server.properties # 或者后台启动 bin/kafka-server-start.sh config/server.properties &

让我们创建一个名为“test”的主题,它只包含一个分区,只有一个副本:

`> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

如果我们运行list topic命令,我们现在可以看到该主题:

`> bin/kafka-topics.sh --list --zookeeper localhost:2181 test

或者,您也可以将代理配置为在发布不存在的主题时自动创建主题,而不是手动创建主题。

查看Topic的信息

./kafka-topics.sh --describe --zookeeper localhost:2181 --topic Hello-Kafka

运行生产者,然后在控制台中键入一些消息以发送到服务器。

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test This is a message This is another message`

运行消费者,查看收到的消息

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning > This is a message > This is another message Kafka 工程实例 POM 依赖 <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.1.0</version> </dependency> 生产者

编写生产者 Java 代码。关于 Properties 中的值的意思描述可以在官方文档中找到 。下面的生产者向 Kafka 推送了10条消息。

import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; /** * <p> * Kafka生产者,发送10个数据 * * @Author niujinpeng * @Date 2018/11/16 15:45 */ public class MyProducer { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "192.168.110.132:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props); for (int i = 0; i < 10; i++) { producer.send(new ProducerRecord<String, String>("test", Integer.toString(i), Integer.toString(i))); } producer.close(); } } 消费者

编写消费者 Java 代码。

import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Arrays; import java.util.Properties; /** * <p> * Kafka消费者 * * @Author niujinpeng * @Date 2018/11/19 15:01 */ public class MyConsumer { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "192.168.110.132:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("test")); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } } } }

可以在控制台看到成功运行后的输出,由 offset 可以看到已经消费了10条消息。

INFO | Kafka version : 2.0.0 INFO | Kafka commitId : 3402a8361b734732 INFO | Cluster ID: 0Xrk5M1CSJet0m1ut3zbiw INFO | [Consumer clientId=consumer-1, groupId=test] Discovered group coordinator 192.168.110.132:9092 (id: 2147483647 rack: null) INFO | [Consumer clientId=consumer-1, groupId=test] Revoking previously assigned partitions [] INFO | [Consumer clientId=consumer-1, groupId=test] (Re-)joining group INFO | [Consumer clientId=consumer-1, groupId=test] Successfully joined group with generation 4 INFO | [Consumer clientId=consumer-1, groupId=test] Setting newly assigned partitions [test-0] offset = 38, key = 0, value = 0 offset = 39, key = 1, value = 1 offset = 40, key = 2, value = 2 offset = 41, key = 3, value = 3 offset = 42, key = 4, value = 4 offset = 43, key = 5, value = 5 offset = 44, key = 6, value = 6 offset = 45, key = 7, value = 7 offset = 46, key = 8, value = 8 offset = 47, key = 9, value = 9 问题

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zyfyjz.html