利用Rsyslog进行日志收集到Kafka

项目需要将日志收集起来做存储分析,数据的流向为rsyslog(收集) -> kafka(消息队列) -> logstash(清理) -> es、hdfs; 今天我们先将如何利用rsyslog进行日志收集到kafka。

一、环境准备

通过对 查看,得知 rsyslog对 kafka的支持是 v8.7.0版本后才提供的支持.通过 ChangeLog 也可以看出 V8.X的版本变化.
最新V8稳定版已经提供RPM包的Rsyslog-kafka插件了,直接yum安装即可,添加yum源:

[rsyslog_v8] name=Adiscon CentOS-$releasever - local packages for $basearch baseurl=http://rpms.adiscon.com/v8-stable/epel-$releasever/$basearch enabled=1 gpgcheck=0 gpgkey=http://rpms.adiscon.com/RPM-GPG-KEY-Adiscon protect=1

添加后 yum install rsyslog rsyslog-kafka.x86_64即可完成安装。

二、配置 1. 处理原则

input submit received messages to rulesets, zero or many

ruleset contains rule, rule consist of a filter and an action list

actions consist of the action call itself (e.g. ”:omusrmsg:”) as well as all action-defining configuration statements ($Action... directives)

2.

通常利用RainerScript type statements进行非常简洁明了的配置声明,例如:

mail.info /var/log/mail.log 3. 流程控制 4. 数据处理:支持set, unset, reset操作

备注: Only message json (CEE/Lumberjack) properties can be modified by the set, unset andreset statements

5. input

有很多种input模块, 我们以imfile模块为例, 此模块将所有的文本文件内容逐行转到syslog中.

input(type="imfile" tag="kafka" file="analyze.log" ruleset="imfile-kafka"[, Facility=local.7]) 6. outputs

也叫作actions, 处理动作,格式如下

action ( type="omkafka" topic="kafka_test" broker="10.120.169.149:9092" ) 7. Rulesets and Rules

Rulesets包括多条rule,一条规则就是rsyslog处理消息的一种方式, 每个规则包含filter和actions

input(type="imfile" tag="kafka" file="analyze.log" ruleset="rulesetname") ruleset(name="rulesetname") { action(type="omfile" file="/path/to/file") action(type="..." ...) /* and so on... */ }

通过input里面的ruleset配置,将输入流进入ruleset进行规则匹配,然后执行action操作,完成对流的处理。

8. Queue parameters

将不同的输入流进入不同的队列并行处理数据,通常在ruleset或者action中配置,默认只有一个队列。配置参数例子

action(type="omfwd" target="192.168.2.11" port="10514" protocol="tcp" queue.filename="forwarding" queue.size="1000000" queue.type="LinkedList" ) 9. templates

这是rsyslog一个重要的特性,它可以让用户自定义输入流格式,同样也可以用于动态生成日志文件, 默认是原始格式。
一般表达式如下:
template(parameters) { list-descriptions }

list : 列表模板,包含name, type="list", 多个constant和property对。

template(name="tpl1" type="list") { constant(value="Syslog MSG is: '") property(name="msg") constant(value="', ") property(name="timereported" dateFormat="rfc3339" caseConversion="lower") constant(value="\n") }

string: 字符串自定义格式模块, 由name, type="string", string="<onstant text and replacement variables>", 例如

%TIMESTAMP:::date-rfc3339% %HOSTNAME%%syslogtag%%msg:::sp-if-no-1st-sp%%msg:::drop-last-lf%\n"

将每个日志字段通过自定义变量和处理方式(property replacer)得到全局能读取的日志变量。

注意:

原始格式: v6之前的格式,$template strtpl,"PRI: %pri%, MSG: %msg%\n"。

利用action里的template参数将templates和action进行绑定,如
action(template=TEMPLATENAME,type="omfile" file="/var/log/all-msgs.log")

三. 实例

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/644d5f9f1c27d1dbd2afef06713ec62f.html