ng分布式环境的部署和配置(4)

2.Flume Channels测试

#Memory Channel

The events are stored in a an in-memory queue with configurable max size. It’s ideal for flow that needs higher throughput and prepared to lose the staged data in the event of a agent failures

#flume channel selectors

# Replicating Channel Selector通道复制测试

#2channel2sink的配置文件

# Name the components on this agent

a1.sources = r1

a1.sinks = k1 k2

a1.channels = c1 c2

# Describe/configure the source

a1.sources.r1.type = syslogtcp

a1.sources.r1.port = 5140

a1.sources.r1.host = localhost

a1.sources.r1.selector.type = replicating

a1.sources.r1.channels = c1 c2

# Describe the sink

a1.sinks.k1.type = avro

a1.sinks.k1.channel = c1

a1.sinks.k1.hostname = 172.25.4.23

a1.sinks.k1.port = 4545

a1.sinks.k2.type = avro

a1.sinks.k2.channel = c2

a1.sinks.k2.hostname = 172.25.4.33

a1.sinks.k2.port = 4545

# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory

a1.channels.c2.capacity = 1000

a1.channels.c2.transactionCapacity = 100

#查看是否都建立了连接

2013-06-04 00:01:53,467 (pool-5-thread-1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x122a0fad, /172.25.4.32:55518 => /172.25.4.23:4545] BOUND: /172.25.4.23:4545

2013-06-04 00:01:53,467 (pool-5-thread-1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x122a0fad, /172.25.4.32:55518 => /172.25.4.23:4545] CONNECTED: /172.25.4.32:55518

2013-06-04 00:01:53,773 (pool-5-thread-1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x021881a7, /172.25.4.32:23731 => /172.25.4.33:4545] BOUND: /172.25.4.33:4545

2013-06-04 00:01:53,773 (pool-5-thread-1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x021881a7, /172.25.4.32:23731 => /172.25.4.33:4545] CONNECTED: /172.25.4.32:23731

#生成测试log

echo "<37>hello via channel selector" | nc localhost 5140

#查看2sink是否得到数据

2013-06-04 00:02:06,479 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4} body: 68 65 6C 6C 6F 20 76 69 61 20 63 68 61 6E 6E 65 hello via channe }

2013-06-04 00:02:09,788 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4} body: 68 65 6C 6C 6F 20 76 69 61 20 63 68 61 6E 6E 65 hello via channe }

#flume channel selectors

# Multiplexing Channel Selector 通道复用测试

#2channel2sink的配置文件

a1.sources = r1

a1.sinks = k1 k2

a1.channels = c1 c2

# Describe/configure the source

a1.sources.r1.type = org.apache.flume.source.http.HTTPSource

a1.sources.r1.port = 5140

a1.sources.r1.host = 0.0.0.0

a1.sources.r1.selector.type = multiplexing

a1.sources.r1.channels = c1 c2

a1.sources.r1.selector.header = state

a1.sources.r1.selector.mapping.CZ = c1

a1.sources.r1.selector.mapping.US = c2

a1.sources.r1.selector.default = c1

# Describe the sink

a1.sinks.k1.type = avro

a1.sinks.k1.channel = c1

a1.sinks.k1.hostname = 172.25.4.23

a1.sinks.k1.port = 4545

a1.sinks.k2.type = avro

a1.sinks.k2.channel = c2

a1.sinks.k2.hostname = 172.25.4.33

a1.sinks.k2.port = 4545

# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory

a1.channels.c2.capacity = 1000

a1.channels.c2.transactionCapacity = 100

#根据配置文件生成测试的header statePOST请求

curl -X POST -d '[{ "headers" :{"state" : "CZ"},"body" : "TEST1"}]' :5140

curl -X POST -d '[{ "headers" :{"state" : "US"},"body" : "TEST2"}]' :5140

curl -X POST -d '[{ "headers" :{"state" : "SH"},"body" : "TEST3"}]' :5140

#查看2sink得到数据是否和配置文件一致

Sink1:

2013-06-04 23:45:35,296 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{state=CZ} body: 54 45 53 54 31 TEST1 }

2013-06-04 23:45:50,309 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{state=SH} body: 54 45 53 54 33 TEST3 }

Sink2:

2013-06-04 23:45:42,293 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{state=US} body: 54 45 53 54 32 TEST2 }

 

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:http://www.heiqu.com/01816d07650288561a369ee403b724a0.html