长话短说,我们部门一个同事找到我,说他的spark 2.3 structured streaming程序频繁报OOM,从来没有坚持过超过三四天的,叫帮看一下。
这种事情一般我是不愿意看的,因为大部分情况下spark oom就那么几种可能:
直奔源码: public class InMemoryStore implements KVStore { private Object metadata; //这里就是那个5个多g大的map private ConcurrentMap<Class<?>, InstanceList> data = new ConcurrentHashMap<>(); ...... }
/** * A KVStore wrapper that allows tracking the number of elements of specific types, and triggering * actions once they reach a threshold. This allows writers, for example, to control how much data * is stored by potentially deleting old data as new data is added. * * This store is used when populating data either from a live UI or an event log. On top of firing * triggers when elements reach a certain threshold, it provides two extra bits of functionality: * * - a generic worker thread that can be used to run expensive tasks asynchronously; the tasks can * be configured to run on the calling thread when more determinism is desired (e.g. unit tests). * - a generic flush mechanism so that listeners can be notified about when they should flush * internal state to the store (e.g. after the SHS finishes parsing an event log). * * The configured triggers are run on a separate thread by default; they can be forced to run on * the calling thread by setting the `ASYNC_TRACKING_ENABLED` configuration to `false`. */ private[spark] class ElementTrackingStore(store: KVStore, conf: SparkConf) extends KVStore { import config._ private val triggers = new HashMap[Class[_], Seq[Trigger[_]]]() private val flushTriggers = new ListBuffer[() => Unit]() private val executor = if (conf.get(ASYNC_TRACKING_ENABLED)) { ThreadUtils.newDaemonSingleThreadExecutor("element-tracking-store-worker") } else { MoreExecutors.sameThreadExecutor() } @volatile private var stopped = false /** * Register a trigger that will be fired once the number of elements of a given type reaches * the given threshold. * * @param klass The type to monitor. * @param threshold The number of elements that should trigger the action. * @param action Action to run when the threshold is reached; takes as a parameter the number * of elements of the registered type currently known to be in the store. */ def addTrigger(klass: Class[_], threshold: Long)(action: Long => Unit): Unit = { val existing = triggers.getOrElse(klass, Seq()) triggers(klass) = existing :+ Trigger(threshold, action) } ...... }这个类的方法里,我们需要关注的就是这个addTrigger方法,其注释也写的很明白,就是用来当保存的对象达到一定数目后触发的操作。
考虑到我们溢出的对象都是SparkPlanGraphNode,所以先看最下面我选中的蓝色那一行的代码: kvstore.addTrigger(classOf[SQLExecutionUIData], conf.get(UI_RETAINED_EXECUTIONS)) { count => cleanupExecutions(count) } private def cleanupExecutions(count: Long): Unit = { val countToDelete = count - conf.get(UI_RETAINED_EXECUTIONS) if (countToDelete <= 0) { return } val view = kvstore.view(classOf[SQLExecutionUIData]).index("completionTime").first(0L) val toDelete = KVUtils.viewToSeq(view, countToDelete.toInt)(_.completionTime.isDefined) //出错的就是这一行 toDelete.foreach { e => kvstore.delete(e.getClass(), e.executionId) } }