handleJobCompletion方法会调用jobSet.handleJobCompletion(job),并且在最后会判断jobSet是否已经全部完成,如果是的话会执行jobGenerator.onBatchCompletion(jobSet.time)
private def handleJobCompletion(job: Job, completedTime: Long) { val jobSet = jobSets.get(job.time) jobSet.handleJobCompletion(job) job.setEndTime(completedTime) listenerBus.post(StreamingListenerOutputOperationCompleted(job.toOutputOperationInfo)) logInfo("Finished job " + job.id + " from job set of time " + jobSet.time) if (jobSet.hasCompleted) { listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo)) } job.result match { case Failure(e) => reportError("Error running job " + job, e) case _ => //如果所有事件完成则会执行以下操作 if (jobSet.hasCompleted) { jobSets.remove(jobSet.time) jobGenerator.onBatchCompletion(jobSet.time) logInfo("Total delay: %.3f s for time %s (execution: %.3f s)".format( jobSet.totalDelay / 1000.0, jobSet.time.toString, jobSet.processingDelay / 1000.0 )) } } }此时到JobGenerator中找到onBatchCompletion():
def onBatchCompletion(time: Time) { eventLoop.post(ClearMetadata(time)) }JobGenerator.processEvent()执行clearMetadata(time)
private def processEvent(event: JobGeneratorEvent) { logDebug("Got event " + event) event match { case GenerateJobs(time) => generateJobs(time) case ClearMetadata(time) => clearMetadata(time) case DoCheckpoint(time, clearCheckpointDataLater) => doCheckpoint(time, clearCheckpointDataLater) case ClearCheckpointData(time) => clearCheckpointData(time) } }clearMetadata()对原数据进行checkpoint或者删除
private def clearMetadata(time: Time) { ssc.graph.clearMetadata(time) // If checkpointing is enabled, then checkpoint, // else mark batch to be fully processed if (shouldCheckpoint) { //如果需要进行checkpoint,发送DoCheckpoint事件 eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = true)) } else { //将meta数据进行删除 } } 2 总结到这里SparkStreaming的启动、任务生成、任务结束、Checkpoint操作基本就了解完毕了,最后我们来做一个总结,SparkStreming程序的运行流程如下:
SparkStreamingContext.start() 启动 JobScheduler
JobScheduler的启动操作
JobScheduler 创建了 EventLoop[JobSchedulerEvent] 来处理 JobStarted 和 JobCompleted 事件
启动 JobGenerator
JobGenerator 的启动操作
JobGenerator 创建了 EventLoop[JobGeneratorEvent] 来处理 GenerateJobs、ClearMetaData、DoCheckPoint和ClearCheckpointData 事件
创建RecurringTimer周期性地发送 GenerateJobs 事件用于任务的周期性创建和执行
JobGenerator的任务生成工作
调用 geneateJobs() 来生成 JobSet 并通过 JobScheduler.submitJobset 进行任务的提交
submitJobset 将 Job 作为参数传入 JobHandler ,并将 JobHandler 丢进线程池 JobExecutor 中执行
发送 DoCheckPoint 事件进行元数据的 checkpoint
任务完成
JobHandler 中任务完成之后会发送 JobCompleted 事件,JobScheduler.EventLoop 接收到该事件后会进行处理,并且判断 JobSet 全部完成之后,发送 ClearMetaData 事件,进行数据的 checkpoint 或者删除
附:RecurringTimer和EventLoop的源码,并作简单的注释
RecurringTimer的代码如下:
private[streaming] class RecurringTimer(clock: Clock, period: Long, callback: (Long) => Unit, name: String) extends Logging { //创建一个thread,用来执行loop private val thread = new Thread("RecurringTimer - " + name) { setDaemon(true) override def run() { loop } } @volatile private var prevTime = -1L @volatile private var nextTime = -1L @volatile private var stopped = false def getStartTime(): Long = { (math.floor(clock.getTimeMillis().toDouble / period) + 1).toLong * period } def getRestartTime(originalStartTime: Long): Long = { val gap = clock.getTimeMillis() - originalStartTime (math.floor(gap.toDouble / period).toLong + 1) * period + originalStartTime } //start方法中主要是启动thread,用于执行thread中的loop def start(startTime: Long): Long = synchronized { nextTime = startTime thread.start() logInfo("Started timer for " + name + " at time " + nextTime) nextTime } def start(): Long = { start(getStartTime()) } def stop(interruptTimer: Boolean): Long = synchronized { if (!stopped) { stopped = true if (interruptTimer) { thread.interrupt() } thread.join() logInfo("Stopped timer for " + name + " after time " + prevTime) } prevTime } private def triggerActionForNextInterval(): Unit = { clock.waitTillTime(nextTime) callback(nextTime) prevTime = nextTime nextTime += period logDebug("Callback for " + name + " called at time " + prevTime) } //周期性地执行callback的内容,也就是 private def loop() { try { while (!stopped) { triggerActionForNextInterval() } triggerActionForNextInterval() } catch { case e: InterruptedException => } } }