EventLoop的源码:主要功能就是创建一个线程在后台判断是否Event进来,有的话则进行相应的处理
private[spark] abstract class EventLoop[E](name: String) extends Logging { private val eventQueue: BlockingQueue[E] = new LinkedBlockingDeque[E]() private val stopped = new AtomicBoolean(false) private val eventThread = new Thread(name) { setDaemon(true) override def run(): Unit = { try { while (!stopped.get) { val event = eventQueue.take() try { onReceive(event) } catch { case NonFatal(e) => try { onError(e) } catch { case NonFatal(e) => logError("Unexpected error in " + name, e) } } } } catch { case ie: InterruptedException => // exit even if eventQueue is not empty case NonFatal(e) => logError("Unexpected error in " + name, e) } } } def start(): Unit = { if (stopped.get) { throw new IllegalStateException(name + " has already been stopped") } // Call onStart before starting the event thread to make sure it happens before onReceive onStart() eventThread.start() } def stop(): Unit = { if (stopped.compareAndSet(false, true)) { eventThread.interrupt() var onStopCalled = false try { eventThread.join() // Call onStop after the event thread exits to make sure onReceive happens before onStop onStopCalled = true onStop() } catch { case ie: InterruptedException => Thread.currentThread().interrupt() if (!onStopCalled) { // ie is thrown from `eventThread.join()`. Otherwise, we should not call `onStop` since // it's already called. onStop() } } } else { // Keep quiet to allow calling `stop` multiple times. } } //将event放进eventQueue中,在eventThread进行相应的处理 def post(event: E): Unit = { eventQueue.put(event) } //返回eventThread是否存活 def isActive: Boolean = eventThread.isAlive //eventThread启动前调用 protected def onStart(): Unit = {} //在eventThred结束后调用 protected def onStop(): Unit = {} //实现类实现Event的处理逻辑 protected def onReceive(event: E): Unit //出错时的处理逻辑 protected def onError(e: Throwable): Unit }SparkSteaming运行流程分析以及CheckPoint操作 (4)
内容版权声明:除非注明,否则皆为本站原创文章。