ng分布式环境的部署和配置(3)

Using this sink requires Hadoop to be installed so that Flume can use the Hadoop jars to communicate with the HDFS cluster

需要安装hadoop

/usr/local/apache-flume-1.3.1-bin/conf/flume-env.sh加入

export HADOOP_HOME=/usr/local/hadoop

#修改配置文件

a1.sources.r1.type = syslogtcp

a1.sources.r1.bind = 0.0.0.0

a1.sources.r1.port = 5140

a1.sources.r1.channels = c1

a1.sinks.k1.type = hdfs

a1.sinks.k1.channel = c1

a1.sinks.k1.hdfs.path = hdfs://master:9000/user/hadoop/flume/collected/

a1.sinks.k1.hdfs.filePrefix = Syslog

a1.sinks.k1.hdfs.round = true

a1.sinks.k1.hdfs.roundValue = 10

a1.sinks.k1.hdfs.roundUnit = minute

#启动flume agent a1

cd /usr/local/apache-flume-1.3.1-bin/conf

flume-ng agent -c . -f hdfs.conf -n a1 -Dflume.root.logger=INFO,console

#测试产生syslog

echo "<37>hello via syslog to hdfs testing one" | nc -u localhost 5140

#在启动的终端查看console输出,文件生成成功

2013-05-29 00:53:58,078 (hdfs-k1-call-runner-0) [INFO - org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:208)] Creating hdfs://master:9000/user/hadoop/flume/collected//Syslog.1369814037714.tmp

2013-05-29 00:54:28,220 (hdfs-k1-roll-timer-0) [INFO - org.apache.flume.sink.hdfs.BucketWriter.renameBucket(BucketWriter.java:427)] Renaming hdfs://master:9000/user/hadoop/flume/collected/Syslog.1369814037714.tmp to hdfs://master:9000/user/hadoop/flume/collected/Syslog.1369814037714

#hadoop上查看文件

./hadoop dfs -cat hdfs://172.25.4.35:9000/user/hadoop/flume/collected/Syslog.1369814037714

SEQ!org.apache.hadoop.io.LongWritable"org.apache.hadoop.io.BytesWritable^;>Gv$hello via syslog to hdfs testing one

#修改配置文件以时间形式自动生成目录

a1.sources.r1.type = org.apache.flume.source.http.HTTPSource

a1.sources.r1.bind = 0.0.0.0

a1.sources.r1.port = 5140

a1.sources.r1.channels = c1

# Describe the sink

a1.sinks.k1.type = hdfs

a1.sinks.k1.channel = c1

a1.sinks.k1.hdfs.path = hdfs://master:9000/user/hadoop/flume/collected/%y-%m-%d/%H%M/%S

a1.sinks.k1.hdfs.filePrefix = Syslog.%{host}

a1.sinks.k1.hdfs.round = true

a1.sinks.k1.hdfs.roundValue = 10

a1.sinks.k1.hdfs.roundUnit = minute

#生成JSON 格式的POST request, headertimestamp 参数如果格式不对则无法解析

需要生成13为的timestamp才能解析出正确的时间,包含MilliSec

#linux生成当前时间10Unix timestamp

date +%s

#linux生成当前时间13Unix timestamp

date +%s%N|awk '{print substr($0,1,13)}'

curl -X POST -d '[{ "headers":{"timestamp":"1369818213654","host":"cc-staging-loginmgr2"},"body": "hello via post"}]' :5140

#在启动的终端查看console输出,文件生成成功

2013-05-29 02:03:38,646 (hdfs-k1-call-runner-4) [INFO - org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:208)] Creating hdfs://master:9000/user/hadoop/flume/collected/2013-05-29/0203/cc-staging-loginmgr2..1369818218614.tmp

2013-05-29 02:04:08,714 (hdfs-k1-roll-timer-0) [INFO - org.apache.flume.sink.hdfs.BucketWriter.renameBucket(BucketWriter.java:427)] Renaming hdfs://master:9000/user/hadoop/flume/collected/2013-05-29/0203/cc-staging-loginmgr2..1369818218614.tmp to hdfs://master:9000/user/hadoop/flume/collected/2013-05-29/0203/cc-staging-loginmgr2..1369818218614

#hadoop上查看文件

./hadoop dfs -ls hdfs://172.25.4.35:9000/user/hadoop/flume/collected/2013-05-29/0203

Found 1 items

-rw-r--r-- 3 root supergroup 129 2013-05-29 02:04 /user/hadoop/flume/collected/2013-05-29/0203/cc-staging-loginmgr2..1369818218614

#测试2 logger sink

Logs event at INFO level. Typically useful for testing/debugging purpose

#测试3 Avro sink

Flume events sent to this sink are turned into Avro events and sent to the configured hostname / port pair

#Avro Source配置文件

a1.sources.r1.type = avro

a1.sources.r1.channels = c1

a1.sources.r1.bind = 0.0.0.0

a1.sources.r1.port = 4545

#Avro Sink配置文件

a1.sinks.k1.type = avro

a1.sinks.k1.channel = c1

a1.sinks.k1.hostname = 172.25.4.23

a1.sinks.k1.port = 4545

#先启动AvroSource,监听端口

cd /usr/local/apache-flume-1.3.1-bin/conf

flume-ng agent -c . -f avro.conf -n a1 -Dflume.root.logger=INFO,console

#再启动AvroSink

cd /usr/local/apache-flume-1.3.1-bin/conf

flume-ng agent -c . -f avro_sink.conf -n a1 -Dflume.root.logger=INFO,console

#可以看到已经建立连接

2013-06-02 19:23:00,237 (pool-5-thread-1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x7a0e28bf, /172.25.4.32:14894 => /172.25.4.23:4545] CONNECTED: /172.25.4.32:14894

#Avro Sink上生成测试log

echo "<37>hello via avro sink" | nc localhost 5140

#Avro Source上可以看到log已经生成

2013-06-02 19:24:13,740 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4} body: 68 65 6C 6C 6F 20 76 69 61 20 61 76 72 6F 20 73 hello via avro s }

#测试4 File Roll Sink

Stores events on the local filesystem

#修改配置文件

a1.sinks.k1.type = file_roll

a1.sinks.k1.channel = c1

a1.sinks.k1.sink.directory = /var/log/flume

#启动file roll 配置文件

cd /usr/local/apache-flume-1.3.1-bin/conf

flume-ng agent -c . -f file_roll.conf -n a1 -Dflume.root.logger=INFO,console

#生成测试log

echo "<37>hello via file roll" | nc localhost 5140

echo "<37>hello via file roll 2" | nc localhost 5140

#查看/var/log/flume下是否生成文件,默认每30秒生成一个新文件

-rw-r--r-- 1 root root 20 Jun 2 19:44 1370227443397-1

-rw-r--r-- 1 root root 0 Jun 2 19:44 1370227443397-2

-rw-r--r-- 1 root root 22 Jun 2 19:45 1370227443397-3

cat 1370227443397-1 1370227443397-3

hello via file roll

hello via file roll 2

 

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:http://www.heiqu.com/01816d07650288561a369ee403b724a0.html