2、计算topology-id到executors的映射信息:
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ; 计算topology-id到executors的映射 ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; (defn- compute-executors [nimbus storm-id] (let [conf (:conf nimbus) storm-base (.storm-base (:storm-cluster-state nimbus) storm-id nil) component->executors (:component->executors storm-base) storm-conf (read-storm-conf conf storm-id) topology (read-storm-topology conf storm-id) task->component (storm-task-info topology storm-conf)] (->> (storm-task-info topology storm-conf) reverse-map (map-val sort) (join-maps component->executors) (map-val (partial apply partition-fixed)) (mapcat second) (map to-executor-id) )))3、计算topology的任务信息 task-info,这里TOPOLOGY-TASKS就决定了每个组件component(spout、bolt)的并行度,或者说tasks数:
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ; 计算topology的task-info ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; (defn storm-task-info "Returns map from task -> component id" [^StormTopology user-topology storm-conf] (->> (system-topology! storm-conf user-topology) all-components ;; 获取每个组件的并行数 (map-val (comp #(get % TOPOLOGY-TASKS) component-conf)) (sort-by first) (mapcat (fn [[c num-tasks]] (repeat num-tasks c))) (map (fn [id comp] [id comp]) (iterate (comp int inc) (int 1))) (into {}) ))4、上述1��2、3段代码会在nimbus进行任务分配时调用,任务分配是通过mk-assignments函数来完成,调用过程用伪代码描述如下:
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ; nimbus进行任务分配 ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; mk-assignments ;; 这一步计算topology的所有executor对应的node + port信息 ->compute-new-topology->executor->node+port ->compute-topology->executors -> ... nimbus进行任务分配这里回顾并补充下nimbus进行任务分配的主要流程:
任务分配的流程1、nimbus将一组node + port 称为worker-slot,由executor到worker-slot的映射信息,就决定executor将在哪台机器、哪个worker进程运行,随之spout、bolt、acker等位置也就确定了,如下图所示:
2、 nimbus是整个集群的控管核心,总体负责了topology的提交、运行状态监控、负载均衡及任务分配等工作。
3、nimbus分配的任务包含了topology代码所在的路径(在nimbus本地)、tasks、executors及workers信息。
worker由node + port及配置的worker数量来唯一确定。
任务信息Assignment结构如下:
3、supervisor负责实际的同步worker的操作。一个supervisor称为一个node。所谓同步worker,是指响应nimbus的任务分配,进行worker的建立、调度与销毁。
在收到任务时,如果相关的topology代码不在本地,supervisor会从nimbus下载代码并写入本地文件。
4、 通过node、port、host信息的计算,worker就知道和哪些机器进行通讯,而当负载均衡发生、任务被重新分配时,这些机器可能发生了变化,worker会通过周期性的调用refresh-connections来获知变化,并进行新连接的建立、废弃连接的销毁等工作,如下图所示:
任务分配的依据