Logstash读取Kafka数据写入HDFS详解 (2)

这里采用的方案是解析日志中的时间字段time_local,然后根据日志中的时间字段添加两个新字段index.date和index.hour来分别标识日期和小时,在output的时候使用这两个新加的字段做变量来生成文件

logstash filter配置如下:

filter { # 匹配原始日志中的time_local字段并设置为时间字段 # time_local字段为本地时间字段,没有8小时的时间差 date { match => ["time_local","dd/MMM/yyyy:HH:mm:ss Z"] target => "time_local" } # 添加一个index.date字段,值设置为time_local的日期 ruby { code => "event.set('index.date', event.get('time_local').time.localtime.strftime('%Y%m%d'))" } # 添加一个index.hour字段,值设置为time_local的小时 ruby { code => "event.set('index.hour', event.get('time_local').time.localtime.strftime('%H'))" } }

output的path中配置如下

path => "/logs/nginx/%{index.date}/%{index.hour}.log" HDFS记录多了时间和host字段

在没有指定codec的情况下,logstash会给每一条日志添加时间和host字段,例如:

源日志格式为

ops-coffee.cn | 192.168.105.91 | 19/Mar/2019:14:28:07 +0800 | GET / HTTP/1.1 | 304 | 0 | - | 0.000

经过logstash处理后多了时间和host字段

2019-03-19T06:28:07.510Z %{host} ops-coffee.cn | 192.168.105.91 | 19/Mar/2019:14:28:07 +0800 | GET / HTTP/1.1 | 304 | 0 | - | 0.000

如果不需要我们可以指定最终的format只取message,解决方法为在output中添加如下配置:

codec => line { format => "%{message}" } 同时output到ES和HDFS

在实际应用中我们需要同时将日志数据写入ES和HDFS,那么可以直接用下边的配置来处理

# cat config/indexer_rsyslog_nginx.conf input { kafka { bootstrap_servers => "localhost:9092" topics => ["rsyslog_nginx"] codec => "json" } } filter { date { match => ["time_local","dd/MMM/yyyy:HH:mm:ss Z"] target => "@timestamp" } ruby { code => "event.set('index.date', event.get('@timestamp').time.localtime.strftime('%Y%m%d'))" } ruby { code => "event.set('index.hour', event.get('@timestamp').time.localtime.strftime('%H'))" } } output { elasticsearch { hosts => ["192.168.106.203:9200"] index => "rsyslog-nginx-%{+YYYY.MM.dd}" } webhdfs { host => "master01" port => 50070 user => "hadmin" path => "/logs/nginx/%{index.date}/%{index.hour}.log" codec => "json" } }

这里我使用logstash的date插件将日志中的"time_local"字段直接替换为了@timestamp,这样做有什么好处呢?

logstash默认生成的@timestamp字段记录的时间是logstash接收到消息的时间,这个时间可能与日志产生的时间不同,而我们往往需要关注的时间是日志产生的时间,且在ELK架构中Kibana日志输出的默认顺序就是按照@timestamp来排序的,所以往往我们需要将默认的@timestamp替换成日志产生的时间,替换方法就用到了date插件,date插件的用法如下

date { match => ["time_local","dd/MMM/yyyy:HH:mm:ss Z"] target => "@timestamp" }

match:匹配日志中的时间字段,这里为time_local

target:将match匹配到的时间戳存储到给定的字段中,默认不指定的话就存到@timestamp字段

另外还有参数可以配置:timezone,locale,tag_on_failure等,具体可查看官方文档

Logstash读取Kafka数据写入HDFS详解

如果你觉得文章不错,请点右下角【在看】。如果你觉得读的不尽兴,推荐阅读以下文章:

ELK日志系统之使用Rsyslog快速方便的收集Nginx日志

ELK日志系统之通用应用程序日志接入方案

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpyyfj.html