Spark2.1.0——深入理解事件总线

Spark2.1.0——深入理解事件总线 概览

  Spark程序在运行的过程中,Driver端的很多功能都依赖于事件的传递和处理,而事件总线在这中间发挥着至关重要的纽带作用。事件总线通过异步线程,提高了Driver执行的效率。

       Spark定义了一个特质ListenerBus,可以接收事件并且将事件提交到对应事件的监听器。为了对ListenerBus有个直观的理解,我们先来看看它的代码实现,见代码清单1。

代码清单1        ListenerBus的定义

private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging { private[spark] val listeners = new CopyOnWriteArrayList[L] final def addListener(listener: L): Unit = { listeners.add(listener) } final def removeListener(listener: L): Unit = { listeners.remove(listener) } final def postToAll(event: E): Unit = { val iter = listeners.iterator while (iter.hasNext) { val listener = iter.next() try { doPostEvent(listener, event) } catch { case NonFatal(e) => logError(s"Listener ${Utils.getFormattedClassName(listener)} threw an exception", e) } } } protected def doPostEvent(listener: L, event: E): Unit private[spark] def findListenersByClass[T <: L : ClassTag](): Seq[T] = { val c = implicitly[ClassTag[T]].runtimeClass listeners.asScala.filter(_.getClass == c).map(_.asInstanceOf[T]).toSeq } }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zwzsjz.html