SparkSteaming运行流程分析以及CheckPoint操作

注:下面源码只贴出跟分析内容有关的代码,其他省略

1 分析流程

应用程序入口:

val sparkConf = new SparkConf().setAppName("SparkStreaming") val sc = new SparkContext(sparkConf) val ssc = new StreamingContext(sc, Seconds(batchDuration.toLong)) ssc.start() ssc.awaitTermination()

一旦ssc.start()调用之后,程序便真正开始运行

第一步:
SparkStreamingContext.start()进行如下主要工作:

调用JobScheduler.start()

发送StreamingListenerStreamingStarted消息

JobScheduler.start() def start(): Unit = synchronized { state match { case INITIALIZED => StreamingContext.ACTIVATION_LOCK.synchronized { StreamingContext.assertNoOtherContextIsActive() try{ ... scheduler.start() } state = StreamingContextState.ACTIVE scheduler.listenerBus.post( StreamingListenerStreamingStarted(System.currentTimeMillis())) } catch { ... } StreamingContext.setActiveContext(this) } ... case ACTIVE => logWarning("StreamingContext has already been started") case STOPPED => throw new IllegalStateException("StreamingContext has already been stopped") } }

第二步:
调用JobScheduler.start()执行以下主要操作:

创建EventLoop用于处理接收到的JobSchedulerEvent,processEvent就是实际的处理逻辑

调用jobGenerator.start()

JobScheduler.start(): def start(): Unit = synchronized { if (eventLoop != null) return // scheduler has already been started logDebug("Starting JobScheduler") //创建一个Event监听器并启动 eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") { override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event) override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e) } eventLoop.start() ... //启动JobGenerator jobGenerator.start() ... }

第三步:
JobGenerator.start()执行以下主要操作:

创建EventLoop[JobGeneratorEvent]用于处理JobGeneratorEvent事件

开始执行job的生成工作

创建一个timer周期地执行eventLoop.post(GenerateJobs(new Time(longTime)))

JobGenerator.start()中的EventLoop收到GenerateJobs事件后,去执行generateJobs(time)

generateJobs()中生成JobSet并调用jobScheduler.submitJobSet()进行提交,然后发送一个DoCheckpointEvent进行checkpoint

JobGenerator.start() def start(): Unit = synchronized { if (eventLoop != null) return // generator has already been started //创建checkpointWriter用于将checkpoint信息持久化 checkpointWriter //创建了Event监听器,用于监听JobGeneratorEvent并处理 eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") { override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event) override protected def onError(e: Throwable): Unit = { jobScheduler.reportError("Error in job generator", e) } } eventLoop.start() if (ssc.isCheckpointPresent) { //从checkpoint中恢复 restart() } else { //首次创建 startFirstTime() } }

首次启动会调用startFirstTime(),在该方法中主要是调用已经初始化好的RecurringTimer.start()进行周期性的发送GenerateJobs事件,这个周期是ssc.graph.batchDuration.milliseconds也就是你所设置的batchTime,JobGenerate.start()中所创建好的EventLoop收到GenerateJobs事件,就会执行processEvent(),从而执行generateJobs(time)来进行Job的生成工作

private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator") private def startFirstTime() { val startTime = new Time(timer.getStartTime()) graph.start(startTime - graph.batchDuration) timer.start(startTime.milliseconds) logInfo("Started JobGenerator at " + startTime) } private def processEvent(event: JobGeneratorEvent) { logDebug("Got event " + event) event match { case GenerateJobs(time) => generateJobs(time) case ClearMetadata(time) => clearMetadata(time) case DoCheckpoint(time, clearCheckpointDataLater) => doCheckpoint(time, clearCheckpointDataLater) case ClearCheckpointData(time) => clearCheckpointData(time) } }

generateJobs的主要工作:

生成JobSet并调用jobScheduler.submitJobSet()进行提交

发送一个DoCheckpointEvent进行checkpoint

private def generateJobs(time: Time) { ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true") Try { jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch graph.generateJobs(time) // generate jobs using allocated block } match { case Success(jobs) => val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time) jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos)) case Failure(e) => jobScheduler.reportError("Error generating jobs for time " + time, e) PythonDStream.stopStreamingContextIfPythonProcessIsDead(e) } eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false)) }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zydxyw.html