Storm如何分配任务和负载均衡?

在上篇:Storm的基础框架分析

基本探讨了storm的:

worker、executor等组件的关系.

线程模型和消息系统.

任务分配流程.

topology提交到执行的过程.

但,感觉对nimbus、supervisor、并行度,任务分配和负载均衡的关系没有交代清楚,而且细节上也有些瑕疵,本篇做一个补充。

基础组件之间的关系

这里写图片描述

这里做一些补充:

worker是一个进程,由supervisor启动,并只负责处理一个topology,所以不会同时处理多个topology.

executor是一个线程,由worker启动,是运行task的物理容器,其和task是1 -> N关系.

component是对spout/bolt/acker的抽象.

task也是对spout/bolt/acker的抽象,不过是计算了并行度之后。component和task是1 -> N 的关系.

supervisor会定时从zookeeper获取topologies、已分配的任务分配信息assignments及各类心跳信息,以此为依据进行任务分配。

在supervisor周期性地进行同步时,会根据新的任务分配来启动新的worker或者关闭旧的worker,以响应任务分配和负载均衡。

worker通过定期的更新connections信息,来获知其应该通讯的其它worker。

worker启动时,会根据其分配到的任务启动一个或多个executor线程。这些线程仅会处理唯一的topology。

executor线程负责处理多个spouts或者多个bolts的逻辑,这些spouts或者bolts,也称为tasks。

并行度的计算 相关配置及参数的意义

具体有多少个worker,多少个executor,每个executor负责多少个task,是由配置和指定的parallelism-hint共同决定的,但指定的并行度并不一定等于实际运行中的数目。

1、TOPOLOGY-WORKERS参数指定了某个topology运行时需启动的worker数目.

2、parallelism-hint指定某个component(组件,如spout)的初始executor的数目.

3、TOPOLOGY-TASKS是component的tasks数,计算稍微复杂点:
(1). 如果未指定TOPOLOGY-TASKS,此值等于初始executors数.
(2). 如果已指定,和TOPOLOGY-MAX-TASK-PARALLELISM值进行比较,取小的那个作为实际的TOPOLOGY-TASKS.

用代码来表达就是:

(defn- component-parallelism [storm-conf component] (let [storm-conf (merge storm-conf (component-conf component)) num-tasks (or (storm-conf TOPOLOGY-TASKS) (num-start-executors component)) max-parallelism (storm-conf TOPOLOGY-MAX-TASK-PARALLELISM) ] (if max-parallelism (min max-parallelism num-tasks) num-tasks)))

4、对于acker这种特殊的bolt来说,其并行度计算如下:

(1). 如果指定了TOPOLOGY-ACKER-EXECUTORS,按这个值计算.
(2). 如果未指定,那么按TOPOLOGY-WORKERS的值来设置并行度,这种情况下,一个acker对应一个worker,显然,在计算任务繁重、数据量比较大的情况下,这是不合适的。

5、如果配置了NIMBUS-SLOTS-PER-TOPOLOGY,在提交topology到nimbus时,会验证topology所需的worker总数,如果超过了这个值,说明不能够满足需求,则抛出异常。

6、如果配置了NIMBUS-EXECUTORS-PER-TOPOLOGY,如第5点,会验证topology所需的executor总数,如果超出,也会抛出异常。

同时,需要注意,实际运行中,有可能出现并行的TASKS数小于指定的数量。
通过调用nimbus接口的rebalance或者do-rebalance操作,以上并行度可被动态改变。

并行度计算在任务分配中的体现

先回顾下任务分配中的几个主要角色:

这里写图片描述

接着看几段重要的并行度计算代码:

1、计算所有topology的topology-id到executors的映射关系:

;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ; 计算所有tolopogy的topology-id到executors的映射 ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; (defn- compute-topology->executors [nimbus storm-ids] "compute a topology-id -> executors map" (into {} (for [tid storm-ids] {tid (set (compute-executors nimbus tid))})))

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/61434a91b2a2da4b5a283496ca58ad6a.html