4.JobTracker.initJob():主要调用job.initTasks(),下面进入到JobInProgress.initTasks()。
5.JobInProgress.initTasks():为job对象设置优先级setPriority(this.priority),接着读取分片信息文件获取分片信息,SplitMetaInfoReader.readSplitMetaInfo()这个方就是jobInPorgress用来读取分分片信息的,读取过程与写入过程相对应,具体还是较简单的。读取了分片信息之后,根据分片数量创建相应数量的mapTask(TaskInProgress对象),接下来会执行nonRunningMapCache = createCache(splits, maxLevel),这个方法是根据每个分片的location信息,然后根据location的host判断每个host上所有的job,并放入cache中。接着根据设置的reduce数量新建对应的reduceTask(TaskInProgress对象),并加入到nonRunningReduces队列中,并根据mapred.reduce.slowstart.completed.maps(百分比,默认是5%)参数的值计算completedMapsForReduceSlowstart(多少map任务完成的时候启动reduce任务)。之后就是分别新建两个setUp任务和cheanUp任务,分别对应map和reduce task。到此initTask完成,initTask完成JobTracker的initJob也就差不多完成了,接着FairScheduler的updateRunnability()也就完成了。回到FairScheduler.update()。
6.FairScheduler.update():
for (Pool pool: poolMgr.getPools()) {
pool.getMapSchedulable().updateDemand();
pool.getReduceSchedulable().updateDemand();
}
// Compute fair shares based on updated demands
List<PoolSchedulable> mapScheds = getPoolSchedulables(TaskType.MAP);
List<PoolSchedulable> reduceScheds = getPoolSchedulables(TaskType.REDUCE);
SchedulingAlgorithms.computeFairShares(
mapScheds, clusterStatus.getMaxMapTasks());
SchedulingAlgorithms.computeFairShares(
reduceScheds, clusterStatus.getMaxReduceTasks());
// Use the computed shares to assign shares within each pool
for (Pool pool: poolMgr.getPools()) {
pool.getMapSchedulable().redistributeShare();
pool.getReduceSchedulable().redistributeShare();
}
if (preemptionEnabled)
updatePreemptionVariables();
}
看不懂,先到这吧,等下次慢慢研究吧。