SparkSteaming运行流程分析以及CheckPoint操作 (2)

第一个操作:jobScheduler.submitJobSet()中的主要操作是遍历jobSet中的job,并将其作为参数传入JobHandler对象中,并将JobHandler丢到jobExecutor中去执行。JobHandler是实现了Runnable,它的run()主要做了以下三件事

发送JobStarted事件

执行Job.run()

发送JobCompleted事件

def submitJobSet(jobSet: JobSet) { if (jobSet.jobs.isEmpty) { logInfo("No jobs added for time " + jobSet.time) } else { listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo)) jobSets.put(jobSet.time, jobSet) jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job))) logInfo("Added jobs for time " + jobSet.time) } } private class JobHandler(job: Job) extends Runnable with Logging { import JobScheduler._ def run() { try { var _eventLoop = eventLoop if (_eventLoop != null) { _eventLoop.post(JobStarted(job, clock.getTimeMillis()))//发送JobStarted事件 SparkHadoopWriterUtils.disableOutputSpecValidation.withValue(true) { job.run() } _eventLoop = eventLoop if (_eventLoop != null) { _eventLoop.post(JobCompleted(job, clock.getTimeMillis()))//发送JobCompleted事件 } } else { } } finally { ssc.sparkContext.setLocalProperties(oldProps) } } }

第二个操作:发送DoCheckpoint事件

JobScheduler.start()中创建的EventLoop的核心内容是processEvent(event)方法,Event的类型有三种,分别是JobStarted、JobCompleted和ErrorReported,EventLoop收到DoCheckpoint事件后会执行doCheckpoint():

//JobGenerator.processEvent() private def processEvent(event: JobGeneratorEvent) { logDebug("Got event " + event) event match { ... case DoCheckpoint(time, clearCheckpointDataLater) => doCheckpoint(time, clearCheckpointDataLater) ... } }

doCheckpoint()调用graph.updateCheckpointData进行checkpoint信息的更新,调用checkpointWriter.write对checkpoint信息进行持久化

private def doCheckpoint(time: Time, clearCheckpointDataLater: Boolean) { if (shouldCheckpoint && (time - graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) { logInfo("Checkpointing graph for time " + time) //将新的checkpoint写到 ssc.graph.updateCheckpointData(time) //将checkpoint写到文件系统中 checkpointWriter.write(new Checkpoint(ssc, time), clearCheckpointDataLater) } else if (clearCheckpointDataLater) { markBatchFullyProcessed(time) } }

checkpoint的update中主要是调用DStreamGraph.updateCheckpointData:

def updateCheckpointData(time: Time) { logInfo("Updating checkpoint data for time " + time) this.synchronized { outputStreams.foreach(_.updateCheckpointData(time)) } logInfo("Updated checkpoint data for time " + time) }

outputStreams.foreach(_.updateCheckpointData(time))则是调用了DStream中的updateCheckpointData方法,而该方法主要是调用checkpointData.update(currentTime)来进行更新,并且调用该DStream所依赖的DStream进行更新。

private[streaming] var generatedRDDs = new HashMap[Time, RDD[T]]() private[streaming] def updateCheckpointData(currentTime: Time) { logDebug(s"Updating checkpoint data for time $currentTime") checkpointData.update(currentTime) dependencies.foreach(_.updateCheckpointData(currentTime)) logDebug(s"Updated checkpoint data for time $currentTime: $checkpointData") }

我们接下来来看看checkpointData.update(currentTime):我们可以在DStream中看到以下的实现:

private[streaming] val checkpointData = new DStreamCheckpointData(this)

我们接着找到了:DStreamCheckpointData.update,DStreamCheckpointData有其他子类用于自定义保存的内容和逻辑

//key为指定时间,value为checkpoint file内容 @transient private var timeToCheckpointFile = new HashMap[Time, String] // key为batchtime,value该batch中最先checkpointed RDD的time @transient private var timeToOldestCheckpointFileTime = new HashMap[Time, Time] protected[streaming] def currentCheckpointFiles = data.asInstanceOf[HashMap[Time, String]] def update(time: Time) { //从dsteam中获得要checkpoint的RDDs,generatedRDDs就是一个HashMap[Time, RDD[T]] val checkpointFiles = dstream.generatedRDDs.filter(_._2.getCheckpointFile.isDefined) .map(x => (x._1, x._2.getCheckpointFile.get)) logDebug("Current checkpoint files:\n" + checkpointFiles.toSeq.mkString("\n")) // checkpoint文件添加到最后要进行序列化的HashMap中 if (!checkpointFiles.isEmpty) { currentCheckpointFiles.clear() currentCheckpointFiles ++= checkpointFiles //更新checkpointfile timeToCheckpointFile ++= currentCheckpointFiles // key为传入的time,value为最先进行checkpoint的rdd的time timeToOldestCheckpointFileTime(time) = currentCheckpointFiles.keys.min(Time.ordering) } }

第四步:任务完成
在上面generateJobs中所调用的jobScheduler.submitJobSet()中提到每个Job都会作为参数传入JobHandler,而JobHandler会丢到JobExecutor中去执行,而JobHandler的主要工作是发送JobStarted事件,执行完任务后会发送JobCompleted事件,而JobScheduler.EventLoop收到事件后会执行handleJobCompletion方法

//JobScheduler.processEvent() private def processEvent(event: JobSchedulerEvent) { try { event match { case JobStarted(job, startTime) => handleJobStart(job, startTime) case JobCompleted(job, completedTime) => handleJobCompletion(job, completedTime) case ErrorReported(m, e) => handleError(m, e) } } catch { case e: Throwable => reportError("Error in job scheduler", e) } }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zydxyw.html