Flume 自定义拦截器(2)

/**
    * 实现内部类接口
    */
    public static class Builder implements Interceptor.Builder{
        @Override
        public Interceptor build() {
            return new ParseLogByRule();
        }

@Override
        public void configure(Context context) {

}
    }

}

插件打包上传

编译打包拦截器插件,然后将打包后的插件和依赖的fastjson一起上传到flume lib目录

配置agent

TAILDIR Source ===>file Channel ===>Kafka Sink

cat agent.conf
    # source的名字
    agent.sources = s1
    # channels的名字
    agent.channels = c1
    # sink的名字
    agent.sinks = r1

# 指定source使用的channel
    agent.sources.s1.channels = c1
    # 指定sink使用的channel
    agent.sinks.r1.channel = c1

######## source相关配置 ########
    # source类型
    agent.sources.s1.type = TAILDIR
    # 元数据位置
    agent.sources.s1.positionFile = /Users/wangpei/tempData/flume/taildir_position.json
    # 监控的目录
    agent.sources.s1.filegroups = f1
    agent.sources.s1.filegroups.f1=/Users/wangpei/tempData/flume/data/.*log
    agent.sources.s1.fileHeader = true

######## interceptor相关配置 ########
    agent.sources.s1.interceptors = i1
    agent.sources.s1.interceptors.i1.type = com.flumePlugins.interceptor.ParseLogByRule$Builder

######## channel相关配置 ########
    # channel类型
    agent.channels.c1.type = file
    # 数据存放路径
    agent.channels.c1.dataDirs = /Users/wangpei/tempData/flume/filechannle/dataDirs
    # 检查点路径
    agent.channels.c1.checkpointDir = /Users/wangpei/tempData/flume/filechannle/checkpointDir
    # channel中最多缓存多少
    agent.channels.c1.capacity = 1000
    # channel一次最多吐给sink多少
    agent.channels.c1.transactionCapacity = 100

######## sink相关配置 ########
    # sink类型
    agent.sinks.r1.type = org.apache.flume.sink.kafka.KafkaSink
    # brokers地址
    agent.sinks.r1.kafka.bootstrap.servers = localhost:9092
    # topic
    agent.sinks.r1.kafka.topic = testTopic3
    # 压缩
    agent.sinks.r1.kafka.producer.compression.type = snappy

启动flume-agent

bin/flume-ng agent --conf conf/ -f conf/agent.conf -Dflume.root.logger=INFO,console -name agent

启动kafka-console-consumer

./kafka-console-consumer --topic testTopic3 --bootstrap-server localhost:9092

写入测试数据到监控目录

#日志数据
log='{
"host":"www.baidu.com",
"user_id":"197878787878787",
"items":[
    {
        "item_type":"clothes",
        "active_time":18989989
    },
    {
        "item_type":"car",
        "active_time":18989989
    }
 ]
}'
#日志追加到文件
echo $log>> /Users/wangpei/tempData/flume/data/test.log

查看发送到kafka中的数据

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/87807658233c3b75c02fa398430456b7.html