把上次遗留的问题继续研究一下。
for (Pool pool: poolMgr.getPools()) {
pool.getMapSchedulable().updateDemand();
pool.getReduceSchedulable().updateDemand();
}
这里是更新每个pool的slot需求情况,下面来看看,pool.getMapSchedulable().updateDemand(),pool.getReduceSchedulable().updateDemand()两个基本相同。
7.PoolSchedulable.updateDemand():第一句poolMgr.getMaxSlots(pool.getName(), taskType)是获取pool的最大slot数量,从配置文件获取,配置文件是之前加载过的,前面有说到。每个PoolSchedulable中都会存在多个JobSchedulable对象,在JobListener.addJob()时添加。一个JobSchedulable对应一个jobInProgress对象。然后调用JobSchedulable.updateDemand()更新每个JobSchedulable的slot的需求。
public void updateDemand() {
// limit the demand to maxTasks
int maxTasks = poolMgr.getMaxSlots(pool.getName(), taskType);
demand = 0;
for (JobSchedulable sched: jobScheds) {
sched.updateDemand();
demand += sched.getDemand();
if (demand >= maxTasks) {
demand = maxTasks;
break;
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("The pool " + pool.getName() + " demand is " + demand
+ "; maxTasks is " + maxTasks);
}
}
8.JobSchedulable.updateDemand():首先第一步就是判断该JobSchedulable的job是否已运行(RUNNING),没有运行则不分配slot。然后判断该JobSchedulable是Map还是Reduce,如果是Reduce则需先判断完成的Map数量(finishedMapTasks)数量+失败的Map(failedMapTIPs)数量>=completedMapsForReduceSlowstart(由"mapred.reduce.slowstart.completed.maps参数值*numMapTasks),满足则表示Reduce任务可以启动,否则不可启动。而对于Map任务直接计算其slot需求。TaskInProgress[] tips = (taskType == TaskType.MAP ? job.getTasks(TaskType.MAP) : job.getTasks(TaskType.REDUCE)),获取对应的taskInPorgress数量(tip),boolean speculationEnabled = (taskType == TaskType.MAP ?job.getMapSpeculativeExecution() : job.getReduceSpeculativeExecution())判断是否启用推测执行,double avgProgress = (taskType == TaskType.MAP ?job.getStatus().mapProgress() : job.getStatus().reduceProgress())获取map/reduce任务的进度,即map/reduce已完成多少,之后计算每个taskInProgress的slot需求。如果taskInProgress未完成则正在运行中,则demand += tip.getActiveTasks().size()计算出所需的slot数量,而tip的ActiveTasks则是任务调用的时候,即调用tip.addRunningTask()方法时添加的,而该方法的调用者则是FairScheduler的assignTasks()方法,即方法调度。获取到tip的activeTasks数量,则就是该tip所需要的slot数量,同时如果启用了推测执行,则还需多加一个slot用于推测执行任务,这样就获得了一个JobSchedulable所需的总slot数量,求和即为这个pool所需的总slot数量,当所需数量大于maxTasks(该pool所拥有的最大slot数),则返回。继续回到FairScheduler.update()方法。
9.FairScheduler.update():
List<PoolSchedulable> mapScheds = getPoolSchedulables(TaskType.MAP);
List<PoolSchedulable> reduceScheds = getPoolSchedulables(TaskType.REDUCE);
SchedulingAlgorithms.computeFairShares(
mapScheds, clusterStatus.getMaxMapTasks());
SchedulingAlgorithms.computeFairShares(
reduceScheds, clusterStatus.getMaxReduceTasks());
// Use the computed shares to assign shares within each pool
for (Pool pool: poolMgr.getPools()) {
pool.getMapSchedulable().redistributeShare();
pool.getReduceSchedulable().redistributeShare();
}
if (preemptionEnabled)
updatePreemptionVariables();
这里涉及的就是FairScheduler的核心之处——资源分配算法。先看看前两句,前两句就是获取所有的MapPoolSchedulable和ReducePoolSchedulable,一个pool中分别包含一个MapPoolSchedulable和ReducePoolSchedulable。下面两句就是具体的资源分配,调用的是SchedulingAlgorithms类进行资源分配的。
10.SchedulingAlgorithms.computeFairShares():
private static double slotSUSEdWithWeightToSlotRatio(double w2sRatio,
Collection<? extends Schedulable> schedulables) {
double slotsTaken = 0;
for (Schedulable sched: schedulables) {
double share = computeShare(sched, w2sRatio);
slotsTaken += share;
}
return slotsTaken;
}
调用computeShare()方法根据job的weight和w2sRatio(相当于总权重,1.0)计算每个Schedulable根据权重应该获得slot数量。
11.SchedulingAlgorithms.computeShare():第一句double share = sched.getWeight() * w2sRatio,获取Pool的权重,该权重是在fair-scheduler.xml中设置pool时为pool设置了weigth,默认是1.0。获得job权重之后,根据weigth*w2sRatio获得一个share值,然后share=Math.max(share, sched.getMinShare())(minShare默认是0),share = Math.min(share, sched.getDemand()),即获得share值。
public double getJobWeight(JobInProgress job, TaskType taskType) {
if (!isRunnable(job)) {
// Job won't launch tasks, but don't return 0 to avoid division errors
return 1.0;
} else {
double weight = 1.0;
if (sizeBasedWeight) {
// Set weight based on runnable tasks
JobInfo info = infos.get(job);
int runnableTasks = (taskType == TaskType.MAP) ?
info.mapSchedulable.getDemand() :
info.reduceSchedulable.getDemand();
weight = Math.log1p(runnableTasks) / Math.log(2);
}
weight *= getPriorityFactor(job.getPriority());
if (weightAdjuster != null) {
// Run weight through the user-supplied weightAdjuster
weight = weightAdjuster.adjustWeight(job, taskType, weight);
}
return weight;
}
}
private static double computeShare(Schedulable sched, double w2sRatio) {
double share = sched.getWeight() * w2sRatio;
share = Math.max(share, sched.getMinShare());
share = Math.min(share, sched.getDemand());
return share;
}