虽然这个设计比第一个设计好多了, 它仍然不是一个完美的方案。topology里面的worker会花费大量的时间等待计算的其它部分完成。 比如看下面的这个计算。
在bolt 1完成它的处理之后, 它需要等待剩下的bolt去处理当前batch, 直到发射下一个batch。
第三个设计(storm采用的设计)一个我们需要意识到的比较重要的问题是,为了实现transactional的特性,在处理一批tuples的时候,不是所有的工作都需要强顺序性的。比如,当做一个全局计数应用的时候, 整个计算可以分为两个部分。
计算这个batch的局部数量。
把这个batch的局部数量更新到数据库里面去。
其中第二步在多个batch之前需要保证强的顺序性, 但是第一步并不许要, 所以我们可以把第一步并行化。所以当第一个batch在更新它的个数进入数据库的时候,第2到10个batch可以开始计算它们的局部数量了。
Storm通过把一个batch的计算分成两个阶段来实现上面所说的原理:
processing阶段: 这个阶段很多batch可以并行计算。
commit阶段: 这个阶段各个batch之间需要有强顺序性的保证。所以第二个batch必须要在第一个batch成功提交之后才能提交。
这两个阶段合起来称为一个transaction。许多batch可以在processing阶段的任何时刻并行计算,但是只有一个batch可以处在commit阶段。如果一个batch在processing或者commit阶段有任何错误, 那么整个transaction需要被replay。
设计细节当使用Transactional Topologies的时候, storm为你做下面这些事情:
1) 管理状态: Storm把所有实现Transactional Topologies所必须的状态保存在zookeeper里面。 这包括当前transaction id以及定义每个batch的一些元数据。
2) 协调事务: Storm帮你管理所有事情, 以帮你决定在任何一个时间点是该proccessing还是该committing。
3) 错误检测: Storm利用acking框架来高效地检测什么时候一个batch被成功处理了,被成功提交了,或者失败了。Storm然后会相应地replay对应的batch。你不需要自己手动做任何acking或者anchoring — storm帮你搞定所有事情。
4) 内置的批处理API: Storm在普通bolt之上包装了一层API来提供对tuple的批处理支持。Storm管理所有的协调工作,包括决定什么时候一个bolt接收到一个特定transaction的所有tuple。Storm同时也会自动清理每个transaction所产生的中间数据。
5) 最后,需要注意的一点是Transactional Topologies需要一个可以完全重发(replay)一个特定batch的消息的队列系统(Message Queue)。Kestrel之类的技术做不到这一点。而Apache的Kafka对于这个需求来说是正合适的。storm-contrib里面的storm-kafka实现了这个。
一个基本的例子你可以通过使用TransactionalTopologyBuilder来创建transactional topology. 下面就是一个transactional topology的定义, 它的作用是计算输入流里面的tuple的个数。这段代码来自storm-starter里面的TransactionalGlobalCount。
1 2 3 4 5 6 7 8
MemoryTransactionalSpout spout = new MemoryTransactionalSpout( DATA, new Fields("word"), PARTITION_TAKE_PER_BATCH); TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder( "global-count", "spout", spout, 3); builder.setBolt("partial-count", new BatchCount(), 5) .shuffleGrouping("spout"); builder.setBolt("sum", new UpdateGlobalCount()) .globalGrouping("partial-count");
TransactionalTopologyBuilder接受如下的参数
这个transaction topology的id
spout在整个topology里面的id。
一个transactional spout。
一个可选的这个transactional spout的并行度。
topology的id是用来在zookeeper里面保存这个topology的当前进度的,所以如果你重启这个topology, 它可以接着前面的进度继续执行。
一个transaction topology里面有一个唯一的TransactionalSpout, 这个spout是通过TransactionalTopologyBuilder的构造函数来制定的。在这个例子里面,MemoryTransactionalSpout被用来从一个内存变量里面读取数据(DATA)。第二个参数制定数据的fields, 第三个参数指定每个batch的最大tuple数量。关于如何自定义TransactionalSpout我们会在后面介绍。