Hadoop Map/Reduce执行全流程关键代码

Hadoop Map/Reduce执行全流程关键代码

JobClient.runJob(conf) | 运行job  

|-->JobClient jc = new JobClient(job);   |-->RunningJob rj = jc.submitJob(job);       |-->submitJobInternal(job);           |-->int reduces = job.getNumReduceTasks();           |-->JobContext context = new JobContext(job, jobId);           |-->maps = writeOldSplits(job, submitSplitFile);           |-->job.setNumMapTasks(maps);           |-->job.writeXml(out);           |-->JobStatus status = jobSubmitClient.submitJob(jobId);      JobTracker.submitJob(JobId) |提交job   |-->JobInProgress job = new JobInProgress(jobId, thisthis.conf);   |-->checkAccess(job, QueueManager.QueueOperation.SUBMIT_JOB);  |检查权限   |-->checkMemoryRequirements(job);  |检查内存需求   |-->addJob(jobId, job);  |添加至job队列       |-->jobs.put(job.getProfile().getJobID(), job);       |--> for (JobInProgressListener listener : jobInProgressListeners) |添加至监听器,供调度使用           |-->listener.jobAdded(job);      JobTracker.heartbeat()  |JobTracker启动后供TaskTracker以RPC方式来调用,返回Response集合   |-->List<TaskTrackerAction> actions = new ArrayList<TaskTrackerAction>();   |-->tasks = taskScheduler.assignTasks(taskTrackerStatus);  |通过调度器选择合适的tasks   |-->for (Task task : tasks)       |-->expireLaunchingTasks.addNewTask(task.getTaskID());       |-->actions.add(new LaunchTaskAction(task));  |实际actions还会添加commmitTask等   |-->response.setHeartbeatInterval(nextInterval);   |-->response.setActions(actions.toArray(new TaskTrackerAction[actions.size()]));   |-->return response;         TaskTracker.offerService |TaskTracker启动后通过offerservice()不断发心跳至JobTracker中   |-->transmitHeartBeat()       |-->HeartbeatResponse heartbeatResponse = jobClient.heartbeat(status, justStarted, justInited,askForNewTask, heartbeatResponseId);   |-->TaskTrackerAction[] actions = heartbeatResponse.getActions();   |-->for(TaskTrackerAction action: actions)       |-->if (action instanceof LaunchTaskAction)           |-->addToTaskQueue((LaunchTaskAction)action);  |添加至执行Queue,根据map/reduce task分别添加               |-->if (action.getTask().isMapTask()) {                   |-->mapLauncher.addToTaskQueue(action);                       |-->TaskInProgress tip = registerTask(action, this);                       |-->tasksToLaunch.add(tip);                       |-->tasksToLaunch.notifyAll();  |唤醒阻塞进程               |-->else                    |-->reduceLauncher.addToTaskQueue(action);      TaskLauncher.run()   |--> while (tasksToLaunch.isEmpty())                 |-->tasksToLaunch.wait();   |-->tip = tasksToLaunch.remove(0);   |-->startNewTask(tip);       |-->localizeJob(tip);           |-->launchTaskForJob(tip, new JobConf(rjob.jobConf));                |-->tip.setJobConf(jobConf);               |-->tip.launchTask();  |TaskInProgress.launchTask()                   |-->this.runner = task.createRunner(TaskTracker.thisthis); |区分map/reduce                   |-->this.runner.start();   MapTaskRunner.run()  |执行MapTask   |-->File workDir = new File(lDirAlloc.getLocalPathToRead()  |准备执行路径   |-->String jar = conf.getJar();  |准备jar包   |-->File jvm = new File(new File(System.getProperty("java.home"), "bin"), "java");  |获取jvm   |-->vargs.add(Child.class.getName());  |添加参数,Child类作为main主函数启动   |-->tracker.addToMemoryManager(t.getTaskID(), t.isMapTask(), conf, pidFile);  |添加至内存管理   |-->jvmManager.launchJvm(this, jvmManager.constructJvmEnv(setup,vargs,stdout,stderr,logSize,  |统一纳入jvm管理器当中并启动                   workDir, env, pidFile, conf));           |-->mapJvmManager.reapJvm(t, env);  |区分map/reduce操作      JvmManager.reapJvm()  |   |--> while (jvmIter.hasNext())       |-->JvmRunner jvmRunner = jvmIter.next().getValue();       |-->JobID jId = jvmRunner.jvmId.getJobId();       |-->setRunningTaskForJvm(jvmRunner.jvmId, t);   |-->spawnNewJvm(jobId, env, t);       |-->JvmRunner jvmRunner = new JvmRunner(env,jobId);           |-->jvmIdToRunner.put(jvmRunner.jvmId, jvmRunner);       |-->jvmRunner.start();   |执行JvmRunner的run()方法           |-->jvmRunner.run()               |-->runChild(env);                   |-->List<String> wrappedCommand =  TaskLog.captureOutAndError(env.setup, env.vargs, env.stdout, env.stderr,                            env.logSize, env.pidFile);  |选取main函数                   |-->shexec.execute();  |执行                   |-->int exitCode = shexec.getExitCode(); |获取执行状态值                   |--> updateOnJvmExit(jvmId, exitCode, killed); |更新Jvm状态      Child.main() 执行Task(map/reduce)   |-->JVMId jvmId = new JVMId(firstTaskid.getJobID(),firstTaskid.isMap(),jvmIdInt);   |-->TaskUmbilicalProtocol umbilical = (TaskUmbilicalProtocol)RPC.getProxy(TaskUmbilicalProtocol.class,           TaskUmbilicalProtocol.versionID, address, defaultConf);   |--> while (true)        |-->JvmTask myTask = umbilical.getTask(jvmId);       |-->task = myTask.getTask();       |-->taskid = task.getTaskID();       |-->TaskRunner.setupWorkDir(job);       |-->task.run(job, umbilical);   |以maptask为例           |-->TaskReporter reporter = new TaskReporter(getProgress(), umbilical);           |-->if (useNewApi)               |-->runNewMapper(job, split, umbilical, reporter);           |-->else               |-->runOldMapper(job, split, umbilical, reporter);                   |-->inputSplit = (InputSplit) ReflectionUtils.newInstance(job.getClassByName(splitClass), job);                   |-->MapRunnable<INKEY,INVALUE,OUTKEY,OUTVALUE> runner =  ReflectionUtils.newInstance(job.getMapRunnerClass(), job);                   |-->runner.run(in, new OldOutputCollector(collector, conf), reporter);      MapRunner.run()   |--> K1 key = input.createKey();   |-->V1 value = input.createValue();   |-->while (input.next(key, value))        |-->mapper.map(key, value, output, reporter);       |--> if(incrProcCount)            |-->reporter.incrCounter(SkipBadRecords.COUNTER_GROUP,                    |-->SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS, 1);   |-->mapper.close();     

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:http://www.heiqu.com/a2376b3906fcb39e814f46513a613447.html