Spark SQL源码解析(四)Optimization和Physical Planning阶段解析

Spark SQL原理解析前言:

Spark SQL源码剖析(一)SQL解析框架Catalyst流程概述

Spark SQL源码解析(二)Antlr4解析Sql并生成树

Spark SQL源码解析(三)Analysis阶段分析

前面已经介绍了SQL parse,将一条SQL语句使用antlr4解析成语法树并使用访问者模式生成Unresolved LogicalPlan,然后是Analysis阶段将Unresolved LogicalPlan转换成Resolved LogicalPlan。这一篇我们介绍Optimization阶段,和生成Physical Planning阶段。

经过这两个阶段后,就差不多要到最后转换成Spark的RDD任务了。

Spark SQL Optimization阶段概述

先来看看Logical Optimization阶段。

上一篇我们讨论了Analysis阶段如何生成一个真正的Logical Plan树。这一阶段听名字就知道是优化阶段,Spark SQL中有两个部分的优化,第一部分就是这里,是rule-base阶段的优化,就是根据各种关系代数的优化规则,对生成的Logical Plan适配,匹配到就进行相应的优化逻辑。这些规则大概有:投影消除,constant folding,替换null值,布尔表达式简化等等。当然大部分规则细节我也不是很清楚,仅仅能从名字推断一二。这

同时还可以添加自己的优化rule,也比较容易实现,论文中就给出了一段自定义优化rule的代码:

object DecimalAggregates extends Rule[LogicalPlan] { /** Maximum number of decimal digits in a Long */ val MAX_LONG_DIGITS = 18 def apply(plan: LogicalPlan): LogicalPlan = { plan transformAllExpressions { case Sum(e @ DecimalType.Expression(prec , scale)) if prec + 10 <= MAX_LONG_DIGITS => MakeDecimal(Sum(UnscaledValue(e)), prec + 10, scale) } }

这段代码的大意是自定义了一个rule,如果匹配到SUM的表达式,那就执行相应的逻辑,论文里描述这里是找到对应的小数并将其转换为未缩放的64位LONG。具体逻辑看不是很明白不过不重要,重要的是编写自己的优化rule很方便就是。

顺便点一下另一种优化,名字叫做cost-base优化(CBO),是发生在Physical Planning阶段的,这里就先卖个关子,后面说到的时候再讨论吧。

然后看到源码的时候,会发现Optimizer这个类也是继承自RuleExecutor,继承这个类之后的流程基本都是一样的。前面分析Analysis阶段的时候已经有详细介绍过这个流程,这里就不展开说了。

其实这优化器的重点应该是各种优化规则,这里我觉得更多的是设计到关系代数表达式优化理论方面的知识,这部分我也不甚精通,所以也就不说了。对这块感兴趣的童鞋可以看看网上别人的文章,这里顺便列几个可能有帮助的博客,

sparksql 中外连接查询中的谓词下推处理

数据库查询优化入门: 代数与物理优化基础

「 数据库原理 」查询优化(关系代数表达式优化)

下面还是来看看最开始的例子进行Optimization阶段后会变成什么样吧,先看看之前的示例代码:

val df = Seq((1, 1)).toDF("key", "value") df.createOrReplaceTempView("src") val queryCaseWhen = sql("select key from src ")

然后在Optimization优化阶段后,变成了:

Project [_1#2 AS key#5] +- LocalRelation [_1#2, _2#3]

好吧,看起来没什么变化,与Analysis阶段相比,也就少了个SubqueryAlias ,符合预期。不过也对,就一条SELECT语句能优化到哪去啊。

Physical Planning生成阶段概述

相比较于Logical Plan,Physical plan算是Spark可以去执行的东西了,当然本质上它也是一棵树。

前面说到,Spark有一种cost-based的优化。主要就在这一阶段,在这一阶段,会生成一个或多个Physical Plan,然后使用cost model预估各个Physical Plan的处理性能,最后选择一个最优的Physical Plan。这里最主要优化的是join操作,当触发join操作的时候,会根据左右两边的数据集判断,然后决定使用Broadcast join,还是传统的Hash join,抑或是MergeSort join,有关这几种join的区别这里就不详细解释了,有兴趣童鞋可以百度看看。

除了cost-based优化,这一阶段也依旧会有rule-based优化,所以说RuleExecutor这个类是很重要的,前面提到的Analysis阶段也好,Optimization阶段也好,包括这里的Physical Plan阶段,只要是涉及到rule-based优化,都会跟RuleExecutor这个类扯上关系。当然这样无疑是极大使用了面向对象的特性,不同的阶段编写不同的rule就行,一次编写,到处复用。

Physical Planning源码分析

首先是在QueryExecution中调度,

class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) { ......其他代码 lazy val sparkPlan: SparkPlan = { SparkSession.setActiveSession(sparkSession) // TODO: We use next(), i.e. take the first plan returned by the planner, here for now, // but we will implement to choose the best plan. planner.plan(ReturnAnswer(optimizedPlan)).next() } ......其他代码 }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpxwsf.html