MapReduce 计算模型非常适合在大量计算机组成的大规模集群上并行运行。图一中的每一个 Map 任务和每一个 Reduce 任务均可以同时运行于一个单独的计算结点上,可想而知其运算效率是很高的,那么这样的并行计算是如何做到的呢?
Hadoop 中的分布式文件系统 HDFS 由一个管理结点 ( NameNode )和N个数据结点 ( DataNode )组成,每个结点均是一台普通的计算机。在使用上同我们熟悉的单机上的文件系统非常类似,一样可以建目录,创建,复制,删除文件,查看文件内容等。但其底层实现上是把文件切割成 Block,然后这些 Block 分散地存储于不同的 DataNode 上,每个 Block 还可以复制数份存储于不同的 DataNode 上,达到容错容灾之目的。NameNode 则是整个 HDFS 的核心,它通过维护一些数据结构,记录了每一个文件被切割成了多少个 Block,这些 Block 可以从哪些 DataNode 中获得,各个 DataNode 的状态等重要信息。如果你想了解更多的关于 HDFS 的信息,可进一步阅读参考资料: The Hadoop Distributed File System:Architecture and Design
Hadoop 中有一个作为主控的 JobTracker,用于调度和管理其它的 TaskTracker, JobTracker 可以运行于集群中任一台计算机上。TaskTracker 负责执行任务,必须运行于 DataNode 上,即 DataNode 既是数据存储结点,也是计算结点。 JobTracker 将 Map 任务和 Reduce 任务分发给空闲的 TaskTracker, 让这些任务并行运行,并负责监控任务的运行情况。如果某一个 TaskTracker 出故障了,JobTracker 会将其负责的任务转交给另一个空闲的 TaskTracker 重新运行。
数据存储在哪一台计算机上,就由这台计算机进行这部分数据的计算,这样可以减少数据在网络上的传输,降低对网络带宽的需求。在 Hadoop 这样的基于集群的分布式并行系统中,计算结点可以很方便地扩充,而因它所能够提供的计算能力近乎是无限的,但是由是数据需要在不同的计算机之间流动,故网络带宽变成了瓶颈,是非常宝贵的,“本地计算”是最有效的一种节约网络带宽的手段,业界把这形容为“移动计算比移动数据更经济”。
图 2. 分布存储与并行计算
把原始大数据集切割成小数据集时,通常让小数据集小于或等于 HDFS 中一个 Block 的大小(缺省是 64M),这样能够保证一个小数据集位于一台计算机上,便于本地计算。有 M 个小数据集待处理,就启动 M 个 Map 任务,注意这 M 个 Map 任务分布于 N 台计算机上并行运行,Reduce 任务的数量 R 则可由用户指定。
把 Map 任务输出的中间结果按 key 的范围划分成 R 份( R 是预先定义的 Reduce 任务的个数),划分时通常使用 hash 函数如: hash(key) mod R,这样可以保证某一段范围内的 key,一定是由一个 Reduce 任务来处理,可以简化 Reduce 的过程。
在 partition 之前,还可以对中间结果先做 combine,即将中间结果中有相同 key的 <key, value> 对合并成一对。combine 的过程与 Reduce 的过程类似,很多情况下就可以直接使用 Reduce 函数,但 combine 是作为 Map 任务的一部分,在执行完 Map 函数后紧接着执行的。Combine 能够减少中间结果中 <key, value> 对的数目,从而减少网络流量。
Map 任务的中间结果在做完 Combine 和 Partition 之后,以文件形式存于本地磁盘。中间结果文件的位置会通知主控 JobTracker, JobTracker 再通知 Reduce 任务到哪一个 DataNode 上去取中间结果。注意所有的 Map 任务产生中间结果均按其 Key 用同一个 Hash 函数划分成了 R 份,R 个 Reduce 任务各自负责一段 Key 区间。每个 Reduce 需要向许多个 Map 任务结点取得落在其负责的 Key 区间内的中间结果,然后执行 Reduce 函数,形成一个最终的结果文件。
有 R 个 Reduce 任务,就会有 R 个最终结果,很多情况下这 R 个最终结果并不需要合并成一个最终结果。因为这 R 个最终结果又可以做为另一个计算任务的输入,开始另一个并行计算任务。