MapReduce和spark的shuffle过程详解 (3)

Reduce任务拖取某个Map 对应的数据,如果在内存中能放得下这次数据的话就直接把数据写到内存中。Reduce要向每个Map去拖取数据,在内存中每个Map对应一块数据,当内存 中存储的Map数据占用空间达到一定程度的时候,开始启动内存中merge,把内存中的数据merge输出到磁盘上一个文件中。

如果在内存 中不能放得下这个Map的数据的话,直接把Map数据写到磁盘上,在本地目录创建一个文件,从HTTP流中读取数据然后写到磁盘,使用的缓存区大小是 64K。拖一个Map数据过来就会创建一个文件,当文件数量达到一定阈值时,开始启动磁盘文件merge,把这些文件合并输出到一个文件。

有些Map的数据较小是可以放在内存中的,有些Map的数据较大需要放在磁盘上,这样最后Reduce任务拖过来的数据有些放在内存中了有些放在磁盘上,最后会对这些来一个全局合并。

Merge Sort

这里使用的Merge和Map端使用的Merge过程一样。Map的输出数据已经是有序的,Merge进行一次合并排序,所谓Reduce端的 sort过程就是这个合并的过程。一般Reduce是一边copy一边sort,即copy和sort两个阶段是重叠而不是完全分开的。

Reduce端的Shuffle过程至此结束。

Spark的Shuffle过程 Shuffle Writer

Spark丰富了任务类型,有些任务之间数据流转不需要通过Shuffle,但是有些任务之间还是需要通过Shuffle来传递数据,比如wide dependency的group by key。

Spark中需要Shuffle 输出的Map任务会为每个Reduce创建对应的bucket,Map产生的结果会根据设置的partitioner得到对应的bucketId,然后填 充到相应的bucket中去。每个Map的输出结果可能包含所有的Reduce所需要的数据,所以每个Map会创建R个bucket(R是reduce的 个数),M个Map总共会创建M*R个bucket。

Map创建的bucket其实对应磁盘上的一个文 件,Map的结果写到每个bucket中其实就是写到那个磁盘文件中,这个文件也被称为blockFile,是Disk Block Manager管理器通过文件名的Hash值对应到本地目录的子目录中创建的。每个Map要在节点上创建R个磁盘文件用于结果输出,Map的结果是直接输 出到磁盘文件上的,100KB的内存缓冲是用来创建Fast Buffered OutputStream输出流。这种方式一个问题就是Shuffle文件过多。

MapReduce和spark的shuffle过程详解

针对上述Shuffle 过程产生的文件过多问题,Spark有另外一种改进的Shuffle过程:consolidation Shuffle,以期显著减少Shuffle文件的数量。在consolidation Shuffle中每个bucket并非对应一个文件,而是对应文件中的一个segment部分。Job的map在某个节点上第一次执行,为每个 reduce创建bucket对应的输出文件,把这些文件组织成ShuffleFileGroup,当这次map执行完之后,这个 ShuffleFileGroup可以释放为下次循环利用;当又有map在这个节点上执行时,不需要创建新的bucket文件,而是在上次的 ShuffleFileGroup中取得已经创建的文件继续追加写一个segment;当前次map还没执行完,ShuffleFileGroup还没有 释放,这时如果有新的map在这个节点上执行,无法循环利用这个ShuffleFileGroup,而是只能创建新的bucket文件组成新的 ShuffleFileGroup来写输出。

MapReduce和spark的shuffle过程详解

比 如一个Job有3个Map和2个reduce:(1) 如果此时集群有3个节点有空槽,每个节点空闲了一个core,则3个Map会调度到这3个节点上执行,每个Map都会创建2个Shuffle文件,总共创 建6个Shuffle文件;(2) 如果此时集群有2个节点有空槽,每个节点空闲了一个core,则2个Map先调度到这2个节点上执行,每个Map都会创建2个Shuffle文件,然后其 中一个节点执行完Map之后又调度执行另一个Map,则这个Map不会创建新的Shuffle文件,而是把结果输出追加到之前Map创建的Shuffle 文件中;总共创建4个Shuffle文件;(3) 如果此时集群有2个节点有空槽,一个节点有2个空core一个节点有1个空core,则一个节点调度2个Map一个节点调度1个Map,调度2个Map的 节点上,一个Map创建了Shuffle文件,后面的Map还是会创建新的Shuffle文件,因为上一个Map还正在写,它创建的 ShuffleFileGroup还没有释放;总共创建6个Shuffle文件。

Shuffle Fetcher

Reduce去拖Map的输出数据,Spark提供了两套不同的拉取数据框架:通过socket连接去取数据;使用netty框架去取数据。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zgfwfg.html