... //预处理
ChainMapper.addMapper(...);
ChainReducer.setReducer(...);
ChainReducer.addMapper(...);
... //后处理
//addMapper()调用的方法形式如下:
public static void addMapper(Job job,
Class< extends Mapper> mclass,
Class< extends K1> inputKeyClass,
Class< extends V1> inputValueClass,
Class< extends K2> outputKeyClass,
Class< extends V2> outputValueClass,
Configuration conf
)
addMapper()方法有8个参数。第一个和最后一个分别为全局的Job和本地的configuration对象。第二个参数是 Mapper类,负责数据处理。余下4个参数 inputKeyClass、inputValueClass、outputKeyClass和outputValueClass是这个Mapper类中输入/输出类的类型。ChainReducer专门提供了一个setReducer()方法来设置整个作业唯一的Reducer,语法与addMapper()方法类似。
线性链式 MapReduce 的示例代码如下所示。
public void function throws IOException {
Configuration conf = new Configuration();
Job job = new Job(conf);
job.setJobName("chainjob");
job.setInputFormat(TextInputFormat.class);
job.setOutputFormat(TextOutputFormat.class);
FileInputFormat.addInputPath(job, in);
FileOutputFormat.setOutputPath(job, out);
//在作业中添加 Map1 阶段
Configuration map1conf = new Configuration(false);
ChainMapper.addMapper(job, Map1.class, LongWritable.class, Text.class,Text.class, Text.class, true, map1conf);
//在作业中添加 Map2 阶段
Configuration map2conf = new Configuration(false);
ChainMapper.addMapper(job, Map2.class, Text.class, Text.class,LongWritable.class, Text.class, true, map2conf);
//在作业中添加 Reduce 阶段
Configuration reduceconf = new Configuration(false);
ChainReducer.setReducer(job,Reduce.class,LongWritable.class,Text.class,Text.class,Text.class,true,reduceconf);
//在作业中添加 Map3 阶段
Configuration map3conf = new Configuration(false);
ChainReducer.addMapper(job,Map3.class,Text.class,Text.class,LongWritable.class,Text.class,true,map3conf);
//在作业中添加 Map4 阶段
Configuration map4conf = new Configuration(false);
ChainReducer.addMapper(job,Map4.class,LongWritable.class,Text.class,LongWritable.class,Text.class,true,map4conf);
job.waitForCompletion(true);
}
注意:对于任意一个MapReduce作业,Map和Reduce阶段可以有无限个Mapper,但是Reduce只能有一个。所以包含多个Reduce的作业,不能使用 ChainMapper/ChainReduce来完成。