Hadoop学习总结:Map(2)

JobTracker作为一个单独的JVM运行,其运行的main函数主要调用有下面两部分:

调用静态函数startTracker(new JobConf())创建一个JobTracker对象 调用JobTracker.offerService()函数提供服务

在JobTracker的构造函数中,会生成一个taskScheduler成员变量,来进行Job的调度,默认为JobQueueTaskScheduler,也即按照FIFO的方式调度任务。

在offerService函数中,则调用taskScheduler.start(),在这个函数中,为JobTracker(也即taskScheduler的taskTrackerManager)注册了两个Listener:

JobQueueJobInProgressListener jobQueueJobInProgressListener用于监控job的运行状态 EagerTaskInitializationListener eagerTaskInitializationListener用于对Job进行初始化

EagerTaskInitializationListener中有一个线程JobInitThread,不断得到jobInitQueue中的JobInProgress对象,调用JobInProgress对象的initTasks函数对任务进行初始化操作。

在上一节中,客户端调用了JobTracker.submitJob函数,此函数首先生成一个JobInProgress对象,然后调用addJob函数,其中有如下的逻辑:

synchronized (jobs) {

synchronized (taskScheduler) {

jobs.put(job.getProfile().getJobID(), job);

    //对JobTracker的每一个listener都调用jobAdded函数

for (JobInProgressListener listener : jobInProgressListeners) {

listener.jobAdded(job);

}

}

}

 

EagerTaskInitializationListener的jobAdded函数就是向jobInitQueue中添加一个JobInProgress对象,于是自然触发了此Job的初始化操作,由JobInProgress得initTasks函数完成:

public synchronized void initTasks() throws IOException {

……

  //从HDFS中读取job.split文件从而生成input splits

String jobFile = profile.getJobFile();

Path sysDir = new Path(this.jobtracker.getSystemDir());

FileSystem fs = sysDir.getFileSystem(conf);

DataInputStream splitFile =

fs.open(new Path(conf.get("mapred.job.split.file")));

JobClient.RawSplit[] splits;

try {

splits = JobClient.readSplitFile(splitFile);

} finally {

splitFile.close();

}

  //map task的个数就是input split的个数

numMapTasks = splits.length;

  //为每个map tasks生成一个TaskInProgress来处理一个input split

maps = new TaskInProgress[numMapTasks];

for(int i=0; i < numMapTasks; ++i) {

inputLength += splits[i].getDataLength();

maps[i] = new TaskInProgress(jobId, jobFile,

splits[i],

jobtracker, conf, this, i);

}

  //对于map task,将其放入nonRunningMapCache,是一个Map<Node, List<TaskInProgress>>,也即对于map task来讲,其将会被分配到其input split所在的Node上。nonRunningMapCache将在JobTracker向TaskTracker分配map task的时候使用。

if (numMapTasks > 0) {
    nonRunningMapCache = createCache(splits, maxLevel);
  }

  //创建reduce task

this.reduces = new TaskInProgress[numReduceTasks];

for (int i = 0; i < numReduceTasks; i++) {

reduces[i] = new TaskInProgress(jobId, jobFile,

numMapTasks, i,

jobtracker, conf, this);

    //reduce task放入nonRunningReduces,其将在JobTracker向TaskTracker分配reduce task的时候使用。

nonRunningReduces.add(reduces[i]);

}

  //创建两个cleanup task,一个用来清理map,一个用来清理reduce.

cleanup = new TaskInProgress[2];

cleanup[0] = new TaskInProgress(jobId, jobFile, splits[0],

jobtracker, conf, this, numMapTasks);

cleanup[0].setJobCleanupTask();

cleanup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,

numReduceTasks, jobtracker, conf, this);

cleanup[1].setJobCleanupTask();

  //创建两个初始化 task,一个初始化map,一个初始化reduce.

setup = new TaskInProgress[2];

setup[0] = new TaskInProgress(jobId, jobFile, splits[0],

jobtracker, conf, this, numMapTasks + 1 );

setup[0].setJobSetupTask();

setup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,

numReduceTasks + 1, jobtracker, conf, this);

setup[1].setJobSetupTask();

tasksInited.set(true);//初始化完毕

……

}

 

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:http://www.heiqu.com/ppfzg.html