Storm如何保证at least once语义?

前期收到的问题:

1、在Topology中我们可以指定spout、bolt的并行度,在提交Topology时Storm如何将spout、bolt自动发布到每个服务器并且控制服务的CPU、磁盘等资源的?

2、Storm处理消息时会根据Topology生成一棵消息树,Storm如何跟踪每个消息、如何保证消息不丢失以及如何实现重发消息机制?

本篇看看storm是通过什么机制来保证消息至少处理一次的语义的,并回答第2个问题。

storm中的一些原语

这里写图片描述


要说明上面的问题,得先了解storm中的一些原语,比如:

tuple和message
在storm中,消息是通过tuple来抽象表示的,每个tuple知道它从哪里来,应往哪里去,包含了其在tuple-tree(如果是anchored的话)或者DAG中的位置,等等信息。

spout
spout充当了tuple的发送源,spout通过和其它消息源,比如kafka交互,将消息封装为tuple,发送到流的下游。

bolt
bolt是tuple的实际处理单元,通过从spout或者另一个bolt接收tuple,进行业务处理,将自己加入tuple-tree(通过在emit方法中设置anchors)或DAG,然后继续将tuple发送到流的下游。

acker
acker是一种特殊的bolt,其接收来自spout和bolt的消息,主要功能是追踪tuple的处理情况,如果处理完成,会向tuple的源头spout发送确认消息,否则,会发送失败消息,spout收到失败的消息,根据配置和自定义的情况会进行消息的丢弃、重放处理。

spout、bolt、acker的关系

spout将tuple发送给流的下游的bolts.

bolt收到tuple,处理后发送给下游的bolts.

spout向acker发送请求ack的消息.

bolt向acker发送请求ack的消息.

acker向bolt和spout发送确认ack的消息.

简单的关系如下所示:

这里写图片描述

上图展示了spout、bolts等形成了一个DAG,如何追踪这个DAG的执行过程,就是storm保证仅处理一次消息的语义的机制所在。

storm如何追踪消息(tuple)的处理

spout在调用emit/emitDirect方法发送tuple时,会以单播或者广播的方式,将消息发送给流的下游的component/task/bolt,如果配置了acker,那么会在每次emit调用之后,向acker发送请求ack的消息:

;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; spout向acker发送请求ack消息 ;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; rooted?表示是否设置了acker (if (and rooted? (not (.isEmpty out-ids))) (do (.put pending root-id [task-id message-id {:stream out-stream-id :values values} (if (sampler) (System/currentTimeMillis))]) (task/send-unanchored task-data ;;表示这是一个流初始化的消息 ACKER-INIT-STREAM-ID ;;将下游组件的out-id和0组成一个异或链,发送给acker用于追踪 [root-id (bit-xor-vals out-ids) task-id] overflow-buffer)) ;; 如果没有配置acker,则调用自身的ack方法 (when message-id (ack-spout-msg executor-data task-data message-id {:stream out-stream-id :values values} (if (sampler) 0) "0:")))

从上面的代码可以看出,每次emit tuple后,spout会向acker发送一个流ID为ACKER-INIT-STREAM-ID的消息,用于将DAG或者tuple-tree中的节点信息交给acker,acker会利用这个信息来追踪tuple-tree或DAG的完成。

而spout调用emit/emitDirect方法,将tuple发到下游的bolts,也同时会发送用于追踪DAG完成情况的信息:

;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; spout向流的下游emit消息 ;;;;;;;;;;;;;;;;;;;;;;;;;;;; (let [tuple-id (if rooted? ;; 如果有acker,tuple的MessageId会包含一个<root-id,id>的哈希表 ;; root-id和id都是long型64位整数 (MessageId/makeRootId root-id id) (MessageId/makeUnanchored)) ;;实例化tuple out-tuple (TupleImpl. worker-context values task-id out-stream-id tuple-id)] ;; 发送至队列,最终发送给流的下游的task/bolt (transfer-fn out-task out-tuple overflow-buffer) ))

这个追踪信息是什么呢?

这里写图片描述

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/6b8f5d1c3da9df40bc3893e2e2fb5345.html