TaskScheduler内幕天机解密:Spark shell案例运行日志详解、TaskScheduler和SchedulerBackend、FIFO与FAIR、Task运行时本地性算法详解等 (4)

如果有新的 ExecutorBackend 分配给我们的 Job 此时会调用 ExecutorAdded 来获得最新的完整的可用计算资源。
[下图是 TaskSetManager.scala 中 executorAdded 方法]

TaskScheduler内幕天机解密:Spark shell案例运行日志详解、TaskScheduler和SchedulerBackend、FIFO与FAIR、Task运行时本地性算法详解等

优先本地性从高到低依次为:PROCESS_LOCAL、NODE_LOCAL、NO_PREF、RACK_LOCAL、ANY. 其中 NO_PREF 是指机器本地性。一台机器通常就只有一个 Node。我们追求的是 Node 的本地性高于机器本地性。每个 Task 默认是采用一个线程进行计算的。
[下图是 TaskSetManager.scala 中 computeValidLocalityLevels 方法]

TaskScheduler内幕天机解密:Spark shell案例运行日志详解、TaskScheduler和SchedulerBackend、FIFO与FAIR、Task运行时本地性算法详解等

从 TaskSchedulerImpl.scala 中的 resourceOffers 后续调用了 resourceOfferSignleTask 来确定了具体任务运行在那台机制上
[下图是 TaskSchedulerImpl.scala 中 resourceOfferSingleTask 方法]

TaskScheduler内幕天机解密:Spark shell案例运行日志详解、TaskScheduler和SchedulerBackend、FIFO与FAIR、Task运行时本地性算法详解等

通过调用 TaskSetManager 的 resourceOffer 最终确定每个 Task 具体运行在哪个 ExecutorBackend 的具体 Locality Level;

在第7步调用makeOffers方法后,再通过 launchTasks 把任务发送给 ExecutorBackend 去执行
[下图是 CoarseGrainedSchedulerBackend.scala 中 launchTasks 方法]

TaskScheduler内幕天机解密:Spark shell案例运行日志详解、TaskScheduler和SchedulerBackend、FIFO与FAIR、Task运行时本地性算法详解等

DAGScheduler 是从数据层面考虑 preferredLocation 的,而 TaskScheduler 是从具体计算 Task 的角度考虑计算的本地性 Task 进行广播时候的 AkkFrameSize 大小是 128MB,这样改好处是可以广播大任务。如果任务大于等于 128 MB - 200K 的话则 Task 会直接被丢弃掉。如果小于 128 MB - 200K 会通过 CoarseGraninedExecutorBackend 去 launchTask 到具体的 ExecutorBackend 上。
[下图是 CoarseGrainedSchedulerBackend.scala 中 akkaFrameSize 变量]

TaskScheduler内幕天机解密:Spark shell案例运行日志详解、TaskScheduler和SchedulerBackend、FIFO与FAIR、Task运行时本地性算法详解等


[下图是 AkkaUtils.scala 中 maxFrameSizeBytes 方法]

TaskScheduler内幕天机解密:Spark shell案例运行日志详解、TaskScheduler和SchedulerBackend、FIFO与FAIR、Task运行时本地性算法详解等

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zgjzsf.html