MapReduce编程实例(一)

开始学习写一些MR编程实例,工作中即将使用(刚刚开始,如果有错误和建议,欢迎指出)

现在有一个文件,里面记录了全校所有学生各科成绩,求每个学生的平均成绩,格式如下

小明 语文 92
小明 数学 88
小明 英语 90
小强 语文 76
小强 数学 66
小强 英语 80
小木 语文 60
小木 数学 65
小木 英语 61


解决思路

Map阶段先将数据拆成key:姓名,value:课程_成绩的格式提供给reduce,默认的partitioner会将名字相同的学生发到同一个reduce上面

这样reduce可以根据总分/科目数计算平均成绩。

逻辑比较简单,

代码如下:

package com.test.mr2;

import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;

import org.apache.Hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

/*
 * 计算学生课程平均成绩(某学生总分/课程数)
 * 输入格式
 *
 * 小明 语文 92
 * 小明 数学 88
 * 小明 英语 90
 * 小强 语文 76
 * 小强 数学 66
 * 小强 英语 80
 * 小木 语文 60
 * 小木 数学 65
 * 小木 英语 61
 *
 * 输出
 *
 * 小明 90
 * 小强 74
 * 小木 62
 */
public class Average {

public static class AverMapper extends Mapper<Object, Text, Text, Text> {
  @Override
  protected void map(Object key, Text value, Context context)
    throws IOException, InterruptedException {
   String line = value.toString();
   StringTokenizer stringTokenizer = new StringTokenizer(line, "\n");
   String name = "";
   StringBuffer out = new StringBuffer(32);
   while (stringTokenizer.hasMoreElements()) {
    String tmp = stringTokenizer.nextToken();
    StringTokenizer st = new StringTokenizer(tmp);
    while (st.hasMoreElements()) {
     name = st.nextToken();
     out.append(st.nextToken());
     out.append("_");
     out.append(st.nextToken());
     // 使用默认的hash partitioner将名字相同的同学发到一个reduce上
     context.write(new Text(name), new Text(out.toString()));
    }
   }
  }

}

public static class AverReducer extends
   Reducer<Text, Text, Text, FloatWritable> {
  @Override
  protected void reduce(Text key, Iterable<Text> values, Context context)
    throws IOException, InterruptedException {
   Iterator<Text> it = values.iterator();
   //计算每个key对应的记录条数和总分数
   int count = 0;
   int sum = 0;
   while (it.hasNext()) {
    String value = it.next().toString();
    String[] strs = value.split("\\_");
    if (strs.length < 2) {
     continue;
    }
    try {
     sum += Integer.parseInt(strs[1]);
    } catch (Exception e) {
     System.err.println(e.getMessage());
    }
    count++;
   }
   FloatWritable average = new FloatWritable(sum / count);
   context.write(key, average);
  }
 }

public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
  System.out.println("Begin.....");
  Configuration conf =new Configuration();
  String[] arguments=new GenericOptionsParser(conf, args).getRemainingArgs();
  if(arguments.length<2){
   System.out.println("Usage:com.test.mr2.Average in out");
   System.exit(1);
  }
  Job job=new Job(conf,"Average");
  job.setJarByClass(Average.class);
  job.setMapperClass(AverMapper.class);
  job.setReducerClass(AverReducer.class);
  job.setMapOutputValueClass(Text.class);
  job.setMapOutputKeyClass(Text.class);
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(FloatWritable.class);
  FileInputFormat.addInputPath(job, new Path(arguments[0]));
  FileOutputFormat.setOutputPath(job, new Path(arguments[1]));
  System.exit(job.waitForCompletion(true)?0:1);
  System.out.println("End.....");
 }

}

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:http://www.heiqu.com/6a2b24717b95095055caec9a4f5fb842.html