摘要:Hadoop之MapReduce程序包括三个部分:Mapper,Reducer和作业执行。本文介绍和分析MapReduce程序三部分结构。
关键词:MapReduce Mapper Reducer 作业执行
MapReduce程序包括三个部分,分别是Mapper,Reducer和作业执行。
--------------------------------------分割线 --------------------------------------
Ubuntu 12.10 +Hadoop 1.2.1版本集群配置
--------------------------------------分割线 --------------------------------------
Mapper
一个类要充当Mapper需要继承MapReduceBase并实现Mapper接口。
Mapper接口负责数据处理阶段。它采用形式为Mapper<K1,V1,K2,V2>的Java泛型。这里的键类和值类分别实现了WritableComparable接口和Writable接口。Mapper接口只有一个map()方法,用于处理一个单独的键值对。map()方法形式如下。
public void map(K1 key, V1 value, OutputCollector<K2,V2> output ,Reporter reporter ) throws IOException
或者
public void map(K1 key, V1 value, Context context) throws IOException, InterruptedException
该函数处理一个给定的键/值对(K1, V1),生成一个键/值对(K2, V2)的列表(该列表也可能为空)。
Hadoop提供的一些有用的Mapper实现,包括IdentityMapper,InverseMapper,RegexMapper和TokenCountMapper等。
Reducer
一个类要充当Reducer需要继承MapReduceBase并实现Reducer接口。
Reduce接口有一个reduce()方法,其形式如下。
public void reduce(K2 key , Iterator<V2> value, OutputCollector<K3, V3> output, Reporter reporter) throws IOException
或者
public void reduce(K2 key, Iterator<V2> value, Context context) throws IOException, InterruptedException
当Reducer任务接受来自各个Mapper的输出时,它根据键/值对中的键对输入数据进行排序,并且把具有相同键的值进行归并,然后调用reduce()函数,通过迭代处理那些与指定键相关联的值,生成一个列表<K3, V3>(可能为空)。
Hadoop提供一些有用Reducer实现,包括IdentityReducer和LongSumReducer等。
作业执行
在run()方法中,通过传递一个配置好的作业给JobClient.runJob()以启动MapReduce作业。run()方法里,需要为每个作业定制基本参数,包括输入路径、输出路径、Mapper类和Reducer类。
一个典型的MapReduce程序基本模型如下。
public class MyJob extends Configured implements Tool {
/* mapreduce程序中Mapper*/
public static class MapClass extends MapReduceBase implements Mapper<Text,Text,Text,Text> {
public void map(Text key, Text value,
OutputCollector<Text,Text> output,
Reporter reporter) throws IOException {
//添加Mapper内处理代码
}
}
/*MapReduce程序中Reducer*/
public static class Reduce extends MapReduceBase
implements Reducer<Text,Text,Text,Text> {
public void reduce<Text key,Iterator<Text> values,
OutputCollector<Text,Text>output,Reporter reporter)
throws IOException {
//添加Reducer内处理代码
}
}
/*MapReduce程序中作业执行*/
public int run(String[] args) throws Exception {
//添加作业执行代码
return 0;
}
}