// Save to file, and reload it as an RDD
val broadcastedConf = rdd.context.broadcast(
new SerializableWritable(rdd.context.hadoopConfiguration))
//checkpoint数据到文件系统中
rdd.context.runJob(rdd, CheckpointRDD.writeToFile[T](path.toString, broadcastedConf) _)
val newRDD = new CheckpointRDD[T](rdd.context, path.toString)
if (newRDD.partitions.size != rdd.partitions.size) {
throw new SparkException(
"Checkpoint RDD " + newRDD + "(" + newRDD.partitions.size + ") has different " +
"number of partitions than original RDD " + rdd + "(" + rdd.partitions.size + ")")
}
// Change the dependencies and partitions of the RDD
RDDCheckpointData.synchronized {
cpFile = Some(path.toString)
cpRDD = Some(newRDD)
rdd.markCheckpointed(newRDD) // Update the RDD's dependencies and partitions
cpState = Checkpointed
}
logInfo("Done checkpointing RDD " + rdd.id + " to " + path + ", new parent is RDD " + newRDD.id)
}
// Get preferred location of a split after checkpointing
def getPreferredLocations(split: Partition): Seq[String] = {
RDDCheckpointData.synchronized {
cpRDD.get.preferredLocations(split)
}
}
def getPartitions: Array[Partition] = {
RDDCheckpointData.synchronized {
cpRDD.get.partitions
}
}
def checkpointRDD: Option[RDD[T]] = {
RDDCheckpointData.synchronized {
cpRDD
}
}
}
在CheckPointRDD中写文件的操作如下:
def writeToFile[T: ClassTag](
path: String,
broadcastedConf: Broadcast[SerializableWritable[Configuration]],
blockSize: Int = -1
)(ctx: TaskContext, iterator: Iterator[T]) {
val env = SparkEnv.get
val outputDir = new Path(path)
val fs = outputDir.getFileSystem(broadcastedConf.value.value)
val finalOutputName = splitIdToFile(ctx.partitionId)
val finalOutputPath = new Path(outputDir, finalOutputName)
val tempOutputPath =
new Path(outputDir, "." + finalOutputName + "-attempt-" + ctx.attemptNumber)
if (fs.exists(tempOutputPath)) {
throw new IOException("Checkpoint failed: temporary path " +
tempOutputPath + " already exists")
}
val bufferSize = env.conf.getInt("spark.buffer.size", 65536)
val fileOutputStream = if (blockSize < 0) {
fs.create(tempOutputPath, false, bufferSize)
} else {
// This is mainly for testing purpose
fs.create(tempOutputPath, false, bufferSize, fs.getDefaultReplication, blockSize)
}
val serializer = env.serializer.newInstance()
val serializeStream = serializer.serializeStream(fileOutputStream)
serializeStream.writeAll(iterator)
serializeStream.close()
if (!fs.rename(tempOutputPath, finalOutputPath)) {
if (!fs.exists(finalOutputPath)) {
logInfo("Deleting tempOutputPath " + tempOutputPath)
fs.delete(tempOutputPath, false)
throw new IOException("Checkpoint failed: failed to save output of task: "
+ ctx.attemptNumber + " and final output path does not exist")
} else {
// Some other copy of this task must've finished before us and renamed it
logInfo("Final output path " + finalOutputPath + " already exists; not overwriting it")
fs.delete(tempOutputPath, false)
}
}
}
在RDD类的源码中,两个方法如下所示:
/**
* Performs the checkpointing of this RDD by saving this. It is called after a job using this RDD
* has completed (therefore the RDD has been materialized and potentially stored in memory).
* doCheckpoint() is called recursively on the parent RDDs.
*/
private[spark] def doCheckpoint() {
if (!doCheckpointCalled) {
doCheckpointCalled = true
if (checkpointData.isDefined) {
checkpointData.get.doCheckpoint()
} else {
dependencies.foreach(_.rdd.doCheckpoint())
}
}
}