在前一篇文章中(见 ),我从客户端详细的讲解了Hadoop对Map执行框架的实现,即客户端可以自己定义或实现给map操作输入怎样的key-value值、map操作、如何根据map输出的key进行排序、如何对map输出的key-value集合进行合并等。用户在提交作业之前对它进行这些进行相应的配置即可。那么,TaskTracker在真正执行作业的任务时是如何根据用户的设置信息来把Map任务相关操作组合成一个有机的整体从而完成作业的map操作呢?这就是本文所要讨论的重点。
任何一个Map任务都被Hadoop抽象成一个MapTask对象,因此,这个Map任务也就是在对应的MapTask中调度执行的了。老规矩,还是先来看看与Map任务真正在TaskTracker端执行相关的具体类吧!
MapTask会在runNewMapper()方法中构造与map任务相关的组件,然后调用map任务执行器Mapper的run()方法来正式的执行map操作,这个过程如下:
下面,我就结合Hadoop的源代码来集体的讲一讲上面的这个过程:
1.构造map任务执行器Mapper
map任务执行器定义了如何对作业的输入数据进行map操作,这个map任务执行器一般是由用户自定义的,但很明显它必须要继承自org.apache.hadoop.mapreduce.Mapper类,并重写它的map(K,V,Context)方法。用户在自定义了自己的map操作之后就可以调用Job的setMapperClass()方法来设置作业的map操作类型。因此,MapTask在构造该任务执行器时通过一条简单的语句就可实例化一个mapper类型的对象:
org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>mapper= (org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)ReflectionUtils.newInstance(taskContext.getMapperClass(), job);
2.构造记录读取器RecordReader
记录读取器RecordReader用来读取map任务的原始输入数据,并把这些数据构造成一个key-value结合,这些key-value就是任务执行器Mapper中map(K,V,Context)方法需要的K,V输入。记录读取器RecordReader的创建可能有一点点的复杂,它首先必须来自一个输入格式化类InputFormat的实现,同时InputFormat的实现必须要定义如何对作业的原始输入数据进行切分,每一个切分被保存到对应的InputSplit实现对象中,也就是说一个InputFormat的实现对应一个RecordReader实现和一个InputSplit实现,然后我们可以再对这个RecordReader进行进一步的包装。所以在Hadoop的0.2.2版本实现中MapTask用到的最顶层的记录读取器的实现是NewTractingRecordReader,这个RecordReader实现就是简单地对最原始的RecordReader(直接来自于InputFormat的实现)进行了包装,另外它还包含一个任务报告器,因为可以根据输入数据的读取量来估计当前任务的执行进度。看一下它的代码实现:
//获取用户的InputFormat实现的实例
org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>inputFormat= (org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>) ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);
//获取用户的InputSplit实现的实例,同时读入map任务对应的输入数据的位置等信息
org.apache.hadoop.mapreduce.InputSplit split = null;
DataInputBuffer splitBuffer = new DataInputBuffer();
splitBuffer.reset(rawSplit.getBytes(), 0, rawSplit.getLength());
SerializationFactory factory = new SerializationFactory(job);
Deserializer<? extends org.apache.hadoop.mapreduce.InputSplit> deserializer = (Deserializer<? extends org.apache.hadoop.mapreduce.InputSplit>) factory.getDeserializer(job.getClassByName(splitClass));
deserializer.open(splitBuffer);
split = deserializer.deserialize(null);
//创建记录读取器
org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input = new NewTrackingRecordReader<INKEY,INVALUE> (inputFormat.createRecordReader(split, taskContext), reporter);
3.构造记录写入器(map任务输出收集器)RecordWriter
在Hadoop的0.2.2版本实现中MapTask用到的map任务输出收集器有两个实现:NewOutputCollector和NewDirectOutputCollector,当作业中没有设置reduce操作的时候就采用NewDirectOutputCollector实现,否则就采用NewDirectOutputCollector实现;当时无论是NewOutputCollector还是NewDirectOutputCollector,它们都包含一个任务报告器,这是因为可以通过map输出数据的收集情况来估计任务的执行进度。至于这个任务输出收集器在其内部到底干了些什么事,如排序、合并等,我将会后面的博文中讨论。
4.组装Map上下文执行环境Context
组装Map上下文执行环境Context很简单,将是要将与map相关的主要组件记录读取器RecordReader、记录写入器RecordWriter、任务报告器等放入Context对象中。
5.初始化记录读取器RecordReade
初始化RecordReade实际上就是对它做一些初始化工作,如:获取输入数据文件的物理位置、申请数据缓存的内存空间、打开输入文件等。
6.运行map任务执行器Maper
就是调用Maper用户实现的run()方法,该方法也很简单,代码如下:
public void run(Context context) throws IOException, InterruptedException {
setup(context);
while (context.nextKeyValue()) {
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
cleanup(context);
}