ng分布式环境的部署和配置(2)

4. Flume Source测试

测试1:

avro source可以发送一个给定的文件给FlumeAvro 源使用AVRO RPC机制

#设置avro配置文件

[root@cc-staging-loginmgr2 conf]# cat avro.conf

# Name the components on this agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1

# Describe/configure the source

a1.sources.r1.type = avro

a1.sources.r1.channels = c1

a1.sources.r1.bind = 0.0.0.0

a1.sources.r1.port = 4141

# Describe the sink

a1.sinks.k1.type = logger

# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

#启动flume agent a1

cd /usr/local/apache-flume-1.3.1-bin/conf

flume-ng agent -c . -f avro.conf -n a1 -Dflume.root.logger=INFO,console

#创建指定文件

echo "hello world" > /usr/logs/log.10

#使用avro-client发送文件

flume-ng avro-client -c . -H localhost -p 4141 -F /usr/logs/log.10

#在启动的终端查看console输出

2013-05-27 01:11:45,852 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{} body: 68 65 6C 6C 6F 20 77 6F 72 6C 64 hello world }

测试2:

Exec source runs a given Unix command on start-up and expects that process to continuously produce data on standard out

#修改的配置文件

[root@cc-staging-loginmgr2 conf]# cat exec.conf

# Describe/configure the source

a1.sources.r1.type = exec

a1.sources.r1.command = cat /usr/logs/log.10

a1.sources.r1.channels = c1

#启动flume agent a1

cd /usr/local/apache-flume-1.3.1-bin/conf

flume-ng agent -c . -f exec.conf -n a1 -Dflume.root.logger=INFO,console

#追加内容到文件

echo "exec test" >> /usr/logs/log.10

#在启动的终端查看console输出

2013-05-27 01:50:12,825 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{} body: 68 65 6C 6C 6F 20 77 6F 72 6C 64 hello world }

2013-05-27 01:50:12,826 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{} body: 65 78 65 63 20 74 65 73 74 exec test }

#如果要使用tail命令,必选使得file足够大才能看到输出内容

a1.sources.r1.command = tail -F /usr/logs/log.10

#生成足够多的内容在文件里

for i in {1..100};do echo "exec test$i" >> /usr/logs/log.10;echo $i;done

#可以在console看到output

2013-05-27 19:17:18,157 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.source.ExecSource.start(ExecSource.java:155)] Exec source starting with command:tail -n 5 -F /usr/logs/log.10

2013-05-27 19:19:50,334 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{} body: 65 78 65 63 20 74 65 73 74 37 exec test7 }

测试3:

Spooling directory source

This source lets you ingest data by dropping files in a spooling directory on disk. Unlike other asynchronous sources, this source avoids data loss even if Flume is restarted or fails.

SpoolSource:是监测配置的目录下新增的文件,并将文件中的数据读取出来。需要注意两点:1) 拷贝到spool目录下的文件不可以再打开编辑。

2) spool目录下不可包含相应的子目录

#修改的配置文件

[root@cc-staging-loginmgr2 conf]# cat spool.conf

# Describe/configure the source

a1.sources.r1.type = spooldir

a1.sources.r1.spoolDir = /usr/logs/flumeSpool

a1.sources.r1.fileHeader = true

a1.sources.r1.channels = c1

#启动flume agent a1

cd /usr/local/apache-flume-1.3.1-bin/conf

flume-ng agent -c . -f spool.conf -n a1 -Dflume.root.logger=INFO,console

#追加内容到spool目录

[root@cc-staging-loginmgr2 ~]# echo "spool test1" > /usr/logs/flumeSpool/spool1.log

#在启动的终端查看console输出

2013-05-27 22:49:06,098 (pool-4-thread-1) [INFO - org.apache.flume.client.avro.SpoolingFileLineReader.retireCurrentFile(SpoolingFileLineReader.java:229)] Preparing to move file /usr/logs/flumeSpool/spool1.log to /usr/logs/flumeSpool/spool1.log.COMPLETED

2013-05-27 22:49:06,101 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{file=/usr/logs/flumeSpool/spool1.log} body: 73 70 6F 6F 6C 20 74 65 73 74 31 spool test1 }

测试4

Netcat source 参见第3部分一个简单的例子

测试5

Syslog tcp source

#修改的配置文件

[root@cc-staging-loginmgr2 conf]# cat syslog.conf

# Describe/configure the source

a1.sources.r1.type = syslogtcp

a1.sources.r1.port = 5140

a1.sources.r1.host = localhost

a1.sources.r1.channels = c1

#启动flume agent a1

cd /usr/local/apache-flume-1.3.1-bin/conf

flume-ng agent -c . -f syslog.conf -n a1 -Dflume.root.logger=INFO,console

#测试产生syslog, <37>因为需要wire format数据,否则会报错” Failed to extract syslog wire entry”

echo "<37>hello via syslog" | nc localhost 5140

#在启动的终端查看console输出

2013-05-27 23:39:10,755 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4} body: 68 65 6C 6C 6F 20 76 69 61 20 73 79 73 6C 6F 67 hello via syslog }

#UDP需要修改配置文件

a1.sources.r1.type = syslogudp

a1.sources.r1.port = 5140

a1.sources.r1.host = localhost

a1.sources.r1.channels = c1

#测试产生syslog

echo "<37>hello via syslog" | nc -u localhost 5140

测试6

HTTP source JSONHandler

#修改的配置文件

[root@cc-staging-loginmgr2 conf]# cat post.conf

# Describe/configure the source

a1.sources = r1

a1.channels = c1

a1.sources.r1.type = org.apache.flume.source.http.HTTPSource

a1.sources.r1.port = 5140

a1.sources.r1.channels = c1

#启动flume agent a1

cd /usr/local/apache-flume-1.3.1-bin/conf

flume-ng agent -c . -f post.conf -n a1 -Dflume.root.logger=INFO,console

#生成JSON 格式的POST request

curl -X POST -d '[{ "headers" :{"namenode" : "namenode.example.com","datanode" : "random_datanode.example.com"},"body" : "really_random_body"}]' :5140

#在启动的终端查看console输出

2013-05-28 01:17:47,186 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{namenode=namenode.example.com, datanode=random_datanode.example.com} body: 72 65 61 6C 6C 79 5F 72 61 6E 64 6F 6D 5F 62 6F really_random_bo }

 

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:http://www.heiqu.com/01816d07650288561a369ee403b724a0.html