上一篇文章()说到了jobTracker中的submitJob()方法,这个方法最终会调用listener.jobAdded(job),将Job注册到TaskScheduler中,由其进行调度。今天接着研究。Hadoop中默认的TaskScheduler是JobQueueTaskScheduler,采用的是FIFO(先进先出)原则进行调度,还有FiarScheduler和CapacityTaskScheduler两种调度类(非hadoop自带,不过hadoop也把他们加入到类库中),这两个类可以在hadoop目录下的lib包下找到,源码在src/contrib下可以找到。主要对FairScheduler进行解读。
上文提到jobTracker最终将job注册到jobListener中,下面就来看看FairScheduler的JobListener。
1.FairScheduler.JobListener.addJob():这个方法比较简单,JobSchedulable mapSched = ReflectionUtils.newInstance(conf.getClass("mapred.jobtracker.jobSchedulable", JobSchedulable.class, JobSchedulable.class), conf)这里通过反射获得两个JobSchedulable对象,也就是默认的FairScheduler.JobSchedulable对象,一个是mapSched,一个是redSched,然后进行JobSchedulable的初始化,比较简单。infos.put(job, info)将job添加到infos(存放所有的jobInPorgress对象)中,同时将job添加到PoolScheduable中,主要是根据配置的poolName获取对应的pool。下面的是重点,update()方法,下面看看这个方法。
public void jobAdded(JobInProgress job) {
synchronized (FairScheduler.this) {
eventLog.log("JOB_ADDED", job.getJobID());
JobSchedulable mapSched = ReflectionUtils.newInstance(
conf.getClass("mapred.jobtracker.jobSchedulable", JobSchedulable.class,
JobSchedulable.class), conf);
mapSched.init(FairScheduler.this, job, TaskType.MAP);
JobSchedulable redSched = ReflectionUtils.newInstance(
conf.getClass("mapred.jobtracker.jobSchedulable", JobSchedulable.class,
JobSchedulable.class), conf);
redSched.init(FairScheduler.this, job, TaskType.REDUCE);
JobInfo info = new JobInfo(mapSched, redSched);
infos.put(job, info);
poolMgr.addJob(job); // Also adds job into the right PoolScheduable
update();
}
}
2.FairScheduler.update():跳过看不懂的,直接看poolMgr.reloadAllocsIfNecessary(),这个方法主要是读取FairScheduler的配置文件(fair-scheduler.xml),由mapred.fairscheduler.allocation.file参数设置,这里是根据配置文件的最后修改时间+ALLOC_RELOAD_INTERVAL决定是否重新加载配置文件,加载文件的时候就是简单地读取xml文件。接着看update方法,加载完配置文件之后会遍历infos(保存了FairScheduler所有的jobInProgress),遍历的时候去除成功了的job和失败了的job以及被kill掉的job,同时也会从pool中去掉该job。接下来就是updateRunnability(),这个方法会根据userMaxJob以及poolMaxJob数量进行判断是否启动job。
List<JobInProgress> toRemove = new ArrayList<JobInProgress>();
for (JobInProgress job: infos.keySet()) {
int runState = job.getStatus().getRunState();
if (runState == JobStatus.SUCCEEDED || runState == JobStatus.FAILED
|| runState == JobStatus.KILLED) {
toRemove.add(job);
}
}
for (JobInProgress job: toRemove) {
jobNoLongerRunning(job);
}
3.FairScheduler.updateRunnability():第一步将所有infos中剩余的job(成功以及失败的任务会在update时清除)状态全部设为notrunning。接着对infos中的job进行排序,Collections.sort(jobs, new FifoJobComparator()),排序规则是FIFO原则(奇怪,不懂)。然后接着对jobs进行遍历,同时根据该job的提交用户和提交的pool的最大提交job数量决定是否将其添加到任务队列中(就是两个list),如果该job状态=RUNNING,则jobinfo.running=true,如果job状态=PREP(准备中),则对其进行初始化(注意这里只对job状态=RUNNING和PREP的job进行操作)。jobInitializer.initJob(jobInfo, job)进行job初始化,这里使用到jdk的threadPool(其实就是将thread加入到线程池中,由线程池绝对什么时候对其进行执行,总之都会调用thread的run方法),看看thread的run方法。run方法中调用ttm.initJob(job),此处的ttm就是jobTracker,现在回到jobTracker去。
if (userCount < poolMgr.getUserMaxJobs(user) &&
poolCount < poolMgr.getPoolMaxJobs(pool)) {
if (job.getStatus().getRunState() == JobStatus.RUNNING ||
job.getStatus().getRunState() == JobStatus.PREP) {
userJobs.put(user, userCount + 1);
poolJobs.put(pool, poolCount + 1);
JobInfo jobInfo = infos.get(job);
if (job.getStatus().getRunState() == JobStatus.RUNNING) {
jobInfo.runnable = true;
} else {
// The job is in the PREP state. Give it to the job initializer
// for initialization if we have not already done it.
if (jobInfo.needsInitializing) {
jobInfo.needsInitializing = false;
jobInitializer.initJob(jobInfo, job);
}
}
}
}
相关阅读:
Ubuntu 13.04上搭建Hadoop环境
Ubuntu 12.10 +Hadoop 1.2.1版本集群配置
搭建Hadoop环境(在Winodws环境下用虚拟机虚拟两个Ubuntu系统进行搭建)