如果是spout -> bolt或者bolt -> bolt,这个信息就是tuple的MessageId,其内部维护一个哈希表:
// map anchor to id private Map<Long, Long> _anchorsToIds;键为root-id,表示spout,值表示tuple在tuple-tree或者DAG的根(spout)或者经过的边(bolt),但这里没有利用任何常规意义上的“树”的算法,而是采用异或的方式来存储这个值:
spout -> bolt,值被初始化为一个long型64位整数.
bolt -> bolt,值被初始化为一个long型64位整数,并和_anchorsToIds中的旧值进行按位异或,将结果更新到_anchorsToIds中.
如果是spout -> acker,或者bolt -> acker,那么用于追踪的是tuple的values:
spout -> acker : [root-id (bit-xor-vals out-ids) task-id]
bolt -> acker : [root (bit-xor id ack-val) ..]
下面给出上面调用的bit-xor-vals和bit-xor方法的代码:
(defn bit-xor-vals [vals] (reduce bit-xor 0 vals)) (defn bit-xor "Bitwise exclusive or" {:inline (nary-inline 'xor) :inline-arities >1? :added "1.0"} ([x y] (. clojure.lang.Numbers xor x y)) ([x y & more] (reduce1 bit-xor (bit-xor x y) more))) 示例说起来有点抽象,看个例子。
假设我们有1个spout,n个bolt,1个acker:
1.spoutspout发送tuple到下游的bolts:
;; id_1是发送到bolt_1的tuple-id,依此类推 spout : ->bolt_1 : id_1 ->bolt_2 : id_2 .. ->bolt_n : id_n 2.boltbolt收到tuple,在execute方法中进行必要的处理,然后调用emit方法,最后调用ack方法:
;; bolt_1调用emit方法,追踪消息的这样一个值:让id_1和bid_1按位进行异或. ;; bid_1和id_1类似,是个long型的64位随机整数,在emit这一步生成 bolt_1 emit : id_1 ^ bid_1 ;; bolt_1调用ack方法,并将值表达为如下方式的异或链的结果 bolt_1 ack : 0 ^ bid_1 ^ id_1 ^ bid_1 = 0 ^ id_1以上,可以看出bolt进行了emit-ack组合后,其自身在异或链中的作用消失了,也就是说tuple在此bolt得到了处理。
(当然,此时的ack还没有得到acker的确认,假设acker确认了,那么上面所说的tuple在bolt得到了处理就成立了。)
来看看acker的确认。
3.ackeracker收到来自spout的tuple:
;; spout发消息给acker,tuple的MessageId包含下面的异或链的结果 spout -> acker : 0 ^ id_1 ^ id_2 ^ .. ^ id_n ;; acker收到来spout的消息,对tuple的ackVal进行处理,如下所示: acker : 0 ^ (0 ^ id_1 ^ id_2 ^ .. ^ id_n) = 0 ^ id_1 ^ id_2 ^ .. ^ id_nacker收到来自bolt的tuple:
;; bolt_1发消息给acker: bolt_1 -> acker : 0 ^ id_1 ;; acker维护的对应此tuple的源spout的ackVal : ackVal : 0 ^ id_1 ^ id_2 ^ .. ^ id_n ;; acker进行确认,也就是拿上面的两个值进行异或: acker : (0 ^ id_1) ^ (0 ^ id_1 ^ id_2 ^ .. ^ id_n) = 0 ^ id_2 ^ .. ^ id_n可以看出,bolt_1向acker请求ack,acker收到请求ack,异或之后,id_1的作用消失。也就是说,bolt_1已处理完毕这个tuple。
所以,在acker看来,如果某个bolt的处理完成,则此bolt在异或链中的作用就消失了。
如果所有的bolt 都得到处理,那么acker将会观察到ackVal值变成了0:
ackVal = 0 = (0 ^ id_1) ^ (0 ^ id_1 ^ .. ^ id_n) ^ .. ^ (0 ^ id_n) = (0 ^ 0) ^ (id_1 ^ id_1) ^ (id_2 ^ id_2) ^ .. ^ (id_n ^ id_n)如果出现了ackVal = 0,说明两个可能:
spout发送的tuple都处理完成,tuple-tree或者DAG已完成。