logstash操作
[root@localhost logstash]# pwd
/home/elk/logstash
[root@localhost logstash]# cat dev.conf
input {
kafka{
bootstrap_servers => "192.168.1.70:9092"
topics => ["filebeat-log"]
codec => "json"
}
}
filter {
if [fields][tag]=="jpwebmap" {
json{
source => "message"
remove_field => "message"
}
geoip {
source => "client"
target => "geoip"
add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ]
}
mutate {
convert => [ "[geoip][coordinates]", "float"]
}
}
if [fields][tag] == "54_tcp_catalina_out"{
grok {
match => ["message", "%{TIMESTAMP_ISO8601:logdate}"]
}
date {
match => ["logdate", "ISO8601"]
}
mutate {
remove_field => [ "logdate" ]
}
}
if [fields][tag] == "54_web_catalina_out"{
grok {
match => ["message", "%{TIMESTAMP_ISO8601:logdate}"]
}
date {
match => ["logdate", "ISO8601"]
}
mutate {
remove_field => [ "logdate" ]
}
}
if [fields][tag] == "55_tcp_catalina_out"{
grok {
match => ["message", "%{TIMESTAMP_ISO8601:logdate}"]
}
date {
match => ["logdate", "ISO8601"]
}
mutate {
remove_field => [ "logdate" ]
}
}
if [fields][tag] == "55_web_catalina_out"{
grok {
match => ["message", "%{TIMESTAMP_ISO8601:logdate}"]
}
date {
match => ["logdate", "ISO8601"]
}
mutate {
remove_field => [ "logdate" ]
}
}
if [fields][tag] == "51_nginx80_access_log" {
mutate {
add_field => { "spstr" => "%{[log][file][path]}" }
}
mutate {
split => ["spstr" , "/"]
# save the last element of the array as the api_method.
add_field => ["src", "%{[spstr][-1]}" ]
}
mutate{
remove_field => [ "friends", "ecs", "agent" , "spstr" ]
}
grok {
match => { "message" => "%{IPORHOST:remote_addr} - %{DATA:remote_user} \[%{HTTPDATE:time}\] \"%{WORD:method} %{DATA:url} HTTP/%{NUMBER:http_version}\" %{NUMBER:response_code} %{NUMBER:body_sent:bytes} \"%{DATA:referrer}\" \"%{DATA:agent}\" \"%{DATA:x_forwarded_for}\" \"%{NUMBER:request_time}\" \"%{DATA:upstream_addr}\" \"%{DATA:upstream_status}\"" }
remove_field => "message"
}
date {
match => ["time", "dd/MMM/yyyy:HH:mm:ss Z"]
target => "@timestamp"
}
geoip {
source => "x_forwarded_for"
target => "geoip"
database => "/home/elk/logstash/GeoLite2-City.mmdb"
add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ]
}
mutate {
convert => [ "[geoip][coordinates]", "float"]
}
}
}
output {
if [fields][tag] == "wori"{
elasticsearch {
hosts => ["192.168.1.70:9200"]
index => "zabbix"
}
}
if [fields][tag] == "54_tcp_catalina_out"{
elasticsearch {
hosts => ["192.168.1.70:9200"]
index => "54_tcp_catalina_out"
}
}
if [fields][tag] == "54_web_catalina_out"{
elasticsearch {
hosts => ["192.168.1.70:9200"]
index => "54_web_catalina_out"
}
}
if [fields][tag] == "55_tcp_catalina_out"{
elasticsearch {
hosts => ["192.168.1.70:9200"]
index => "55_tcp_catalina_out"
}
}
if [fields][tag] == "55_web_catalina_out"{
elasticsearch {
hosts => ["192.168.1.70:9200"]
index => "55_web_catalina_out"
}
}
if [fields][tag] == "51_nginx80_access_log" {
stdout{}
elasticsearch {
hosts => ["192.168.1.70:9200"]
index => "51_nginx80_access_log"
}
}
}
其他的配置文件