WordCount这个任务配置完成后,就要启动。这个启动过程可是非常复杂了,还要进行运行时设置,你可以通过JobClient类的runJob()方法看到。代码实现如下所示:
/**
* Utility that submits a job, then polls for progress until the job is
* complete.
*
* @param job the job configuration.
* @throws IOException
*/
public static RunningJob runJob(JobConf job) throws IOException {
JobClient jc = new JobClient(job);
boolean error = true;
RunningJob running = null;
String lastReport = null;
final int MAX_RETRIES = 5;
int retries = MAX_RETRIES;
TaskStatusFilter filter;
try {
filter = getTaskOutputFilter(job);
} catch(IllegalArgumentException e) {
LOG.warn("Invalid Output filter : " + e.getMessage() +
" Valid values are : NONE, FAILED, SUCCEEDED, ALL");
throw e;
}
try {
running = jc.submitJob(job);
String jobId = running.getJobID();
LOG.info("Running job: " + jobId);
int eventCounter = 0;
boolean profiling = job.getProfileEnabled();
Configuration.IntegerRanges mapRanges = job.getProfileTaskRange(true);
Configuration.IntegerRanges reduceRanges = job.getProfileTaskRange(false);
while (true) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {}
try {
if (running.isComplete()) {
break;
}
running = jc.getJob(jobId);
String report =
(" map " + StringUtils.formatPercent(running.mapProgress(), 0)+
" reduce " +
StringUtils.formatPercent(running.reduceProgress(), 0));
if (!report.equals(lastReport)) {
LOG.info(report);
lastReport = report;
}
TaskCompletionEvent[] events =
running.getTaskCompletionEvents(eventCounter);
eventCounter += events.length;
for(TaskCompletionEvent event : events){
TaskCompletionEvent.Status status = event.getTaskStatus();
if (profiling &&
(status == TaskCompletionEvent.Status.SUCCEEDED ||
status == TaskCompletionEvent.Status.FAILED) &&
(event.isMap ? mapRanges : reduceRanges).
isIncluded(event.idWithinJob())) {
downloadProfile(event);
}
switch(filter){
case NONE:
break;
case SUCCEEDED:
if (event.getTaskStatus() ==
TaskCompletionEvent.Status.SUCCEEDED){
LOG.info(event.toString());
displayTaskLogs(event.getTaskId(), event.getTaskTrackerHttp());
}
break;
case FAILED:
if (event.getTaskStatus() ==
TaskCompletionEvent.Status.FAILED){
LOG.info(event.toString());
// Displaying the task diagnostic information
String taskId = event.getTaskId();
String tipId = TaskInProgress.getTipId(taskId);
String[] taskDiagnostics =
jc.jobSubmitClient.getTaskDiagnostics(jobId, tipId,
taskId);
if (taskDiagnostics != null) {
for(String diagnostics : taskDiagnostics){
System.err.println(diagnostics);
}
}
// Displaying the task logs
displayTaskLogs(event.getTaskId(), event.getTaskTrackerHttp());
}
break;
case KILLED:
if (event.getTaskStatus() == TaskCompletionEvent.Status.KILLED){
LOG.info(event.toString());
}
break;
case ALL:
LOG.info(event.toString());
displayTaskLogs(event.getTaskId(), event.getTaskTrackerHttp());
break;
}
}
retries = MAX_RETRIES;
} catch (IOException ie) {
if (--retries == 0) {
LOG.warn("Final attempt failed, killing job.");
throw ie;
}
LOG.info("Communication problem with server: " +
StringUtils.stringifyException(ie));
}
}
if (!running.isSuccessful()) {
throw new IOException("Job failed!");
}
LOG.info("Job complete: " + jobId);
running.getCounters().log(LOG);
error = false;
} finally {
if (error && (running != null)) {
running.killJob();
}
jc.close();
}
return running;
}
在理解这个方法之前,我认为有必要了解一下JobClient类:
org.apache.Hadoop.mapred.JobClient
JobClient is the primary interface for the user-job to interact with the JobTracker. JobClient provides facilities to submit jobs, track their progress, access component-tasks' reports/logs, get the Map-Reduce cluster status information etc. The job submission process involves: Here is an example on how to use JobClient: At times clients would chain map-reduce jobs to accomplish complex tasks which cannot be done via a single map-reduce job. This is fairly easy since the output of the job, typically, goes to distributed file-system and that can be used as the input for the next job. However, this also means that the onus on ensuring jobs are complete (success/failure) lies squarely on the clients. In such situations the various job-control options are:
大致说明了如下内容:
1、JobClient可以实现提交多个Job,同时追踪这些Job的运行状态及其访问日志,可以获取Map-Reduce集群的状态信息。关于集群,这是相对基于分布式计算条件下,处理海量数据的情况下,可能一个任务会在一个集群中协调处理,这要涉及到任务调度,哪些结点来执行Map操作,哪些结点来执行Reduce操作,等等。
2、一个Job提交,会涉及到很多工作:核查该Job输入输出状态条件、计算Job分割数量、必要时配置分布式缓存、在DFS上为Map-Reduce系统拷贝配置文件等、提交任务并监控其执行过程和执行状态。
3、一个Job可以是Map任务,也可以是Reduce任务,都是通过JobClinet来管理维护的。
再看上面runJob()方法的代码:
首先根据一个JobConf实例,实例化一个JobClient对象:
JobClient jc = new JobClient(job);
通过JobClient jc调用JobClient的submitJob()方法提交一个Job,返回一个RunningJob running实例,RunningJob running包含了一个可以执行的Job的详细信息,比如,可以通过它来获取到这个Job的ID:
String jobId = running.getJobID();
通过下面调用来获取一个Job(可能是Map任务,也可能是Reduce任务)的范围:
Configuration.IntegerRanges mapRanges = job.getProfileTaskRange(true);
Configuration.IntegerRanges reduceRanges = job.getProfileTaskRange(false);
Configuration.IntegerRanges表示一个正整数范围,上面分别对应着Map任务的范围和Reduce任务的范围,通过这个范围来判断是否该从集群中的指定的结点上下载数据来执行调度程序分配的任务,如下所示:
if (profiling &&
(status == TaskCompletionEvent.Status.SUCCEEDED ||
status == TaskCompletionEvent.Status.FAILED) &&
(event.isMap ? mapRanges : reduceRanges).
isIncluded(event.idWithinJob())) {
downloadProfile(event);
}
也就是说,如果JobClient监视Job执行状态,如果执行到合适的时机,可以根据TaskCompletionEvent event来下载执行任务所需要的文件。
接着下面有一个switch语句,根据TaskStatusFilter filter = getTaskOutputFilter(job);获取到的任务执行状态来决定该做什么操作,其中,TaskStatusFilter是一个枚举类型:
public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL }
它定义了5种状态。
然后根据当前Job的执行状态来进行之后执行动作的转换,大致如下:
1、NONE 没有任务在执行,直接退出;
2、SUCCEEDED 任务执行成功,登录任务详情日志,最后退出;
3、FAILED 任务执行失败,对失败的任务运行状况进行诊断,将诊断信息写入日志,最后退出;
4、KILLED 任务中途终止,登录日志,最后退出;
5、ALL 任何情况下都可以用此状态来登录任务日志。
一个Job只有完成配置和调度之后,才能变为一个RunningJob running,runJob()方法中处理完成后返回的就是这个执行的Job。RunningJob是一个接口类,用来查询一个运行中的Map-ReduceJob的详细信息:
package org.apache.hadoop.mapred; import java.io.*; // 获取到一个提交的Job的配置的路径(path) // 获取显示追踪到的某个Job执行信息的URL /** /** // 检查一个Job是否完成 // 检查一个Job是否成功地完成了分配的任务 // 阻塞,直到Job完成 /**
public interface RunningJob {
// 获取Job的ID
public String getJobID();
// 获取Job的name
public String getJobName();
public String getJobFile();
public String getTrackingURL();
* Get the <i>progress</i> of the job's map-tasks, as a float between 0.0
* and 1.0. When all map tasks have completed, the function returns 1.0.
*/
public float mapProgress() throws IOException;
* Get the <i>progress</i> of the job's reduce-tasks, as a float between 0.0
* and 1.0. When all reduce tasks have completed, the function returns 1.0.
*/
public float reduceProgress() throws IOException;
public boolean isComplete() throws IOException;
public boolean isSuccessful() throws IOException;
public void waitForCompletion() throws IOException;
* Kill the running job. Blocks until all job tasks have been
* killed as well. If the job is no longer running, it simply returns.
*/
public void killJob() throws IOException;
/**
* Get events indicating completion (success/failure) of component tasks.
*/
public TaskCompletionEvent[] getTaskCompletionEvents(int startFrom)
throws IOException;
/**
* Kill indicated task attempt.
*/
public void killTask(String taskId, boolean shouldFail) throws IOException;
// 获取Job的统计计数
public Counters getCounters() throws IOException;
}
实现该接口的只有一个实现类,是org.apache.hadoop.mapred.JobClient.NetworkedJob,如下所示:
/** /** /** public String getJobID() { public String getJobFile() { /** /** /** /** /** /** /** /** @Override
* A NetworkedJob is an implementation of RunningJob. It holds
* a JobProfile object to provide some info, and interacts with the
* remote service to provide certain functionality.
*/
class NetworkedJob implements RunningJob {
JobProfile profile;
JobStatus status;
long statustime;
* We store a JobProfile and a timestamp for when we last
* acquired the job profile. If the job is null, then we cannot
* perform any of the tasks. The job might be null if the JobTracker
* has completely forgotten about the job. (eg, 24 hours after the
* job completes.)
*/
public NetworkedJob(JobStatus job) throws IOException {
this.status = job;
this.profile = jobSubmitClient.getJobProfile(job.getJobId());
this.statustime = System.currentTimeMillis();
}
* Some methods rely on having a recent job profile object. Refresh
* it, if necessary
*/
synchronized void ensureFreshStatus() throws IOException {
if (System.currentTimeMillis() - statustime > MAX_JOBPROFILE_AGE) {
this.status = jobSubmitClient.getJobStatus(profile.getJobId());
this.statustime = System.currentTimeMillis();
}
}
return profile.getJobId();
}
public String getJobName() {
return profile.getJobName();
}
return profile.getJobFile();
}
* A URL where the job's status can be seen
*/
public String getTrackingURL() {
return profile.getURL().toString();
}
* A float between 0.0 and 1.0, indicating the % of map work
* completed.
*/
public float mapProgress() throws IOException {
ensureFreshStatus();
return status.mapProgress();
}
* A float between 0.0 and 1.0, indicating the % of reduce work
* completed.
*/
public float reduceProgress() throws IOException {
ensureFreshStatus();
return status.reduceProgress();
}
* Returns immediately whether the whole job is done yet or not.
*/
public synchronized boolean isComplete() throws IOException {
ensureFreshStatus();
return (status.getRunState() == JobStatus.SUCCEEDED ||
status.getRunState() == JobStatus.FAILED);
}
* True iff job completed successfully.
*/
public synchronized boolean isSuccessful() throws IOException {
ensureFreshStatus();
return status.getRunState() == JobStatus.SUCCEEDED;
}
* Blocks until the job is finished
*/
public void waitForCompletion() throws IOException {
while (!isComplete()) {
try {
Thread.sleep(5000);
} catch (InterruptedException ie) {
}
}
}
* Tells the service to terminate the current job.
*/
public synchronized void killJob() throws IOException {
jobSubmitClient.killJob(getJobID());
}
/**
* Kill indicated task attempt.
*/
public synchronized void killTask(String taskId, boolean shouldFail) throws IOException {
jobSubmitClient.killTask(taskId, shouldFail);
}
* Fetch task completion events from jobtracker for this job.
*/
public synchronized TaskCompletionEvent[] getTaskCompletionEvents(
int startFrom) throws IOException{
return jobSubmitClient.getTaskCompletionEvents(
getJobID(), startFrom, 10);
}
public String toString() {
try {
ensureFreshStatus();
} catch (IOException e) {
}
return "Job: " + profile.getJobId() + "\n" +
"file: " + profile.getJobFile() + "\n" +
"tracking URL: " + profile.getURL() + "\n" +
"map() completion: " + status.mapProgress() + "\n" +
"reduce() completion: " + status.reduceProgress();
}
// 返回Job的统计计数
public Counters getCounters() throws IOException {
return jobSubmitClient.getJobCounters(getJobID());
}
}
在运行一个Job的时候,如果执行失败,则允许MAX_RETRIES=5,即可以最多尝试着执行5次,否则,表明执行该Job的结点已经无法完成该Job,集群中的Master会检测到该结点状态,从而放弃由该节点执行该Job,重新进行调度分配,将该Job的执行分配给其它空闲的结点来执行。