Spark SQL原理解析前言:
Spark SQL源码剖析(一)SQL解析框架Catalyst流程概述
Spark SQL源码解析(二)Antlr4解析Sql并生成树
Analysis阶段概述首先,这里需要引入一个新概念,前面介绍SQL parse阶段,会使用antlr4,将一条SQL语句解析成语法树,然后使用antlr4的访问者模式遍历生成语法树,也就是Logical Plan。但其实,SQL parse这一阶段生成的Logical Plan是被称为Unresolved Logical Plan。所谓unresolved,就是说SQL语句中的对象都是未解释的。
比如说一条语句SELECT col FROM sales,当我们不知道col的具体类型(Int,String,还是其他),甚至是否在sales表中有col这一个列的时候,就称之为是Unresolved的。
而在analysis阶段,主要就是解决这个问题,也就是将Unresolved的变成Resolved的。Spark SQL通过使用Catalyst rule和Catalog来跟踪数据源的table信息。并对Unresolved应用如下的rules(rule可以理解为一条一条的规则,当匹配到树某些节点的时候就会被应用)。
从Catalog中,查询Unresolved Logical Plan中对应的关系(relations)
根据输入属性名(比如上述的col列),映射到具体的属性
确定哪些属性引用相同的值并赋予它们唯一的ID(这个是论文中的内容,看不是很明白,不过主要是方便后面优化器实现的)
Propagating and coercing types through expressions,这个看着也是有点迷,大概是对数据进行强制转换,方便后续对1 + col 这样的数据进行处理。
而处理过后,就会真正生成一棵Resolved Logical Plan,接下来就去看看源码里面是怎么实现的吧。
Analysis阶段详细解析通过跟踪调用代码,在调用完SQL parse的内容后,就会跑去org.apache.spark.sql.execution.QueryExecution这个类中执行,后面包括Logical Optimization阶段,Physical Planning阶段,生成RDD任务阶段都是在这个类中进行调度的。不过此次只介绍Analysis。
在QueryExecution中,会去调用org.apache.spark.sql.catalyst.Analyzer这个类,这个类是继承自org.apache.spark.sql.catalyst.rules.RuleExecutor,记住这个,后面还有很多个阶段都是通过继承这个类实现的,实现原理也和Analysis阶段相似。
继承自RuleExecutor的类,包括这里的Analyzer类,都是在自身实现大量的rule,然后注册到batch变量中,这里大概贴点代码瞅瞅。
class Analyzer( catalog: SessionCatalog, conf: SQLConf, maxIterations: Int) extends RuleExecutor[LogicalPlan] with CheckAnalysis { ......其他代码 lazy val batches: Seq[Batch] = Seq( Batch("Hints", fixedPoint, new ResolveHints.ResolveBroadcastHints(conf), ResolveHints.ResolveCoalesceHints, ResolveHints.RemoveAllHints), Batch("Simple Sanity Check", Once, LookupFunctions), Batch("Substitution", fixedPoint, CTESubstitution, WindowsSubstitution, EliminateUnions, new SubstituteUnresolvedOrdinals(conf)), ......其他代码先大概说下batches这个变量吧,batches是由Batch的列表构成。而Batch的具体签名如下:
abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging { ......其他代码 protected case class Batch(name: String, strategy: Strategy, rules: Rule[TreeType]*) ......其他代码 }一个Batch由策略Strategy,和一组Rule构成,其中策略Strategy主要是区分迭代次数用的,按我的理解,某些rule可以迭代多次,越多次效果会越好,类似机器学习的学习过程。而策略Strategy会规定迭代一次还是固定次数。而rule就是具体的应用规则了,这个先略过。
在Analyzer这个类中,你会发现很大篇幅的代码都是各种各样rule的实现。然后最终,Analyzer会去调用super.execute()方法,也就是调用父类(RuleExecutor)的方法执行具体逻辑。而父类又会去调用这个batches变量,循环来与Sql Parse阶段生成的Unresolved Logical Plan做匹配,匹配到了就执行具体的验证。还是贴下代码看看吧。
abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging { def execute(plan: TreeType): TreeType = { var curPlan = plan val queryExecutionMetrics = RuleExecutor.queryExecutionMeter //遍历Analyzer中定义的batchs变量 batches.foreach { batch => val batchStartPlan = curPlan var iteration = 1 var lastPlan = curPlan var continue = true // Run until fix point (or the max number of iterations as specified in the strategy. //这里的continue决定是否再次循环,由batch的策略(固定次数或单次),以及该batch对plan的作用效果这两者控制 while (continue) { //调用foldLeft让batch中每条rule应用于plan,然后就是执行对应rule规则逻辑了 curPlan = batch.rules.foldLeft(curPlan) { case (plan, rule) => val startTime = System.nanoTime() val result = rule(plan) val runTime = System.nanoTime() - startTime if (!result.fastEquals(plan)) { queryExecutionMetrics.incNumEffectiveExecution(rule.ruleName) queryExecutionMetrics.incTimeEffectiveExecutionBy(rule.ruleName, runTime) logTrace( s""" |=== Applying Rule ${rule.ruleName} === |${sideBySide(plan.treeString, result.treeString).mkString("\n")} """.stripMargin) } queryExecutionMetrics.incExecutionTimeBy(rule.ruleName, runTime) queryExecutionMetrics.incNumExecution(rule.ruleName) // Run the structural integrity checker against the plan after each rule. if (!isPlanIntegral(result)) { val message = s"After applying rule ${rule.ruleName} in batch ${batch.name}, " + "the structural integrity of the plan is broken." throw new TreeNodeException(result, message, null) } result } iteration += 1 //策略的生效地方 if (iteration > batch.strategy.maxIterations) { // Only log if this is a rule that is supposed to run more than once. if (iteration != 2) { val message = s"Max iterations (${iteration - 1}) reached for batch ${batch.name}" if (Utils.isTesting) { throw new TreeNodeException(curPlan, message, null) } else { logWarning(message) } } continue = false } if (curPlan.fastEquals(lastPlan)) { logTrace( s"Fixed point reached for batch ${batch.name} after ${iteration - 1} iterations.") continue = false } lastPlan = curPlan } if (!batchStartPlan.fastEquals(curPlan)) { logDebug( s""" |=== Result of Batch ${batch.name} === |${sideBySide(batchStartPlan.treeString, curPlan.treeString).mkString("\n")} """.stripMargin) } else { logTrace(s"Batch ${batch.name} has no effect.") } } curPlan } }