做项目时遇到一个问题,需要对接收到的日志数据做复杂逻辑处理并将一条转换成多条。
对比了td-agent,filebeat、flume日志采集工具。
td-agent核心部分是用C实现,而插件部分用了ruby,但ruby不熟;filebeat正则匹配很强大,但关于插件相关资料很少;flume插件却可以直接用java实现。于是决定通过自定义flume拦截器实现这一功能。
Flume拦截器
Flume的拦截器可删除或修改Event。
Timestamp 拦截器:在Event Header中添加时间戳。
Host 拦截器:在Event Header中添加agent运行机器的Host或IP。
Static 拦截器:在Event Header中添加自定义静态属性。
Remove Header拦截器:可移除Event Header中指定属性。
UUID拦截器:在Event Header中添加全局唯一UUID。
Search and Replace拦截器:基于正则搜索和替换字符串等。
Regex Filtering拦截器:基于正则过滤或反向过滤Event。
Regex Extractor拦截器:基于正则在Event Header添加指定的Key,并将匹配到的内容作为对应的Value。
自定义Flume拦截器
自定义拦截器,实现给每条数据添加公共字段,并将一条转换成多条。其他复杂逻辑类似。
package com.flumePlugins.interceptor;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.codec.Charsets;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.interceptor.Interceptor;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
/**
* Author: Wang Pei
* Summary:
*/
public class ParseLogByRule implements Interceptor {
@Override
public void initialize() {
//pass
}
@Override
public void close() {
//pass
}
/**
* 解析单条event
* @param event
* @return
*/
@Override
public Event intercept(Event event) {
//输入
String inputeBody=null;
//输出
byte[] outputBoday=null;
//解析---这里定义对单条Event处理规则
try {
inputeBody=new String(event.getBody(), Charsets.UTF_8);
ArrayList<String> temp = new ArrayList<>();
JSONObject bodyObj = JSON.parseObject(inputeBody);
//1)公共字段
String host = bodyObj.getString("host");
String user_id = bodyObj.getString("user_id");
JSONArray data = bodyObj.getJSONArray("items");
//2)Json数组=>every json obj
for (Object item : data) {
JSONObject itemObj = JSON.parseObject(item.toString());
HashMap<String, Object> fields = new HashMap<>();
fields.put("host",host);
fields.put("user_id",user_id);
fields.put("item_type",itemObj.getString("item_type"));
fields.put("active_time",itemObj.getLongValue("active_time"));
temp.add(new JSONObject(fields).toJSONString());
}
//3)Json obj 拼接
outputBoday=String.join("\n",temp).getBytes();
}catch (Exception e){
System.out.println("输入数据:"+inputeBody);
e.printStackTrace();
}
event.setBody(outputBoday);
return event;
}
/**
* 解析一批event
* @param events
* @return
*/
@Override
public List<Event> intercept(List<Event> events) {
//输出---一批Event
ArrayList<Event> result = new ArrayList<>();
//输入---一批Event
try{
for (Event event : events) {
//一条条解析
Event interceptedEvent = intercept(event);
byte[] interceptedEventBody = interceptedEvent.getBody();
if(interceptedEventBody.length!=0){
String multiEvent = new String(interceptedEventBody, Charsets.UTF_8);
String[] multiEventArr = multiEvent.split("\n");
for (String needEvent : multiEventArr) {
SimpleEvent simpleEvent = new SimpleEvent();
simpleEvent.setBody(needEvent.getBytes());
result.add(simpleEvent);
}
}
}
}catch (Exception e){
e.printStackTrace();
}
return result;
}