一个SparkSQL作业的一生

Spark是时下很火的计算框架,由UC Berkeley AMP Lab研发,并由原班人马创建的Databricks负责商业化相关事务。而SparkSQL则是Spark之上搭建的SQL解决方案,主打交互查询场景。

人人都说Spark/SparkSQL快,各种Benchmark满天飞,但是到底Spark/SparkSQL快么,或者快在哪里,似乎很少 有人说得清。因为Spark是基于内存的计算框架?因为SparkSQL有强大的优化器?本文将带你看一看一个SparkSQL作业到底是如何执行的,顺 便探讨一下SparkSQL和Hive On MapReduce比起来到底有何其别。

SQL On Hadoop的解决方案已经玲琅满目了,不管是元祖级的Hive,Cloudera的Impala,MapR的 Drill,Presto,SparkSQL甚至Apache Tajo,IBM BigSQL等等,各家公司都试图解决SQL交互场景的性能问题,因为原本的Hive On MapReduce实在太慢了。

那么Hive On MapReduce和SparkSQL或者其他交互引擎相比,慢在何处呢?让我们先看看一个SQL On Hadoop引擎到底如何工作的。

现在的SQL On Hadoop作业,前半段的工作原理都差不多,类似一个Compiler,分来分去都是这基层。

小红是数据分析,她某天写了个SQL来统计一个分院系的加权均值分数汇总。

SELECT dept, avg(math_score * 1.2) + avg(eng_score * 0.8) FROM studentsGROUP BY dept;

其中STUDENTS表是学生分数表(请不要在意这个表似乎不符合范式,很多Hadoop上的数据都不符合范式,因为Join成本高,而且我写表 介绍也会很麻烦)。她通过网易大数据的猛犸系统提交了这个查询到某个SQL On Hadoop平台执行,然后她放下工作,切到视频网页看一会《琅琊榜》。

在她看视频的时候,我们的SQL平台可是有很努力的工作滴。

首先是查询解析。

这里和很多Compiler类似,你需要一个Parser(就是著名的程序员约架专用项目),Parser(确切说是Lexer加 Parser)的作用是把一个字符串流变成一个一个Token,再根据语法定义生成一棵抽象语法树AST。这里不详细展开,童鞋们可以参考编译原理。比较 多的项目会选ANTLR(Hive啦,Presto啦等等),你可以用类似BNF的范式来写Parser规则,当然也有手写的比如SparkSQL。 AST会进一步包装成一个简单的基本查询信息对象,这个对象包含了一个查询基本的信息,比如基本语句的类型是SELECT还是INSERT,WHERE是 什么,GROUP BY是什么,如果有子查询,还需要递归进去,这个东西大致来说就是所谓的逻辑计划。

TableScan(students)

-> Project(dept, avg(math_score * 1.2) + avg(eng_score * 0.8))

->TableSink

上面是无责任示意,具体到某个SQL引擎会略有不同,但是基本上都会这么干。如果你想找一个代码干净易懂的SQL引擎,可以参考Presto(可以算我读过的开源代码写的最漂亮的了)。

到上面为止,你已经把字符串转换成一个所谓的LogicalPlan,这个Plan距离可以求值来说还比较残疾。最基本来说,我还不知道dept 是个啥吧,math_score是神马类型,AVG是个什么函数,这些都不明了。这样的LogicalPlan可以称为Unresolved(残疾 的)Logical Plan。

缺少的是所谓的元数据信息,这里主要包含两部分:表的Schema和函数信息。表的Schema信息主要包含表的列定义(名字,类型),表的物理位置,格式,如何读取;函数信息是函数签名,类的位置等。

有了这些,SQL引擎需要再一次遍历刚才的残废计划,进行一次深入的解析。最重要的处理是列引用绑定和函数绑定。列引用绑定决定了一个表达式的 类型。而有了类型你可以做函数绑定。函数绑定几乎是这里最关键的步骤,因为普通函数比如CAST,和聚合函数比如这里的AVG,分析函数比如Rank以及 Table Function比如explode都会用完全不同的方式求值,他们会被改写成独立的计划节点,而不再是普通的Expression节点。除此之外,还需 要进行深入的语义检测。比如GROUP BY是否囊括了所有的非聚合列,聚合函数是否内嵌了聚合函数,以及最基本的类型兼容检查,对于强类型的系统,类型不一致比如date = ‘2015-01-01’需要报错,对于弱类型的系统,你可以添加CAST来做Type(类型) Coerce(苟合)。

然后我们得到了一个尚未优化的逻辑计划:

TableScan(students=>dept:String, eng_score:double, math_score:double)

->Project(dept, math_score * 1.2:expr1, eng_score * 0.8:expr2)

->Aggregate(avg(expr1):expr3, avg(expr2):expr4, GROUP:dept)

->Project(dept, expr3+expr4:avg_result)

->TableSink(dept, avg_result->Client)

所以我们可以开始上肉戏了?还早呢。刚才的计划,还差得很远,作为一个SQL引擎,没有优化怎么好见人?不管是SparkSQL还是Hive,都 有一套优化器。大多数SQL on Hadoop引擎都有基于规则的优化,少数复杂的引擎比如Hive,拥有基于代价的优化。规则优化很容易实现,比如经典的谓词下推,可以把Join查询的 过滤条件推送到子查询预先计算,这样JOIN时需要计算的数据就会减少(JOIN是最重的几个操作之一,能用越少的数据做JOIN就会越快),又比如一些 求值优化,像去掉求值结果为常量的表达式等等。基于代价的优化就复杂多了,比如根据JOIN代价来调整JOIN顺序(最经典的场景),对SparkSQL 来说,代价优化是最简单的根据表大小来选择JOIN策略(小表可以用广播分发),而没有JOIN顺序交换这些,而JOIN策略选择则是在随后要解释的物理 执行计划生成阶段。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/011d8eff370e89876d6504c77804f926.html