Java.conf
filter {
if [fields][tag] == "java"{
grok {
match => ["message", "%{TIMESTAMP_ISO8601:logdate}"]
}
date {
match => ["logdate", "ISO8601"]
}
mutate {
remove_field => [ "logdate" ]
}
} #End if
}
kafkainput.conf
input {
kafka{
bootstrap_servers => "172.16.11.68:9092"
#topics => ["ql-prod-tomcat" ]
topics => ["ql-prod-dubbo","ql-prod-nginx","ql-prod-tomcat" ]
codec => "json"
consumer_threads => 5
decorate_events => true
#auto_offset_reset => "latest"
group_id => "logstash"
#client_id => ""
############################# HELK Optimizing Latency #############################
fetch_min_bytes => "1"
request_timeout_ms => "305000"
############################# HELK Optimizing Availability #############################
session_timeout_ms => "10000"
max_poll_records => "550"
max_poll_interval_ms => "300000"
}
}
#input {
# kafka{
# bootstrap_servers => "172.16.11.68:9092"
# topics => ["ql-prod-java-dubbo","ql-prod","ql-prod-java" ]
# codec => "json"
# consumer_threads => 15
# decorate_events => true
# auto_offset_reset => "latest"
# group_id => "logstash-1"
# ############################# HELK Optimizing Latency #############################
# fetch_min_bytes => "1"
# request_timeout_ms => "305000"
# ############################# HELK Optimizing Availability #############################
# session_timeout_ms => "10000"
# max_poll_records => "550"
# max_poll_interval_ms => "300000"
# }
#}
nginx.conf
filter {
if [fields][tag] == "nginx-access" {
mutate {
add_field => { "spstr" => "%{[log][file][path]}" }
}
mutate {
split => ["spstr" , "/"]
# save the last element of the array as the api_method.
add_field => ["src", "%{[spstr][-1]}" ]
}
mutate{
remove_field => [ "friends", "ecs", "agent" , "spstr" ]
}
grok {
match => { "message" => "%{IPORHOST:remote_addr} - %{DATA:remote_user} \[%{HTTPDATE:time}\] \"%{WORD:method} %{DATA:url} HTTP/%{NUMBER:http_version}\" %{NUMBER:response_code} %{NUMBER:body_sent:bytes} \"%{DATA:referrer}\" \"%{DATA:agent}\" \"%{DATA:x_forwarded_for}\" \"%{NUMBER:request_time}\" \"%{DATA:upstream_addr}\" \"%{DATA:upstream_status}\"" }
remove_field => "message"
}
date {
match => ["time", "dd/MMM/yyyy:HH:mm:ss Z"]
target => "@timestamp"
}
geoip {
source => "x_forwarded_for"
target => "geoip"
database => "/opt/logstash-6.2.4/GeoLite2-City.mmdb"
add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ]
}
mutate {
convert => [ "[geoip][coordinates]", "float"]
}
} #endif
}
ouput.conf
output{
if [fields][tag] == "nginx-access" {
stdout{}
elasticsearch {
user => elastic
password => WR141bp2sveJuGFaD4oR
hosts => ["172.16.11.67:9200"]
index => "logstash-%{[fields][proname]}-%{+YYYY.MM.dd}"
}
}
#stdout{}
if [fields][tag] == "java" {
elasticsearch {
user => elastic
password => WR141bp2sveJuGFaD4oR
hosts => ["172.16.11.66:9200","172.16.11.68:9200"]
index => "%{[host][name]}-%{[src]}"
}
}
}
Linux公社的RSS地址:https://www.linuxidc.com/rssFeed.aspx