注:下面源码只贴出跟分析内容有关的代码,其他省略
1 分析流程应用程序入口:
val sparkConf = new SparkConf().setAppName("SparkStreaming") val sc = new SparkContext(sparkConf) val ssc = new StreamingContext(sc, Seconds(batchDuration.toLong)) ssc.start() ssc.awaitTermination()一旦ssc.start()调用之后,程序便真正开始运行
第一步:
SparkStreamingContext.start()进行如下主要工作:
调用JobScheduler.start()
发送StreamingListenerStreamingStarted消息
JobScheduler.start() def start(): Unit = synchronized { state match { case INITIALIZED => StreamingContext.ACTIVATION_LOCK.synchronized { StreamingContext.assertNoOtherContextIsActive() try{ ... scheduler.start() } state = StreamingContextState.ACTIVE scheduler.listenerBus.post( StreamingListenerStreamingStarted(System.currentTimeMillis())) } catch { ... } StreamingContext.setActiveContext(this) } ... case ACTIVE => logWarning("StreamingContext has already been started") case STOPPED => throw new IllegalStateException("StreamingContext has already been stopped") } }第二步:
调用JobScheduler.start()执行以下主要操作:
创建EventLoop用于处理接收到的JobSchedulerEvent,processEvent就是实际的处理逻辑
调用jobGenerator.start()
JobScheduler.start(): def start(): Unit = synchronized { if (eventLoop != null) return // scheduler has already been started logDebug("Starting JobScheduler") //创建一个Event监听器并启动 eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") { override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event) override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e) } eventLoop.start() ... //启动JobGenerator jobGenerator.start() ... }第三步:
JobGenerator.start()执行以下主要操作:
创建EventLoop[JobGeneratorEvent]用于处理JobGeneratorEvent事件
开始执行job的生成工作
创建一个timer周期地执行eventLoop.post(GenerateJobs(new Time(longTime)))
JobGenerator.start()中的EventLoop收到GenerateJobs事件后,去执行generateJobs(time)
generateJobs()中生成JobSet并调用jobScheduler.submitJobSet()进行提交,然后发送一个DoCheckpointEvent进行checkpoint
JobGenerator.start() def start(): Unit = synchronized { if (eventLoop != null) return // generator has already been started //创建checkpointWriter用于将checkpoint信息持久化 checkpointWriter //创建了Event监听器,用于监听JobGeneratorEvent并处理 eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") { override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event) override protected def onError(e: Throwable): Unit = { jobScheduler.reportError("Error in job generator", e) } } eventLoop.start() if (ssc.isCheckpointPresent) { //从checkpoint中恢复 restart() } else { //首次创建 startFirstTime() } }首次启动会调用startFirstTime(),在该方法中主要是调用已经初始化好的RecurringTimer.start()进行周期性的发送GenerateJobs事件,这个周期是ssc.graph.batchDuration.milliseconds也就是你所设置的batchTime,JobGenerate.start()中所创建好的EventLoop收到GenerateJobs事件,就会执行processEvent(),从而执行generateJobs(time)来进行Job的生成工作
private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator") private def startFirstTime() { val startTime = new Time(timer.getStartTime()) graph.start(startTime - graph.batchDuration) timer.start(startTime.milliseconds) logInfo("Started JobGenerator at " + startTime) } private def processEvent(event: JobGeneratorEvent) { logDebug("Got event " + event) event match { case GenerateJobs(time) => generateJobs(time) case ClearMetadata(time) => clearMetadata(time) case DoCheckpoint(time, clearCheckpointDataLater) => doCheckpoint(time, clearCheckpointDataLater) case ClearCheckpointData(time) => clearCheckpointData(time) } }generateJobs的主要工作:
生成JobSet并调用jobScheduler.submitJobSet()进行提交
发送一个DoCheckpointEvent进行checkpoint
private def generateJobs(time: Time) { ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true") Try { jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch graph.generateJobs(time) // generate jobs using allocated block } match { case Success(jobs) => val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time) jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos)) case Failure(e) => jobScheduler.reportError("Error generating jobs for time " + time, e) PythonDStream.stopStreamingContextIfPythonProcessIsDead(e) } eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false)) }