2.Flume Channels测试
#Memory Channel
The events are stored in a an in-memory queue with configurable max size. It’s ideal for flow that needs higher throughput and prepared to lose the staged data in the event of a agent failures
#flume channel selectors
# Replicating Channel Selector通道复制测试
#2个channel和2个sink的配置文件
# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5140
a1.sources.r1.host = localhost
a1.sources.r1.selector.type = replicating
a1.sources.r1.channels = c1 c2
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = 172.25.4.23
a1.sinks.k1.port = 4545
a1.sinks.k2.type = avro
a1.sinks.k2.channel = c2
a1.sinks.k2.hostname = 172.25.4.33
a1.sinks.k2.port = 4545
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
#查看是否都建立了连接
2013-06-04 00:01:53,467 (pool-5-thread-1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x122a0fad, /172.25.4.32:55518 => /172.25.4.23:4545] BOUND: /172.25.4.23:4545
2013-06-04 00:01:53,467 (pool-5-thread-1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x122a0fad, /172.25.4.32:55518 => /172.25.4.23:4545] CONNECTED: /172.25.4.32:55518
2013-06-04 00:01:53,773 (pool-5-thread-1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x021881a7, /172.25.4.32:23731 => /172.25.4.33:4545] BOUND: /172.25.4.33:4545
2013-06-04 00:01:53,773 (pool-5-thread-1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x021881a7, /172.25.4.32:23731 => /172.25.4.33:4545] CONNECTED: /172.25.4.32:23731
#生成测试log
echo "<37>hello via channel selector" | nc localhost 5140
#查看2个sink是否得到数据
2013-06-04 00:02:06,479 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4} body: 68 65 6C 6C 6F 20 76 69 61 20 63 68 61 6E 6E 65 hello via channe }
2013-06-04 00:02:09,788 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4} body: 68 65 6C 6C 6F 20 76 69 61 20 63 68 61 6E 6E 65 hello via channe }
#flume channel selectors
# Multiplexing Channel Selector 通道复用测试
#2个channel和2个sink的配置文件
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# Describe/configure the source
a1.sources.r1.type = org.apache.flume.source.http.HTTPSource
a1.sources.r1.port = 5140
a1.sources.r1.host = 0.0.0.0
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.channels = c1 c2
a1.sources.r1.selector.header = state
a1.sources.r1.selector.mapping.CZ = c1
a1.sources.r1.selector.mapping.US = c2
a1.sources.r1.selector.default = c1
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = 172.25.4.23
a1.sinks.k1.port = 4545
a1.sinks.k2.type = avro
a1.sinks.k2.channel = c2
a1.sinks.k2.hostname = 172.25.4.33
a1.sinks.k2.port = 4545
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
#根据配置文件生成测试的header 为state的POST请求
curl -X POST -d '[{ "headers" :{"state" : "CZ"},"body" : "TEST1"}]' :5140
curl -X POST -d '[{ "headers" :{"state" : "US"},"body" : "TEST2"}]' :5140
curl -X POST -d '[{ "headers" :{"state" : "SH"},"body" : "TEST3"}]' :5140
#查看2个sink得到数据是否和配置文件一致
Sink1:
2013-06-04 23:45:35,296 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{state=CZ} body: 54 45 53 54 31 TEST1 }
2013-06-04 23:45:50,309 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{state=SH} body: 54 45 53 54 33 TEST3 }
Sink2:
2013-06-04 23:45:42,293 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{state=US} body: 54 45 53 54 32 TEST2 }