日志分析工具ELK配置详解(4)


5.5.2 收集elasticsearch的error日志
此处把上个system日志和这个error(java程序日志)日志,放在一起。使用if判断,两种日志分别写到不同索引中.此处的type(固定的就是type,不可更改)不可以和日志格式的任何一个域(可以理解为字段)的名称重复,也就是说日志的域不可以有type这个名称。
vim all.conf
input {
file {
path => "/var/log/nginx/access.log"
type => "nginx"
start_position => "beginning"
}
file {
path => "/var/log/elasticsearch/chinasoft_elk_cluster.log"
type => "es-error"
start_position => "beginning"
}
}
output {
if [type] == "nginx" {
elasticsearch {
hosts => ["192.168.3.17:9200"]
index => "nginx-%{+YYYY.MM.dd}"
}
}
if [type] == "es-error" {
elasticsearch {
hosts => ["192.168.3.17:9200"]
index => "es-error-%{+YYYY.MM.dd}"
}
}
}
5.6 把多行整个报错收集到一个事件中
5.6.1举例说明
以at.org开头的内容都属于同一个事件,但是显示在不同行,这样的日志格式看起来很不方便,所以需要把他们合并到一个事件中

日志分析工具ELK配置详解


5.6.2引入codec的multiline插件
官方文档提供
input {
stdin {
codec => multiline {
` pattern => "pattern, a regexp"
negate => "true" or "false"
what => "previous" or "next"`
}
}
}
regrxp:使用正则,什么情况下把多行合并起来
negate:正向匹配和反向匹配
what:合并到当前行还是下一行
在标准输入和标准输出中测试以证明多行收集到一个日志成功
vim muliline.conf
input {
stdin {
codec => multiline {
pattern => "^\["
negate => true
what => "previous"
}
}
}
output {
stdout {
codec => "rubydebug"
}
}
# /opt/logstash/bin/logstash -f muliline.conf 
Settings: Default pipeline workers: 2
Pipeline main started
[1    
Received an event that has a different character encoding than you configured. {:text=>"\\xE3[1\\n", :expected_charset=>"UTF-8", :level=>:warn}
[2    
{
    "@timestamp" => "2016-10-28T06:19:59.275Z",
       "message" => "\\xE3[1\\n",
      "@version" => "1",
          "host" => "node1.chinasoft.com"
}
{
chinasoft            
chinasoft.com
123456
[3
{
    "@timestamp" => "2016-10-28T06:21:13.812Z",
       "message" => "[2\n{\nchinasoft\nchinasoft.com\n123456",
      "@version" => "1",
          "tags" => [
        [0] "multiline"
    ],
          "host" => "node1.chinasoft.com"
}
继续将上述实验结果放到all.conf的es-error索引中
vim all.conf
input {
file {
path => "/var/log/nginx/access.log"
type => "nginx"
start_position => "beginning"
}
file {
path => "/var/log/elasticsearch/chuck-clueser.log"
type => "es-error"
start_position => "beginning"
codec => multiline {
pattern => "^\["
negate => true
what => "previous"
}
}
}
output {
if [type] == "nginx" {
elasticsearch {
hosts => ["192.168.3.17:9200"]
index => "nginx-%{+YYYY.MM.dd}"
}
}
if [type] == "es-error" {
elasticsearch {
hosts => ["192.168.3.17:9200"]
index => "es-error-%{+YYYY.MM.dd}"
}
}
}
六、熟悉kibana
6.1 编辑kinaba配置文件使之生效
# grep '^[a-Z]' /usr/local/kibana/config/kibana.yml
server.port: 5601 # kibana端口
server.host: "0.0.0.0" 对外服务的主机
elasticsearch.url: "http://192.168.3.17:9200" # 和elasticsearch联系
kibana.index: " .kibana # 在elasticsearch中添加.kibana索引


开启一个screen,并启动kibana
yum install -y screen
# screen
# /usr/local/kibana/bin/kibana
使用crtl +a+d退出screen
使用浏览器打开192.168.3.17:5601
6.2 验证error的muliline插件生效
在kibana中添加一个es-error索引

日志分析工具ELK配置详解



可以看到默认的字段
选择discover查看
验证error的muliline插件生效(即过滤条件,将多行错误转为一行)
七、logstash收集nginx、syslog和tcp日志
7.1收集nginx的访问日志
在这里使用codec的json插件将日志的域进行分段,使用key-value的方式,使日志格式更清晰,易于搜索,还可以降低cpu的负载
更改nginx的配置文件的日志格式,使用json
vim /etc/nginx/nginx.conf
log_format json '{ "@timestamp": "$time_local", '
'"@fields": { '
'"remote_addr": "$remote_addr", '
'"remote_user": "$remote_user", '
'"body_bytes_sent": "$body_bytes_sent", '
'"request_time": "$request_time", '
'"status": "$status", '
'"request": "$request", '
'"request_method": "$request_method", '
'"http_referrer": "$http_referer", '
'"body_bytes_sent":"$body_bytes_sent", '
'"http_x_forwarded_for": "$http_x_forwarded_for", '
'"http_user_agent": "$http_user_agent" } }';
# access_log /var/log/nginx/access_json.log main;
access_log /var/log/nginx/access.log json;
重新启动nginx
# nginx -t
nginx: the configuration file /etc/nginx/nginx.conf syntax is ok
nginx: configuration file /etc/nginx/nginx.conf test is successful
# ss -tunlp|grep nginx
tcp    LISTEN     0      128                    *:80                    *:*      users:(("nginx",13590,6),("nginx",13591,6))
日志格式显示如下

日志分析工具ELK配置详解


使用logstash将nginx访问日志收集起来,继续写到all.conf中
将nginx-log加入kibana中并显示

日志分析工具ELK配置详解


7.2 收集系统syslog日志
前文中已经使用文件file的形式收集了系统日志/var/log/messages,但是实际生产环境是需要使用syslog插件直接收集
修改syslog的配置文件,把日志信息发送到514端口上
# vim /etc/rsyslog.conf

90 *.* @@192.168.3.17

# service rsyslog restart
将system-syslog放到all.conf中,启动all.conf

input {
syslog {
type => "system-syslog"
host => "192.168.3.17"
port => "514"
}
file {
path => "/var/log/nginx/access.log"
type => "nginx"
start_position => "beginning"
}
file {
path => "/var/log/elasticsearch/chuck-clueser.log"
type => "es-error"
start_position => "beginning"
codec => multiline {
pattern => "^\["
negate => true
what => "previous"
}
}
}
output {
if [type] == "nginx" {
elasticsearch {
hosts => ["192.168.3.17:9200"]
index => "nginx-%{+YYYY.MM.dd}"
}
}
if [type] == "es-error" {
elasticsearch {
hosts => ["192.168.3.17:9200"]
index => "es-error-%{+YYYY.MM.dd}"
}
}
if [type] == "system-syslog" {
elasticsearch {
hosts => ["192.168.3.17:9200"]
index => "system-syslog-%{+YYYY.MM.dd}"
}
}
}
在elasticsearch插件中就可见到增加的system-syslog索引

7.3 收集tcp日志

日志分析工具ELK配置详解


编写tcp.conf

# vim tcp.conf
input {
tcp {
host => "192.168.3.17"
port => "6666"
}
}
output {
stdout {
codec => "rubydebug"
}
}
使用nc对6666端口写入数据
# nc 192.168.3.17 6666 </var/log/yum.log
将信息输入到tcp的伪设备中
将信息输入到tcp的伪设备中
# echo "chinasoft" >/dev/tcp/192.168.3.17/6666
八、logstash解耦之消息队列
8.1 图解使用消息队列架构
数据源Datasource把数据写到input插件中,output插件使用消息队列把消息写入到消息队列Message Queue中,Logstash indexing Instance启动logstash使用input插件读取消息队列中的信息,Fliter插件过滤后在使用output写入到elasticsearch中。
如果生产环境中不适用正则grok匹配,可以写Python脚本从消息队列中读取信息,输出到elasticsearch中

日志分析工具ELK配置详解

日志分析工具ELK配置详解

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/00075c0035f4d4f5cd9f2c3bcb230e58.html