再看一下Reduce的实现:
/**
* Reduce是一个内部静态类。作为统计单词数量的中间结果类,由于这个例子简单无须执行中间结果的合并。
*/
public static class Reduce extends MapReduceBase
implements Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, IntWritable> output,
Reporter reporter) throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));
}
}
reduce方法的第二个参数为Iterator<IntWritable> values,是一个迭代器类型,即是多个value的迭代器,通过这个迭代器可以得到多个value。又由于第一个参数指定了key,那么这个迭代器就是与这个key相关的了,即每个value都是key的value,如果统计词频,只要将多个value进行求和运算即可。
最后同样要输出了,这次是输出到文件系统中的指定文件中了,因为是最终结果。
最后一个就是Map/Reduce的核心驱动部分了,目的就是要让WordCount这个工具正常地运行起来,看run方法的实现:
/**
* map/reduce程序的驱动部分,用于实现提交map/reduce任务。
*/
public int run(String[] args) throws Exception {
JobConf conf = new JobConf(getConf(), WordCount.class);
conf.setJobName("wordcount");
// the keys are words (strings)
conf.setOutputKeyClass(Text.class);
// the values are counts (ints)
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(MapClass.class);
conf.setCombinerClass(Reduce.class);
conf.setReducerClass(Reduce.class);
List<String> other_args = new ArrayList<String>();
for(int i=0; i < args.length; ++i) {
try {
if ("-m".equals(args[i])) {
conf.setNumMapTasks(Integer.parseInt(args[++i]));
} else if ("-r".equals(args[i])) {
conf.setNumReduceTasks(Integer.parseInt(args[++i]));
} else {
other_args.add(args[i]);
}
} catch (NumberFormatException except) {
System.out.println("ERROR: Integer expected instead of " + args[i]);
return printUsage();
} catch (ArrayIndexOutOfBoundsException except) {
System.out.println("ERROR: Required parameter missing from " +
args[i-1]);
return printUsage();
}
}
// Make sure there are exactly 2 parameters left.
if (other_args.size() != 2) {
System.out.println("ERROR: Wrong number of parameters: " +
other_args.size() + " instead of 2.");
return printUsage();
}
conf.setInputPath(new Path(other_args.get(0)));
conf.setOutputPath(new Path(other_args.get(1)));
JobClient.runJob(conf);
return 0;
}
在run()方法中,值得注意的是JobConf这个类,它是一个任务配置类。它是Configuration的子类,因为在继承了Configuration的关于Hadoop的基本配置以外,还有自己的一些针对任务的相关配置。
JobConf类应该是相当重要的。我们主要围绕在WordCount这个工具中使用到的一些方法进行了解。
首先要实例化一个JobConf类的对象:
JobConf conf = new JobConf(getConf(), WordCount.class);
通过这个初始化代码行来看一下JobConf类的构造方法:
public JobConf(Configuration conf, Class exampleClass) {
this(conf);
setJarByClass(exampleClass);
}
首先, 调用该类的具有一个Configuration类型参数的构造方法,其实就是继承自Configuration类,如下所示:
public JobConf(Configuration conf) {
super(conf);
}
然后,调用setJarByClass()方法,根据指定的类名称来设置当前运行任务的任务配置包含的Jar文件,方法如下所示:
public void setJarByClass(Class cls) {
String jar = findContainingJar(cls);
if (jar != null) {
setJar(jar);
}
}
这里首先要查找包含的Jar文件(返回的是Jar文件的字符串描述),如果不空再调用 setJar(jar);为任务配置进行设置。
看一下如何进行查找的,在findContainingJar()方法中有实现过程:
private static String findContainingJar(Class my_class) {
ClassLoader loader = my_class.getClassLoader(); // 获取到指定类Class my_class的类加载器
String class_file = my_class.getName().replaceAll("\\.", "/") + ".class"; // 获取到类文件
try {
for(Enumeration itr = loader.getResources(class_file);
itr.hasMoreElements();) {
URL url = (URL) itr.nextElement();
if ("jar".equals(url.getProtocol())) { // 迭代出的URL是否支持jar协议
String toReturn = url.getPath(); // 获取这个URL的path
if (toReturn.startsWith("file:")) {
toReturn = toReturn.substring("file:".length()); // 提取path中“file:”字符串后面的文件名字符串
}
toReturn = URLDecoder.decode(toReturn, "UTF-8"); // 解码
return toReturn.replaceAll("!.*$", ""); // 格式化:去掉文件名称中的"!.*$"
}
}
} catch (IOException e) {
throw new RuntimeException(e);
}
return null;
}
查找获得了一个jar字符串,然后setJar(jar) ,方法如下:
/**
* Set the user jar for the map-reduce job.
*
* @param jar the user jar for the map-reduce job.
*/
public void setJar(String jar) { set("mapred.jar", jar); }
上面set("mapred.jar", jar);方法是继承自Configuration类的方法,如下所示:
/**
* Set the <code>value</code> of the <code>name</code> property.
*
* @param name property name.
* @param value property value.
*/
public void set(String name, String value) {
getOverlay().setProperty(name, value);
getProps().setProperty(name, value);
}
上面把set("mapred.jar", jar); 设置到Properties变量中了,而properties和overlay都是Configuration类的成员:
private Properties properties;
private Properties overlay;
到这里,JobConf conf已经进行了基本的任务配置,加载类设置。
接着就要继续更详细的配置了。
设置任务名称:
conf.setJobName("wordcount");
setJobName()方法实现:
public void setJobName(String name) {
set("mapred.job.name", name);
}
即,任务名称为wordcount。
设置任务的key的Class:
// the keys are words (strings)
conf.setOutputKeyClass(Text.class);
对应于:
public void setOutputKeyClass(Class<? extends WritableComparable> theClass) {
setClass("mapred.output.key.class", theClass, WritableComparable.class);
}
设置任务的value的Class:
// the values are counts (ints)
conf.setOutputValueClass(IntWritable.class);
对应于:
public void setOutputValueClass(Class<? extends Writable> theClass) {
setClass("mapred.output.value.class", theClass, Writable.class);
}
设置MapperClass、CombinerClass、ReducerClass的Class:
conf.setMapperClass(MapClass.class);
conf.setCombinerClass(Reduce.class);
conf.setReducerClass(Reduce.class);
其中,CombinerClass就是使用的ReducerClass,就在ReducerClass中完成一次完成合并简化操作。
接着往下看,List<String> other_args是接收输入的一些特定参数,这里是指设置Map任务和Reduce任务的数量,即-m和-r参数,通过一个循环判断是否指定了这些参数,如果指定了,要分别将其设置到任务的配置中去,以便任务启动之时能够按照我们定制的方式进行执行。
if ("-m".equals(args[i])) {
conf.setNumMapTasks(Integer.parseInt(args[++i]));
} else if ("-r".equals(args[i])) {
conf.setNumReduceTasks(Integer.parseInt(args[++i]));
} else {
other_args.add(args[i]);
}
一个是设置Map任务数量:
public void setNumMapTasks(int n) { setInt("mapred.map.tasks", n); }
另一个是设置Reduce任务数量:
public void setNumReduceTasks(int n) { setInt("mapred.reduce.tasks", n); }
使用命令行,要指定任务输入目录和输出目录:
conf.setInputPath(new Path(other_args.get(0)));
conf.setOutputPath(new Path(other_args.get(1)));
设置输入目录的方法调用:
public void setInputPath(Path dir) {
dir = new Path(getWorkingDirectory(), dir);
set("mapred.input.dir", dir.toString());
}
设置任务输出目录的方法调用:
public void setOutputPath(Path dir) {
dir = new Path(getWorkingDirectory(), dir);
set("mapred.output.dir", dir.toString());
}
它们都调用了一个得到当前工作目录的绝对路径的方法getWorkingDirectory(),如下所示:
/**
* Get the current working directory for the default file system.
*
* @return the directory name.
*/
public Path getWorkingDirectory() {
String name = get("mapred.working.dir");
if (name != null) {
return new Path(name);
} else {
try {
Path dir = FileSystem.get(this).getWorkingDirectory();
set("mapred.working.dir", dir.toString());
return dir;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
最后,任务配置完成后,进行启动:
JobClient.runJob(conf);
这个启动过程可是非常复杂了,你可以通过JobClient类的runJob()方法看到。