分析Hadoop自带WordCount例子的执行过程(5)

再看一下Reduce的实现:

/**
   * Reduce是一个内部静态类。作为统计单词数量的中间结果类,由于这个例子简单无须执行中间结果的合并。
   */
public static class Reduce extends MapReduceBase
    implements Reducer<Text, IntWritable, Text, IntWritable> {
   
    public void reduce(Text key, Iterator<IntWritable> values,
                       OutputCollector<Text, IntWritable> output,
                       Reporter reporter) throws IOException {
      int sum = 0;
      while (values.hasNext()) {
        sum += values.next().get();
      }
      output.collect(key, new IntWritable(sum));
    }
}
 

reduce方法的第二个参数为Iterator<IntWritable> values,是一个迭代器类型,即是多个value的迭代器,通过这个迭代器可以得到多个value。又由于第一个参数指定了key,那么这个迭代器就是与这个key相关的了,即每个value都是key的value,如果统计词频,只要将多个value进行求和运算即可。

最后同样要输出了,这次是输出到文件系统中的指定文件中了,因为是最终结果。

最后一个就是Map/Reduce的核心驱动部分了,目的就是要让WordCount这个工具正常地运行起来,看run方法的实现:

/**
   * map/reduce程序的驱动部分,用于实现提交map/reduce任务。
   */
public int run(String[] args) throws Exception {
    JobConf conf = new JobConf(getConf(), WordCount.class);
    conf.setJobName("wordcount");

    // the keys are words (strings)
    conf.setOutputKeyClass(Text.class);
    // the values are counts (ints)
    conf.setOutputValueClass(IntWritable.class);
   
    conf.setMapperClass(MapClass.class);       
    conf.setCombinerClass(Reduce.class);
    conf.setReducerClass(Reduce.class);
   
    List<String> other_args = new ArrayList<String>();
    for(int i=0; i < args.length; ++i) {
      try {
        if ("-m".equals(args[i])) {
          conf.setNumMapTasks(Integer.parseInt(args[++i]));
        } else if ("-r".equals(args[i])) {
          conf.setNumReduceTasks(Integer.parseInt(args[++i]));
        } else {
          other_args.add(args[i]);
        }
      } catch (NumberFormatException except) {
        System.out.println("ERROR: Integer expected instead of " + args[i]);
        return printUsage();
      } catch (ArrayIndexOutOfBoundsException except) {
        System.out.println("ERROR: Required parameter missing from " +
                           args[i-1]);
        return printUsage();
      }
    }
    // Make sure there are exactly 2 parameters left.
    if (other_args.size() != 2) {
      System.out.println("ERROR: Wrong number of parameters: " +
                         other_args.size() + " instead of 2.");
      return printUsage();
    }
    conf.setInputPath(new Path(other_args.get(0)));
    conf.setOutputPath(new Path(other_args.get(1)));
       
    JobClient.runJob(conf);
    return 0;
}

 

在run()方法中,值得注意的是JobConf这个类,它是一个任务配置类。它是Configuration的子类,因为在继承了Configuration的关于Hadoop的基本配置以外,还有自己的一些针对任务的相关配置。

JobConf类应该是相当重要的。我们主要围绕在WordCount这个工具中使用到的一些方法进行了解。

首先要实例化一个JobConf类的对象:

JobConf conf = new JobConf(getConf(), WordCount.class);  

通过这个初始化代码行来看一下JobConf类的构造方法:

public JobConf(Configuration conf, Class exampleClass) {
    this(conf);  
    setJarByClass(exampleClass);
}
 

首先, 调用该类的具有一个Configuration类型参数的构造方法,其实就是继承自Configuration类,如下所示:

public JobConf(Configuration conf) {
    super(conf);
}
 

然后,调用setJarByClass()方法,根据指定的类名称来设置当前运行任务的任务配置包含的Jar文件,方法如下所示:

public void setJarByClass(Class cls) {
    String jar = findContainingJar(cls);
    if (jar != null) {
      setJar(jar);
    }  
}
 

这里首先要查找包含的Jar文件(返回的是Jar文件的字符串描述),如果不空再调用 setJar(jar);为任务配置进行设置。

看一下如何进行查找的,在findContainingJar()方法中有实现过程:

private static String findContainingJar(Class my_class) {
    ClassLoader loader = my_class.getClassLoader();   // 获取到指定类Class my_class的类加载器
    String class_file = my_class.getName().replaceAll("\\.", "/") + ".class"; // 获取到类文件
    try {
      for(Enumeration itr = loader.getResources(class_file);
          itr.hasMoreElements();) {
        URL url = (URL) itr.nextElement();
        if ("jar".equals(url.getProtocol())) { // 迭代出的URL是否支持jar协议
          String toReturn = url.getPath(); // 获取这个URL的path
          if (toReturn.startsWith("file:")) {
            toReturn = toReturn.substring("file:".length()); //   提取path中“file:”字符串后面的文件名字符串
          }
          toReturn = URLDecoder.decode(toReturn, "UTF-8"); // 解码
          return toReturn.replaceAll("!.*$", ""); // 格式化:去掉文件名称中的"!.*$"
        }
      }
    } catch (IOException e) {
      throw new RuntimeException(e);
    }
    return null;
}
 

查找获得了一个jar字符串,然后setJar(jar) ,方法如下:

/**
   * Set the user jar for the map-reduce job.
   *
   * @param jar the user jar for the map-reduce job.
   */
public void setJar(String jar) { set("mapred.jar", jar); }
 

上面set("mapred.jar", jar);方法是继承自Configuration类的方法,如下所示:

/**
   * Set the <code>value</code> of the <code>name</code> property.
   *
   * @param name property name.
   * @param value property value.
   */
public void set(String name, String value) {
    getOverlay().setProperty(name, value);
    getProps().setProperty(name, value);
}
 

上面把set("mapred.jar", jar); 设置到Properties变量中了,而properties和overlay都是Configuration类的成员:

private Properties properties;
private Properties overlay;
 

到这里,JobConf conf已经进行了基本的任务配置,加载类设置。

接着就要继续更详细的配置了。

设置任务名称:

conf.setJobName("wordcount");  

setJobName()方法实现:

public void setJobName(String name) {
    set("mapred.job.name", name);
}
 

即,任务名称为wordcount。

设置任务的key的Class:

    // the keys are words (strings)
    conf.setOutputKeyClass(Text.class);
 

对应于:

  public void setOutputKeyClass(Class<? extends WritableComparable> theClass) {
    setClass("mapred.output.key.class", theClass, WritableComparable.class);
}
 

设置任务的value的Class:

    // the values are counts (ints)
    conf.setOutputValueClass(IntWritable.class);
 

对应于:

public void setOutputValueClass(Class<? extends Writable> theClass) {
    setClass("mapred.output.value.class", theClass, Writable.class);
}
 

设置MapperClass、CombinerClass、ReducerClass的Class:

    conf.setMapperClass(MapClass.class);       
    conf.setCombinerClass(Reduce.class);
    conf.setReducerClass(Reduce.class);
 

其中,CombinerClass就是使用的ReducerClass,就在ReducerClass中完成一次完成合并简化操作。

接着往下看,List<String> other_args是接收输入的一些特定参数,这里是指设置Map任务和Reduce任务的数量,即-m和-r参数,通过一个循环判断是否指定了这些参数,如果指定了,要分别将其设置到任务的配置中去,以便任务启动之时能够按照我们定制的方式进行执行。

        if ("-m".equals(args[i])) {
          conf.setNumMapTasks(Integer.parseInt(args[++i]));
        } else if ("-r".equals(args[i])) {
          conf.setNumReduceTasks(Integer.parseInt(args[++i]));
        } else {
          other_args.add(args[i]);
        }
 

一个是设置Map任务数量:

public void setNumMapTasks(int n) { setInt("mapred.map.tasks", n); }

 

另一个是设置Reduce任务数量:

public void setNumReduceTasks(int n) { setInt("mapred.reduce.tasks", n); }

 

使用命令行,要指定任务输入目录和输出目录:

conf.setInputPath(new Path(other_args.get(0)));
    conf.setOutputPath(new Path(other_args.get(1)));

 

设置输入目录的方法调用:

public void setInputPath(Path dir) {
    dir = new Path(getWorkingDirectory(), dir);
    set("mapred.input.dir", dir.toString());
}
 

设置任务输出目录的方法调用:

   public void setOutputPath(Path dir) {
    dir = new Path(getWorkingDirectory(), dir);
    set("mapred.output.dir", dir.toString());
}
 

它们都调用了一个得到当前工作目录的绝对路径的方法getWorkingDirectory(),如下所示:

/**
   * Get the current working directory for the default file system.
   *
   * @return the directory name.
   */
public Path getWorkingDirectory() {
    String name = get("mapred.working.dir");
    if (name != null) {
      return new Path(name);
    } else {
      try {
        Path dir = FileSystem.get(this).getWorkingDirectory();
        set("mapred.working.dir", dir.toString());
        return dir;
      } catch (IOException e) {
        throw new RuntimeException(e);
      }
    }
}
 

最后,任务配置完成后,进行启动:

JobClient.runJob(conf);  

这个启动过程可是非常复杂了,你可以通过JobClient类的runJob()方法看到。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:http://www.heiqu.com/pxpfw.html