JobTracker作为一个单独的JVM运行,其运行的main函数主要调用有下面两部分:
调用静态函数startTracker(new JobConf())创建一个JobTracker对象 调用JobTracker.offerService()函数提供服务在JobTracker的构造函数中,会生成一个taskScheduler成员变量,来进行Job的调度,默认为JobQueueTaskScheduler,也即按照FIFO的方式调度任务。
在offerService函数中,则调用taskScheduler.start(),在这个函数中,为JobTracker(也即taskScheduler的taskTrackerManager)注册了两个Listener:
JobQueueJobInProgressListener jobQueueJobInProgressListener用于监控job的运行状态 EagerTaskInitializationListener eagerTaskInitializationListener用于对Job进行初始化EagerTaskInitializationListener中有一个线程JobInitThread,不断得到jobInitQueue中的JobInProgress对象,调用JobInProgress对象的initTasks函数对任务进行初始化操作。
在上一节中,客户端调用了JobTracker.submitJob函数,此函数首先生成一个JobInProgress对象,然后调用addJob函数,其中有如下的逻辑:
synchronized (jobs) {
synchronized (taskScheduler) {
jobs.put(job.getProfile().getJobID(), job);
//对JobTracker的每一个listener都调用jobAdded函数
for (JobInProgressListener listener : jobInProgressListeners) {
listener.jobAdded(job);
}
}
}
EagerTaskInitializationListener的jobAdded函数就是向jobInitQueue中添加一个JobInProgress对象,于是自然触发了此Job的初始化操作,由JobInProgress得initTasks函数完成:
public synchronized void initTasks() throws IOException {
……
//从HDFS中读取job.split文件从而生成input splits
String jobFile = profile.getJobFile();
Path sysDir = new Path(this.jobtracker.getSystemDir());
FileSystem fs = sysDir.getFileSystem(conf);
DataInputStream splitFile =
fs.open(new Path(conf.get("mapred.job.split.file")));
JobClient.RawSplit[] splits;
try {
splits = JobClient.readSplitFile(splitFile);
} finally {
splitFile.close();
}
//map task的个数就是input split的个数
numMapTasks = splits.length;
//为每个map tasks生成一个TaskInProgress来处理一个input split
maps = new TaskInProgress[numMapTasks];
for(int i=0; i < numMapTasks; ++i) {
inputLength += splits[i].getDataLength();
maps[i] = new TaskInProgress(jobId, jobFile,
splits[i],
jobtracker, conf, this, i);
}
//对于map task,将其放入nonRunningMapCache,是一个Map<Node, List<TaskInProgress>>,也即对于map task来讲,其将会被分配到其input split所在的Node上。nonRunningMapCache将在JobTracker向TaskTracker分配map task的时候使用。
if (numMapTasks > 0) {
nonRunningMapCache = createCache(splits, maxLevel);
}
//创建reduce task
this.reduces = new TaskInProgress[numReduceTasks];
for (int i = 0; i < numReduceTasks; i++) {
reduces[i] = new TaskInProgress(jobId, jobFile,
numMapTasks, i,
jobtracker, conf, this);
//reduce task放入nonRunningReduces,其将在JobTracker向TaskTracker分配reduce task的时候使用。
nonRunningReduces.add(reduces[i]);
}
//创建两个cleanup task,一个用来清理map,一个用来清理reduce.
cleanup = new TaskInProgress[2];
cleanup[0] = new TaskInProgress(jobId, jobFile, splits[0],
jobtracker, conf, this, numMapTasks);
cleanup[0].setJobCleanupTask();
cleanup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,
numReduceTasks, jobtracker, conf, this);
cleanup[1].setJobCleanupTask();
//创建两个初始化 task,一个初始化map,一个初始化reduce.
setup = new TaskInProgress[2];
setup[0] = new TaskInProgress(jobId, jobFile, splits[0],
jobtracker, conf, this, numMapTasks + 1 );
setup[0].setJobSetupTask();
setup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,
numReduceTasks + 1, jobtracker, conf, this);
setup[1].setJobSetupTask();
tasksInited.set(true);//初始化完毕
……
}