Hadoop1.1.2开发笔记详细记录(6)

人类学习的方式在很大程度上始于模仿,“古者包犠氏之王天下也……作结绳而为网罟,以佃以渔,盖取诸离”,古人从自然法则中求生存,逐步走出蒙昧,人法地,地法天,天法道,道法自然。(历代对本句训诂汗牛充栋,还不如本人的解释来得直接 ,顺便鄙视一下那些训诂专家,小题大做,愚不可及)

而本文要描述的是,先来模仿几个Hadoop的example,以增强hadoop编程的感悟能力

从下面几个example可以增强理解MapReduce的具体处理过程,包括输入输出的类型以及shuffle的功能

1 数据去重

 

public class Dedup { //map将输入中的value复制到输出数据的key上,并直接输出 public static class Map extends Mapper<Object,Text,Text,Text>{ private static Text line=new Text();//每行数据 //实现map函数 public void map(Object key,Text value,Context context) throws IOException,InterruptedException{ line=value; context.write(line, new Text("")); } } //reduce将输入中的key复制到输出数据的key上,并直接输出 public static class Reduce extends Reducer<Text,Text,Text,Text>{ //实现reduce函数 public void reduce(Text key,Iterable<Text> values,Context context) throws IOException,InterruptedException{ context.write(key, new Text("")); } } /** * @param args */ public static void main(String[] args) throws Exception { // TODO Auto-generated method stub Configuration conf = new Configuration(); String[] ioArgs=new String[]{"dedup_in","dedup_out"}; String[] otherArgs = new GenericOptionsParser(conf, ioArgs).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: Data Deduplication <in> <out>"); System.exit(2); } Job job = new Job(conf, "Data Deduplication"); job.setJarByClass(Dedup.class); //设置Map、Combine和Reduce处理类 job.setMapperClass(Map.class); job.setCombinerClass(Reduce.class); job.setReducerClass(Reduce.class); //设置输出类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); //设置输入和输出目录 FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }

 

2 数据排序

 

public class Sort { // map将输入中的value化成IntWritable类型,作为输出的key public static class Map extends Mapper<Object, Text, IntWritable, IntWritable> { private static IntWritable data = new IntWritable(); // 实现map函数 public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); data.set(Integer.parseInt(line)); context.write(data, new IntWritable(1)); } } // reduce将输入中的key复制到输出数据的key上, // 然后根据输入的value-list中元素的个数决定key的输出次数 // 用全局linenum来代表key的位次 public static class Reduce extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable> { private static IntWritable linenum = new IntWritable(1); // 实现reduce函数 public void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { for (IntWritable val : values) { context.write(linenum, key); linenum = new IntWritable(linenum.get() + 1); } } } /** * @param args * @throws Exception */ public static void main(String[] args) throws Exception { // TODO Auto-generated method stub Configuration conf = new Configuration(); String[] ioArgs = new String[] { "sort_in", "sort_out" }; String[] otherArgs = new GenericOptionsParser(conf, ioArgs) .getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: Data Sort <in> <out>"); System.exit(2); } Job job = new Job(conf, "Data Sort"); job.setJarByClass(Sort.class); // 设置Map和Reduce处理类 job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); // 设置输出类型 job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(IntWritable.class); // 设置输入和输出目录 FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }

 

3 平均成绩

 

public class Score { public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> { // 实现map函数 public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 将输入的纯文本文件的数据转化成String String line = value.toString(); // 将输入的数据首先按行进行分割 StringTokenizer tokenizerArticle = new StringTokenizer(line, "\n"); // 分别对每一行进行处理 while (tokenizerArticle.hasMoreElements()) { // 每行按空格划分 StringTokenizer tokenizerLine = new StringTokenizer( tokenizerArticle.nextToken()); String strName = tokenizerLine.nextToken();// 学生姓名部分 String strScore = tokenizerLine.nextToken();// 成绩部分 Text name = new Text(strName); int scoreInt = Integer.parseInt(strScore); // 输出姓名和成绩 context.write(name, new IntWritable(scoreInt)); } } } public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> { // 实现reduce函数 public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; int count = 0; Iterator<IntWritable> iterator = values.iterator(); while (iterator.hasNext()) { sum += iterator.next().get();// 计算总分 count++;// 统计总的科目数 } int average = (int) sum / count;// 计算平均成绩 context.write(key, new IntWritable(average)); } } /** * @param args * @throws Exception */ public static void main(String[] args) throws Exception { // TODO Auto-generated method stub Configuration conf = new Configuration(); String[] ioArgs = new String[] { "score_in", "score_out" }; String[] otherArgs = new GenericOptionsParser(conf, ioArgs).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: Score Average <in> <out>"); System.exit(2); } Job job = new Job(conf, "Score Average"); job.setJarByClass(Score.class); // 设置Map、Combine和Reduce处理类 job.setMapperClass(Map.class); job.setCombinerClass(Reduce.class); job.setReducerClass(Reduce.class); // 设置输出类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 将输入的数据集分割成小数据块splites,提供一个RecordReader的实现 job.setInputFormatClass(TextInputFormat.class); // 提供一个RecordWriter的实现,负责数据输出 job.setOutputFormatClass(TextOutputFormat.class); // 设置输入和输出目录 FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }

 

4 倒排索引

 

public class InvertedIndex { public static class Map extends Mapper<Object, Text, Text, Text> { private Text keyInfo = new Text(); // 存储单词和URL组合 private Text valueInfo = new Text(); // 存储词频 private FileSplit split; // 存储Split对象 // 实现map函数 public void map(Object key, Text value, Context context) throws IOException, InterruptedException { // 获得<key,value>对所属的FileSplit对象 split = (FileSplit) context.getInputSplit(); StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { // key值由单词和URL组成,如"MapReduce:file1.txt" // 获取文件的完整路径 // keyInfo.set(itr.nextToken()+":"+split.getPath().toString()); // 这里为了好看,只获取文件的名称。 int splitIndex = split.getPath().toString().indexOf("file"); keyInfo.set(itr.nextToken() + ":" + split.getPath().toString().substring(splitIndex)); // 词频初始化为1 valueInfo.set("1"); context.write(keyInfo, valueInfo); } } } public static class Combine extends Reducer<Text, Text, Text, Text> { private Text info = new Text(); // 实现reduce函数 public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { // 统计词频 int sum = 0; for (Text value : values) { sum += Integer.parseInt(value.toString()); } int splitIndex = key.toString().indexOf(":"); // 重新设置value值由URL和词频组成 info.set(key.toString().substring(splitIndex + 1) + ":" + sum); // 重新设置key值为单词 key.set(key.toString().substring(0, splitIndex)); context.write(key, info); } } public static class Reduce extends Reducer<Text, Text, Text, Text> { private Text result = new Text(); // 实现reduce函数 public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { // 生成文档列表 String fileList = new String(); for (Text value : values) { fileList += value.toString() + ";"; } result.set(fileList); context.write(key, result); } } /** * @param args * @throws Exception */ public static void main(String[] args) throws Exception { // TODO Auto-generated method stub Configuration conf = new Configuration(); String[] ioArgs = new String[] { "index_in", "index_out" }; String[] otherArgs = new GenericOptionsParser(conf, ioArgs) .getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: Inverted Index <in> <out>"); System.exit(2); } Job job = new Job(conf, "Inverted Index"); job.setJarByClass(InvertedIndex.class); // 设置Map、Combine和Reduce处理类 job.setMapperClass(Map.class); job.setCombinerClass(Combine.class); job.setReducerClass(Reduce.class); // 设置Map输出类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); // 设置Reduce输出类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); // 设置输入和输出目录 FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:http://www.heiqu.com/97a8ba88fb7f856122c09f7b1ef29b3d.html