Using this sink requires Hadoop to be installed so that Flume can use the Hadoop jars to communicate with the HDFS cluster
需要安装hadoop
在/usr/local/apache-flume-1.3.1-bin/conf/flume-env.sh加入
export HADOOP_HOME=/usr/local/hadoop
#修改配置文件
a1.sources.r1.type = syslogtcp
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 5140
a1.sources.r1.channels = c1
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = hdfs://master:9000/user/hadoop/flume/collected/
a1.sinks.k1.hdfs.filePrefix = Syslog
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
#启动flume agent a1
cd /usr/local/apache-flume-1.3.1-bin/conf
flume-ng agent -c . -f hdfs.conf -n a1 -Dflume.root.logger=INFO,console
#测试产生syslog
echo "<37>hello via syslog to hdfs testing one" | nc -u localhost 5140
#在启动的终端查看console输出,文件生成成功
2013-05-29 00:53:58,078 (hdfs-k1-call-runner-0) [INFO - org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:208)] Creating hdfs://master:9000/user/hadoop/flume/collected//Syslog.1369814037714.tmp
2013-05-29 00:54:28,220 (hdfs-k1-roll-timer-0) [INFO - org.apache.flume.sink.hdfs.BucketWriter.renameBucket(BucketWriter.java:427)] Renaming hdfs://master:9000/user/hadoop/flume/collected/Syslog.1369814037714.tmp to hdfs://master:9000/user/hadoop/flume/collected/Syslog.1369814037714
#在hadoop上查看文件
./hadoop dfs -cat hdfs://172.25.4.35:9000/user/hadoop/flume/collected/Syslog.1369814037714
SEQ!org.apache.hadoop.io.LongWritable"org.apache.hadoop.io.BytesWritable^;>Gv$hello via syslog to hdfs testing one
#修改配置文件以时间形式自动生成目录
a1.sources.r1.type = org.apache.flume.source.http.HTTPSource
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 5140
a1.sources.r1.channels = c1
# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = hdfs://master:9000/user/hadoop/flume/collected/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = Syslog.%{host}
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
#生成JSON 格式的POST request, header的timestamp 参数如果格式不对则无法解析
需要生成13为的timestamp才能解析出正确的时间,包含MilliSec
#linux生成当前时间10位Unix timestamp
date +%s
#linux生成当前时间13位Unix timestamp
date +%s%N|awk '{print substr($0,1,13)}'
curl -X POST -d '[{ "headers":{"timestamp":"1369818213654","host":"cc-staging-loginmgr2"},"body": "hello via post"}]' :5140
#在启动的终端查看console输出,文件生成成功
2013-05-29 02:03:38,646 (hdfs-k1-call-runner-4) [INFO - org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:208)] Creating hdfs://master:9000/user/hadoop/flume/collected/2013-05-29/0203/cc-staging-loginmgr2..1369818218614.tmp
2013-05-29 02:04:08,714 (hdfs-k1-roll-timer-0) [INFO - org.apache.flume.sink.hdfs.BucketWriter.renameBucket(BucketWriter.java:427)] Renaming hdfs://master:9000/user/hadoop/flume/collected/2013-05-29/0203/cc-staging-loginmgr2..1369818218614.tmp to hdfs://master:9000/user/hadoop/flume/collected/2013-05-29/0203/cc-staging-loginmgr2..1369818218614
#在hadoop上查看文件
./hadoop dfs -ls hdfs://172.25.4.35:9000/user/hadoop/flume/collected/2013-05-29/0203
Found 1 items
-rw-r--r-- 3 root supergroup 129 2013-05-29 02:04 /user/hadoop/flume/collected/2013-05-29/0203/cc-staging-loginmgr2..1369818218614
#测试2 logger sink
Logs event at INFO level. Typically useful for testing/debugging purpose
#测试3 Avro sink
Flume events sent to this sink are turned into Avro events and sent to the configured hostname / port pair
#Avro Source配置文件
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4545
#Avro Sink配置文件
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = 172.25.4.23
a1.sinks.k1.port = 4545
#先启动Avro的Source,监听端口
cd /usr/local/apache-flume-1.3.1-bin/conf
flume-ng agent -c . -f avro.conf -n a1 -Dflume.root.logger=INFO,console
#再启动Avro的Sink
cd /usr/local/apache-flume-1.3.1-bin/conf
flume-ng agent -c . -f avro_sink.conf -n a1 -Dflume.root.logger=INFO,console
#可以看到已经建立连接
2013-06-02 19:23:00,237 (pool-5-thread-1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x7a0e28bf, /172.25.4.32:14894 => /172.25.4.23:4545] CONNECTED: /172.25.4.32:14894
#在Avro Sink上生成测试log
echo "<37>hello via avro sink" | nc localhost 5140
#在Avro Source上可以看到log已经生成
2013-06-02 19:24:13,740 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4} body: 68 65 6C 6C 6F 20 76 69 61 20 61 76 72 6F 20 73 hello via avro s }
#测试4 File Roll Sink
Stores events on the local filesystem
#修改配置文件
a1.sinks.k1.type = file_roll
a1.sinks.k1.channel = c1
a1.sinks.k1.sink.directory = /var/log/flume
#启动file roll 配置文件
cd /usr/local/apache-flume-1.3.1-bin/conf
flume-ng agent -c . -f file_roll.conf -n a1 -Dflume.root.logger=INFO,console
#生成测试log
echo "<37>hello via file roll" | nc localhost 5140
echo "<37>hello via file roll 2" | nc localhost 5140
#查看/var/log/flume下是否生成文件,默认每30秒生成一个新文件
-rw-r--r-- 1 root root 20 Jun 2 19:44 1370227443397-1
-rw-r--r-- 1 root root 0 Jun 2 19:44 1370227443397-2
-rw-r--r-- 1 root root 22 Jun 2 19:45 1370227443397-3
cat 1370227443397-1 1370227443397-3
hello via file roll
hello via file roll 2