运维故障排障速度往往与监控系统体系颗粒度成正比,监控到位才能快速排障
在部署这套系统之前,平台所有系统日志都由Graylog+Zabbix,针对日志出现的错误关键字进行告警,这种做法在运维工作开展过程中暴露出多个不足点,不详述;在考虑多方面原因后,最终对日志告警系统进行更换,选用的方案是:ELK + Kafka+ Filebeat + Elastalert
本文主要以两个需求为主轴做介绍
非工作时间服务器异常登录告警
系统日志出现错误关键字告警
架构

服务选型
nameversioninfoAmazon Elasticsearch Service v6.2 AWK官网部署教程
Logstash v6.2.3 选用与ES相同版本
Filebeat v6.2.3 选用与ES相同版本
Confluent(Kafka) v4.0 这里推荐 Confluent 的版本,Confluent 是 kafka 作者 Neha Narkhede 从 Linkedin 出来之后联合 LinkedIn 前员工创建的大数据公司,专注于 kafka 的企业应用。
Elastalert v0.1.29 原先考虑采用X-Pack但由于AWS目前还不支持
部署
本文采用的操作系统 :CentOS release 6.6
Filebeat
# 下载源
$ curl -L -O https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-6.2.3-x86_64.rpm
# 安装
$ sudo rpm -vi filebeat-6.2.3-x86_64.rpm
Logstash
# 导入Yum源
$ rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch
$ cat <<EOF > /etc/yum.repos.d/logstash.repo
[logstash-6.x]
name=Elastic repository for 6.x packages
baseurl=https://artifacts.elastic.co/packages/6.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md
EOF
# 安装
yum install logstash -y
Elastalert
# pip直接安装
$ pip install elastalert
# 如果出现依赖包报错,以下为常用开发所需依赖包
$ yum install -y zlib openssl openssl-devel gcc gcc-c++ Xvfb libXfont Xorg libffi libffi-devel Python-cffi python-devel libxslt-devel libxml2-devel zlib-devel bzip2-devel xz-libs wget
配置
Filebeat
/etc/filebeat/filebeat.yml
filebeat.config:
  prospectors:
    path: /etc/filebeat/conf/*.yml
    reload.enabled: true
    reload.period: 10s
output.kafka:
  # kafkaNode为Kafaka服务所在服务器 
  hosts: ["kafkaNode:9092"]
# 索引取fields.out_topic
  topic: "%{[fields][out_topic]}"
  partition.round_robin:
    reachable_only: false
/etc/filebeat/conf/base.yml
# 收集系统日志
- type: log
  paths: 
    - /var/log/messages
    - /var/log/syslog*
  exclude_files: [".gz$"]
  exclude_lines: ["ssh_host_dsa_key"]
  tags: ["system_log"]
  scan_frequency: 1s
  fields:
    # 新增字段用于辨别来源客户端
    server_name: client01
# 索引
    out_topic: "system_log"
  multiline:
    pattern: "^\\s"
    match: after
# 收集登录日志
- type: log
  paths:
    - /var/log/secure*
    - /var/log/auth.log*
  tags: ["system_secure"]
  exclude_files: [".gz$"]
  scan_frequency: 1s
  fields:
    server_name: client01
    out_topic: "system_secure"
  multiline:
    pattern: "^\\s"
    match: after
Logstash
/etc/logstash/conf.d/system_log.conf
input {
    kafka {
        bootstrap_servers => "kafkaNode:9092"
        consumer_threads => 3
        topics => ["system_log"]
        auto_offset_reset => "latest"
        codec => "json"
    }
}
filter {
# 排除logstash日志
    if [source] == "/var/log/logstash-stdout.log" {
        drop {}
    }
if [fields][out_topic] == "system_log" {
        date {match => [ "[system][syslog][timestamp]", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss" ]}
        grok {
            match => { "message" => ["%{SYSLOGTIMESTAMP:[system][syslog][timestamp]} %{SYSLOGHOST:[system][syslog][hostname]} %{DATA:[system][syslog][program]}(?:\[%{POSINT:[system][syslog][pid]}\])?: %{GREEDYMULTILINE:[system][syslog][message]}"] }
            pattern_definitions => { "GREEDYMULTILINE" => "(.|\n)*" }
            remove_field => "message"
        }
    }
}

