前期收到的问题:
1、在Topology中我们可以指定spout、bolt的并行度,在提交Topology时Storm如何将spout、bolt自动发布到每个服务器并且控制服务的CPU、磁盘等资源的?
2、Storm处理消息时会根据Topology生成一棵消息树,Storm如何跟踪每个消息、如何保证消息不丢失以及如何实现重发消息机制?
上篇:Storm是如何保证at least once语义的
回答了第2个问题。
本篇来建立一个基本的背景,来大概看下构成storm流式计算能力的一些基础框架,并部分回答第一个问题。
worker、executor、task的关系worker是一个进程.
executor是一个线程,是运行tasks的物理容器.
task是对spout/bolt/acker等任务的逻辑抽象.
supervisor会定时从zookeeper获取拓补信息topologies、任务分配信息assignments及各类心跳信息,以此为依据进行任务分配。
在supervisor同步时,会根据新的任务分配情况来启动新的worker或者关闭旧的worker并进行负载均衡。
worker通过定期的更新connections信息,来获知其应该通讯的其它worker。
worker启动时,会根据其分配到的任务启动一个或多个executor线程。这些线程仅会处理唯一的topology。
如果有新的tolopogy被提交到集群,nimbus会重新分配任务,这个后面会说到。
executor线程负责处理多个spouts或者多个bolts的逻辑,这些spouts或者bolts,也称为tasks。
具体有多少个worker,多少个executor,每个executor负责多少个task,是由配置和指定的parallelism-hint共同决定的,但这个值并不一定等于实际运行中的数目。
如果计算出的总的executors超过了nimbus的限制,此topology将不会得到执行。
并行度的作用:
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ; 计算所有tolopogy的topology-id到executors的映射 ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; (defn- compute-topology->executors [nimbus storm-ids] "compute a topology-id -> executors map" (into {} (for [tid storm-ids] {tid (set (compute-executors nimbus tid))}))) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ; 计算topology-id到executors的映射 ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; (defn- compute-executors [nimbus storm-id] (let [conf (:conf nimbus) storm-base (.storm-base (:storm-cluster-state nimbus) storm-id nil) component->executors (:component->executors storm-base) storm-conf (read-storm-conf conf storm-id) topology (read-storm-topology conf storm-id) task->component (storm-task-info topology storm-conf)] (->> (storm-task-info topology storm-conf) reverse-map (map-val sort) (join-maps component->executors) (map-val (partial apply partition-fixed)) (mapcat second) (map to-executor-id) ))) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ; 计算topology的task-info ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; (defn storm-task-info "Returns map from task -> component id" [^StormTopology user-topology storm-conf] (->> (system-topology! storm-conf user-topology) all-components ;; 获取每个组件的并行数 (map-val (comp #(get % TOPOLOGY-TASKS) component-conf)) (sort-by first) (mapcat (fn [[c num-tasks]] (repeat num-tasks c))) (map (fn [id comp] [id comp]) (iterate (comp int inc) (int 1))) (into {}) ))