到这里,如果还没报错,那你就幸运滴得到了一个Resolved(不残废的)Logical Plan了。这个Plan,再配上表达式求值器,你也可以折腾折腾在单机对表查询求值了。但是,我们不是做分布式系统的么?数据分析妹子已经看完《琅琊 榜》的片头了,你还在悠闲什么呢?
为了让妹子在看完电视剧之前算完几百G的数据,我们必须借助分布式的威力,毕竟单节点算的话够妹子看完整个琅琊榜剧集了。刚才生成的逻辑计划,之 所以称为逻辑计划,是因为它只是逻辑上看起来似乎能执行了(误),实际上我们并不知道具体这个东西怎么对应Spark或者MapReduce任务。
逻辑执行计划接下来需要转换成具体可以在分布式情况下执行的物理计划,你还缺少:怎么和引擎对接,怎么做表达式求值两个部分。
表达式求值有两种基本策略,一个是解释执行,直接把之前带来的表达式进行解释执行,这个是Hive现在的模式;另一个是代码生成,包括 SparkSQL,Impala,Drill等等号称新一代的引擎都是代码生成模式的(并且配合高速编译器)。不管是什么模式,你最终把表达式求值部分封 装成了类。代码可能长得类似如下:
// math_score * 1.2
val leftOp = row.get(1/* math_score column index */);
val result = if (leftOp == null) then null else leftOp * 1.2;
每个独立的SELECT项目都会生成这样一段表达式求值代码或者封装过的求值器。但是AVG怎么办?当初写wordcount的时候,我记得聚合计算需要分派在Map和Reduce两个阶段呀?这里就涉及到物理执行转换,涉及到分布式引擎的对接。
AVG这样的聚合计算,加上GROUP BY的指示,告诉了底层的分布式引擎你需要怎么做聚合。本质上来说AVG聚合需要拆分成Map阶段来计算累加,还有条目个数,以及Reduce阶段二次累加最后每个组做除法。
因此我们要算的AVG其实会进一步拆分成两个计划节点:Aggregates(Partial)和Aggregates(Final)。 Partial部分是我们计算局部累加的部分,每个Mapper节点都将执行,然后底层引擎会做一个Shuffle,将相同Key(在这里是Dept)的 行分发到相同的Reduce节点。这样经过最终聚合你才能拿到最后结果。
拆完聚合函数,如果只是上面案例给的一步SQL,那事情比较简单,如果还有多个子查询,那么你可能面临多次Shuffle,对于 MapReduce来说,每次Shuffle你需要一个MapReduce Job来支撑,因为MapReduce模型中,只有通过Reduce阶段才能做Shuffle操作,而对于Spark来说,Shuffle可以随意摆放, 不过你要根据Shuffle来拆分Stage。这样拆过之后,你得到一个多个MR Job串起来的DAG或者一个Spark多个Stage的DAG(有向无环图)。
还记得刚才的执行计划么?它最后变成了这样的物理执行计划:
TableScan->Project(dept, math_score * 1.2: expr1, eng_score * 0.8: expr2)
-> AggretatePartial(avg(expr1):avg1, avg(expr2):avg2, GROUP: dept)
-> ShuffleExchange(Row, KEY:dept)
-> AggregateFinal(avg1, avg2, GROUP:dept)
-> Project(dept, avg1 + avg2)
-> TableSink
这东西到底怎么在MR或者Spark中执行啊?对应Shuffle之前和之后,物理上它们将在不同批次的计算节点上执行。不管对应 MapReduce引擎还是Spark,它们分别是Mapper和Reducer,中间隔了Shuffle。上面的计划,会由 ShuffleExchange中间断开,分别发送到Mapper和Reducer中执行,当然除了上面的部分还有之前提到的求值类,也都会一起序列化发 送。
实际在MapReduce模型中,你最终执行的是一个特殊的Mapper和特殊的Reducer,它们分别在初始化阶段载入被序列化的Plan和求值器信息,然后在map和reduce函数中依次对每个输入求值;而在Spark中,你生成的是一个一个RDD变换操作。
比如一个Project操作,对于MapReduce来说,伪代码大概是这样的:
void configuration() {
context = loadContext()
}
void map(inputRow) {
outputRow = context.projectEvaluator (inputRow);
write(outputRow);
}
对于Spark,大概就是这样:
currentPlan.mapPartitions { iter =>
projection = loadContext()
iter.map { row => projection(row) } }
至此为止,引擎帮你愉快滴提交了Job,你的集群开始不紧不慢地计算了。