Spark持久化以及checkpoint原理分析

在Spark 的持久化使用中,我们会将一些经常使用到的数据进行持久化,比如使用cache()或者persist()方法进行持久化操作,但是当某个节点或者executor挂掉之后,持久化的数据会丢失,因为我们的数据是保存在内存当中的,这时就会重新计算RDD,如果某个之前的RDD需要大量的计算时间,这时将会浪费很多时间,因此,我们有时候需要使用checkpoint操作来将一些数据持久化可容错文件系统中,比如HDFS文件系统中,虽然这种方式可能对性能带来了一定的影响(磁盘IO),但是为了避免大量的重复计算操作,有时也可以使用性能代价来换取时间效率上的提升。

当我们对某个RDD进行了缓存操作之后,首先会去CaacheManager中去找,然后紧接着去BlockManager中去获取内存或者磁盘中缓存的数据,如果没有进行缓存或者缓存丢失,那么就会去checkpoint的容错文件系统中查找数据,如果最终没有找到,那就会按照RDD lineage重新计算。

checkpoint原理
1.在代码中,当使用SparkContext可以设置一个checkpointFile文件目录,比如HDFS文件目录。
2.在代码中对需要checkpoint的RDD调用checkpoint方法。
3.RDDCheckpointData(spark内部的API),接管你的RDD,会标记为marked for checkpointing,准备进行checkpoint。
4.你的job运行完之后,会调用一个finalRDD.doCheckpoint()方法,会顺着rdd lineage,回溯扫描,发现有标记为待checkpoint的rdd,就会进行二次标记,标记为checkpointing in progress,正在接受checkpoint操作。
5.job执行完之后,就会启动一个内部的新job,去将标记为checkpointing in progress的rdd的数据,都写入hdfs文件中。(如果rdd之前cache过,会直接从缓存中获取数据,写入hdfs中;如果没有cache过,那么就会重新计算一遍这个rdd,再checkpoint)
6.将checkpoint过的rdd之前的依赖rdd,改成一个CheckpointRDD*,强制改变你的rdd的lineage。后面如果rdd的cache数据获取失败,直接会通过它的上游CheckpointRDD,去容错的文件系统,比如hdfs,中,获取checkpoint的数据。

RDDCheckpointData源码如下:

/**
* Enumeration to manage state transitions of an RDD through checkpointing
* [ Initialized --> marked for checkpointing --> checkpointing in progress --> checkpointed ]
*/
private[spark] object CheckpointState extends Enumeration {
  type CheckpointState = Value
  val Initialized, MarkedForCheckpoint, CheckpointingInProgress, Checkpointed = Value
}

/**
* This class contains all the information related to RDD checkpointing. Each instance of this
* class is associated with a RDD. It manages process of checkpointing of the associated RDD,
* as well as, manages the post-checkpoint state by providing the updated partitions,
* iterator and preferred locations of the checkpointed RDD.
*/
private[spark] class RDDCheckpointData[T: ClassTag](@transient rdd: RDD[T])
  extends Logging with Serializable {

import CheckpointState._

// The checkpoint state of the associated RDD.
  var cpState = Initialized

// The file to which the associated RDD has been checkpointed to
  @transient var cpFile: Option[String] = None

// The CheckpointRDD created from the checkpoint file, that is, the new parent the associated RDD.
  var cpRDD: Option[RDD[T]] = None

// Mark the RDD for checkpointing
  def markForCheckpoint() {
    RDDCheckpointData.synchronized {
      if (cpState == Initialized) cpState = MarkedForCheckpoint
    }
  }

// Is the RDD already checkpointed
  def isCheckpointed: Boolean = {
    RDDCheckpointData.synchronized { cpState == Checkpointed }
  }

// Get the file to which this RDD was checkpointed to as an Option
  def getCheckpointFile: Option[String] = {
    RDDCheckpointData.synchronized { cpFile }
  }

// Do the checkpointing of the RDD. Called after the first job using that RDD is over.
  def doCheckpoint() {
    // If it is marked for checkpointing AND checkpointing is not already in progress,
    // then set it to be in progress, else return
    RDDCheckpointData.synchronized {
      if (cpState == MarkedForCheckpoint) {
        cpState = CheckpointingInProgress
      } else {
        return
      }
    }

// Create the output path for the checkpoint
    val path = new Path(rdd.context.checkpointDir.get, "rdd-" + rdd.id)
    //获取checkpoint文件路径
    val fs = path.getFileSystem(rdd.context.HadoopConfiguration)
    if (!fs.mkdirs(path)) {
      throw new SparkException("Failed to create checkpoint path " + path)
    }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/50f75dcb6faeb6628e352c079a31dfe4.html