其实这个类的逻辑不难懂,就是遍历batchs变量,而每个batch又会去使用scala的foldLeft函数,遍历应用里面的每条rule。然后根据Batch的策略以及将新生成的Plan与旧的Plan比较,决定是否要再次遍历。然后最后将新生成的Plan输出。
如果不清楚scala的foldLeft函数内容,可以百度下看看,不难懂的。然后跟RuleExecutor有关的基本都是这个套路,区别只在于rule的不同。
接下来我们来看看具体是如果应用一条rule,将Unresolved LogicalPlan转换成Resolved LogicalPlan吧。
Rule介绍前面说到,在Analyzer中重写了Batchs变量,Batchs包含多个Batch,每个Batch又有多个Rule,所以不可能全部看过来,庆幸的是,要了解Unresolved LogicalPlan转换成Resolved LogicalPlan,只需要看一个Rule就行,那就是ResolveRelations这个Rule,我们就只介绍这个Rule来管中窥豹。
各自Rule基本都是object类型,也就是静态的,且继承自Rule这个抽象类,Rule很简单,就一个ruleName变量喝一个apply方法用以实现逻辑,然后就没了。所以重点还是在继承后的实现逻辑。
前面提到,从Unresolved到Resolved的过程,可以理解为就是将SQL语句中的类型和字段,映射到实体表中的字段信息。而存储实体表元数据信息的,是Catalog,到具体的类,是org.apache.spark.sql.catalyst.catalog.SessionCatalog。
我们来看看具体的逻辑代码:
object ResolveRelations extends Rule[LogicalPlan] { ......其他代码 def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _) if child.resolved => EliminateSubqueryAliases(lookupTableFromCatalog(u)) match { case v: View => u.failAnalysis(s"Inserting into a view is not allowed. View: ${v.desc.identifier}.") case other => i.copy(table = other) } case u: UnresolvedRelation => resolveRelation(u) } ......其他代码 }逻辑其实也蛮简单的,就是匹配UnresolvedRelation(就是Unresolved的节点),然后递归去Catlog中获取对应的元数据信息,递归将它及子节点变成Resoulved。不过还有个要提的是,SQL中对应的,有可能是文件数据,或是数据库中的表,抑或是视图(view),针对文件数据是不会转换的,转换成Resolved会在后面进行。而表和视图则会立即转换。
最后,接上一篇的例子,接着来看看,经过Analysis阶段后,LogicalPlan变成什么样吧,上一篇SQL parse使用的示例代码:
//生成DataFrame val df = Seq((1, 1)).toDF("key", "value") df.createOrReplaceTempView("src") //调用spark.sql val queryCaseWhen = sql("select key from src ")经过上次介绍的SQL parse后是变成这样:
'Project ['key] +- 'UnresolvedRelation `src`这里的涵义上篇已介绍,不再赘述,而经过本次的Analysis后,则会变成这样
Project [key#5] +- SubqueryAlias `src` +- Project [_1#2 AS key#5, _2#3 AS value#6] +- LocalRelation [_1#2, _2#3]可以发现,主要就是对UnresolvedRelation进行展开,现在我们可以发现src有两个字段,分别是key和value及其对应的别名(1#2,2#3)。这里还有一个SubqueryAlias,这个我也不是很明白,按源码里面的说法,这里的subquery仅用来提供属性的作用域信息,Analysis阶段过后就就可以将其删除,所以在Optimization阶段后会发现SubqueryAlias消失了。
小结OK,那今天就先介绍到这里吧,主要综述了Analysis的内容,然后介绍RuleExecution的逻辑,最后简单看了个Rule的具体内容以及承接SQL parse阶段的例子。有兴趣的童鞋可以自己去顺着思路翻源码看看。