ng分布式环境的部署和配置(6)

2. Event Serializers测试

Body Text Serializer

Alias: text. This interceptor writes the body of the event to an output stream without any transformation or modification(body中的内容变成文本内容)

#配置文件

a1.sources.r1.type = org.apache.flume.source.http.HTTPSource

a1.sources.r1.port = 5140

a1.sources.r1.host = localhost

a1.sources.r1.channels = c1

# Describe the sink

a1.sinks.k1.type = file_roll

a1.sinks.k1.channel = c1

a1.sinks.k1.sink.directory = /var/log/flume

a1.sinks.k1.sink.serializer = text

a1.sinks.k1.sink.serializer.appendNewline = false

#生成测试log

curl -X POST -d '[{ "headers" :{"host":"cc-staging-loginmgr2"},"body" : "TEST1 BODY TEXT"}]' :5140

curl -X POST -d '[{ "headers" :{"host":"cc-staging-loginmgr2"},"body" : "TEST2 BODY TEXT"}]' :5140

curl -X POST -d '[{ "headers" :{"host":"cc-staging-loginmgr2"},"body" : "TEST3 BODY TEXT"}]' :5140

#查看file roll 文件中的文本内容

cat /var/log/flume/1370675739270-1

TEST1 BODY TEXT

TEST2 BODY TEXT

TEST3 BODY TEXT

#Avro Event Serializer

Alias: avro_event. This interceptor serializes Flume events into an Avro container file

flume event变成avro 中包含的文件

 

1.Flume Interceptors测试

Timestamp Interceptor

This interceptor inserts into the event headers, the time in millis at which it processes the event. This interceptor inserts a header with key timestamp whose value is the relevant timestamp

Host Interceptor

This interceptor inserts the hostname or IP address of the host that this agent is running on. It inserts a header with key host or a configured key whose value is the hostname or IP address of the host

#配置文件

# Name the components on this agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1

# Describe/configure the source

a1.sources.r1.type = syslogtcp

a1.sources.r1.bind = 0.0.0.0

a1.sources.r1.port = 5140

a1.sources.r1.channels = c1

a1.sources.r1.interceptors = i1 i2

a1.sources.r1.interceptors.i1.preserveExisting = false

a1.sources.r1.interceptors.i1.type = timestamp

a1.sources.r1.interceptors.i2.type = host

a1.sources.r1.interceptors.i2.hostHeader = hostname

a1.sources.r1.interceptors.i2.useIP = false

# Describe the sink

a1.sinks.k1.type = hdfs

a1.sinks.k1.channel = c1

a1.sinks.k1.hdfs.path = hdfs://master:9000/user/Hadoop/flume/collected/%Y-%m-%d/%H%M

a1.sinks.k1.hdfs.filePrefix = %{hostname}.

# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

#启动agent

cd /usr/local/apache-flume-1.3.1-bin/conf

flume-ng agent -c . -f dynamic_intercept.conf -n a1 -Dflume.root.logger=INFO,console

#生成测试log

echo "<37>test dynamic interceptor" | nc localhost 5140

#查看hdfs生成的文件,可以看到timestamphostname都已经生成在header里面,可以根据自定义的格式生成文件夹

./hadoop dfs -ls hdfs://172.25.4.35:9000/user/hadoop/flume/collected/2013-06-16/2331/

Found 1 items

-rw-r--r-- 3 root supergroup 140 2013-06-16 23:32 /user/hadoop/flume/collected/2013-06-16/2331/cc-staging-loginmgr2..1371450697118

Static Interceptor

Static interceptor allows user to append a static header with static value to all events

#配置文件

# Name the components on this agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1

# Describe/configure the source

a1.sources.r1.type = syslogtcp

a1.sources.r1.port = 5140

a1.sources.r1.host = localhost

a1.sources.r1.channels = c1

a1.sources.r1.interceptors = i1

a1.sources.r1.interceptors.i1.type = static

a1.sources.r1.interceptors.i1.key = datacenter

a1.sources.r1.interceptors.i1.value = NEW_YORK

# Describe the sink

a1.sinks.k1.type = logger

# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

#启动agent

cd /usr/local/apache-flume-1.3.1-bin/conf

flume-ng agent -c . -f dynamic_intercept.conf -n a1 -Dflume.root.logger=INFO,console

#生成测试log

echo "<37>test1 static interceptor" | nc localhost 5140

#查看console输出结果

2013-06-17 00:15:38,453 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4, datacenter=NEW_YORK} body: 74 65 73 74 31 20 73 74 61 74 69 63 20 69 6E 74 test1 static int }

 

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:http://www.heiqu.com/01816d07650288561a369ee403b724a0.html