我们知道,在reduce之前,MP框架会对收到的<K,V>对按K进行排序,而对于一个特定的K来说,它的List<V>是没有被排过序的,就是说这些V是无序的,因为它们来自不同的Map端,而且很多应用也不依赖于K所对应的list<V>的顺序,但是有一些应用就要就要依赖于相同K的V的顺序,而且还要把他们聚合在一起,下面会提出这样一个问题,是参考Hadoop The Defiinitive Guide的第八章 (下载见 )。
1. 问题的提出对于如下数据,我们要计算出每一年的最高温度值:(1900,34) (1900,32) .... (1950, 0) (1950, 22) (1950, −11) (1949, 111) (1949, 78)
计算结果可能如下:
1901 317 1902 244 1903 289 1904 256 ... 1949 111
我们一般的方法是把key设置成年份,把value设置成温度,在reduce的时候去遍历所有相同key的value值,找出最大的那个值,在Reduce返回的时候,只collect这个最大的值,这是一种办法,但是这种办法在效率上相对比较差,也不够灵活,下面我们来看看怎么使用Secondary Sort来解决这个问题2. Secondary SortSecondary Sort 实际上就是一种对Value进行二次排序,然后按key的特定部分进行聚合的方法,这里用到了一个组合Key的概念,就是把K与要排序的Value组合在一起,生成一个新的Key值,上面的例子中,新的组合key为<1900,32>,也就是<年份,温度>的组合,(1900, 35°C),(1900, 34°C),这样组合以后,生成一个新的key,但是这样组合以后,它们会被切分到不同的Reduce上,所以我们这里要写一个根据新组合的key的第一个参数(年份)来进行相应的partitioner,在JobConf中可以使用setPartitionerClass来进行设置,这样可以解决相同年份的key会被聚合在同一个Reduce上,但是还没有解析在同一个Reduce上,把部分key相同的记录聚合(group)在一起,所以这里我们要设置一个group的比较器,这样就可以把相同年份的记录聚合在一起,但对于相同key(这里是指key中第一个参数相同)的排序问题,我们要使用一个KeyComparator比较器来做,就是在group中对key进行二次排序,在上面例子中就是按key中第二个参数温度来降序排序,这里要注意的是这里的输入是key,而不是value,这就是我们为什么把value组合在key一起的原因,而写这个比较方法的时候,还要注意一定要符合Group方法的原因,如果group是按key的第一个参数来得,那这里key的比较就要在第一个参数相同的情况下,才能会第二个参数(value)进行比较,我想这里解释了为什么这种排序叫Secondary Sort的原因吧,在上面的例子中,key的比较是先比较第一个参数(年份),如果第一个参数相同,再比较第二个参数(温度),按第二个参数降序排列。
所以一般要使用Secondary Sort,在JobConf要配置这三个参数
setPartitionerClass // 这个是用来设置key的切分,上面例子中是按key中的第一个参数来切分 setOutputValueGroupingComparator // 这里设置group,就是按key的哪一个参数进行聚合,上面的例子中也是按第一个参数年份进行聚合 setOutputKeyComparatorClass // 这个是设置key的比较器,设置聚合的key的一个二次排序方法 3. 代码分析
Example 8-9. Application to find the maximum temperature by sorting temperatures in the key public class MaxTemperatureUsingSecondarySort extends Configured implements Tool { // Map任务 static class MaxTemperatureMapper extends MapReduceBase implements Mapper<LongWritable, Text, IntPair, NullWritable> { private NcdcRecordParser parser = new NcdcRecordParser(); public void map(LongWritable key, Text value, OutputCollector<IntPair, NullWritable> output, Reporter reporter) throws IOException { parser.parse(value); // 解析输入的文本 if (parser.isValidTemperature()) { // 这里把年份与温度组合成一个key,value为空 output.collect(new IntPair(parser.getYearInt(),+ parser.getAirTemperature()), NullWritable.get()); } } } // Reduce任务 static class MaxTemperatureReducer extends MapReduceBase implements Reducer<IntPair, NullWritable, IntPair, NullWritable> { public void reduce(IntPair key, Iterator<NullWritable> values, OutputCollector<IntPair, NullWritable> output, Reporter reporter) throws IOException { // 输出聚合的key值,这里的key是先按年份进行聚合,所我们会看到相同所有年份相同的key会聚合在一起,而这些聚合后的key按温度进行降序按列 // 所以聚合中第一个key为温度最高的,所以这里输出的key为这一年中温度最高的值 output.collect(key, NullWritable.get()); } } // 切分器,这里是按年份* 127 % reduceNum来进行切分的 public static class FirstPartitioner implements Partitioner<IntPair, NullWritable> { @Override public void configure(JobConf job) {} @Override public int getPartition(IntPair key, NullWritable value, int numPartitions) { return Math.abs(key.getFirst() * 127) % numPartitions; } } // 聚合key的一个比较器 public static class KeyComparator extends WritableComparator { protected KeyComparator() { super(IntPair.class, true); } @Override public int compare(WritableComparable w1, WritableComparable w2) { IntPair ip1 = (IntPair) w1; IntPair ip2 = (IntPair) w2; // 这里要注意的是,一定要在聚合参数相同的情况下,再比较另一个参数 // 这里是先比较年份,再比较温度,按温度降序排序 int cmp = IntPair.compare(ip1.getFirst(), ip2.getFirst()); if (cmp != 0) { return cmp; } return -IntPair.compare(ip1.getSecond(), ip2.getSecond()); //reverse } } // 设置聚合比较器 public static class GroupComparator extends WritableComparator { protected GroupComparator() { super(IntPair.class, true); } @Override public int compare(WritableComparable w1, WritableComparable w2) { IntPair ip1 = (IntPair) w1; IntPair ip2 = (IntPair) w2; // 这里是按key的第一个参数来聚合,就是年份 return IntPair.compare(ip1.getFirst(), ip2.getFirst()); } } @Override public int run(String[] args) throws IOException { JobConf conf = JobBuilder.parseInputAndOutput(this, getConf(), args); if (conf == null) { return -1; } conf.setMapperClass(MaxTemperatureMapper.class); conf.setPartitionerClass(FirstPartitioner.class); // 设置切分器 conf.setOutputKeyComparatorClass(KeyComparator.class); // 设置key的比较器 conf.setOutputValueGroupingComparator(GroupComparator.class); // 设置聚合比较器 conf.setReducerClass(MaxTemperatureReducer.class); conf.setOutputKeyClass(IntPair.class); // 设置key的一个组合类型,如里这个类型实现了WritableComparable<T>的话,那就不要设置setOutputKeyComparatorClass了. conf.setOutputValueClass(NullWritable.class); // 输出的value为NULL,因为这里的实际value已经组合到了key中 JobClient.runJob(conf); return 0; } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new MaxTemperatureUsingSecondarySort(), args); System.exit(exitCode); } }
输出结果如下: