这里不只是局限于2台部署有Logstash程序的机器,DBus对Logstash数量不做限制,比如应用日志分布在几十上百台机器上,只需要在每台机器上部署Logstash程序,并将数据统一抽取到同一个Kafka Topic中,DBus就能够对所有主机的数据进行数据处理、监控、预警、统计等。
2.1 启动Logstash在启动Logstash程序后,我们就可以从topic : heartbeat_log_logstash中读取数据,数据样例如下:
1)心跳数据
2)普通日志数据
2.2 配置规则接下来,我们只需要在DBus Web中配置相应的规则就可以对数据进行处理了。
首先新建一个逻辑表sink_info_table,该表用来抽取sink事件的日志信息,然后配置该表的规则组(一个或多个,但所有的规则组过滤后的数据需要满足相同schema特性),heartbeat_log_logstash作为原始数据topic,我们可以实时的对数据进行可视化操作配置(所见即所得,即席验证)。
1)读取原始数据日志
可以看到由Logstash预先提取已经包含了log4j的基本信息,例如path、@timestamp、level等。但是数据日志的详细信息在字段log中。由于不同的数据日志输出是不一样的,因此可以看到log列数据是不同的。
2)提取感兴趣的列
假如我们对timestamp、log 等原始信息感兴趣,那么可以添加一个toIndex算子,来提取这些字段:
这里需要指出,我们考虑使用数组下标方式,是有原因的: - 并不是所有列本身自带列名(例如flume抽取的原始数据,或者split算子处理后的数据列); - 下标方式可以使用数组方式指定列(类似python方式, 例如:1:3表示1,2列); 因此后续操作全部基于数组下标方式访问。
执行规则,就可以看到被提取后的字段情况:
3)过滤需要的数据
在这个例子中,我们只对含有“Sink to influxdb OK!”的数据感兴趣。因此添加一个filter算子,提取第7列中包含”Sink to influxdb OK!”内容的行数据:
执行后,只有符合条件的日志行数据才会存在。
4)对特定列进行提取
添加一个select算子,我们对第1和3列的内容感兴趣,所以对这两列进行提取。
执行select算子,数据中就会只含有第1和3列了。
5)以正则表达式的方式处理数据
我们想从第1列的数据中提取符合特定正则表达式的值,使用regexExtract算子对数据进行过滤。正则表达式如下:http_code=(\d*).*type=(.*),ds=(.*),schema=(.*),table=(.*)\s.*errorCount=(\d*),用户可以写自定义的正则表达式。
执行后,就会获取正则表达式执行后的数据。
6)选择输出列
最后我们把感兴趣的列进行输出,使用saveAs算子, 指定列名和类型,方便于保存在关系型数据库中。
执行saveAs算子后,这就是处理好的最终输出数据样本。
2.3 查看结构化输出结果