Spark Plan

原创文章,转载请注明: 转载自慢慢的回味

本文链接地址: Spark Plan

The article will list the class hierarchy for Spark Plan and describe a core Plan which is WholeStageCodegenExec.

Spark Plan sub classes hierarchy
SparkPlan 
    OutputFakerExec 
    EventTimeWatermarkExec 
    CodegenSupport 
        DeserializeToObjectExec 
        GenerateExec 
        HashAggregateExec 
        RangeExec 
        ExpandExec 
        BaseLimitExec 
            GlobalLimitExec 
            LocalLimitExec 
        DataSourceScanExec 
            FileSourceScanExec 
            RowDataSourceScanExec 
        WholeStageCodegenExec 
        SortExec 
        ProjectExec 
        BroadcastHashJoinExec 
        InputAdapter 
        DebugExec in package$ 
        SampleExec 
        SerializeFromObjectExec 
        SortMergeJoinExec 
        MapElementsExec 
        FilterExec 
    StatefulOperator 
        StateStoreSaveExec 
        StateStoreRestoreExec 
    UnionExec 
    ExecutedCommandExec 
    UnaryExecNode 
        DeserializeToObjectExec 
        SortAggregateExec 
        GenerateExec 
        CollectLimitExec 
        FlatMapGroupsInRExec 
        HashAggregateExec 
        ExpandExec 
        BaseLimitExec 
            GlobalLimitExec 
            LocalLimitExec 
        Exchange 
            ShuffleExchange 
            BroadcastExchangeExec 
        WholeStageCodegenExec 
        AppendColumnsExec 
        SortExec 
        MapGroupsExec 
        ProjectExec 
        ObjectConsumerExec 
            SerializeFromObjectExec 
            MapPartitionsExec 
            AppendColumnsWithObjectExec 
            MapElementsExec 
        InsertIntoHiveTable 
        DebugExec in package$ 
        InputAdapter 
        SampleExec 
        CoalesceExec 
        ExceptionInjectingOperator 
        ObjectHashAggregateExec 
        ReferenceSort 
        ScriptTransformation 
        StateStoreSaveExec 
        WindowExec 
        SubqueryExec 
        TakeOrderedAndProjectExec 
        FilterExec 
        StateStoreRestoreExec 
    BinaryExecNode 
        BroadcastNestedLoopJoinExec 
        CoGroupExec 
        ShuffledHashJoinExec 
        CartesianProductExec 
        BroadcastHashJoinExec 
        SortMergeJoinExec 
    BatchEvalPythonExec 
    FastOperator 
    LeafExecNode 
        ReusedExchangeExec 
        InMemoryTableScanExec 
        PlanLater 
        RangeExec 
        LocalTableScanExec 
        DataSourceScanExec 
            FileSourceScanExec 
            RowDataSourceScanExec 
        RDDScanExec 
        MyPlan 
        ExternalRDDScanExec 
        HiveTableScanExec 
        StreamingRelationExec 
    DummySparkPlan 
    ObjectProducerExec 
        DeserializeToObjectExec 
        FlatMapGroupsInRExec 
        CoGroupExec 
        MapGroupsExec 
        ExternalRDDScanExec 
        MapPartitionsExec 
        MapElementsExec

CodegenSupport
WholeStageCodegenExec

Code in QueryExecution.scala.

  /**
   * Prepares a planned [[SparkPlan]] for execution by inserting shuffle operations and internal
   * row format conversions as needed.
   */
  protected def prepareForExecution(plan: SparkPlan): SparkPlan = {
    preparations.foldLeft(plan) { case (sp, rule) => rule.apply(sp) }
  }

When preparing for execution, the rule CollapseCodegenStages will find the chained plans that support codegen,
and then collapse them together as WholeStageCodegen.
Code in WholeStageCodegen.scala.

/**
 * Find the chained plans that support codegen, collapse them together as WholeStageCodegen.
 */
case class CollapseCodegenStages(conf: SQLConf) extends Rule[SparkPlan] {
 
  private def supportCodegen(e: Expression): Boolean = e match {
    case e: LeafExpression => true
    // CodegenFallback requires the input to be an InternalRow
    case e: CodegenFallback => false
    case _ => true
  }
 
  private def numOfNestedFields(dataType: DataType): Int = dataType match {
    case dt: StructType => dt.fields.map(f => numOfNestedFields(f.dataType)).sum
    case m: MapType => numOfNestedFields(m.keyType) + numOfNestedFields(m.valueType)
    case a: ArrayType => numOfNestedFields(a.elementType)
    case u: UserDefinedType[_] => numOfNestedFields(u.sqlType)
    case _ => 1
  }
 
  private def supportCodegen(plan: SparkPlan): Boolean = plan match {
    case plan: CodegenSupport if plan.supportCodegen =>
      val willFallback = plan.expressions.exists(_.find(e => !supportCodegen(e)).isDefined)
      // the generated code will be huge if there are too many columns
      val hasTooManyOutputFields =
        numOfNestedFields(plan.schema) > conf.wholeStageMaxNumFields
      val hasTooManyInputFields =
        plan.children.map(p => numOfNestedFields(p.schema)).exists(_ > conf.wholeStageMaxNumFields)
      !willFallback && !hasTooManyOutputFields && !hasTooManyInputFields
    case _ => false
  }
 
  /**
   * Inserts an InputAdapter on top of those that do not support codegen.
   */
  private def insertInputAdapter(plan: SparkPlan): SparkPlan = plan match {
    case j @ SortMergeJoinExec(_, _, _, _, left, right) if j.supportCodegen =>
      // The children of SortMergeJoin should do codegen separately.
      j.copy(left = InputAdapter(insertWholeStageCodegen(left)),
        right = InputAdapter(insertWholeStageCodegen(right)))
    case p if !supportCodegen(p) =>   //Branch 4
      // collapse them recursively
      InputAdapter(insertWholeStageCodegen(p))
    case p =>
      p.withNewChildren(p.children.map(insertInputAdapter))
  }
 
  /**
   * Inserts a WholeStageCodegen on top of those that support codegen.
   */
  private def insertWholeStageCodegen(plan: SparkPlan): SparkPlan = plan match {
    // For operators that will output domain object, do not insert WholeStageCodegen for it as
    // domain object can not be written into unsafe row.
    case plan if plan.output.length == 1 && plan.output.head.dataType.isInstanceOf[ObjectType] => //Branch 1
      plan.withNewChildren(plan.children.map(insertWholeStageCodegen))
    case plan: CodegenSupport if supportCodegen(plan) =>   //Branch 2
      WholeStageCodegenExec(insertInputAdapter(plan))
    case other =>   //Branch 3
      other.withNewChildren(other.children.map(insertWholeStageCodegen))
  }
 
  def apply(plan: SparkPlan): SparkPlan = {
    if (conf.wholeStageEnabled) {
      insertWholeStageCodegen(plan)
    } else {
      plan
    }
  }
}

Before the rule CollapseCodegenStages apply:

BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
+- *Project [_2#224 AS str#233]
   +- *Filter isnotnull(_2#224)
      +- LocalTableScan [_1#223, _2#224]

After the rule CollapseCodegenStages apply:

BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
+- WholeStageCodegenExec   //Mark A
   +- *Project [_2#224 AS str#233]
      +- *Filter isnotnull(_2#224)    //Mark B
         +- InputAdapter    //Mark C
            +- LocalTableScan [_1#223, _2#224]

For Mark A, the ProjectExec support CodegenSupport, so add WholeStageCodegenExec as parent for it. See Branch 2.
For Mark B, the FilterExec support CodegenSupport, but the output satisfy Branch 1, so don’t add WholeStageCodegenExec for it.
For Mark C, the LocalTableScanExec don’t support CodegenSupport, so add an InputAdapter as parent for it. See Branch 3 and Branch 4.
Code WholeStageCodegenExec.scala
PS: The Spark Plan can also support execute directly if don’t support generate and compile source code.

  override def doExecute(): RDD[InternalRow] = {
    val (ctx, cleanedSource) = doCodeGen()
    // try to compile and fallback if it failed
    try {
      CodeGenerator.compile(cleanedSource)
    } catch {
      case e: Exception if !Utils.isTesting && sqlContext.conf.wholeStageFallback =>
        // We should already saw the error message
        logWarning(s"Whole-stage codegen disabled for this plan:\n $treeString")
        return child.execute()
    }
    val references = ctx.references.toArray
 
    val durationMs = longMetric("pipelineTime")
 
    val rdds = child.asInstanceOf[CodegenSupport].inputRDDs()
    assert(rdds.size <= 2, "Up to two input RDDs can be supported")
    if (rdds.length == 1) {
......

Why need an InputAdapter?
When doExecute at WholeStageCodegenExec, the compiled Java code need get RDDs from child(“val rdds = child.asInstanceOf[CodegenSupport].inputRDDs()”),
but LocalTableScanExec don’t have the method “inputRDDs” as it don’t support CodegenSupport.
The wrap an InputAdapter on it as it have method “inputRDDs”.

/**
 * InputAdapter is used to hide a SparkPlan from a subtree that support codegen.
 *
 * This is the leaf node of a tree with WholeStageCodegen that is used to generate code
 * that consumes an RDD iterator of InternalRow.
 */
case class InputAdapter(child: SparkPlan) extends UnaryExecNode with CodegenSupport {
 
  override def output: Seq[Attribute] = child.output
......
 
  override def inputRDDs(): Seq[RDD[InternalRow]] = {
    child.execute() :: Nil
  }
......

At last, the invoke graph is:

 * Here is the call graph of to generate Java source (plan A support codegen, but plan B does not):
 *
 *   WholeStageCodegen       Plan A                 FakeInput        Plan B
 *                   Project [_2#224 AS str#233]   InputAdapter   LocalTableScan [_1#223, _2#224]
 * ==============================================================================================
 *
 * -> execute()
 *     |
 *  doExecute() --------->   inputRDDs() ---------> inputRDDs() ------> execute()
 *     |
 *     +----------------->   produce()
 *                             |
 *                          doProduce()  ---------> produce()
 *                                                     |
 *                                                  doProduce()
 *                                                     |
 *                          doConsume() <---------- consume()
 *                             |
 *  doConsume()  <--------  consume()

The InputAdapter will end the produce flow and start consume back to parent Plan.
The corresponding Java code generated is:

/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIterator(references);
/* 003 */ }
/* 004 */
/* 005 */ /**
 * Codegend pipeline for
 * Project [_2#224 AS str#233]
 * +- Filter isnotnull(_2#224)
 *    +- LocalTableScan [_1#223, _2#224]
 */
/* 006 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 007 */   private Object[] references;
/* 008 */   private scala.collection.Iterator[] inputs;
/* 009 */   private scala.collection.Iterator inputadapter_input;
/* 010 */   private org.apache.spark.sql.execution.metric.SQLMetric filter_numOutputRows;
/* 011 */   private UnsafeRow filter_result;
/* 012 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder filter_holder;
/* 013 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter filter_rowWriter;
/* 014 */   private UnsafeRow project_result;
/* 015 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder project_holder;
/* 016 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter project_rowWriter;
/* 017 */
/* 018 */   public GeneratedIterator(Object[] references) {
/* 019 */     this.references = references;
/* 020 */   }
/* 021 */
/* 022 */   public void init(int index, scala.collection.Iterator[] inputs) {
/* 023 */     partitionIndex = index;
/* 024 */     this.inputs = inputs;
/* 025 */     inputadapter_input = inputs[0];
/* 026 */     this.filter_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[0];
/* 027 */     filter_result = new UnsafeRow(2);
/* 028 */     this.filter_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(filter_result, 32);
/* 029 */     this.filter_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(filter_holder, 2);
/* 030 */     project_result = new UnsafeRow(1);
/* 031 */     this.project_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(project_result, 32);
/* 032 */     this.project_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(project_holder, 1);
/* 033 */
/* 034 */   }
/* 035 */
/* 036 */   protected void processNext() throws java.io.IOException {
/* 037 */     // PRODUCE: Project [_2#224 AS str#233]
/* 038 */     // PRODUCE: Filter isnotnull(_2#224)
/* 039 */     // PRODUCE: InputAdapter
/* 040 */     while (inputadapter_input.hasNext()) {
/* 041 */       InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
/* 042 */       // CONSUME: Filter isnotnull(_2#224)
/* 043 */       // input[1, string, true]
/* 044 */       boolean inputadapter_isNull1 = inputadapter_row.isNullAt(1);
/* 045 */       UTF8String inputadapter_value1 = inputadapter_isNull1 ? null : (inputadapter_row.getUTF8String(1));
/* 046 */
/* 047 */       if (!(!(inputadapter_isNull1))) continue;
/* 048 */
/* 049 */       filter_numOutputRows.add(1);
/* 050 */
/* 051 */       // CONSUME: Project [_2#224 AS str#233]
/* 052 */       // CONSUME: WholeStageCodegen
/* 053 */       project_holder.reset();
/* 054 */
/* 055 */       project_rowWriter.write(0, inputadapter_value1);
/* 056 */       project_result.setTotalSize(project_holder.totalSize());
/* 057 */       append(project_result);
/* 058 */       if (shouldStop()) return;
/* 059 */     }
/* 060 */   }
/* 061 */ }

本作品采用知识共享署名 4.0 国际许可协议进行许可。

发表回复