2. Event Serializers测试
Body Text Serializer
Alias: text. This interceptor writes the body of the event to an output stream without any transformation or modification(把body中的内容变成文本内容)
#配置文件
a1.sources.r1.type = org.apache.flume.source.http.HTTPSource
a1.sources.r1.port = 5140
a1.sources.r1.host = localhost
a1.sources.r1.channels = c1
# Describe the sink
a1.sinks.k1.type = file_roll
a1.sinks.k1.channel = c1
a1.sinks.k1.sink.directory = /var/log/flume
a1.sinks.k1.sink.serializer = text
a1.sinks.k1.sink.serializer.appendNewline = false
#生成测试log
curl -X POST -d '[{ "headers" :{"host":"cc-staging-loginmgr2"},"body" : "TEST1 BODY TEXT"}]' :5140
curl -X POST -d '[{ "headers" :{"host":"cc-staging-loginmgr2"},"body" : "TEST2 BODY TEXT"}]' :5140
curl -X POST -d '[{ "headers" :{"host":"cc-staging-loginmgr2"},"body" : "TEST3 BODY TEXT"}]' :5140
#查看file roll 文件中的文本内容
cat /var/log/flume/1370675739270-1
TEST1 BODY TEXT
TEST2 BODY TEXT
TEST3 BODY TEXT
#Avro Event Serializer
Alias: avro_event. This interceptor serializes Flume events into an Avro container file
把flume event变成avro 中包含的文件
1.Flume Interceptors测试
Timestamp Interceptor
This interceptor inserts into the event headers, the time in millis at which it processes the event. This interceptor inserts a header with key timestamp whose value is the relevant timestamp
Host Interceptor
This interceptor inserts the hostname or IP address of the host that this agent is running on. It inserts a header with key host or a configured key whose value is the hostname or IP address of the host
#配置文件
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 5140
a1.sources.r1.channels = c1
a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.preserveExisting = false
a1.sources.r1.interceptors.i1.type = timestamp
a1.sources.r1.interceptors.i2.type = host
a1.sources.r1.interceptors.i2.hostHeader = hostname
a1.sources.r1.interceptors.i2.useIP = false
# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = hdfs://master:9000/user/Hadoop/flume/collected/%Y-%m-%d/%H%M
a1.sinks.k1.hdfs.filePrefix = %{hostname}.
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
#启动agent
cd /usr/local/apache-flume-1.3.1-bin/conf
flume-ng agent -c . -f dynamic_intercept.conf -n a1 -Dflume.root.logger=INFO,console
#生成测试log
echo "<37>test dynamic interceptor" | nc localhost 5140
#查看hdfs生成的文件,可以看到timestamp和hostname都已经生成在header里面,可以根据自定义的格式生成文件夹
./hadoop dfs -ls hdfs://172.25.4.35:9000/user/hadoop/flume/collected/2013-06-16/2331/
Found 1 items
-rw-r--r-- 3 root supergroup 140 2013-06-16 23:32 /user/hadoop/flume/collected/2013-06-16/2331/cc-staging-loginmgr2..1371450697118
Static Interceptor
Static interceptor allows user to append a static header with static value to all events
#配置文件
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5140
a1.sources.r1.host = localhost
a1.sources.r1.channels = c1
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = datacenter
a1.sources.r1.interceptors.i1.value = NEW_YORK
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
#启动agent
cd /usr/local/apache-flume-1.3.1-bin/conf
flume-ng agent -c . -f dynamic_intercept.conf -n a1 -Dflume.root.logger=INFO,console
#生成测试log
echo "<37>test1 static interceptor" | nc localhost 5140
#查看console输出结果
2013-06-17 00:15:38,453 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4, datacenter=NEW_YORK} body: 74 65 73 74 31 20 73 74 61 74 69 63 20 69 6E 74 test1 static int }