安装6.5版本的
[root@localhost filebeat]# pwd /usr/local/filebeat [root@localhost filebeat]# cat filebeat.yml filebeat.prospectors: - type: log paths: - /opt/logs/workphone-tcp/catalina.out fields: tag: 54_tcp_catalina_out - type: log paths: - /opt/logs/workphone-webservice/catalina.out fields: tag: 54_web_catalina_out name: 192.168.1.54 filebeat.config.modules: path: ${path.config}/modules.d/*.yml reload.enabled: false setup.template.settings: index.number_of_shards: 3 output.kafka: hosts: ["192.168.1.70:9092"] topic: "filebeat-log" partition.hash: reachable_only: true compression: gzip max_message_bytes: 1000000 required_acks: 1 [root@localhost filebeat]#安装完成后去logstash编辑配置文件
logstash操作 [root@localhost logstash]# pwd /home/elk/logstash [root@localhost logstash]# cat dev.conf input { kafka{ bootstrap_servers => "192.168.1.70:9092" topics => ["filebeat-log"] codec => "json" } } filter { if [fields][tag]=="jpwebmap" { json{ source => "message" remove_field => "message" } geoip { source => "client" target => "geoip" add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ] add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ] } mutate { convert => [ "[geoip][coordinates]", "float"] } } if [fields][tag] == "54_tcp_catalina_out"{ grok { match => ["message", "%{TIMESTAMP_ISO8601:logdate}"] } date { match => ["logdate", "ISO8601"] } mutate { remove_field => [ "logdate" ] } } if [fields][tag] == "54_web_catalina_out"{ grok { match => ["message", "%{TIMESTAMP_ISO8601:logdate}"] } date { match => ["logdate", "ISO8601"] } mutate { remove_field => [ "logdate" ] } } if [fields][tag] == "55_tcp_catalina_out"{ grok { match => ["message", "%{TIMESTAMP_ISO8601:logdate}"] } date { match => ["logdate", "ISO8601"] } mutate { remove_field => [ "logdate" ] } } if [fields][tag] == "55_web_catalina_out"{ grok { match => ["message", "%{TIMESTAMP_ISO8601:logdate}"] } date { match => ["logdate", "ISO8601"] } mutate { remove_field => [ "logdate" ] } } if [fields][tag] == "51_nginx80_access_log" { mutate { add_field => { "spstr" => "%{[log][file][path]}" } } mutate { split => ["spstr" , "http://www.likecs.com/"] # save the last element of the array as the api_method. add_field => ["src", "%{[spstr][-1]}" ] } mutate{ remove_field => [ "friends", "ecs", "agent" , "spstr" ] } grok { match => { "message" => "%{IPORHOST:remote_addr} - %{DATA:remote_user} \[%{HTTPDATE:time}\] \"%{WORD:method} %{DATA:url} HTTP/%{NUMBER:http_version}\" %{NUMBER:response_code} %{NUMBER:body_sent:bytes} \"%{DATA:referrer}\" \"%{DATA:agent}\" \"%{DATA:x_forwarded_for}\" \"%{NUMBER:request_time}\" \"%{DATA:upstream_addr}\" \"%{DATA:upstream_status}\"" } remove_field => "message" } date { match => ["time", "dd/MMM/yyyy:HH:mm:ss Z"] target => "@timestamp" } geoip { source => "x_forwarded_for" target => "geoip" database => "/home/elk/logstash/GeoLite2-City.mmdb" add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ] add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ] } mutate { convert => [ "[geoip][coordinates]", "float"] } } } output { if [fields][tag] == "wori"{ elasticsearch { hosts => ["192.168.1.70:9200"] index => "zabbix" } } if [fields][tag] == "54_tcp_catalina_out"{ elasticsearch { hosts => ["192.168.1.70:9200"] index => "54_tcp_catalina_out" } } if [fields][tag] == "54_web_catalina_out"{ elasticsearch { hosts => ["192.168.1.70:9200"] index => "54_web_catalina_out" } } if [fields][tag] == "55_tcp_catalina_out"{ elasticsearch { hosts => ["192.168.1.70:9200"] index => "55_tcp_catalina_out" } } if [fields][tag] == "55_web_catalina_out"{ elasticsearch { hosts => ["192.168.1.70:9200"] index => "55_web_catalina_out" } } if [fields][tag] == "51_nginx80_access_log" { stdout{} elasticsearch { hosts => ["192.168.1.70:9200"] index => "51_nginx80_access_log" } } } 其他的配置文件 index.conf filter { mutate { add_field => { "spstr" => "%{[log][file][path]}" } } mutate { split => ["spstr" , "http://www.likecs.com/"] # save the last element of the array as the api_method. add_field => ["src", "%{[spstr][-1]}" ] } mutate{ remove_field => [ "friends", "ecs", "agent" , "spstr" ] } } java.conf filter { if [fields][tag] == "java"{ grok { match => ["message", "%{TIMESTAMP_ISO8601:logdate}"] } date { match => ["logdate", "ISO8601"] } mutate { remove_field => [ "logdate" ] } } #End if } kafkainput.conf input { kafka{ bootstrap_servers => "172.16.11.68:9092" #topics => ["ql-prod-tomcat" ] topics => ["ql-prod-dubbo","ql-prod-nginx","ql-prod-tomcat" ] codec => "json" consumer_threads => 5 decorate_events => true #auto_offset_reset => "latest" group_id => "logstash" #client_id => "" ############################# HELK Optimizing Latency ############################# fetch_min_bytes => "1" request_timeout_ms => "305000" ############################# HELK Optimizing Availability ############################# session_timeout_ms => "10000" max_poll_records => "550" max_poll_interval_ms => "300000" } } #input { # kafka{ # bootstrap_servers => "172.16.11.68:9092" # topics => ["ql-prod-java-dubbo","ql-prod","ql-prod-java" ] # codec => "json" # consumer_threads => 15 # decorate_events => true # auto_offset_reset => "latest" # group_id => "logstash-1" # ############################# HELK Optimizing Latency ############################# # fetch_min_bytes => "1" # request_timeout_ms => "305000" # ############################# HELK Optimizing Availability ############################# # session_timeout_ms => "10000" # max_poll_records => "550" # max_poll_interval_ms => "300000" # } #} nginx.conf filter { if [fields][tag] == "nginx-access" { mutate { add_field => { "spstr" => "%{[log][file][path]}" } } mutate { split => ["spstr" , "http://www.likecs.com/"] # save the last element of the array as the api_method. add_field => ["src", "%{[spstr][-1]}" ] } mutate{ remove_field => [ "friends", "ecs", "agent" , "spstr" ] } grok { match => { "message" => "%{IPORHOST:remote_addr} - %{DATA:remote_user} \[%{HTTPDATE:time}\] \"%{WORD:method} %{DATA:url} HTTP/%{NUMBER:http_version}\" %{NUMBER:response_code} %{NUMBER:body_sent:bytes} \"%{DATA:referrer}\" \"%{DATA:agent}\" \"%{DATA:x_forwarded_for}\" \"%{NUMBER:request_time}\" \"%{DATA:upstream_addr}\" \"%{DATA:upstream_status}\"" } remove_field => "message" } date { match => ["time", "dd/MMM/yyyy:HH:mm:ss Z"] target => "@timestamp" } geoip { source => "x_forwarded_for" target => "geoip" database => "/opt/logstash-6.2.4/GeoLite2-City.mmdb" add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ] add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ] } mutate { convert => [ "[geoip][coordinates]", "float"] } } #endif } ouput.conf output{ if [fields][tag] == "nginx-access" { stdout{} elasticsearch { user => elastic password => WR141bp2sveJuGFaD4oR hosts => ["172.16.11.67:9200"] index => "logstash-%{[fields][proname]}-%{+YYYY.MM.dd}" } } #stdout{} if [fields][tag] == "java" { elasticsearch { user => elastic password => WR141bp2sveJuGFaD4oR hosts => ["172.16.11.66:9200","172.16.11.68:9200"] index => "%{[host][name]}-%{[src]}" } } }