利用Hadoop MapReduce 做数据排序

由于Hadoop在reduce之后就不能对结果做什么了,所以只能分为两个job完成,第一个job统计次数,第二个job对第一个job的结果排序。 第一个job的就是hadoop最简单的例子countwords,我要说的是用hadoop对结果排序。 假设第一个job的结果输出如下:

part-r-0000文件内容: a 5 b 4 c 74 d 78 e 1 r 64 f 4

要做的就是按照每个词出现的次数降序排列。

**********************************分割线*****************************************

首先可能会出现这样的问题:

1.可能上一个job为多个reduce,也就是会产生多个结果文件,因为一个reduce就会生成一个结果文件,结果存放在上一个job输出目录下类似part-r-00的文件里。

2.需要排序的文件内容很大,所以需要考虑多个reduce的情况。

*********************************分割线*******************************

怎么去设计mapreduce

1.在map阶段按照行读取文本,然后调用map方法时把上一个job的结果颠倒,也就是map后结果应该是这样的

5    a
4    b
74    c
................
.........................
4    f

2.然后map后,hadoop会对结果进行分组,这时结果就会变成

(5:a) (4:b,f) (74:c)

3.然后按照reduce数目的大小自定义分区函数,让结果形成多个区间,比如我认为大于50的应该在一个区间,一共3个reduce,那么最后的数据应该是三个区间,大于50的直接分到第一个分区0,25到50之间的分到第二个分区1,小于25的分到第三个分区2.因为分区数和reduce数是相同的,所以不同的分区对应不同的reduce,因为分区是从0开始的,数据分区到分区0的会被分到第一个reduce处理,分区是1的会分到第2个reduce处理,依次类推。并且reduce对应着输出文件,所以,第一个reduce生成的文件就会是part-r-0000,第二个reduce对应的生成文件就会是part-r-0001,依次类推,所以reduce处理时只需要把key和value再倒过来直接输出。这样最后就会形成数目最大的字符串就会在第一个生成文件里,排好序的数据就是按照文件命名的顺序存放的。

**其实就是利用了hadoop分组的特点,会把key相同的字符串放到一个组里,然后我们把分组的数据用自己定义的排序函数按照key排序后,再按照分区函数分到不同的reduce,固然会是第一个reduce结果文件里面是最大数字的已排序集合,也就是说需要排好序的数据时只需要依次遍历reduce的结果文件part-r-0000,part-r0001,part-r-0002...。当然,如果只有一个reduce,那就正好是一个排好序的结果文件。**

代码如下:

*******************************分割线*****************************************

map:

/**
 * 把上一个mapreduce的结果的key和value颠倒,调到后就可以按照key排序了。
 *
 * @author zhangdonghao
 *
 */
    public class SortIntValueMapper extends
    Mapper<LongWritable, Text, IntWritable, Text> {
private final static IntWritable wordCount = new IntWritable(1);

private Text word = new Text();

public SortIntValueMapper() {
    super();
}

@Override
public void map(LongWritable key, Text value, Context context)
        throws IOException, InterruptedException {

StringTokenizer tokenizer = new StringTokenizer(value.toString());
    while (tokenizer.hasMoreTokens()) {
        word.set(tokenizer.nextToken().trim());
        wordCount.set(Integer.valueOf(tokenizer.nextToken().trim()));
        context.write(wordCount, word);
    }
}
}

reudce:

/**
 * 把key和value颠倒过来输出
 * @author zhangdonghao
 *
 */
    public class SortIntValueReduce extends
    Reducer<IntWritable, Text, Text, IntWritable> {

private Text result = new Text();

@Override
public void reduce(IntWritable key, Iterable<Text> values, Context context)
        throws IOException, InterruptedException {
    for (Text val : values) {
        result.set(val.toString());
        context.write(result, key);
    }

}
}

更多详情请继续阅读第2页的内容

推荐阅读

Hadoop 新 MapReduce 框架 Yarn 详解

Hadoop中HDFS和MapReduce节点基本简介

MapReduce的自制Writable分组输出及组内排序

MapReduce的一对多连接操作

Hadoop--两个简单的MapReduce程序

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:http://www.heiqu.com/d2bb28488894da863ba4b951289ea1d6.html