CentOS 7.6 部署ELK日志分析系统步骤(3)

logstash操作
[root@localhost logstash]# pwd
/home/elk/logstash
[root@localhost logstash]# cat dev.conf
input {
  kafka{
    bootstrap_servers => "192.168.1.70:9092"
    topics => ["filebeat-log"]
    codec => "json"
  }
}
filter {
        if [fields][tag]=="jpwebmap" {
            json{
                source => "message"
                remove_field => "message"
            }
            geoip {
            source => "client"
            target => "geoip"
            add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
            add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}"  ]
            }
            mutate {
                convert => [ "[geoip][coordinates]", "float"]
                }
        }
    if [fields][tag] == "54_tcp_catalina_out"{
            grok {
                match => ["message", "%{TIMESTAMP_ISO8601:logdate}"]
            }
            date {
                match => ["logdate", "ISO8601"]
            }
            mutate {
                remove_field => [ "logdate" ]
            }
    }
    if [fields][tag] == "54_web_catalina_out"{
                grok {
                        match => ["message", "%{TIMESTAMP_ISO8601:logdate}"]
                }
                date {
                        match => ["logdate", "ISO8601"]
                }
                mutate {
                        remove_field => [ "logdate" ]
                }
        }
    if [fields][tag] == "55_tcp_catalina_out"{
                grok {
                        match => ["message", "%{TIMESTAMP_ISO8601:logdate}"]
                }
                date {
                        match => ["logdate", "ISO8601"]
                }
                mutate {
                        remove_field => [ "logdate" ]
                }
        }
        if [fields][tag] == "55_web_catalina_out"{
                grok {
                        match => ["message", "%{TIMESTAMP_ISO8601:logdate}"]
                }
                date {
                        match => ["logdate", "ISO8601"]
                }
                mutate {
                        remove_field => [ "logdate" ]
                }
        }
    if [fields][tag] == "51_nginx80_access_log" {
            mutate {
                add_field => { "spstr" => "%{[log][file][path]}" }
            }
            mutate {
                split => ["spstr" , "/"]
                # save the last element of the array as the api_method.
                add_field => ["src", "%{[spstr][-1]}" ]
            }
            mutate{
                remove_field => [ "friends", "ecs", "agent" , "spstr" ]
            }
            grok {
                match => { "message" => "%{IPORHOST:remote_addr} - %{DATA:remote_user} \[%{HTTPDATE:time}\] \"%{WORD:method} %{DATA:url} HTTP/%{NUMBER:http_version}\" %{NUMBER:response_code} %{NUMBER:body_sent:bytes} \"%{DATA:referrer}\" \"%{DATA:agent}\" \"%{DATA:x_forwarded_for}\" \"%{NUMBER:request_time}\" \"%{DATA:upstream_addr}\" \"%{DATA:upstream_status}\"" }
                remove_field => "message"
            }
            date {
                    match => ["time", "dd/MMM/yyyy:HH:mm:ss Z"]
                    target => "@timestamp"
            }
            geoip {
                source => "x_forwarded_for"
                target => "geoip"
                database => "/home/elk/logstash/GeoLite2-City.mmdb"
                add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
                add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}"  ]
            }
            mutate {
                convert => [ "[geoip][coordinates]", "float"]
            }
    }
}
output {
if [fields][tag] == "wori"{
  elasticsearch {
  hosts => ["192.168.1.70:9200"]
  index => "zabbix"
      }
  }
if [fields][tag] == "54_tcp_catalina_out"{
  elasticsearch {
  hosts => ["192.168.1.70:9200"]
  index => "54_tcp_catalina_out"
      }
  }
if [fields][tag] == "54_web_catalina_out"{
  elasticsearch {
  hosts => ["192.168.1.70:9200"]
  index => "54_web_catalina_out"
      }
  }
if [fields][tag] == "55_tcp_catalina_out"{
  elasticsearch {
  hosts => ["192.168.1.70:9200"]
  index => "55_tcp_catalina_out"
      }
  } 
if [fields][tag] == "55_web_catalina_out"{
  elasticsearch {
  hosts => ["192.168.1.70:9200"]
  index => "55_web_catalina_out"
      }
  }
if [fields][tag] == "51_nginx80_access_log" {
      stdout{}
    elasticsearch {
    hosts => ["192.168.1.70:9200"]
    index => "51_nginx80_access_log"
    }
  }
}


其他的配置文件

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/74a3530b6ad41bc74ffb3b8a6b3fe632.html