Spark SQL源码解析(五)SparkPlan准备和执行阶段 (2)

这里实际上是调用了之前生成的SparkPlan的execute()方法,这个方法最终会再调用它的doExecute()方法,而这个方法是各个子类自己实现的,也就是说,不同的SparkPlan执行的doExecute()是不一样的。

通过上面的阶段,我们得到了一棵4层的树,不过其中WholeStageCodegenExec和InputAdapter是为Codegen优化生成的,这里就不讨论了,忽略这两个其实结果是一样的。也就是说这里只介绍ProjectExec和LocalTableScanExec两个SparkPlan的doExecute()方法。

先是ProjectExec这个SparkPlan,我们看看它的doExecute()代码。

case class ProjectExec(projectList: Seq[NamedExpression], child: SparkPlan) extends UnaryExecNode with CodegenSupport { ......其他代码 protected override def doExecute(): RDD[InternalRow] = { child.execute().mapPartitionsWithIndexInternal { (index, iter) => val project = UnsafeProjection.create(projectList, child.output, subexpressionEliminationEnabled) project.initialize(index) iter.map(project) } } ......其他代码 }

可以看到它是先递归去调用child(也就是LocalTableScanExec)的doExecute()方法,还是得先去看看LocalTableScanExec生成什么东西呀。

case class LocalTableScanExec( output: Seq[Attribute], @transient rows: Seq[InternalRow]) extends LeafExecNode { ......其他代码 private lazy val rdd = sqlContext.sparkContext.parallelize(unsafeRows, numParallelism) protected override def doExecute(): RDD[InternalRow] = { val numOutputRows = longMetric("numOutputRows") rdd.map { r => numOutputRows += 1 r } } ......其他代码

可以看到最底层的rdd就是在这里实现的,LocalTableScanExec一开始就会生成一个lazy的rdd,在需要的时候返回。而在doExecute()方法中的numOutputRows可以理解为仅是一个测量值,暂时不用理会。总之这里我们就发现LocalTableScanExec的doExecute()其实就是返回一个parallelize生成的rdd。然后再回到ProjectExec去。

它调用child.execute().mapPartitionsWithIndexInternal ,这里的mapPartitionsWithIndexInternal和rdd的mapPartitionsWithIndex是类似的,区别只在于mapPartitionsWithIndexInternal只会在内部模块使用,如果有童鞋不明白mapPartitionsWithIndex这个API,可以百度查查看。然后重点看mapPartitionsWithIndexInternal的内部逻辑。

child.execute().mapPartitionsWithIndexInternal { (index, iter) => val project = UnsafeProjection.create(projectList, child.output, subexpressionEliminationEnabled) project.initialize(index) iter.map(project) }

这里最后一行iter.map(project),其实还是scala的语法糖,实际大概是这样iter.map(i ⇒ project.apply(i))。就是调用project的apply方法,对每行数据处理。然后通过追踪,可以发现project的实例是InterpretedUnsafeProjection,我们看看它的apply方法。

class InterpretedUnsafeProjection(expressions: Array[Expression]) extends UnsafeProjection { ......其他代码 override def apply(row: InternalRow): UnsafeRow = { // Put the expression results in the intermediate row. var i = 0 while (i < numFields) { values(i) = expressions(i).eval(row) i += 1 } // Write the intermediate row to an unsafe row. rowWriter.reset() writer(intermediate) rowWriter.getRow() } ......其他代码

这里其实重点在最后三行,就是将结果写入到result row,再返回回去。当执行完毕的时候,就会得到最终的RDD[InternalRow],再剩下的,就交给spark core去处理了。

小结

OK,那到这里基本就把Spark整个流程给讲完了,回顾一下整个流程。

catalyst流程

其实可以发现流程是挺简单的,很多其他SQL解析框架(比如calcite)也是类似的流程,只是在设计上在某些方面的取舍会有偏差。而后深入到代码的时候容易陷入一些细节中,当然这几篇也省略了很多细节,很多时候细节才是真正精髓的地方,以后有如果涉及到的时候再写文章讨论吧(/偷笑)。如果在开放过程中涉及到SQL解析这方面的开放,应该都会是在优化方面,也就是Optimization阶段增加或处理Rule,这块就需要对代数优化理论和代码有一些了解了。

限于本人水平,介绍spark sql的这几篇文章难免有疏漏和不足的地方,欢迎在评论区评论,先谢过了~~

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpxjsg.html