这里通过 if 判断将不同type的日志输出到不同的索引。
input { beats { port => 5044 } } filter { if "www.xxx.com" in [tags] { grok { match => { "message" => "%{IP:remote_addr} (?:%{DATA:remote_user}|-) \[%{HTTPDATE:timestamp}\] %{IPORHOST:http_host} %{DATA:request_method} %{DATA:request_uri} %{NUMBER:status} (?:%{NUMBER:body_bytes_sent}|-) (?:%{DATA:request_time}|-) \"(?:%{DATA:http_referer}|-)\" \"%{DATA:http_user_agent}\" (?:%{DATA:http_x_forwarded_for}|-) \"(?:%{DATA:http_cookie}|-)\""} } mutate { convert => ["status","integer"] convert => ["body_bytes_sent","integer"] convert => ["request_time","float"] } geoip { source=>"remote_addr" } date { match => [ "timestamp","dd/MMM/YYYY:HH:mm:ss Z"] } useragent { source=>"http_user_agent" } } } output { if "www.xxx.com" in [tags] { elasticsearch { hosts => ["10.1.128.101:9200"] index => "www.xxx.com_10.1.144.60" user => 'elastic' password => '123456' } } stdout { codec => rubydebug } }Logstash 监控日志文件时应对日志文件名改变的原理
Logstash5.4.1解析日志报错处理