Spark持久化以及checkpoint原理分析(2)

// Save to file, and reload it as an RDD
    val broadcastedConf = rdd.context.broadcast(
      new SerializableWritable(rdd.context.hadoopConfiguration))
      //checkpoint数据到文件系统中
    rdd.context.runJob(rdd, CheckpointRDD.writeToFile[T](path.toString, broadcastedConf) _)
    val newRDD = new CheckpointRDD[T](rdd.context, path.toString)
    if (newRDD.partitions.size != rdd.partitions.size) {
      throw new SparkException(
        "Checkpoint RDD " + newRDD + "(" + newRDD.partitions.size + ") has different " +
          "number of partitions than original RDD " + rdd + "(" + rdd.partitions.size + ")")
    }

// Change the dependencies and partitions of the RDD
    RDDCheckpointData.synchronized {
      cpFile = Some(path.toString)
      cpRDD = Some(newRDD)
      rdd.markCheckpointed(newRDD)  // Update the RDD's dependencies and partitions
      cpState = Checkpointed
    }
    logInfo("Done checkpointing RDD " + rdd.id + " to " + path + ", new parent is RDD " + newRDD.id)
  }

// Get preferred location of a split after checkpointing
  def getPreferredLocations(split: Partition): Seq[String] = {
    RDDCheckpointData.synchronized {
      cpRDD.get.preferredLocations(split)
    }
  }

def getPartitions: Array[Partition] = {
    RDDCheckpointData.synchronized {
      cpRDD.get.partitions
    }
  }

def checkpointRDD: Option[RDD[T]] = {
    RDDCheckpointData.synchronized {
      cpRDD
    }
  }
}

在CheckPointRDD中写文件的操作如下:

def writeToFile[T: ClassTag](
      path: String,
      broadcastedConf: Broadcast[SerializableWritable[Configuration]],
      blockSize: Int = -1
    )(ctx: TaskContext, iterator: Iterator[T]) {
    val env = SparkEnv.get
    val outputDir = new Path(path)
    val fs = outputDir.getFileSystem(broadcastedConf.value.value)

val finalOutputName = splitIdToFile(ctx.partitionId)
    val finalOutputPath = new Path(outputDir, finalOutputName)
    val tempOutputPath =
      new Path(outputDir, "." + finalOutputName + "-attempt-" + ctx.attemptNumber)

if (fs.exists(tempOutputPath)) {
      throw new IOException("Checkpoint failed: temporary path " +
        tempOutputPath + " already exists")
    }
    val bufferSize = env.conf.getInt("spark.buffer.size", 65536)

val fileOutputStream = if (blockSize < 0) {
      fs.create(tempOutputPath, false, bufferSize)
    } else {
      // This is mainly for testing purpose
      fs.create(tempOutputPath, false, bufferSize, fs.getDefaultReplication, blockSize)
    }
    val serializer = env.serializer.newInstance()
    val serializeStream = serializer.serializeStream(fileOutputStream)
    serializeStream.writeAll(iterator)
    serializeStream.close()

if (!fs.rename(tempOutputPath, finalOutputPath)) {
      if (!fs.exists(finalOutputPath)) {
        logInfo("Deleting tempOutputPath " + tempOutputPath)
        fs.delete(tempOutputPath, false)
        throw new IOException("Checkpoint failed: failed to save output of task: "
          + ctx.attemptNumber + " and final output path does not exist")
      } else {
        // Some other copy of this task must've finished before us and renamed it
        logInfo("Final output path " + finalOutputPath + " already exists; not overwriting it")
        fs.delete(tempOutputPath, false)
      }
    }
  }

在RDD类的源码中,两个方法如下所示:

/**
  * Performs the checkpointing of this RDD by saving this. It is called after a job using this RDD
  * has completed (therefore the RDD has been materialized and potentially stored in memory).
  * doCheckpoint() is called recursively on the parent RDDs.
  */
  private[spark] def doCheckpoint() {
    if (!doCheckpointCalled) {
      doCheckpointCalled = true
      if (checkpointData.isDefined) {
        checkpointData.get.doCheckpoint()
      } else {
        dependencies.foreach(_.rdd.doCheckpoint())
      }
    }
  }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/50f75dcb6faeb6628e352c079a31dfe4.html