五、处理流程验证
看前面的代码,都知道我在各个组件上已经设置好了相应的标志,用于追踪整个MapReduce处理二次排序的处理流程。现在让我们分别看看Map端和Reduce端的日志情况。
(1)Map端日志分析
2014-03-18 17:07:45,278 INFO org.apache.Hadoop.util.NativeCodeLoader: Loaded the native-hadoop library
2014-03-18 17:07:45,432 WARN org.apache.hadoop.metrics2.impl.MetricsSystemImpl: Source name ugi already exists!
2014-03-18 17:07:45,501 INFO org.apache.hadoop.util.ProcessTree: setsid exited with exit code 0
2014-03-18 17:07:45,506 INFO org.apache.hadoop.mapred.Task: Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@69b01afa
2014-03-18 17:07:45,584 INFO org.apache.hadoop.mapred.MapTask: io.sort.mb = 100
2014-03-18 17:07:45,618 INFO org.apache.hadoop.mapred.MapTask: data buffer = 79691776/99614720
2014-03-18 17:07:45,618 INFO org.apache.hadoop.mapred.MapTask: record buffer = 262144/327680
2014-03-18 17:07:45,626 WARN org.apache.hadoop.io.compress.snappy.LoadSnappy: Snappy native library not loaded
2014-03-18 17:07:45,634 INFO com.mr.SecondSortMR: ---------enter map function flag---------
2014-03-18 17:07:45,634 INFO com.mr.DefinedPartition: --------enter DefinedPartition flag--------
2014-03-18 17:07:45,634 INFO com.mr.DefinedPartition: --------out DefinedPartition flag--------
2014-03-18 17:07:45,634 INFO com.mr.SecondSortMR: ---------out map function flag---------
2014-03-18 17:07:45,634 INFO com.mr.SecondSortMR: ---------enter map function flag---------
2014-03-18 17:07:45,635 INFO com.mr.DefinedPartition: --------enter DefinedPartition flag--------
2014-03-18 17:07:45,635 INFO com.mr.DefinedPartition: --------out DefinedPartition flag--------
2014-03-18 17:07:45,635 INFO com.mr.SecondSortMR: ---------out map function flag---------
2014-03-18 17:07:45,635 INFO com.mr.SecondSortMR: ---------enter map function flag---------
2014-03-18 17:07:45,635 INFO com.mr.DefinedPartition: --------enter DefinedPartition flag--------
2014-03-18 17:07:45,635 INFO com.mr.DefinedPartition: --------out DefinedPartition flag--------
2014-03-18 17:07:45,635 INFO com.mr.SecondSortMR: ---------out map function flag---------
2014-03-18 17:07:45,635 INFO com.mr.SecondSortMR: ---------enter map function flag---------
2014-03-18 17:07:45,635 INFO com.mr.DefinedPartition: --------enter DefinedPartition flag--------
2014-03-18 17:07:45,635 INFO com.mr.DefinedPartition: --------out DefinedPartition flag--------
2014-03-18 17:07:45,635 INFO com.mr.SecondSortMR: ---------out map function flag---------
2014-03-18 17:07:45,635 INFO com.mr.SecondSortMR: ---------enter map function flag---------
2014-03-18 17:07:45,636 INFO com.mr.DefinedPartition: --------enter DefinedPartition flag--------
2014-03-18 17:07:45,636 INFO com.mr.DefinedPartition: --------out DefinedPartition flag--------
2014-03-18 17:07:45,636 INFO com.mr.SecondSortMR: ---------out map function flag---------
2014-03-18 17:07:45,636 INFO com.mr.SecondSortMR: ---------enter map function flag---------
2014-03-18 17:07:45,636 INFO com.mr.DefinedPartition: --------enter DefinedPartition flag--------
2014-03-18 17:07:45,636 INFO com.mr.DefinedPartition: --------out DefinedPartition flag--------
2014-03-18 17:07:45,636 INFO com.mr.SecondSortMR: ---------out map function flag---------
2014-03-18 17:07:45,636 INFO com.mr.SecondSortMR: ---------enter map function flag---------
2014-03-18 17:07:45,636 INFO com.mr.DefinedPartition: --------enter DefinedPartition flag--------
2014-03-18 17:07:45,636 INFO com.mr.DefinedPartition: --------out DefinedPartition flag--------
2014-03-18 17:07:45,636 INFO com.mr.SecondSortMR: ---------out map function flag---------
2014-03-18 17:07:45,636 INFO com.mr.SecondSortMR: ---------enter map function flag---------
2014-03-18 17:07:45,637 INFO com.mr.DefinedPartition: --------enter DefinedPartition flag--------
2014-03-18 17:07:45,637 INFO com.mr.DefinedPartition: --------out DefinedPartition flag--------
2014-03-18 17:07:45,637 INFO com.mr.SecondSortMR: ---------out map function flag---------
2014-03-18 17:07:45,637 INFO org.apache.hadoop.mapred.MapTask: Starting flush of map output
2014-03-18 17:07:45,651 INFO com.mr.DefinedComparator: ---------enter DefinedComparator flag---------
2014-03-18 17:07:45,651 INFO com.mr.DefinedComparator: ---------out DefinedComparator flag---------
2014-03-18 17:07:45,651 INFO com.mr.DefinedComparator: ---------enter DefinedComparator flag---------
2014-03-18 17:07:45,651 INFO com.mr.DefinedComparator: ---------out DefinedComparator flag---------
2014-03-18 17:07:45,651 INFO com.mr.DefinedComparator: ---------enter DefinedComparator flag---------
2014-03-18 17:07:45,651 INFO com.mr.DefinedComparator: ---------out DefinedComparator flag---------
2014-03-18 17:07:45,651 INFO com.mr.DefinedComparator: ---------enter DefinedComparator flag---------
2014-03-18 17:07:45,651 INFO com.mr.DefinedComparator: ---------out DefinedComparator flag---------
2014-03-18 17:07:45,651 INFO com.mr.DefinedComparator: ---------enter DefinedComparator flag---------
2014-03-18 17:07:45,651 INFO com.mr.DefinedComparator: ---------out DefinedComparator flag---------
2014-03-18 17:07:45,652 INFO com.mr.DefinedComparator: ---------enter DefinedComparator flag---------
2014-03-18 17:07:45,652 INFO com.mr.DefinedComparator: ---------out DefinedComparator flag---------
2014-03-18 17:07:45,652 INFO com.mr.DefinedComparator: ---------enter DefinedComparator flag---------
2014-03-18 17:07:45,652 INFO com.mr.DefinedComparator: ---------out DefinedComparator flag---------
2014-03-18 17:07:45,652 INFO com.mr.DefinedComparator: ---------enter DefinedComparator flag---------
2014-03-18 17:07:45,652 INFO com.mr.DefinedComparator: ---------out DefinedComparator flag---------
2014-03-18 17:07:45,652 INFO com.mr.DefinedComparator: ---------enter DefinedComparator flag---------
2014-03-18 17:07:45,652 INFO com.mr.DefinedComparator: ---------out DefinedComparator flag---------
2014-03-18 17:07:45,652 INFO com.mr.DefinedComparator: ---------enter DefinedComparator flag---------
2014-03-18 17:07:45,652 INFO com.mr.DefinedComparator: ---------out DefinedComparator flag---------
2014-03-18 17:07:45,652 INFO com.mr.DefinedComparator: ---------enter DefinedComparator flag---------
2014-03-18 17:07:45,652 INFO com.mr.DefinedComparator: ---------out DefinedComparator flag---------
2014-03-18 17:07:45,652 INFO com.mr.DefinedComparator: ---------enter DefinedComparator flag---------
2014-03-18 17:07:45,652 INFO com.mr.DefinedComparator: ---------out DefinedComparator flag---------
2014-03-18 17:07:45,652 INFO com.mr.DefinedComparator: ---------enter DefinedComparator flag---------
2014-03-18 17:07:45,652 INFO com.mr.DefinedComparator: ---------out DefinedComparator flag---------
2014-03-18 17:07:45,656 INFO org.apache.hadoop.mapred.MapTask: Finished spill 0
2014-03-18 17:07:45,661 INFO org.apache.hadoop.mapred.Task: Task:attempt_201312292019_13586_m_000000_0 is done. And is in the process of commiting
2014-03-18 17:07:48,494 INFO org.apache.hadoop.mapred.Task: Task 'attempt_201312292019_13586_m_000000_0' done.
2014-03-18 17:07:48,526 INFO org.apache.hadoop.mapred.TaskLogsTruncater: Initializing logs' truncater with mapRetainSize=-1 and reduceRetainSize=-1
2014-03-18 17:07:48,548 INFO org.apache.hadoop.io.nativeio.NativeIO: Initialized cache for UID to User mapping with a cache timeout of 14400 seconds.
2014-03-18 17:07:48,548 INFO org.apache.hadoop.io.nativeio.NativeIO: Got UserName hadoop for UID 1000 from the native implementation
从map端的日志,我们可以很容易的看出来每一条记录开始是进入到map函数进行处理,处理完了之后立马就入自定义分区函数中对其进行分区,当所有输入数据经过map函数和分区函数处理完之后,就调用自定义二次排序函数对其进行排序。
(2)Reduce端日志分析
2014-03-18 17:07:51,266 INFO org.apache.hadoop.util.NativeCodeLoader: Loaded the native-hadoop library
2014-03-18 17:07:51,418 WARN org.apache.hadoop.metrics2.impl.MetricsSystemImpl: Source name ugi already exists!
2014-03-18 17:07:51,486 INFO org.apache.hadoop.util.ProcessTree: setsid exited with exit code 0
2014-03-18 17:07:51,491 INFO org.apache.hadoop.mapred.Task: Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@28bb494b
2014-03-18 17:07:51,537 INFO org.apache.hadoop.mapred.ReduceTask: ShuffleRamManager: MemoryLimit=195749472, MaxSingleShuffleLimit=48937368
2014-03-18 17:07:51,542 INFO org.apache.hadoop.mapred.ReduceTask: attempt_201312292019_13586_r_000000_0 Thread started: Thread for merging on-disk files
2014-03-18 17:07:51,542 INFO org.apache.hadoop.mapred.ReduceTask: attempt_201312292019_13586_r_000000_0 Thread started: Thread for merging in memory files
2014-03-18 17:07:51,542 INFO org.apache.hadoop.mapred.ReduceTask: attempt_201312292019_13586_r_000000_0 Thread waiting: Thread for merging on-disk files
2014-03-18 17:07:51,543 INFO org.apache.hadoop.mapred.ReduceTask: attempt_201312292019_13586_r_000000_0 Need another 1 map output(s) where 0 is already in progress
2014-03-18 17:07:51,543 INFO org.apache.hadoop.mapred.ReduceTask: attempt_201312292019_13586_r_000000_0 Thread started: Thread for polling Map Completion Events
2014-03-18 17:07:51,543 INFO org.apache.hadoop.mapred.ReduceTask: attempt_201312292019_13586_r_000000_0 Scheduled 0 outputs (0 slow hosts and0 dup hosts)
2014-03-18 17:07:56,544 INFO org.apache.hadoop.mapred.ReduceTask: attempt_201312292019_13586_r_000000_0 Scheduled 1 outputs (0 slow hosts and0 dup hosts)
2014-03-18 17:07:57,553 INFO org.apache.hadoop.mapred.ReduceTask: GetMapEventsThread exiting
2014-03-18 17:07:57,553 INFO org.apache.hadoop.mapred.ReduceTask: getMapsEventsThread joined.
2014-03-18 17:07:57,553 INFO org.apache.hadoop.mapred.ReduceTask: Closed ram manager
2014-03-18 17:07:57,553 INFO org.apache.hadoop.mapred.ReduceTask: Interleaved on-disk merge complete: 0 files left.
2014-03-18 17:07:57,553 INFO org.apache.hadoop.mapred.ReduceTask: In-memory merge complete: 1 files left.
2014-03-18 17:07:57,577 INFO org.apache.hadoop.mapred.Merger: Merging 1 sorted segments
2014-03-18 17:07:57,577 INFO org.apache.hadoop.mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 130 bytes
2014-03-18 17:07:57,583 INFO org.apache.hadoop.mapred.ReduceTask: Merged 1 segments, 130 bytes to disk to satisfy reduce memory limit
2014-03-18 17:07:57,584 INFO org.apache.hadoop.mapred.ReduceTask: Merging 1 files, 134 bytes from disk
2014-03-18 17:07:57,584 INFO org.apache.hadoop.mapred.ReduceTask: Merging 0 segments, 0 bytes from memory into reduce
2014-03-18 17:07:57,584 INFO org.apache.hadoop.mapred.Merger: Merging 1 sorted segments
2014-03-18 17:07:57,586 INFO org.apache.hadoop.mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 130 bytes
2014-03-18 17:07:57,599 INFO com.mr.DefinedGroupSort: -------enter DefinedGroupSort flag-------
2014-03-18 17:07:57,599 INFO com.mr.DefinedGroupSort: -------Grouping result:0-------
2014-03-18 17:07:57,599 INFO com.mr.DefinedGroupSort: -------out DefinedGroupSort flag-------
2014-03-18 17:07:57,599 INFO com.mr.DefinedGroupSort: -------enter DefinedGroupSort flag-------
2014-03-18 17:07:57,599 INFO com.mr.DefinedGroupSort: -------Grouping result:-1-------
2014-03-18 17:07:57,599 INFO com.mr.DefinedGroupSort: -------out DefinedGroupSort flag-------
2014-03-18 17:07:57,600 INFO com.mr.SecondSortMR: ---------enter reduce function flag---------
2014-03-18 17:07:57,600 INFO com.mr.SecondSortMR: reduce Input data:{[sort1,2],[1,2]}
2014-03-18 17:07:57,600 INFO com.mr.SecondSortMR: ---------out reduce function flag---------
2014-03-18 17:07:57,600 INFO com.mr.DefinedGroupSort: -------enter DefinedGroupSort flag-------
2014-03-18 17:07:57,600 INFO com.mr.DefinedGroupSort: -------Grouping result:0-------
2014-03-18 17:07:57,600 INFO com.mr.DefinedGroupSort: -------out DefinedGroupSort flag-------
2014-03-18 17:07:57,600 INFO com.mr.DefinedGroupSort: -------enter DefinedGroupSort flag-------
2014-03-18 17:07:57,600 INFO com.mr.DefinedGroupSort: -------Grouping result:0-------
2014-03-18 17:07:57,600 INFO com.mr.DefinedGroupSort: -------out DefinedGroupSort flag-------
2014-03-18 17:07:57,601 INFO com.mr.DefinedGroupSort: -------enter DefinedGroupSort flag-------
2014-03-18 17:07:57,601 INFO com.mr.DefinedGroupSort: -------Grouping result:-4-------
2014-03-18 17:07:57,601 INFO com.mr.DefinedGroupSort: -------out DefinedGroupSort flag-------
2014-03-18 17:07:57,601 INFO com.mr.SecondSortMR: ---------enter reduce function flag---------
2014-03-18 17:07:57,601 INFO com.mr.SecondSortMR: reduce Input data:{[sort2,77],[3,54,77]}
2014-03-18 17:07:57,601 INFO com.mr.SecondSortMR: ---------out reduce function flag---------
2014-03-18 17:07:57,601 INFO com.mr.DefinedGroupSort: -------enter DefinedGroupSort flag-------
2014-03-18 17:07:57,601 INFO com.mr.DefinedGroupSort: -------Grouping result:0-------
2014-03-18 17:07:57,601 INFO com.mr.DefinedGroupSort: -------out DefinedGroupSort flag-------
2014-03-18 17:07:57,601 INFO com.mr.DefinedGroupSort: -------enter DefinedGroupSort flag-------
2014-03-18 17:07:57,601 INFO com.mr.DefinedGroupSort: -------Grouping result:0-------
2014-03-18 17:07:57,601 INFO com.mr.DefinedGroupSort: -------out DefinedGroupSort flag-------
2014-03-18 17:07:57,601 INFO com.mr.SecondSortMR: ---------enter reduce function flag---------
2014-03-18 17:07:57,601 INFO com.mr.SecondSortMR: reduce Input data:{[sort6,221],[20,22,221]}
2014-03-18 17:07:57,601 INFO com.mr.SecondSortMR: ---------out reduce function flag---------
2014-03-18 17:07:57,641 INFO org.apache.hadoop.mapred.Task: Task:attempt_201312292019_13586_r_000000_0 is done. And is in the process of commiting
2014-03-18 17:08:00,668 INFO org.apache.hadoop.mapred.Task: Task attempt_201312292019_13586_r_000000_0 is allowed to commit now
2014-03-18 17:08:00,682 INFO org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter: Saved output of task 'attempt_201312292019_13586_r_000000_0' to /user/hadoop/z.zeng/output23
2014-03-18 17:08:03,593 INFO org.apache.hadoop.mapred.Task: Task 'attempt_201312292019_13586_r_000000_0' done.
2014-03-18 17:08:03,596 INFO org.apache.hadoop.mapred.TaskLogsTruncater: Initializing logs' truncater with mapRetainSize=-1 and reduceRetainSize=-1
2014-03-18 17:08:03,615 INFO org.apache.hadoop.io.nativeio.NativeIO: Initialized cache for UID to User mapping with a cache timeout of 14400 seconds.
2014-03-18 17:08:03,615 INFO org.apache.hadoop.io.nativeio.NativeIO: Got UserName hadoop for UID 1000 from the native implementation
首先,我们看了Reduce端的日志,第一个信息我应该能够很容易的看出来的,就是分组和reduce函数处理都是在shuffle完成之后才进行的。另外一点我们也非常容易看出,就是每处理完一个分组数据就会去调用一次的reduce函对这个分组来进行处理和输出。此外,说明一下分组函数的返回值问题,当返回值为0时候才会被分到同一个组当中。另外一点我们也可以看出来,一个分组中每合并n个值就会有n-1分组函数返回0值,也就是说有进行了n-1次比较。
所以,中map端和reduce端的日志情况来看,MapReduce框架处理二次排序的总体流程正如我上面的图所画的,整一个流程是正确的。
六、总结
本文主要从MapReduce框架执行的流程,去分析了如何去实现二次排序,通过代码进行了实现,并且对整个流程进行了验证。另外,要吐槽一下,网络上有很多文章都记录了MapReudce处理二次排序问题,但是对MapReduce框架整个处理流程的描述错漏很多,而且他们最终的流程描述也没有证据可以支撑。所以,对于网络上的学习资源不能够完全依赖,要融入自己的思想,并且要重要的观点进行代码或者实践的验证。另外,今天在一个hadoop交流群上听到少部分人在讨论,有了hive我们就不用学习些MapReduce程序?对这这个问题我是这么认为:我不相信写不好MapReduce程序的程序员会写好hive语句,最起码的他们对整个执行流程是一无所知的,更不用说性能问题了,有可能连最常见的数据倾斜问题的弄不清楚。
如果文章写的有问题,欢迎指出,共同学习!