运维故障排障速度往往与监控系统体系颗粒度成正比,监控到位才能快速排障
在部署这套系统之前,平台所有系统日志都由Graylog+Zabbix,针对日志出现的错误关键字进行告警,这种做法在运维工作开展过程中暴露出多个不足点,不详述;在考虑多方面原因后,最终对日志告警系统进行更换,选用的方案是:ELK + Kafka+ Filebeat + Elastalert
本文主要以两个需求为主轴做介绍
非工作时间服务器异常登录告警
系统日志出现错误关键字告警
架构
服务选型
nameversioninfoAmazon Elasticsearch Service v6.2 AWK官网部署教程
Logstash v6.2.3 选用与ES相同版本
Filebeat v6.2.3 选用与ES相同版本
Confluent(Kafka) v4.0 这里推荐 Confluent 的版本,Confluent 是 kafka 作者 Neha Narkhede 从 Linkedin 出来之后联合 LinkedIn 前员工创建的大数据公司,专注于 kafka 的企业应用。
Elastalert v0.1.29 原先考虑采用X-Pack但由于AWS目前还不支持
部署
本文采用的操作系统 :CentOS release 6.6
Filebeat
# 下载源
$ curl -L -O https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-6.2.3-x86_64.rpm
# 安装
$ sudo rpm -vi filebeat-6.2.3-x86_64.rpm
Logstash
# 导入Yum源
$ rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch
$ cat <<EOF > /etc/yum.repos.d/logstash.repo
[logstash-6.x]
name=Elastic repository for 6.x packages
baseurl=https://artifacts.elastic.co/packages/6.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md
EOF
# 安装
yum install logstash -y
Elastalert
# pip直接安装
$ pip install elastalert
# 如果出现依赖包报错,以下为常用开发所需依赖包
$ yum install -y zlib openssl openssl-devel gcc gcc-c++ Xvfb libXfont Xorg libffi libffi-devel Python-cffi python-devel libxslt-devel libxml2-devel zlib-devel bzip2-devel xz-libs wget
配置
Filebeat
/etc/filebeat/filebeat.yml
filebeat.config:
prospectors:
path: /etc/filebeat/conf/*.yml
reload.enabled: true
reload.period: 10s
output.kafka:
# kafkaNode为Kafaka服务所在服务器
hosts: ["kafkaNode:9092"]
# 索引取fields.out_topic
topic: "%{[fields][out_topic]}"
partition.round_robin:
reachable_only: false
/etc/filebeat/conf/base.yml
# 收集系统日志
- type: log
paths:
- /var/log/messages
- /var/log/syslog*
exclude_files: [".gz$"]
exclude_lines: ["ssh_host_dsa_key"]
tags: ["system_log"]
scan_frequency: 1s
fields:
# 新增字段用于辨别来源客户端
server_name: client01
# 索引
out_topic: "system_log"
multiline:
pattern: "^\\s"
match: after
# 收集登录日志
- type: log
paths:
- /var/log/secure*
- /var/log/auth.log*
tags: ["system_secure"]
exclude_files: [".gz$"]
scan_frequency: 1s
fields:
server_name: client01
out_topic: "system_secure"
multiline:
pattern: "^\\s"
match: after
Logstash
/etc/logstash/conf.d/system_log.conf
input {
kafka {
bootstrap_servers => "kafkaNode:9092"
consumer_threads => 3
topics => ["system_log"]
auto_offset_reset => "latest"
codec => "json"
}
}
filter {
# 排除logstash日志
if [source] == "/var/log/logstash-stdout.log" {
drop {}
}
if [fields][out_topic] == "system_log" {
date {match => [ "[system][syslog][timestamp]", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]}
grok {
match => { "message" => ["%{SYSLOGTIMESTAMP:[system][syslog][timestamp]} %{SYSLOGHOST:[system][syslog][hostname]} %{DATA:[system][syslog][program]}(?:\[%{POSINT:[system][syslog][pid]}\])?: %{GREEDYMULTILINE:[system][syslog][message]}"] }
pattern_definitions => { "GREEDYMULTILINE" => "(.|\n)*" }
remove_field => "message"
}
}
}