6.JobTracker.main():在实例化jobTracker之后,会调用tracker.offerService()方法,之后main方法就没什么了,下面看看tracker.offerService()这个方法。
public static void main(String argv[]
) throws IOException, InterruptedException {
StringUtils.startupShutdownMessage(JobTracker.class, argv, LOG);
try {
if(argv.length == 0) {
JobTracker tracker = startTracker(new JobConf());
tracker.offerService();
}
else {
if ("-dumpConfiguration".equals(argv[0]) && argv.length == 1) {
dumpConfiguration(new PrintWriter(System.out));
}
else {
System.out.println("usage: JobTracker [-dumpConfiguration]");
System.exit(-1);
}
}
} catch (Throwable e) {
LOG.fatal(StringUtils.stringifyException(e));
System.exit(-1);
}
}
7.JobTracker.offerService():这个方法中有一些其他东西,略掉,只看taskScheduler.start()这个方法,因为这里只是想分析下JobTracker提交job的过程,所以省去很多复杂的东西。
8.taskScheduler.start():这个方法就是启动TaskScheduler,这个方法不同taskScheduler也不同,但是统一的还是会有一个taskTrackerManager.addJobInProgressListener(jobListener)这个操作,taskTrackerManager就是jobTracker(第5步),这句的意思是为jobTracker添加jobListener,用来监听job的。这句的内部就是调用jobTracker的jobInProgressListeners集合的add(listener)方法。
到这里可以说看完了整个JobTracker的启动过程,虽然很浅显,但是对于后面将要分析的内容,这些就够了。下面来看看job的提交过程,也就是jobTracker的submit()方法。
1.jobTracker.submit():第一步是checkSafeMode(),检查是否在安全模式,在安全模式则抛出异常。然后执行jobInfo = new JobInfo(jobId, new Text(ugi.getShortUserName()),new Path(jobSubmitDir),生成一个jobInfo对象,jobInfo主要保存job的id,user,jobSubmitDir(也就是job的任务目录,上一篇文章提到)。接着是判断job是否可被recovered(job失败的时候尝试再次执行),如果允许的话(默认允许),则将jobInfo对象序列化到job-info文件中。接着到达最关键的地方,job = new JobInProgress(this, this.conf, jobInfo, 0, ts),为job实例化一个JobInProgress对象,这个对象将会对job以后的所有情况进行负责,如初始化,执行等。下面看看JobInProgress对象的初始化操作。
2.JobInProgress:这里看下将job.xml下载到本地的操作。然后就是job的队列信息,默认的队列名是default,Queue queue = this.jobtracker.getQueueManager().getQueue(queueName),这个主要是根据Hadoop所使用的taskScheduler有关,具体不研究。剩下的是一些参数的初始化,如map的数目,reduce的数目等。这里还有个设置job的优先级的,默认是normal。this.priority = conf.getJobPriority();this.status.setJobPriority(this.priority);还有检查taskLimit的操作,就是检查map+reduce的任务数是否超出mapred.jobtracker.maxtasks.per.job设置的值,默认是-1,就是没有限制的意思。回到jobTracker.submit()方法
this.localJobFile = default_conf.getLocalPath(JobTracker.SUBDIR
+"/"+jobId + ".xml");
Path jobFilePath = JobSubmissionFiles.getJobConfPath(jobSubmitDir);
jobFile = jobFilePath.toString();
fs.copyToLocalFile(jobFilePath, localJobFile);
conf = new JobConf(localJobFile);