Log Aggregation
kafka的特性决定它非常适合作为"日志收集中心";application可以将操作日志"批量""异步"的发送到kafka集群中,而不是保存在本地或者DB中;kafka可以批量提交消息/压缩消息等,这对producer端而言,几乎感觉不到性能的开支.此时consumer端可以使hadoop等其他系统化的存储和分析系统.
从上我们可以了解到,我们可以增加一个log4net kafkaappender 日志生产者通过这个appender将日志写入kafka,由于kafka批量提交、压缩的特性,因此对我们的应用服务器性能的开支很小。日志消费者端使用logstash订阅kafka中的消息,传送到elasticsearch中,通过kibana展示给我们。同时我们也可以通过kibana对我们的日志进行统计分析等。刚好可以解决我们上面的一些问题。整个流程大致如下图:
关于log4net for kafka appender,我自己写了一个,nuget上也有现成的包,大家需要可以去nuget上找一找。
3.2.搭建简单介绍一下搭建,搭建过程中采用Docker。
3.2.1 docker 安装kafka //下载 //下载zookeeper docker pull wurstmeister/zookeeper //下载kafka docker pull wurstmeister/kafka:2.11-0.11.0.3 //启动 //启动zookeeper docker run -d --name zookeeper --publish 2181:2181 --volume /etc/localtime:/etc/localtime wurstmeister/zookeeper //启动kafka docker run -d --name kafka --publish 9092:9092 \ --link zookeeper \ --env KAFKA_ZOOKEEPER_CONNECT=192.168.121.205:2181 \ --env KAFKA_ADVERTISED_HOST_NAME=192.168.121.205 \ --env KAFKA_ADVERTISED_PORT=9092 \ --volume /etc/localtime:/etc/localtime \ wurstmeister/kafka:2.11-0.11.0.3 //测试 //创建topic bin/kafka-topics.sh --create --zookeeper 192.168.121.205:2181 --replication-factor 1 --partitions 1 --topic mykafka //查看topic bin/kafka-topics.sh --list --zookeeper 192.168.121.205:2181 //创建生产者 bin/kafka-console-producer.sh --broker-list 192.168.121.205:9092 --topic mykafka //创建消费者 bin/kafka-console-consumer.sh --zookeeper 192.168.121.205:2181 --topic mykafka --from-beginning 3.2.2 Docker安装ELK //1.下载elk docker pull sebp/elk //2.启动elk //Elasticsearch至少需要单独2G的内存 //增加了一个volume绑定,以免重启container以后ES的数据丢失 docker run -d -p 5044:5044 -p 127.0.0.1:5601:5601 -p 127.0.0.1:9200:9200 -p 127.0.0.1:9300:9300 -v /var/data/elk:/var/lib/elasticsearch --name=elk sebp/elk //若启动过程出错一般是因为elasticsearch用户拥有的内存权限太小,至少需要262144 切换到root用户 执行命令: sysctl -w vm.max_map_count=262144 查看结果: sysctl -a|grep vm.max_map_count 显示: vm.max_map_count = 262144 上述方法修改之后,如果重启虚拟机将失效,所以: 解决办法: 在 /etc/sysctl.conf文件最后添加一行 vm.max_map_count=262144 即可永久修改配置使用
//进入容器 docker exec -it <container-name> /bin/bash //执行命令 /opt/logstash/bin/logstash -e 'input { stdin { } } output { elasticsearch { hosts => ["localhost"] } }' /* 注意:如果看到这样的报错信息 Logstash could not be started because there is already another instance using the configured data directory. If you wish to run multiple instances, you must change the "path.data" setting. 请执行命令:service logstash stop 然后在执行就可以了。 */测试
当命令成功被执行后,看到:Successfully started Logstash API endpoint {:port=>9600} 信息后,输入:this is a dummy entry 然后回车,模拟一条日志进行测试。
打开浏览器,输入:
这是我测试用的一个配置文件。
input { kafka{ //此处注意:logstash5.x版本以前kafka插件配置的是zookeeper地址,5.x以后配置的是kafka实例地址 bootstrap_servers =>["192.168.121.205:9092"] client_id => "test" group_id => "test" consumer_threads => 5 decorate_events => true topics => "logstash" } } filter{ json{ source => "message" } } output { elasticsearch { hosts => ["192.168.121.205"] index=> "hslog_2" codec => "json" } }配置文件启动logstash方式
/opt/logstash/bin/logstash -f "配置文件地址" 结束语