用 Hadoop 进行分布式并行编程, 第 2 部分程序实例

简介: Hadoop 是一个实现了 MapReduce 计算模型的开源分布式并行编程框架,借助于 Hadoop, 程序员可以轻松地编写分布式并行程序,将其运行于计算机集群上,完成海量数据的计算。在本文中,详细介绍了如何针对一个具体的并行计算任务,基于 Hadoop 编写程序,如何使用 IBM MapReduce Tools 在 Eclipse 环境中编译并运行 Hadoop 程序。

本文相关附件(改进的 wordcount 程序 wordcount.zip 与 IBM MapReduce Tools mapreduce_plugin.zip )下载

免费下载地址在

用户名与密码都是

具体下载目录在 /pub/2011/12/03/用 Hadoop 进行分布式并行编程/

前言

在上一篇文章:“用 Hadoop 进行分布式并行编程 第一部分 基本概念与安装部署”中(见  ),介绍了 MapReduce 计算模型,分布式文件系统 HDFS,分布式并行计算等的基本原理, 并且详细介绍了如何安装 Hadoop,如何运行基于 Hadoop 的并行程序。在本文中,将针对一个具体的计算任务,介绍如何基于 Hadoop 编写并行程序,如何使用 IBM 开发的 Hadoop Eclipse plugin 在 Eclipse 环境中编译并运行程序。

分析 WordCount 程序

我们先来看看 Hadoop 自带的示例程序 WordCount,这个程序用于统计一批文本文件中单词出现的频率,完整的代码可在下载的 Hadoop 安装包中得到(在 src/examples 目录中)。

1.实现Map类

见代码清单1。这个类实现 Mapper 接口中的 map 方法,输入参数中的 value 是文本文件中的一行,利用 StringTokenizer 将这个字符串拆成单词,然后将输出结果 <单词,1> 写入到 org.apache.hadoop.mapred.OutputCollector 中。OutputCollector 由 Hadoop 框架提供, 负责收集 Mapper 和 Reducer 的输出数据,实现 map 函数和 reduce 函数时,只需要简单地将其输出的 <key,value> 对往 OutputCollector 中一丢即可,剩余的事框架自会帮你处理好。

代码中 LongWritable, IntWritable, Text 均是 Hadoop 中实现的用于封装 Java 数据类型的类,这些类都能够被串行化从而便于在分布式环境中进行数据交换,你可以将它们分别视为 long, int, String 的替代品。Reporter 则可用于报告整个应用的运行进度,本例中未使用。


代码清单1
public static class MapClass extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { String line = value.toString(); StringTokenizer itr = new StringTokenizer(line); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); output.collect(word, one); } } }  

2.实现 Reduce 类

见代码清单 2。这个类实现 Reducer 接口中的 reduce 方法, 输入参数中的 key, values 是由 Map 任务输出的中间结果,values 是一个 Iterator, 遍历这个 Iterator, 就可以得到属于同一个 key 的所有 value. 此处,key 是一个单词,value 是词频。只需要将所有的 value 相加,就可以得到这个单词的总的出现次数。


代码清单 2
public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { int sum = 0; while (values.hasNext()) { sum += values.next().get(); } output.collect(key, new IntWritable(sum)); } }  

3.运行 Job

在 Hadoop 中一次计算任务称之为一个 job, 可以通过一个 JobConf 对象设置如何运行这个 job。此处定义了输出的 key 的类型是 Text, value 的类型是 IntWritable, 指定使用代码清单1中实现的 MapClass 作为 Mapper 类, 使用代码清单2中实现的 Reduce 作为 Reducer 类和 Combiner 类, 任务的输入路径和输出路径由命令行参数指定,这样 job 运行时会处理输入路径下的所有文件,并将计算结果写到输出路径下。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:http://www.heiqu.com/ppyff.html