4. Flume Source测试
测试1:
avro source可以发送一个给定的文件给Flume,Avro 源使用AVRO RPC机制
#设置avro配置文件
[root@cc-staging-loginmgr2 conf]# cat avro.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
#启动flume agent a1
cd /usr/local/apache-flume-1.3.1-bin/conf
flume-ng agent -c . -f avro.conf -n a1 -Dflume.root.logger=INFO,console
#创建指定文件
echo "hello world" > /usr/logs/log.10
#使用avro-client发送文件
flume-ng avro-client -c . -H localhost -p 4141 -F /usr/logs/log.10
#在启动的终端查看console输出
2013-05-27 01:11:45,852 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{} body: 68 65 6C 6C 6F 20 77 6F 72 6C 64 hello world }
测试2:
Exec source runs a given Unix command on start-up and expects that process to continuously produce data on standard out
#修改的配置文件
[root@cc-staging-loginmgr2 conf]# cat exec.conf
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = cat /usr/logs/log.10
a1.sources.r1.channels = c1
#启动flume agent a1
cd /usr/local/apache-flume-1.3.1-bin/conf
flume-ng agent -c . -f exec.conf -n a1 -Dflume.root.logger=INFO,console
#追加内容到文件
echo "exec test" >> /usr/logs/log.10
#在启动的终端查看console输出
2013-05-27 01:50:12,825 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{} body: 68 65 6C 6C 6F 20 77 6F 72 6C 64 hello world }
2013-05-27 01:50:12,826 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{} body: 65 78 65 63 20 74 65 73 74 exec test }
#如果要使用tail命令,必选使得file足够大才能看到输出内容
a1.sources.r1.command = tail -F /usr/logs/log.10
#生成足够多的内容在文件里
for i in {1..100};do echo "exec test$i" >> /usr/logs/log.10;echo $i;done
#可以在console看到output
2013-05-27 19:17:18,157 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.source.ExecSource.start(ExecSource.java:155)] Exec source starting with command:tail -n 5 -F /usr/logs/log.10
2013-05-27 19:19:50,334 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{} body: 65 78 65 63 20 74 65 73 74 37 exec test7 }
测试3:
Spooling directory source
This source lets you ingest data by dropping files in a spooling directory on disk. Unlike other asynchronous sources, this source avoids data loss even if Flume is restarted or fails.
SpoolSource:是监测配置的目录下新增的文件,并将文件中的数据读取出来。需要注意两点:1) 拷贝到spool目录下的文件不可以再打开编辑。
2) spool目录下不可包含相应的子目录
#修改的配置文件
[root@cc-staging-loginmgr2 conf]# cat spool.conf
# Describe/configure the source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /usr/logs/flumeSpool
a1.sources.r1.fileHeader = true
a1.sources.r1.channels = c1
#启动flume agent a1
cd /usr/local/apache-flume-1.3.1-bin/conf
flume-ng agent -c . -f spool.conf -n a1 -Dflume.root.logger=INFO,console
#追加内容到spool目录
[root@cc-staging-loginmgr2 ~]# echo "spool test1" > /usr/logs/flumeSpool/spool1.log
#在启动的终端查看console输出
2013-05-27 22:49:06,098 (pool-4-thread-1) [INFO - org.apache.flume.client.avro.SpoolingFileLineReader.retireCurrentFile(SpoolingFileLineReader.java:229)] Preparing to move file /usr/logs/flumeSpool/spool1.log to /usr/logs/flumeSpool/spool1.log.COMPLETED
2013-05-27 22:49:06,101 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{file=/usr/logs/flumeSpool/spool1.log} body: 73 70 6F 6F 6C 20 74 65 73 74 31 spool test1 }
测试4
Netcat source 参见第3部分一个简单的例子
测试5
Syslog tcp source
#修改的配置文件
[root@cc-staging-loginmgr2 conf]# cat syslog.conf
# Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5140
a1.sources.r1.host = localhost
a1.sources.r1.channels = c1
#启动flume agent a1
cd /usr/local/apache-flume-1.3.1-bin/conf
flume-ng agent -c . -f syslog.conf -n a1 -Dflume.root.logger=INFO,console
#测试产生syslog, <37>因为需要wire format数据,否则会报错” Failed to extract syslog wire entry”
echo "<37>hello via syslog" | nc localhost 5140
#在启动的终端查看console输出
2013-05-27 23:39:10,755 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4} body: 68 65 6C 6C 6F 20 76 69 61 20 73 79 73 6C 6F 67 hello via syslog }
#UDP需要修改配置文件
a1.sources.r1.type = syslogudp
a1.sources.r1.port = 5140
a1.sources.r1.host = localhost
a1.sources.r1.channels = c1
#测试产生syslog
echo "<37>hello via syslog" | nc -u localhost 5140
测试6
HTTP source JSONHandler
#修改的配置文件
[root@cc-staging-loginmgr2 conf]# cat post.conf
# Describe/configure the source
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = org.apache.flume.source.http.HTTPSource
a1.sources.r1.port = 5140
a1.sources.r1.channels = c1
#启动flume agent a1
cd /usr/local/apache-flume-1.3.1-bin/conf
flume-ng agent -c . -f post.conf -n a1 -Dflume.root.logger=INFO,console
#生成JSON 格式的POST request
curl -X POST -d '[{ "headers" :{"namenode" : "namenode.example.com","datanode" : "random_datanode.example.com"},"body" : "really_random_body"}]' :5140
#在启动的终端查看console输出
2013-05-28 01:17:47,186 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{namenode=namenode.example.com, datanode=random_datanode.example.com} body: 72 65 61 6C 6C 79 5F 72 61 6E 64 6F 6D 5F 62 6F really_random_bo }