Spark内核解析 (13)

落盘的流程则比较简单,如果其存储级别符合_useDisk为true的条件,再根据其_deserialized判断是否是非序列化的形式,若是则对其进行序列化,最后将数据存储到磁盘,在Storage模块中更新其信息。

执行内存管理

执行内存主要用来存储任务再在执行Shuffle时占用的内存,Shuffle是按照一定规则对RDD数据重新分区的过程,Shuffle的Write和Read两阶段对执行内存的使用:

Shuffle Write

在map端会采用ExternalSorter进行外排,在内存中存储数据时主要占用堆内执行空间。

Shuffle Read

(1)在对reduce端的数据进行聚合时,要将数据交给Aggregator处理,在内存中存储数据时占用堆内执行空间。

(2)如果需要进行最终结果排序,则要将再次将数据交给ExternalSorter处理,占用堆内执行空间。

在ExternalSorter和Aggregator中,Spark会使用一种叫做AppendOnlyMap的哈希表在堆内执行内存中存储数据,但是Shuffle过程中所有数据并不能都保存到该哈希表中,当这个哈希表占用的内存会进行周期性地采样估算,当其大到一定程度,无法再从MemoryManager申请到新的执行内存时,Spark就会将其全部内容存储到磁盘文件中,这个过程被称为溢存(Spill),溢存到磁盘的文件最后会被归并(Merge)。

Spark的存储内存和执行内存有着截然不同的管理方式:对于存储内存来说,Spark用一个LinkedHashMap来集中管理所有的Block,Block由需要缓存的RDD的Partition转化而成;而对于执行内存,Spark用AppendOnlyMap来存储Shuffle过程中的数据,在Tungsten排序中甚至抽象称为页式内存管理,开辟了全新的JVM内存管理机制。

九、Spark核心组件解析

BlockManager数据存储与管理机制

BlockManager是整个Spark底层负责数据存储与管理的一个组件,Driver和Executor的所有数据都由对应的BlockManager进行管理。

Driver上有BlockManagerMaster,负责对各个节点上的BlockManager内部管理的数据的元数据进行维护,比如block的增删改等操作,都会在这里维护好元数据的变更。

每个节点都有一个BlockManager,每个BlockManager创建之后,第一件事即使去向BlockManagerMaster进行注册,此时BlockManagerMaster会为其创建对应的BlockManagerInfo。

img

BlockManagerMaster与BlockManager的关系非常像NameNode与DataNode的关系,BlockManagerMaster中保存BlockManager内部管理数据的元数据,进行维护,当BlockManager进行Block增删改等操作时,都会在BlockManagerMaster中进行元数据的变更,这与NameNode维护DataNode的元数据信息,DataNode中数据发生变化时NameNode中的元数据也会相应变化是一致的。

每个节点上都有一个BlockManager,BlockManager中有三个非常重要的组件:

DisStore:负责对磁盘数据进行读写;

MemoryStore:负责对内存数据进行读写;

BlockTransferService:负责建立BlockManager到远程其他节点的BlockManager的连接,负责对远程其他节点的BlockManager的数据进行读写;

每个BlockManager创建之后,做的第一件事就是向BlockManagerMaster进行注册,此时BlockManagerMaster会为其创建对应的BlockManagerInfo。

使用BlockManager进行写操作时,比如说,RDD运行过程中的一些中间数据,或者我们手动指定了persist(),会优先将数据写入内存中,如果内存大小不够,会使用自己的算法,将内存中的部分数据写入磁盘;此外,如果persist()指定了要replica,那么会使用BlockTransferService将数据replicate一份到其他节点的BlockManager上去。

使用BlockManager进行读操作时,比如说,shuffleRead操作,如果能从本地读取,就利用DisStore或者MemoryStore从本地读取数据,但是本地没有数据的话,那么会用BlockTransferService与有数据的BlockManager建立连接,然后用BlockTransferService从远程BlockManager读取数据;例如,shuffle Read操作中,很有可能要拉取的数据本地没有,那么此时就会从远程有数据的节点上,找那个节点的BlockManager来拉取需要的数据。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wsxwxz.html