记录如何使用Kafka+Log4j实现集中日志管理的过程。
引言
前面写的《Spring+Log4j+ActiveMQ实现远程记录日志——实战+分析》得到了许多同学的认可,在认可的同时,也有同学提出可以使用Kafka来集中管理日志,于是今天就来学习一下。
特别说明,由于网络上关于Kafka+Log4j的完整例子并不多,我也是一边学习一边使用,因此如果有解释得不好或者错误的地方,欢迎批评指正,如果你有好的想法,也欢迎留言探讨。
第一部分 搭建Kafka环境
安装Kafka
下载:
tar zxf kafka-<VERSION>.tgz
cd kafka-<VERSION>
启动Zookeeper
启动Zookeeper前需要配置一下config/zookeeper.properties:
接下来启动Zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
启动Kafka Server
启动Kafka Server前需要配置一下config/server.properties。主要配置以下几项,内容就不说了,注释里都很详细:
然后启动Kafka Server:
bin/kafka-server-start.sh config/server.properties
创建Topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
查看创建的Topic
>bin/kafka-topics.sh --list --zookeeper localhost:2181
启动控制台Producer,向Kafka发送消息
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message
^C
启动控制台Consumer,消费刚刚发送的消息
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
This is a message
This is another message
删除Topic
bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test
注:只有当delete.topic.enable=true时,该操作才有效
配置Kafka集群(单台机器上)
首先拷贝server.properties文件为多份(这里演示4个节点的Kafka集群,因此还需要拷贝3份配置文件):
cp config/server.properties config/server1.properties
cp config/server.properties config/server2.properties
cp config/server.properties config/server3.properties
修改server1.properties的以下内容:
broker.id=1
port=9093
log.dir=/tmp/kafka-logs-1
同理修改server2.properties和server3.properties的这些内容,并保持所有配置文件的zookeeper.connect属性都指向运行在本机的zookeeper地址localhost:2181。注意,由于这几个Kafka节点都将运行在同一台机器上,因此需要保证这几个值不同,这里以累加的方式处理。例如在server2.properties上:
broker.id=2
port=9094
log.dir=/tmp/kafka-logs-2
把server3.properties也配置好以后,依次启动这些节点:
bin/kafka-server-start.sh config/server1.properties &
bin/kafka-server-start.sh config/server2.properties &
bin/kafka-server-start.sh config/server3.properties &
Topic & Partition
Topic在逻辑上可以被认为是一个queue,每条消费都必须指定它的Topic,可以简单理解为必须指明把这条消息放进哪个queue里。为了使得Kafka的吞吐率可以线性提高,物理上把Topic分成一个或多个Partition,每个Partition在物理上对应一个文件夹,该文件夹下存储这个Partition的所有消息和索引文件。
现在在Kafka集群上创建备份因子为3,分区数为4的Topic:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 4 --topic kafka
说明:备份因子replication-factor越大,则说明集群容错性越强,就是当集群down掉后,数据恢复的可能性越大。所有的分区数里的内容共同组成了一份数据,分区数partions越大,则该topic的消息就越分散,集群中的消息分布就越均匀。
然后使用kafka-topics.sh的--describe参数查看一下Topic为kafka的详情:
输出的第一行是所有分区的概要,接下来的每一行是一个分区的描述。可以看到Topic为kafka的消息,PartionCount=4,ReplicationFactor=3正是我们创建时指定的分区数和备份因子。
另外:Leader是指负责这个分区所有读写的节点;Replicas是指这个分区所在的所有节点(不论它是否活着);ISR是Replicas的子集,代表存有这个分区信息而且当前活着的节点。
拿partition:0这个分区来说,该分区的Leader是server0,分布在id为0,1,2这三个节点上,而且这三个节点都活着。
再来看下Kafka集群的日志:
其中kafka-logs-0代表server0的日志,kafka-logs-1代表server1的日志,以此类推。