使用Hadoop构建MapReduce应用(2)

FileInputFormat:我们定义一个FileInputFormat去读取指定目录下(作为第一个参数传递给MapReduce应用程序)的所有文件,并传递这些文件给一个TextInputFormat(见Listing 1)以便分发给我们的mappers。

TextInputFormat:Hadoop默认的InputFormat是TextInputFormat,它每次读取一行,并把字节偏移作为key(LongWritable),将这行文本作为value(Text),并返回key。

Word Count Mapper:这是一个我们写的类用来把InputFormat传给它的单行文本标记化成单词,然后把单词本身和一个用于表示我们见过这个词的数字“1”绑在一起

Combiner:在开发环境中我们不需要combiner,但是combiner(或combiner的功能)是由reducer(在本文后面会有描述)实现的,在传递(键/值)对(key/value pair)到reducer之前运行在本地节点上。应用combiner能够急剧地提示性能,但是你需要确保combining你的结果不会破坏你的reducer:为了能让reducer承担combiner的功能,它的操作必须是可结合的(即reducer应与combiner一样能与map结合),否则,发送到reducer的map将不会产生正确的结果。

Word Count Reducer: word count  reducer接受一个映射(map),它映射每个单词到记录该单词所有被mapper观察到的次数的列表。没有combiner,reducer将会接受一个单词和一个全为”1”的集合,但是由于我们让reducer承担combiner的功能,我们接受到得将是一个各个待被相加到一起的数字的集合。

TextOutputFormat:本例中,我们使用TextOutputFormat类,并告诉它key为Text类型,value为IntWritable类型。

FileOutputFormat:TextOutputFormat发送它的格式化输出到FileOutputFormat,后者将结果写入到自己创建的”output”目录中。

你或许会疑惑为什么我们把String叫做“Text”,把number叫做“IntWritable”和“LongWritable”。原因是为了能够让value采用分布式的方式在Hadoop文件系统(HDFS)传递,存在一些定义序列化的规则。幸运的是,Hadoop为普通类型提供了包装(wrapper),但是如果你需要自己开发,那么它提供了Writable接口,你可以通过实现该接口来实现自己的需要。

Listing 1 显示了我们的第一个MapReduce应用程序的源代码。

package com.geekcap.hadoopexamples; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.KeyValueTextInputFormat; import org.apache.hadoop.mapred.TextOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import java.io.IOException; import java.util.Iterator; import java.util.StringTokenizer; /** * Created by IntelliJ IDEA. * User: shaines * Date: 12/9/12 * Time: 9:25 PM * To change this template use File | Settings | File Templates. */ public class WordCount extends Configured implements Tool { public static class MapClass extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { private Text word = new Text(); private final static IntWritable one = new IntWritable( 1 ); public void map( LongWritable key, // Offset into the file Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { // Get the value as a String String text = value.toString().toLowerCase(); // Replace all non-characters text = text.replaceAll( "'", "" ); text = text.replaceAll( "[^a-zA-Z]", " " ); // Iterate over all of the words in the string StringTokenizer st = new StringTokenizer( text ); while( st.hasMoreTokens() ) { // Get the next token and set it as the text for our "word" variable word.set( st.nextToken() ); // Output this word as the key and 1 as the value output.collect( word, one ); } } } public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { public void reduce( Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { // Iterate over all of the values (counts of occurrences of this word) int count = 0; while( values.hasNext() ) { // Add the value to our count count += values.next().get(); } // Output the word with its count (wrapped in an IntWritable) output.collect( key, new IntWritable( count ) ); } } public int run(String[] args) throws Exception { // Create a configuration Configuration conf = getConf(); // Create a job from the default configuration that will use the WordCount class JobConf job = new JobConf( conf, WordCount.class ); // Define our input path as the first command line argument and our output path as the second Path in = new Path( args[0] ); Path out = new Path( args[1] ); // Create File Input/Output formats for these paths (in the job) FileInputFormat.setInputPaths( job, in ); FileOutputFormat.setOutputPath( job, out ); // Configure the job: name, mapper, reducer, and combiner job.setJobName( "WordCount" ); job.setMapperClass( MapClass.class ); job.setReducerClass( Reduce.class ); job.setCombinerClass( Reduce.class ); // Configure the output job.setOutputFormat( TextOutputFormat.class ); job.setOutputKeyClass( Text.class ); job.setOutputValueClass( IntWritable.class ); // Run the job JobClient.runJob(job); return 0; } public static void main(String[] args) throws Exception { // Start the WordCount MapReduce application int res = ToolRunner.run( new Configuration(), new WordCount(), args ); System.exit( res ); } }

注解

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:http://www.heiqu.com/b67be966757511c033c672f6985cd40d.html