在上篇:Storm的基础框架分析
基本探讨了storm的:
worker、executor等组件的关系.
线程模型和消息系统.
任务分配流程.
topology提交到执行的过程.
但,感觉对nimbus、supervisor、并行度,任务分配和负载均衡的关系没有交代清楚,而且细节上也有些瑕疵,本篇做一个补充。
基础组件之间的关系这里做一些补充:
worker是一个进程,由supervisor启动,并只负责处理一个topology,所以不会同时处理多个topology.
executor是一个线程,由worker启动,是运行task的物理容器,其和task是1 -> N关系.
component是对spout/bolt/acker的抽象.
task也是对spout/bolt/acker的抽象,不过是计算了并行度之后。component和task是1 -> N 的关系.
supervisor会定时从zookeeper获取topologies、已分配的任务分配信息assignments及各类心跳信息,以此为依据进行任务分配。
在supervisor周期性地进行同步时,会根据新的任务分配来启动新的worker或者关闭旧的worker,以响应任务分配和负载均衡。
worker通过定期的更新connections信息,来获知其应该通讯的其它worker。
worker启动时,会根据其分配到的任务启动一个或多个executor线程。这些线程仅会处理唯一的topology。
executor线程负责处理多个spouts或者多个bolts的逻辑,这些spouts或者bolts,也称为tasks。
并行度的计算 相关配置及参数的意义具体有多少个worker,多少个executor,每个executor负责多少个task,是由配置和指定的parallelism-hint共同决定的,但指定的并行度并不一定等于实际运行中的数目。
1、TOPOLOGY-WORKERS参数指定了某个topology运行时需启动的worker数目.
2、parallelism-hint指定某个component(组件,如spout)的初始executor的数目.
3、TOPOLOGY-TASKS是component的tasks数,计算稍微复杂点:
(1). 如果未指定TOPOLOGY-TASKS,此值等于初始executors数.
(2). 如果已指定,和TOPOLOGY-MAX-TASK-PARALLELISM值进行比较,取小的那个作为实际的TOPOLOGY-TASKS.
用代码来表达就是:
(defn- component-parallelism [storm-conf component] (let [storm-conf (merge storm-conf (component-conf component)) num-tasks (or (storm-conf TOPOLOGY-TASKS) (num-start-executors component)) max-parallelism (storm-conf TOPOLOGY-MAX-TASK-PARALLELISM) ] (if max-parallelism (min max-parallelism num-tasks) num-tasks)))4、对于acker这种特殊的bolt来说,其并行度计算如下:
(1). 如果指定了TOPOLOGY-ACKER-EXECUTORS,按这个值计算.
(2). 如果未指定,那么按TOPOLOGY-WORKERS的值来设置并行度,这种情况下,一个acker对应一个worker,显然,在计算任务繁重、数据量比较大的情况下,这是不合适的。
5、如果配置了NIMBUS-SLOTS-PER-TOPOLOGY,在提交topology到nimbus时,会验证topology所需的worker总数,如果超过了这个值,说明不能够满足需求,则抛出异常。
6、如果配置了NIMBUS-EXECUTORS-PER-TOPOLOGY,如第5点,会验证topology所需的executor总数,如果超出,也会抛出异常。
同时,需要注意,实际运行中,有可能出现并行的TASKS数小于指定的数量。
通过调用nimbus接口的rebalance或者do-rebalance操作,以上并行度可被动态改变。
先回顾下任务分配中的几个主要角色:
接着看几段重要的并行度计算代码:
1、计算所有topology的topology-id到executors的映射关系:
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ; 计算所有tolopogy的topology-id到executors的映射 ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; (defn- compute-topology->executors [nimbus storm-ids] "compute a topology-id -> executors map" (into {} (for [tid storm-ids] {tid (set (compute-executors nimbus tid))})))