原创文章,转载请注明: 转载自慢慢的回味
本文链接地址: Spark Plan
The article will list the class hierarchy for Spark Plan and describe a core Plan which is WholeStageCodegenExec.
Spark Plan sub classes hierarchy
SparkPlan OutputFakerExec EventTimeWatermarkExec CodegenSupport DeserializeToObjectExec GenerateExec HashAggregateExec RangeExec ExpandExec BaseLimitExec GlobalLimitExec LocalLimitExec DataSourceScanExec FileSourceScanExec RowDataSourceScanExec WholeStageCodegenExec SortExec ProjectExec BroadcastHashJoinExec InputAdapter DebugExec in package$ SampleExec SerializeFromObjectExec SortMergeJoinExec MapElementsExec FilterExec StatefulOperator StateStoreSaveExec StateStoreRestoreExec UnionExec ExecutedCommandExec UnaryExecNode DeserializeToObjectExec SortAggregateExec GenerateExec CollectLimitExec FlatMapGroupsInRExec HashAggregateExec ExpandExec BaseLimitExec GlobalLimitExec LocalLimitExec Exchange ShuffleExchange BroadcastExchangeExec WholeStageCodegenExec AppendColumnsExec SortExec MapGroupsExec ProjectExec ObjectConsumerExec SerializeFromObjectExec MapPartitionsExec AppendColumnsWithObjectExec MapElementsExec InsertIntoHiveTable DebugExec in package$ InputAdapter SampleExec CoalesceExec ExceptionInjectingOperator ObjectHashAggregateExec ReferenceSort ScriptTransformation StateStoreSaveExec WindowExec SubqueryExec TakeOrderedAndProjectExec FilterExec StateStoreRestoreExec BinaryExecNode BroadcastNestedLoopJoinExec CoGroupExec ShuffledHashJoinExec CartesianProductExec BroadcastHashJoinExec SortMergeJoinExec BatchEvalPythonExec FastOperator LeafExecNode ReusedExchangeExec InMemoryTableScanExec PlanLater RangeExec LocalTableScanExec DataSourceScanExec FileSourceScanExec RowDataSourceScanExec RDDScanExec MyPlan ExternalRDDScanExec HiveTableScanExec StreamingRelationExec DummySparkPlan ObjectProducerExec DeserializeToObjectExec FlatMapGroupsInRExec CoGroupExec MapGroupsExec ExternalRDDScanExec MapPartitionsExec MapElementsExec |
CodegenSupport
WholeStageCodegenExec
Code in QueryExecution.scala.
/** * Prepares a planned [[SparkPlan]] for execution by inserting shuffle operations and internal * row format conversions as needed. */ protected def prepareForExecution(plan: SparkPlan): SparkPlan = { preparations.foldLeft(plan) { case (sp, rule) => rule.apply(sp) } } |
When preparing for execution, the rule CollapseCodegenStages will find the chained plans that support codegen,
and then collapse them together as WholeStageCodegen.
Code in WholeStageCodegen.scala.
/** * Find the chained plans that support codegen, collapse them together as WholeStageCodegen. */ case class CollapseCodegenStages(conf: SQLConf) extends Rule[SparkPlan] { private def supportCodegen(e: Expression): Boolean = e match { case e: LeafExpression => true // CodegenFallback requires the input to be an InternalRow case e: CodegenFallback => false case _ => true } private def numOfNestedFields(dataType: DataType): Int = dataType match { case dt: StructType => dt.fields.map(f => numOfNestedFields(f.dataType)).sum case m: MapType => numOfNestedFields(m.keyType) + numOfNestedFields(m.valueType) case a: ArrayType => numOfNestedFields(a.elementType) case u: UserDefinedType[_] => numOfNestedFields(u.sqlType) case _ => 1 } private def supportCodegen(plan: SparkPlan): Boolean = plan match { case plan: CodegenSupport if plan.supportCodegen => val willFallback = plan.expressions.exists(_.find(e => !supportCodegen(e)).isDefined) // the generated code will be huge if there are too many columns val hasTooManyOutputFields = numOfNestedFields(plan.schema) > conf.wholeStageMaxNumFields val hasTooManyInputFields = plan.children.map(p => numOfNestedFields(p.schema)).exists(_ > conf.wholeStageMaxNumFields) !willFallback && !hasTooManyOutputFields && !hasTooManyInputFields case _ => false } /** * Inserts an InputAdapter on top of those that do not support codegen. */ private def insertInputAdapter(plan: SparkPlan): SparkPlan = plan match { case j @ SortMergeJoinExec(_, _, _, _, left, right) if j.supportCodegen => // The children of SortMergeJoin should do codegen separately. j.copy(left = InputAdapter(insertWholeStageCodegen(left)), right = InputAdapter(insertWholeStageCodegen(right))) case p if !supportCodegen(p) => //Branch 4 // collapse them recursively InputAdapter(insertWholeStageCodegen(p)) case p => p.withNewChildren(p.children.map(insertInputAdapter)) } /** * Inserts a WholeStageCodegen on top of those that support codegen. */ private def insertWholeStageCodegen(plan: SparkPlan): SparkPlan = plan match { // For operators that will output domain object, do not insert WholeStageCodegen for it as // domain object can not be written into unsafe row. case plan if plan.output.length == 1 && plan.output.head.dataType.isInstanceOf[ObjectType] => //Branch 1 plan.withNewChildren(plan.children.map(insertWholeStageCodegen)) case plan: CodegenSupport if supportCodegen(plan) => //Branch 2 WholeStageCodegenExec(insertInputAdapter(plan)) case other => //Branch 3 other.withNewChildren(other.children.map(insertWholeStageCodegen)) } def apply(plan: SparkPlan): SparkPlan = { if (conf.wholeStageEnabled) { insertWholeStageCodegen(plan) } else { plan } } } |
Before the rule CollapseCodegenStages apply:
BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true])) +- *Project [_2#224 AS str#233] +- *Filter isnotnull(_2#224) +- LocalTableScan [_1#223, _2#224] |
After the rule CollapseCodegenStages apply:
BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true])) +- WholeStageCodegenExec //Mark A +- *Project [_2#224 AS str#233] +- *Filter isnotnull(_2#224) //Mark B +- InputAdapter //Mark C +- LocalTableScan [_1#223, _2#224] |
For Mark A, the ProjectExec support CodegenSupport, so add WholeStageCodegenExec as parent for it. See Branch 2.
For Mark B, the FilterExec support CodegenSupport, but the output satisfy Branch 1, so don’t add WholeStageCodegenExec for it.
For Mark C, the LocalTableScanExec don’t support CodegenSupport, so add an InputAdapter as parent for it. See Branch 3 and Branch 4.
Code WholeStageCodegenExec.scala
PS: The Spark Plan can also support execute directly if don’t support generate and compile source code.
override def doExecute(): RDD[InternalRow] = { val (ctx, cleanedSource) = doCodeGen() // try to compile and fallback if it failed try { CodeGenerator.compile(cleanedSource) } catch { case e: Exception if !Utils.isTesting && sqlContext.conf.wholeStageFallback => // We should already saw the error message logWarning(s"Whole-stage codegen disabled for this plan:\n $treeString") return child.execute() } val references = ctx.references.toArray val durationMs = longMetric("pipelineTime") val rdds = child.asInstanceOf[CodegenSupport].inputRDDs() assert(rdds.size <= 2, "Up to two input RDDs can be supported") if (rdds.length == 1) { ...... |
Why need an InputAdapter?
When doExecute at WholeStageCodegenExec, the compiled Java code need get RDDs from child(“val rdds = child.asInstanceOf[CodegenSupport].inputRDDs()”),
but LocalTableScanExec don’t have the method “inputRDDs” as it don’t support CodegenSupport.
The wrap an InputAdapter on it as it have method “inputRDDs”.
/** * InputAdapter is used to hide a SparkPlan from a subtree that support codegen. * * This is the leaf node of a tree with WholeStageCodegen that is used to generate code * that consumes an RDD iterator of InternalRow. */ case class InputAdapter(child: SparkPlan) extends UnaryExecNode with CodegenSupport { override def output: Seq[Attribute] = child.output ...... override def inputRDDs(): Seq[RDD[InternalRow]] = { child.execute() :: Nil } ...... |
At last, the invoke graph is:
* Here is the call graph of to generate Java source (plan A support codegen, but plan B does not): * * WholeStageCodegen Plan A FakeInput Plan B * Project [_2#224 AS str#233] InputAdapter LocalTableScan [_1#223, _2#224] * ============================================================================================== * * -> execute() * | * doExecute() ---------> inputRDDs() ---------> inputRDDs() ------> execute() * | * +-----------------> produce() * | * doProduce() ---------> produce() * | * doProduce() * | * doConsume() <---------- consume() * | * doConsume() <-------- consume() |
The InputAdapter will end the produce flow and start consume back to parent Plan.
The corresponding Java code generated is:
/* 001 */ public Object generate(Object[] references) { /* 002 */ return new GeneratedIterator(references); /* 003 */ } /* 004 */ /* 005 */ /** * Codegend pipeline for * Project [_2#224 AS str#233] * +- Filter isnotnull(_2#224) * +- LocalTableScan [_1#223, _2#224] */ /* 006 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator { /* 007 */ private Object[] references; /* 008 */ private scala.collection.Iterator[] inputs; /* 009 */ private scala.collection.Iterator inputadapter_input; /* 010 */ private org.apache.spark.sql.execution.metric.SQLMetric filter_numOutputRows; /* 011 */ private UnsafeRow filter_result; /* 012 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder filter_holder; /* 013 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter filter_rowWriter; /* 014 */ private UnsafeRow project_result; /* 015 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder project_holder; /* 016 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter project_rowWriter; /* 017 */ /* 018 */ public GeneratedIterator(Object[] references) { /* 019 */ this.references = references; /* 020 */ } /* 021 */ /* 022 */ public void init(int index, scala.collection.Iterator[] inputs) { /* 023 */ partitionIndex = index; /* 024 */ this.inputs = inputs; /* 025 */ inputadapter_input = inputs[0]; /* 026 */ this.filter_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[0]; /* 027 */ filter_result = new UnsafeRow(2); /* 028 */ this.filter_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(filter_result, 32); /* 029 */ this.filter_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(filter_holder, 2); /* 030 */ project_result = new UnsafeRow(1); /* 031 */ this.project_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(project_result, 32); /* 032 */ this.project_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(project_holder, 1); /* 033 */ /* 034 */ } /* 035 */ /* 036 */ protected void processNext() throws java.io.IOException { /* 037 */ // PRODUCE: Project [_2#224 AS str#233] /* 038 */ // PRODUCE: Filter isnotnull(_2#224) /* 039 */ // PRODUCE: InputAdapter /* 040 */ while (inputadapter_input.hasNext()) { /* 041 */ InternalRow inputadapter_row = (InternalRow) inputadapter_input.next(); /* 042 */ // CONSUME: Filter isnotnull(_2#224) /* 043 */ // input[1, string, true] /* 044 */ boolean inputadapter_isNull1 = inputadapter_row.isNullAt(1); /* 045 */ UTF8String inputadapter_value1 = inputadapter_isNull1 ? null : (inputadapter_row.getUTF8String(1)); /* 046 */ /* 047 */ if (!(!(inputadapter_isNull1))) continue; /* 048 */ /* 049 */ filter_numOutputRows.add(1); /* 050 */ /* 051 */ // CONSUME: Project [_2#224 AS str#233] /* 052 */ // CONSUME: WholeStageCodegen /* 053 */ project_holder.reset(); /* 054 */ /* 055 */ project_rowWriter.write(0, inputadapter_value1); /* 056 */ project_result.setTotalSize(project_holder.totalSize()); /* 057 */ append(project_result); /* 058 */ if (shouldStop()) return; /* 059 */ } /* 060 */ } /* 061 */ } |
本作品采用知识共享署名 4.0 国际许可协议进行许可。