Twitter Storm源代码分析之CoordinatedBolt

关于Twitter Storm的新特性:Transactional Topology被问到的最多的问题是: Storm是怎么知道一个Bolt处理完成了它所有的tuple的?其实要做到这一点还是有蛮多事情要做的, 幸运的是Storm已经提供了一个Bolt,帮我们把这些事情都做掉了。这个牛逼的bolt就是
CoordinatedBolt. 重要的是CoordinatedBolt的实现也是在storm的原语:spout, bolt这些基础之上的 — 也就是说即使作者不提供,我们自己也可以实现。我们来看看这个类的实现原理。

虽然CoordinatedBolt所发挥的作用很牛逼,但是其实它的原理并不是很复杂。它现在被用在两个场景里面:

DRPC

Transactional Topology

在看CoordinatedBolt的原理之前,我们先看看到底什么叫”处理完了”, 到底处理完什么了?
其实CoordinatedBolt对于业务不是完全没有侵入的,要使用CoordinatedBolt提供的功能,你必须要保证你的每个bolt发送的每个tuple的第一个field是request-id, 那么所谓的”做完了”的意思是说当前这个bolt对于当前这个”request-id”所需要做的工作做完了。 这个request-id在DRPC里面代表一个DRPC请求;在Transactional Topology里面代表一个batch.

CoordinatedBolt的原理是这样的:

对于用户在DRPC, Transactional Topology里面的Bolt,都被CoordinatedBolt包装了一层:也就是说在DRPC, Transactional Topology里面的topology里面运行的已经不是用户提供的原始的Bolt, 而是一堆CoordinatedBolt, CoordinatedBolt把这些Bolt的事务都代理了。

有了这个代理层,CoordinatedBolt就可以做它的工作了。

它会在自己这里维护以下几个数据:

哪些上游task要给我发tuple?(通过构造topology的时候所提供的grouping信息可以得知)

我给哪些下游task发tuple? (同样通过grouping信息可以得知)

每个CoordinatedBolt在每次真正bolt发出一个tuple之后,它都会记录下,这个tuple发给哪个task了。

等它所有的tuple都发送完了之后(怎么知道发送完了?等会再说,少安毋躁),它通过另外一个特殊的stream以emitDirect的方式告诉所有它发送过tuple的task,它发送了多少tuple给它。

一个bolt在接到所有的上游task发送的tuple个数信息之后,对比它接收到的tuple数量,如果数量对上了,说明它接收到了所有的tuple — 它处理完成了。

这样它处理完成了,它可以重复上面的步骤通知它的下游,它的下游再通知它的下游的下游等等。

总结一下,每个tuple怎么知道自己处理完成了的?都是靠它的上游通知的。所以只要一个bolt有上游,它就能够知道自己什么时候完成。

那总有一个bolt是没有上游的 — 最上面那个bolt。那么这个bolt是怎么知道自己处理完成的呢?靠的是storm的ack系统 — 只要它ack了它的上游(某个非CoordinatedBolt, 在DRPC里面就是PrepareRequest)发送过来的tuple, 它就完成处理这个tuple了。 — 也就是说对于最上面那个bolt来说它只要处理完一个tuple(相对于它的下游要处理很多tuple才算完成)

具体原理如下图:

正如我们在上面讨论到底什么叫”做完成”的概念的时候,我们说了,CoordinatedBolt的使用对于业务是有侵入的:你必须要在你的每个tuple的第一个字段带上当前request-id, 否则CoordinatedBolt就跟踪不了了。 一种更优雅的方式是网络协议栈里面IP, TCP协议的处理方式。IP包在TCP包的外面包上IP层需要的信息,而不要求把IP层需要的信息掺杂在TCP的包字段里面,TCP层在发送数据的时候只组装TCP的那些字段,到了IP层自动加上IP层的信息。而IP层把数据包传给TCP层之前也自动去掉IP层的那些信息,TCP只会看到自己层的那些字段,毫无侵入。。作者对于这个问题提了一些改进的措施在这里

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/b2b3d00a6c2a28cdc86bdc3a3c714a72.html