Kafka的概念和入门
Kafka是一个消息系统。由LinkedIn于2011年设计开发。
Kafka是一种分布式的,基于发布/订阅的消息系统。主要设计目标如下:
以时间复杂度O(1)的方式提供消息持久化能力,即使对TB级以上数据页能保证常数时间复杂度的访问性能。
高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒100K条以上的消息传输。
支持Kafka Server间的消息分区,及分布式消费,同时保证每个Partition内的消息顺序传输。
同时支持离线数据处理和实时数据处理。
支持在线水平扩展。
消费者是采用pull模式从Broker订阅消息。
模式 优点 缺点pull模式 消费者可以根据自己的消费能力决定拉取的策略 没有消息的时候会空轮询(kafka为了避免,有个参数可以阻塞直到新消息到达)
push模式 及时性高 消费能力远低于生产能力时,就会导致客户端消息堆积,甚至服务崩溃。服务端需要维护每次传输状态,以防消息传递失败好进行重试。
Kafka的基本概念
Broker:Kafka集群包含一个或多个服务器,这种服务器被称为broker
Topic:每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。
Partition:Partition是物理上的概念,每个Topic包含一个或多个Partition
Producer:负责发布消息到Kafka broker
Consumer:消息消费者,向Kafka broker读取消息的客户端
Consumer Group:每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)
单机部署结构
集群部署结构
Topic和Partition
一个Topic可以包含一个或者多个Partition。因为一台机器的存储和性能是有限的,多个Partition是为了支持水平扩展和并行处理。
Partition和Replica
分为多个Partition可以将消息压力分到多个机器上,但是如果其中一个partition数据丢了,那总体数据就少了一块。所以又引入了Replica的概念。
每个partition都可以通过副本因子添加多个副本。这样就算有一台机器故障了,其他机器上也有备份的数据
集群环境下的3分区,3副本:
源码图:
下载kafka
我用的2.7.0版本,下载后解压
注意选择Binary downloads而不是Source download
2. 进入conf/server.properties文件,打开如下配置
listeners=PLAINTEXT://localhost:9092
启动zookeeper
自行安装,我用的3.7版本的zookeeper
4. 启动kafka
bin/kafka-server-start.sh config/server.properties
Kafka命令行
查看topic
bin/kafka-topics.sh --zookeeper localhost:2181 --list
创建topic
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test1 --partitions 4 --replication-factor 1
查看topic信息
bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic test1
消费命令
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic test1
生产命令
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test1
简单性能测试:
bin/kafka-producer-perf-test.sh --topic test1 --num-records 100000 --record-size 1000 --throughput 2000 --producer-props bootstrap.servers=localhost:9092
bin/kafka-consumer-perf-test.sh --bootstrap-server localhost:9092 --topic test1 -- fetch-size 1048576 --messages 100000 --threads 1
Java客户端
生产者:
public class SimpleKafkaProducer { public static void main(String[] args) { Properties properties=new Properties(); properties.setProperty("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); properties.setProperty("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); properties.setProperty("bootstrap.servers","192.168.157.200:9092"); KafkaProducer producer=new KafkaProducer(properties); ProducerRecord record=new ProducerRecord("test1","这是一条消息"); producer.send(record); producer.close(); } }