如果有新的 ExecutorBackend 分配给我们的 Job 此时会调用 ExecutorAdded 来获得最新的完整的可用计算资源。
[下图是 TaskSetManager.scala 中 executorAdded 方法]
优先本地性从高到低依次为:PROCESS_LOCAL、NODE_LOCAL、NO_PREF、RACK_LOCAL、ANY. 其中 NO_PREF 是指机器本地性。一台机器通常就只有一个 Node。我们追求的是 Node 的本地性高于机器本地性。每个 Task 默认是采用一个线程进行计算的。
[下图是 TaskSetManager.scala 中 computeValidLocalityLevels 方法]
从 TaskSchedulerImpl.scala 中的 resourceOffers 后续调用了 resourceOfferSignleTask 来确定了具体任务运行在那台机制上
[下图是 TaskSchedulerImpl.scala 中 resourceOfferSingleTask 方法]
通过调用 TaskSetManager 的 resourceOffer 最终确定每个 Task 具体运行在哪个 ExecutorBackend 的具体 Locality Level;
在第7步调用makeOffers方法后,再通过 launchTasks 把任务发送给 ExecutorBackend 去执行
[下图是 CoarseGrainedSchedulerBackend.scala 中 launchTasks 方法]
DAGScheduler 是从数据层面考虑 preferredLocation 的,而 TaskScheduler 是从具体计算 Task 的角度考虑计算的本地性 Task 进行广播时候的 AkkFrameSize 大小是 128MB,这样改好处是可以广播大任务。如果任务大于等于 128 MB - 200K 的话则 Task 会直接被丢弃掉。如果小于 128 MB - 200K 会通过 CoarseGraninedExecutorBackend 去 launchTask 到具体的 ExecutorBackend 上。
[下图是 CoarseGrainedSchedulerBackend.scala 中 akkaFrameSize 变量]
[下图是 AkkaUtils.scala 中 maxFrameSizeBytes 方法]