Twitter Storm如何保证消息不丢失

storm保证从spout发出的每个tuple都会被完全处理。这篇文章介绍storm是怎么做到这个保证的,以及我们使用者怎么做才能充分利用storm的可靠性特点。

一个tuple被”完全处理”是什么意思?

就如同蝴蝶效应一样,从spout发射的一个tuple可以引起其它成千上万个tuple因它而产生, 想想那个计算一篇文章中每个单词出现次数的topology.

1

2

3

4

5

6

7

8

9

 

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout(1, new KestrelSpout("kestrel.backtype.com",

22133,

"sentence_queue",

new StringScheme()));

builder.setBolt(2, new SplitSentence(), 10)

.shuffleGrouping(1);

builder.setBolt(3, new WordCount(), 20)

.fieldsGrouping(2, new Fields("word"));

 

这个topology从一个Kestrel队列读取句子, 把每个句子分割成一个个单词, 然后发射这一个个单词: 一个源tuple(一个句子)引起后面很多tuple的产生(一个个单词), 这个消息流大概是这样的:

统计单词出现次数的tuple树

统计单词出现次数的tuple树

在storm里面一个tuple被完全处理的意思是: 这个tuple以及由这个tuple所导致的所有的tuple都被成功处理。而一个tuple会被认为处理失败了如果这个消息在timeout所指定的时间内没有成功处理。 而这个timetout可以通过来指定。

如果一个消息处理成功了或者失败了会发生什么?

FYI。 下面这个是spout要实现的接口:

1

2

3

4

5

6

7

8

 

public interface ISpout extends Serializable {

void open(Map conf, TopologyContext context,

SpoutOutputCollector collector);

void close();

void nextTuple();

void ack(Object msgId);

void fail(Object msgId);

}

 

首先storm通过调用spout的nextTuple方法来获取下一个tuple, Spout通过open方法参数里面提供的SpoutOutputCollector来发射新tuple到它的其中一个输出消息流, 发射tuple的时候spout会提供一个message-id, 后面我们通过这个message-id来追踪这个tuple。举例来说, KestrelSpout从kestrel队列里面读取一个消息,并且把kestrel提供的消息id作为message-id, 看例子:

1

 

_collector.emit(new Values("field1","field2", 3),msgId);

 

接下来, 这个发射的tuple被传送到消息处理者bolt那里, storm会跟踪由此所产生的这课tuple树。如果storm检测到一个tuple被完全处理了, 那么storm会以最开始的那个message-id作为参数去调用消息源的ack方法;反之storm会调用spout的fail方法。值得注意的一点是, storm调用ack或者fail的task始终是产生这个tuple的那个task。所以如果一个spout被分成很多个task来执行, 消息执行的成功失败与否始终会通知最开始发出tuple的那个task。

我们再以KestrelSpout为例来看看spout需要做些什么才能保证“一个消息始终被完全处理”, 当KestrelSpout从Kestrel里面读出一条消息, 首先它“打开”这条消息, 这意味着这条消息还在kestrel队列里面, 不过这条消息会被标示成“处理中”直到ack或者fail被调用。处于“处理中“状态的消息不会被发给其他消息处理者了;并且如果这个spout“断线”了, 那么所有处于“处理中”状态的消息会被重新标示成“等待处理”.

Storm的可靠性API

作为storm的使用者,有两件事情要做以更好的利用storm的可靠性特征。 首先,在你生成一个新的tuple的时候要通知storm; 其次,完成处理一个tuple之后要通知storm。 这样storm就可以检测整个tuple树有没有完成处理,并且通知源spout处理结果。storm提供了一些简洁的api来做这些事情。

由一个tuple产生一个新的tuple称为: anchoring。你发射一个新tuple的同时也就完成了一次anchoring。看下面这个例子: 这个bolt把一个包含一个句子的tuple分割成每个单词一个tuple。

01

02

03

04

05

06

07

08

09

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

 

public class SplitSentence implements IRichBolt {

OutputCollector _collector;

 

public void prepare(Map conf,

TopologyContext context,

OutputCollector collector) {

_collector = collector;

}

 

public void execute(Tuple tuple) {

String sentence = tuple.getString(0);

for(String word: sentence.split(" ")) {

_collector.emit(tuple, new Values(word));

}

_collector.ack(tuple);

}

 

public void cleanup() {

}

 

public void declareOutputFields(OutputFieldsDeclarer declarer) {

declarer.declare(new Fields("word"));

}

}

 

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/2524a197230067c15969e05665c20b7d.html