Twitter Storm: Transactional Topolgoy简介(2)

虽然这个设计比第一个设计好多了, 它仍然不是一个完美的方案。topology里面的worker会花费大量的时间等待计算的其它部分完成。 比如看下面的这个计算。

在bolt 1完成它的处理之后, 它需要等待剩下的bolt去处理当前batch, 直到发射下一个batch。

第三个设计(storm采用的设计)

一个我们需要意识到的比较重要的问题是,为了实现transactional的特性,在处理一批tuples的时候,不是所有的工作都需要强顺序性的。比如,当做一个全局计数应用的时候, 整个计算可以分为两个部分。

计算这个batch的局部数量。

把这个batch的局部数量更新到数据库里面去。

其中第二步在多个batch之前需要保证强的顺序性, 但是第一步并不许要, 所以我们可以把第一步并行化。所以当第一个batch在更新它的个数进入数据库的时候,第2到10个batch可以开始计算它们的局部数量了。

Storm通过把一个batch的计算分成两个阶段来实现上面所说的原理:

processing阶段: 这个阶段很多batch可以并行计算。

commit阶段: 这个阶段各个batch之间需要有强顺序性的保证。所以第二个batch必须要在第一个batch成功提交之后才能提交。

这两个阶段合起来称为一个transaction。许多batch可以在processing阶段的任何时刻并行计算,但是只有一个batch可以处在commit阶段。如果一个batch在processing或者commit阶段有任何错误, 那么整个transaction需要被replay。

设计细节

当使用Transactional Topologies的时候, storm为你做下面这些事情:

1) 管理状态: Storm把所有实现Transactional Topologies所必须的状态保存在zookeeper里面。 这包括当前transaction id以及定义每个batch的一些元数据。

2) 协调事务: Storm帮你管理所有事情, 以帮你决定在任何一个时间点是该proccessing还是该committing。

3) 错误检测: Storm利用acking框架来高效地检测什么时候一个batch被成功处理了,被成功提交了,或者失败了。Storm然后会相应地replay对应的batch。你不需要自己手动做任何acking或者anchoring — storm帮你搞定所有事情。

4) 内置的批处理API: Storm在普通bolt之上包装了一层API来提供对tuple的批处理支持。Storm管理所有的协调工作,包括决定什么时候一个bolt接收到一个特定transaction的所有tuple。Storm同时也会自动清理每个transaction所产生的中间数据。

5) 最后,需要注意的一点是Transactional Topologies需要一个可以完全重发(replay)一个特定batch的消息的队列系统(Message Queue)。Kestrel之类的技术做不到这一点。而Apache的Kafka对于这个需求来说是正合适的。storm-contrib里面的storm-kafka实现了这个。

一个基本的例子

你可以通过使用TransactionalTopologyBuilder来创建transactional topology. 下面就是一个transactional topology的定义, 它的作用是计算输入流里面的tuple的个数。这段代码来自storm-starter里面的TransactionalGlobalCount

1

2

3

4

5

6

7

8

 

MemoryTransactionalSpout spout = new MemoryTransactionalSpout(

DATA, new Fields("word"), PARTITION_TAKE_PER_BATCH);

TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder(

"global-count", "spout", spout, 3);

builder.setBolt("partial-count", new BatchCount(), 5)

.shuffleGrouping("spout");

builder.setBolt("sum", new UpdateGlobalCount())

.globalGrouping("partial-count");

 

TransactionalTopologyBuilder接受如下的参数

这个transaction topology的id

spout在整个topology里面的id。

一个transactional spout。

一个可选的这个transactional spout的并行度。

topology的id是用来在zookeeper里面保存这个topology的当前进度的,所以如果你重启这个topology, 它可以接着前面的进度继续执行。

一个transaction topology里面有一个唯一的TransactionalSpout, 这个spout是通过TransactionalTopologyBuilder的构造函数来制定的。在这个例子里面,MemoryTransactionalSpout被用来从一个内存变量里面读取数据(DATA)。第二个参数制定数据的fields, 第三个参数指定每个batch的最大tuple数量。关于如何自定义TransactionalSpout我们会在后面介绍。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/1dd5b802a91c5089205ed5d50f36d2ef.html