MapReduce允许程序员能够容易地编写并行运行在大规模集群上处理大量数据的程序,确保程序的运行稳定可靠和具有容错处理能力。程序员编写的运行在MapReduce上的应用程序称为作业(job),Hadoop既支持用Java编写的job,也支持其它语言编写的作业,比如Hadoop Streaming(shell、python)和Hadoop Pipes(c++)。Hadoop-2.X不再保留Hadoop-1.X版本中的JobTracker和TaskTracker组件,但这并不意味着Hadoop-2.X不再支持MapReduce作业,相反Hadoop-2.X通过唯一的主ResourceManager、每个节点一个的从NodeManager和每个应用程序一个的MRAppMaster保留了对MapReduce作业的向后兼容。在新版本中MapReduce作业依然由Map和Reduce任务组成,Map依然接收由MapReduce框架将输入数据分割为数据块,然后Map任务以完全并行的方式处理这些数据块,接着MapReduce框架对Map任务的输出进行排序,并将结果做为Reduce任务的输入,最后由Reduce任务输出最终的结果,在整个执行过程中MapReduce框架负责任务的调度,监控和重新执行失败的任务等。
通常计算节点和存储节点是相同的,MapReduce框架会有效地将任务安排在存储数据的节点上,有助于降低传输数据时的带宽使用量。MapReduce应用程序通过实现或者继承合适的接口或类提供了map和reduce函数,这两个函数负责Map任务和Reduce任务。作业客户端将编写好的作业提交给ResourceManager,而不再是JobTracker,ResourceManager负责将作业分布到从节点上,调度和监控作业,为作业客户端提供状态和诊断信息。
MapReduce框架只处理<key, value>键值对,也就是将作业的输入视为一些键值对并输出键值对。做为键值的类必须可以被MapReduce框架序列化,因此需要实现Writable接口,常用的IntWritable,LongWritable和Text都是实现该接口的类。做为键的类除了要实现Writable接口外,还需要实现WritableComparable接口,实现该接口主要为了有助于排序,上面提到的三个类也都实现了该接口。
在简要介绍了MapReduce框架后,下面深入学习框架中的两个重要概念:Mapper和Reducer,正如上文提到了,它们组成了MapReduce作业并负责完成实际的业务逻辑处理。
Mapper是独立的任务,将输入记录转换为中间记录,即对输入的键值对进行处理,并输出为一组中间键值对,输出的键值对使用context.write(WritableComparable, Writable)方法收集起来,中间记录的键值类型不必与输入记录的键值类型相同,实际上也往往是不同的。一条输入记录经由Mapper处理后可能输出为0条或者多条中间记录。比如,如果输入记录不满足业务要求(没有包含特定的值或者包含了特定的值)的话,可以直接返回,则会输出0条记录,此时Mapper起了过滤器的作用。
接着MapReduce框架将与给定键相关联的所有中间值分组,然后传递给Reducer。用户可以通过Job.setGroupingComparatorClass(Class)方法指定Comparator来控制分组。Mapper的输出被排序然后按照Reducer分区,总的分区数与作业启动的Reducer任务数相同,程序员可以通过实现自定义的Partitioner控制输出的记录由哪个Reducer处理,默认使用的是HashPartitioner。程序员还可以通过Job.setCombinerClass(Class)指定一个combiner来执行中间输出的本地聚合,这有助于减少Mapper到Reducer的数据传输。Mapper的中间输出经过排序后总是保存为(key-len, key,value-len, value)的格式,应用程序可以通过Configuration控制是否将中间输出进行压缩,以及使用何种压缩方式,相关的几个参数有:mapreduce.map.output.compress、mapreduce.map.output.compress.codec。程序员通过Job.setMapperClass(Class)将Mapper传递给Job,MapReduce框架调用Mapper的map(WritableComparable, Writable, Context)处理该任务的价值对,应用程序可以覆盖cleanup(Context)方法实现任何需要的清理工作。
MapReduce框架为每个由作业的InputFormat生成的InputSplit启动一个map任务,因此总的map任务数量由输入数据大小决定,更准确说是由输入文件总的块数决定。虽然可以为较少使用CPU的map任务在节点上设置300个map任务,但每个节点更适合并行运行10-100个map任务。由于任务的启动需要花费一些时间,所以任务的运行最好至少需要1分钟,因为如果任务运行的时间很少,整个作业的时间将大部分消耗在任务的建立上面。
Reducer将具有相同键的一组中间值降低为一组更小数量的值,比如合并单词的数量等。一个作业启动的Reducer数量可以通过Job.setNumReduceTasks(int)或者mapred-site.xml中的参数mapreduce.job.reduces设置,但是更推荐前者,因为可以由程序员决定启动多少个reducer,而后者更多的是提供了一种默认值。程序员使用Job.setReducerClass(Class)将Reducer提交给作业,MapReduce框架为每对<key, (list of values)>调用reduce(WritableComparable, Iterable<Writable>, Context)方法,同Mapper一样,程序员也可以覆盖cleanup(Context)方法指定需要的清理工作。