用 Linux 和 Apache Hadoop 进行云计算(5)

创建 MapReduce 应用程序

MapReduce 应用程序必须具备 “映射” 和 “缩减” 的性质,也就是说任务或作业可以分割为小片段以进行并行处理。然后,可以缩减每个子任务的结果,得到原任务的结果。这种任务之一是网站关键字搜索。搜索和抓取任务可以分割为子任务并分配给从节点,然后在主节点上聚合所有结果并得到最终结果。

试用示例应用程序

Hadoop 附带一些用于测试的示例应用程序。其中之一是单词计数器,它统计某一单词在几个文件中出现的次数。通过运行这个应用程序检查 Hadoop 集群。

首先,把输入文件放在分布式文件系统中(conf/ 目录下面)。我们将统计单词在这些文件中出现的次数。

$ bin/hadoop fs –put conf input  

然后,运行这个示例应用程序,以下命令统计以 “dfs” 开头的单词出现的次数。

$ bin/hadoop jar hadoop-*-examples.jar grep input output 'dfs[a-z.]+'  

命令的输出说明映射和缩减过程。

前两个命令会在 HDFS 中生成两个目录,“input” 和 “output”。可以使用以下命令列出它们。

$ bin/hadoop fs –ls  

查看分布式文件系统中已经输出的文件。它以键-值对的形式列出以 “dfs*” 开头的单词出现的次数。

$ bin/hadoop fs -cat ouput/*  

现在,访问 JobTracker 站点查看完成的作业日志。

创建 Log Analyzer MapReduce 应用程序

现在创建一个 Portal (IBM WebSphere? Portal v6.0) Log Analyzer 应用程序,它与 Hadoop 中的 WordCount 应用程序有许多共同点。这个分析程序搜索所有 Portal 的 SystemOut*.log 文件,显示在特定的时间段内应用程序在 Portal 上启动了多少次。

在 Portal 环境中,所有日志分割为 5MB 的片段,很适合由几个节点并行地分析。


hadoop.sample.PortalLogAnalyzer.Java
public class PortalLogAnalyzer { public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { private static String APP_START_TOKEN = "Application started:"; private Text application = new Text(); public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { String line = value.toString(); if(line.indexOf(APP_START_TOKEN) > -1) { int startIndex = line.indexOf(APP_START_TOKEN); startIndex += APP_START_TOKEN.length(); String appName = line.substring(startIndex).trim(); application.set(appName); output.collect(application, new IntWritable(1)); } } } public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { int sum = 0; while(values.hasNext()) { sum += values.next().get(); } output.collect(key, new IntWritable(sum)); } } public static void main(String[] args) throws IOException { JobConf jobConf = new JobConf(PortalLogAnalyzer.class); jobConf.setJobName("Portal Log Analizer"); jobConf.setOutputKeyClass(Text.class); jobConf.setOutputValueClass(IntWritable.class); jobConf.setMapperClass(Map.class); jobConf.setCombinerClass(Reduce.class); jobConf.setReducerClass(Reduce.class); jobConf.setInputFormat(TextInputFormat.class); jobConf.setOutputFormat(TextOutputFormat.class); FileInputFormat.setInputPaths(jobConf, new Path(args[0])); FileOutputFormat.setOutputPath(jobConf, new Path(args[1])); JobClient.runJob(jobConf); } }  

对 Hadoop API 的完整解释请参见 Hadoop 网站上的 API 文档。这里只做简要说明。

Map 类实现映射功能,它搜索日志文件的每一行,寻找应用程序的名称。然后把应用程序名称以键-值对的形式放在输出集合中。

Reduce 类计算具有相同键(相同应用程序名称)的所有值的总和。因此,这个应用程序最终输出的键-值对表示每个应用程序在 Portal 上启动的次数。

Main 函数配置并运行 MapReduce 作业。

运行 PortalLogAnalyzer

首先,把这些 Java 代码复制到主节点并编译。把 Java 代码复制到 <hadoop_home>/workspace 目录中。对它执行编译并存档在一个 Jar 文件中,后面 hadoop 命令将运行这个文件。

$ mkdir classes $ javac –cp ../hadoop-0.19.1-core.jar –d classes hadoop/sample/PortalLogAnalyzer.java $ jar –cvf PortalLogAnalyzer.jar –C classes/ .  

把 Portal 日志复制到 workspace/input 中。假设有多个日志文件,其中包含 2009 年 5 月的所有日志。把这些日志放到 HDFS 中。

$ bin/hadoop fs –put workspace/input input2  

在运行 PortalLogAnalyzer 时,输出说明映射和缩减过程。

$ bin/hadoop jar workspace/PortalLogAnalizer.jar hadoop.sample.PortalLogAnalizer input2 output2  


图 3. 任务的输出

$ bin/hadoop jar workspace/PortalLogAnalizer.jar hadoop.sample.PortalLogAnalizer input2 output2


应用程序执行完之后,输出应该与图 4 相似。

$ bin/hadoop fs –cat output2/*  


图 4. 部分输出

$ bin/hadoop fs –cat output2/*


在访问 JobTracker 站点时,会看到另一个完成的作业。注意图 5 中的最后一行。


图 5. 完成的作业

另一个完成的作业

 

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wyjxps.html