开始学习写一些MR编程实例,工作中即将使用(刚刚开始,如果有错误和建议,欢迎指出)
现在有一个文件,里面记录了全校所有学生各科成绩,求每个学生的平均成绩,格式如下
小明 语文 92
小明 数学 88
小明 英语 90
小强 语文 76
小强 数学 66
小强 英语 80
小木 语文 60
小木 数学 65
小木 英语 61
解决思路
Map阶段先将数据拆成key:姓名,value:课程_成绩的格式提供给reduce,默认的partitioner会将名字相同的学生发到同一个reduce上面
这样reduce可以根据总分/科目数计算平均成绩。
逻辑比较简单,
代码如下:
package com.test.mr2;
import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;
import org.apache.Hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
/*
* 计算学生课程平均成绩(某学生总分/课程数)
* 输入格式
*
* 小明 语文 92
* 小明 数学 88
* 小明 英语 90
* 小强 语文 76
* 小强 数学 66
* 小强 英语 80
* 小木 语文 60
* 小木 数学 65
* 小木 英语 61
*
* 输出
*
* 小明 90
* 小强 74
* 小木 62
*/
public class Average {
public static class AverMapper extends Mapper<Object, Text, Text, Text> {
@Override
protected void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
StringTokenizer stringTokenizer = new StringTokenizer(line, "\n");
String name = "";
StringBuffer out = new StringBuffer(32);
while (stringTokenizer.hasMoreElements()) {
String tmp = stringTokenizer.nextToken();
StringTokenizer st = new StringTokenizer(tmp);
while (st.hasMoreElements()) {
name = st.nextToken();
out.append(st.nextToken());
out.append("_");
out.append(st.nextToken());
// 使用默认的hash partitioner将名字相同的同学发到一个reduce上
context.write(new Text(name), new Text(out.toString()));
}
}
}
}
public static class AverReducer extends
Reducer<Text, Text, Text, FloatWritable> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
Iterator<Text> it = values.iterator();
//计算每个key对应的记录条数和总分数
int count = 0;
int sum = 0;
while (it.hasNext()) {
String value = it.next().toString();
String[] strs = value.split("\\_");
if (strs.length < 2) {
continue;
}
try {
sum += Integer.parseInt(strs[1]);
} catch (Exception e) {
System.err.println(e.getMessage());
}
count++;
}
FloatWritable average = new FloatWritable(sum / count);
context.write(key, average);
}
}
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
System.out.println("Begin.....");
Configuration conf =new Configuration();
String[] arguments=new GenericOptionsParser(conf, args).getRemainingArgs();
if(arguments.length<2){
System.out.println("Usage:com.test.mr2.Average in out");
System.exit(1);
}
Job job=new Job(conf,"Average");
job.setJarByClass(Average.class);
job.setMapperClass(AverMapper.class);
job.setReducerClass(AverReducer.class);
job.setMapOutputValueClass(Text.class);
job.setMapOutputKeyClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FloatWritable.class);
FileInputFormat.addInputPath(job, new Path(arguments[0]));
FileOutputFormat.setOutputPath(job, new Path(arguments[1]));
System.exit(job.waitForCompletion(true)?0:1);
System.out.println("End.....");
}
}