用 Hadoop 进行分布式并行编程, 第 3 部分部署到分(4)

五 使用 IBM MapReduce Tools 部署分布式程序

第二篇文章中,已经介绍了 IBM MapReduce Tools 的基本功能和用法。现在我们重点介绍如何使用 IBM MapReduce Tools 将 MapReduce 程序远程部署到 Hadoop 分布式环境中去运行。

假定我们还是使用上一节部署完成的分布式环境,然后是在另一台机器上使用 Eclipse 开发 MapReduce 程序。

1. 定义 Hadoop server 的位置

首先请确保你的 Eclipse 已经安装了 IBM MapReduce Tools 这个插件。启动 Eclipse, 选择 Window -> Open Perspective ->other, 再从弹出框中选择 MapReduce, 这样 Eclipse 会进入专门的 MapReduce 视图 ( perspective )。

随后,请检查你的 MapReduce perspective中是否有一个专门的 MapReduce Servers view, 如果没有,请选择 Window -> Show View ->other, 再从弹出框中选择 MapReduce Tools 类别下面的 MapReduce Servers, 打开这个 view.

然后,请点击 MapReduce Servers view 右上角的蓝色图标,就会出现如图一所示的设置 Hadoop Server 的位置的界面。此处所说的 Hadoop server,具体到本文,就是 homer06 这台机器。在输入各项参数之后,请点击 ”Validate location” 按钮,检查是否能够正确的找到并连接上你的 Hadoop server. 如果出错,请尝试在命令行下执行命令:ssh the_hostname_of_your_hadoop_server, (或使用图形界面的 SSH 远程登录软件), 确保 ssh 能够连接成功。


图一 定义 Hadoop server 的位置

图一


2. 创立一个 MapReduce Project

在 Eclipse 中新创建一个 MapReduce Project, 将我们在第二篇文章中定义的 WordCount 类加到此 Project 中。这个类需要略作修改才能直接远程部署到我们已经搭建好的分布式环境中去运行,因为我们原来在 WordCount 程序中是通过读取命令行参数获得计算任务的输入路径和输出路径,而当前版本的 IBM MapReduce Tools 不支持远程部署时读取命令行参数。为测试的简便起见,我在程序中直接将输入路径定义为 input, 输出路径定义为 output。在测试 WordCount 程序之前,需要事先将需要做词频统计的一批文件拷贝到分布式文件系统的 input 目录下去。

完整的 WordCount 类的代码如代码清单 7 所示:


代码清单7
//import 语句省略 public class WordCount extends Configured implements Tool { public static class MapClass extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); private String pattern="[^\\w]"; public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { String line = value.toString().toLowerCase(); line = line.replaceAll(pattern, " "); StringTokenizer itr = new StringTokenizer(line); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); output.collect(word, one); } } } public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { int sum = 0; while (values.hasNext()) { sum += values.next().get(); } output.collect(key, new IntWritable(sum)); } } public int run(String[] args) throws Exception { Path tempDir = new Path("wordcount-temp-" + Integer.toString(new Random().nextInt(Integer.MAX_VALUE))); JobConf conf = new JobConf(getConf(), WordCount.class); try { conf.setJobName("wordcount"); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); conf.setMapperClass(MapClass.class); conf.setCombinerClass(Reduce.class); conf.setReducerClass(Reduce.class); conf.setInputPath(new Path(args[0])); conf.setOutputPath(tempDir); conf.setOutputFormat(SequenceFileOutputFormat.class); JobClient.runJob(conf); JobConf sortJob = new JobConf(getConf(), WordCount.class); sortJob.setJobName("sort"); sortJob.setInputPath(tempDir); sortJob.setInputFormat(SequenceFileInputFormat.class); sortJob.setMapperClass(InverseMapper.class); sortJob.setNumReduceTasks(1); sortJob.setOutputPath(new Path(args[1])); sortJob.setOutputKeyClass(IntWritable.class); sortJob.setOutputValueClass(Text.class); sortJob.setOutputKeyComparatorClass(IntWritableDecreasingComparator.class); JobClient.runJob(sortJob); } finally { FileSystem.get(conf).delete(tempDir); } return 0; } private static class IntWritableDecreasingComparator extends IntWritable.Comparator { public int compare(WritableComparable a, WritableComparable b) { return -super.compare(a, b); } public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { return -super.compare(b1, s1, l1, b2, s2, l2); } } public static void main(String[] args) throws Exception { String[] paths = {"input" , "output"}; int res = ToolRunner.run(new Configuration(), new WordCount(), paths); System.exit(res); } }  

3. 远程部署与运行

在左侧的 Project Explorer 中选中 WordCount 类,在右键弹出菜单中选择 Run As->Run on hadoop, 如图二 所示:


图二

图二


然后在 “select hadoop server” 弹出框中选择我们已经定义好的 Hadoop server, 点击 Finish 之后,MapReduce Tool 会自动将 WordCount project打包成一个 jar 并拷到远程 Hadoop server 上运行起来, 整个运行过程的输出在 Eclipse 的 console 中即可看到,非常方便。

4. 查看运行结果

当定义好 Hadoop server 的位置之后,在左侧的 Project Explorer 会出现一个新的 project( 项目名前面有一个蓝色的小象图标), , 通过这个 project 可以浏览 Hadoop 分布式文件系统中的文件。双击 output 目录下的 part-0000 文件,我们就可以直接在 Eclipse 中查看 WordCount 程序的输出结果,如图三所示:


图三

图三

 

linux

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:http://www.heiqu.com/ppyfj.html