这里的planner是org.apache.spark.sql.execution.SparkPlanner这个类,而这个类继承自org.apache.spark.sql.catalyst.planning.QueryPlanner,plan()方法也是在父类QueryPlanner中实现的。和RuleExecution类似,QueryPlanner中有一个返回Seq[GenericStrategy[PhysicalPlan]]的方法:def strategies: Seq[GenericStrategy[PhysicalPlan]],这个方法会在子类(也就是SparkPlanner)重写,然后被QueryPlanner的plan()方法调用。
我们来看看SparkPlanner中strategies方法的重写,再来看QueryPlanner的plan()方法吧。
class SparkPlanner( val sparkContext: SparkContext, val conf: SQLConf, val experimentalMethods: ExperimentalMethods) extends SparkStrategies { ......其他代码 override def strategies: Seq[Strategy] = experimentalMethods.extraStrategies ++ extraPlanningStrategies ++ ( PythonEvals :: DataSourceV2Strategy :: FileSourceStrategy :: DataSourceStrategy(conf) :: SpecialLimits :: Aggregation :: Window :: JoinSelection :: InMemoryScans :: BasicOperators :: Nil) ......其他代码strategies()返回策略列表,是生成策略GenericStrategy,这是个具体的抽象类,位于org.apache.spark.sql.catalyst.planning包。所谓生成策略,就是决定如果根据Logical Plan生成Physical Plan的策略。比如上面介绍的join操作可以生成Broadcast join,Hash join,抑或是MergeSort join,就是一种生成策略,具体的类就是上面代码中的JoinSelection。每个生成策略GenericStrategy都是object,其apply()方法返回的是Seq[SparkPlan],这里的SparkPlan就是PhysicalPlan(注意:下文会将SparkPlan和PhysicalPlan混着用)。
明白了生成策略后,就可以来看看QueryPlanner的plan()方法了。
abstract class QueryPlanner[PhysicalPlan <: TreeNode[PhysicalPlan]] { ......其他代码 def plan(plan: LogicalPlan): Iterator[PhysicalPlan] = { // Obviously a lot to do here still... // Collect physical plan candidates. val candidates = strategies.iterator.flatMap(_(plan)) //迭代调用并平铺,变成Iterator[SparkPlan] // The candidates may contain placeholders marked as [[planLater]], // so try to replace them by their child plans. val plans = candidates.flatMap { candidate => val placeholders = collectPlaceholders(candidate) if (placeholders.isEmpty) { // Take the candidate as is because it does not contain placeholders. Iterator(candidate) } else { // Plan the logical plan marked as [[planLater]] and replace the placeholders. placeholders.iterator.foldLeft(Iterator(candidate)) { case (candidatesWithPlaceholders, (placeholder, logicalPlan)) => // Plan the logical plan for the placeholder. val childPlans = this.plan(logicalPlan) candidatesWithPlaceholders.flatMap { candidateWithPlaceholders => childPlans.map { childPlan => // Replace the placeholder by the child plan candidateWithPlaceholders.transformUp { case p if p.eq(placeholder) => childPlan } } } } } } val pruned = prunePlans(plans) assert(pruned.hasNext, s"No plan for $plan") pruned } ......其他代码 }这里的流程其实不难,主要工作其实就是调用各个生成策略GenericStrategy的apply()方法,生成Iterator[SparkPlan]。后面很大部分代码是处理占位符,按我的理解,在生成Logical Plan的时候,可能有些无意义的占位符,这种需要使用子节点替换调它。倒数第三行prunePlans()方法按注释说是用来去掉bad plan的,但看实际代码只是原封不动返回。
这样最终就得到一个Iterator[SparkPlan],每个SparkPlan就是可执行的物理操作了。
大致流程就是如此,当然具体到一些生成策略没有细说,包括输入源策略,聚合策略等等,每一个都蛮复杂的,这里就不细说,有兴趣可以自行查阅。
对了,最后还要看看示例代码到这一步变成什么样了,先上示例代码:
//生成DataFrame val df = Seq((1, 1)).toDF("key", "value") df.createOrReplaceTempView("src") //调用spark.sql val queryCaseWhen = sql("select key from src ")经过Physical Planning阶段后,变成如下:
Project [_1#2 AS key#5] +- LocalTableScan [_1#2, _2#3]对比上面的optimized阶段,直观看就是LocalRelation变成LocalTableScan。变得更加具体了,但实际上,Project也变了,虽然打印名字相同,但一个的类型是Project,本质上是LogicalPlan。而一个是ProjectExec,本质上是SparkPlan(也就是PhysicalPlan)。这一点通过断点看的更清楚。
到这一步已经很解决终点了,后面再经过一个Preparations阶段就能生成RDD了,剩下的部分留待下篇介绍吧。