SQL execute on Spark

原创文章,转载请注明: 转载自慢慢的回味

本文链接地址: SQL execute on Spark

SQL is popularity on relational data procession. And Spark is good at supporting SQL. This article will analyse the basic flow about how the SQL processed on Spark.
Still, I will use an Unit test with comments on Code to explain:

  test("self join with aliases") {
    Seq(1, 2, 3).map(i => (i, i.toString)).toDF("int", "str").createOrReplaceTempView("df")
 
    checkAnswer(
      sql(
        """
          |SELECT x.str, COUNT(*)
          |FROM df x JOIN df y ON x.str = y.str
          |GROUP BY x.str
        """.stripMargin),
      Row("1", 1) :: Row("2", 1) :: Row("3", 1) :: Nil)
  }

Parse SQL with ANTLR.
      sql(
        """
          |SELECT x.str, COUNT(*)
          |FROM df x JOIN df y ON x.str = y.str
          |GROUP BY x.str
        """.stripMargin),

Code in SparkSession.scala

  /**
   * Executes a SQL query using Spark, returning the result as a [[DataFrame]].
   * The dialect that is used for SQL parsing can be configured with 'spark.sql.dialect'.
   *
   * @since 2.0.0
   */
  def sql(sqlText: String): DataFrame = {
    Dataset.ofRows(self, sessionState.sqlParser.parsePlan(sqlText))
  }

Code in ParseDriver.scala
The parse(sqlText) will produce SqlBaseParser, which the class is generated by ANTLR.
Then astBuilder will visit the parser to generate the Logic Plan.

  /** Creates LogicalPlan for a given SQL string. */
  override def parsePlan(sqlText: String): LogicalPlan = parse(sqlText) { parser =>
    astBuilder.visitSingleStatement(parser.singleStatement()) match {
      case plan: LogicalPlan => plan
      case _ =>
        val position = Origin(None, None)
        throw new ParseException(Option(sqlText), "Unsupported SQL statement", position, position)
    }
  }

The sqlText at here is “SELECT x.str, COUNT(*) FROM df x JOIN df y ON x.str = y.str GROUP BY x.str”.

Visit the grammar tree and generate the Logic Plan.

Let us debug on how the Join relation Logic Plan is generated.

Code in AstBuilder.scala
The visitFromClause override the SqlBaseVisitor which is generated code by ANTLR.
The parser which call this method.

  /**
   * Create a logical plan for a given 'FROM' clause. Note that we support multiple (comma
   * separated) relations here, these get converted into a single plan by condition-less inner join.
   */
  override def visitFromClause(ctx: FromClauseContext): LogicalPlan = withOrigin(ctx) {
    val from = ctx.relation.asScala.foldLeft(null: LogicalPlan) { (left, relation) =>
      val right = plan(relation.relationPrimary)
      val join = right.optionalMap(left)(Join(_, _, Inner, None))
      withJoinRelations(join, relation)
    }
    ctx.lateralView.asScala.foldLeft(from)(withGenerate)
  }

Get the join and relation, then invoke withJoinRelations to produce the Join Logic Plan.

  /**
   * Join one more [[LogicalPlan]]s to the current logical plan.
   */
  private def withJoinRelations(base: LogicalPlan, ctx: RelationContext): LogicalPlan = {
    ctx.joinRelation.asScala.foldLeft(base) { (left, join) =>
      withOrigin(join) {
        val baseJoinType = join.joinType match {
          case null => Inner
          case jt if jt.CROSS != null => Cross
          case jt if jt.FULL != null => FullOuter
          case jt if jt.SEMI != null => LeftSemi
          case jt if jt.ANTI != null => LeftAnti
          case jt if jt.LEFT != null => LeftOuter
          case jt if jt.RIGHT != null => RightOuter
          case _ => Inner
        }
 
        // Resolve the join type and join condition
        val (joinType, condition) = Option(join.joinCriteria) match {
          case Some(c) if c.USING != null =>
            val columns = c.identifier.asScala.map { column =>
              UnresolvedAttribute.quoted(column.getText)
            }
            (UsingJoin(baseJoinType, columns), None)
          case Some(c) if c.booleanExpression != null =>
            (baseJoinType, Option(expression(c.booleanExpression)))
          case None if join.NATURAL != null =>
            if (baseJoinType == Cross) {
              throw new ParseException("NATURAL CROSS JOIN is not supported", ctx)
            }
            (NaturalJoin(baseJoinType), None)
          case None =>
            (baseJoinType, None)
        }
        Join(left, plan(join.right), joinType, condition)
      }
    }
  }

Analyse the Logic Plan.

See Analyzer In Spark for more detail.
See TreeNode in Spark analyze for more detail.
Code in Dateset.scala

  def ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan): DataFrame = {
    val qe = sparkSession.sessionState.executePlan(logicalPlan)
    qe.assertAnalyzed()
    new Dataset[Row](sparkSession, qe, RowEncoder(qe.analyzed.schema))
  }

Code in QueryExecution.scala

  def assertAnalyzed(): Unit = {
    try sparkSession.sessionState.analyzer.checkAnalysis(analyzed) catch {
      case e: AnalysisException =>
        val ae = new AnalysisException(e.message, e.line, e.startPosition, Some(analyzed))
        ae.setStackTrace(e.getStackTrace)
        throw ae
    }
  }
  lazy val analyzed: LogicalPlan = {
    SparkSession.setActiveSession(sparkSession)
    sparkSession.sessionState.analyzer.execute(logical)
  }

Code in RuleExecutor.scala(Parent of Analyzer.scala)

  /**
   * Executes the batches of rules defined by the subclass. The batches are executed serially
   * using the defined execution strategy. Within each batch, rules are also executed serially.
   */
  def execute(plan: TreeType): TreeType = {
    var curPlan = plan
 
    batches.foreach { batch =>
      val batchStartPlan = curPlan
      var iteration = 1
      var lastPlan = curPlan
      var continue = true
 
      // Run until fix point (or the max number of iterations as specified in the strategy.
      while (continue) {
        curPlan = batch.rules.foldLeft(curPlan) {
          case (plan, rule) =>
            val startTime = System.nanoTime()
            val result = rule(plan)
            val runTime = System.nanoTime() - startTime
            RuleExecutor.timeMap.addAndGet(rule.ruleName, runTime)
 
            if (!result.fastEquals(plan)) {
              logTrace(
                s"""
                  |=== Applying Rule ${rule.ruleName} ===
                  |${sideBySide(plan.treeString, result.treeString).mkString("\n")}
                """.stripMargin)
            }
 
            result
        }

And the list of the batches. Each rule will apply on the Logic Plan.

  lazy val batches: Seq[Batch] = Seq(
    Batch("Substitution", fixedPoint,
      CTESubstitution,
      WindowsSubstitution,
      EliminateUnions,
      new SubstituteUnresolvedOrdinals(conf)),
    Batch("Resolution", fixedPoint,
      ResolveTableValuedFunctions ::
      ResolveRelations ::
      ResolveReferences ::
      ResolveCreateNamedStruct ::
      ResolveDeserializer ::
      ResolveNewInstance ::
      ResolveUpCast ::
      ResolveGroupingAnalytics ::
      ResolvePivot ::
      ResolveOrdinalInOrderByAndGroupBy ::
      ResolveMissingReferences ::
      ExtractGenerator ::
      ResolveGenerate ::
      ResolveFunctions ::
      ResolveAliases ::
      ResolveSubquery ::
      ResolveWindowOrder ::
      ResolveWindowFrame ::
      ResolveNaturalAndUsingJoin ::
      ExtractWindowExpressions ::
      GlobalAggregates ::
      ResolveAggregateFunctions ::
      TimeWindowing ::
      ResolveInlineTables ::
      TypeCoercion.typeCoercionRules ++
      extendedResolutionRules : _*),
    Batch("Nondeterministic", Once,
      PullOutNondeterministic),
    Batch("UDF", Once,
      HandleNullInputsForUDF),
    Batch("FixNullability", Once,
      FixNullability),
    Batch("Cleanup", fixedPoint,
      CleanupAliases)
  )

Code in object ResolveReferences.
When resolve operators, it will make sure the children of the Logic Rule has been resolved.

    def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
      case p: LogicalPlan if !p.childrenResolved => p

Code in LogicPlan.scala

  /**
   * Returns true if all its children of this query plan have been resolved.
   */
  def childrenResolved: Boolean = children.forall(_.resolved)
 
  // if not a natural join, use `resolvedExceptNatural`. if it is a natural join or
  // using join, we still need to eliminate natural or using before we mark it resolved.
  override lazy val resolved: Boolean = joinType match {
    case NaturalJoin(_) => false
    case UsingJoin(_, _) => false
    case _ => resolvedExceptNatural
  }
 
  // Joins are only resolved if they don't introduce ambiguous expression ids.
  // NaturalJoin should be ready for resolution only if everything else is resolved here
  lazy val resolvedExceptNatural: Boolean = {
    childrenResolved &&
      expressions.forall(_.resolved) &&
      duplicateResolved &&
      condition.forall(_.dataType == BooleanType)
  }

Login Plan before analyzed:

'Aggregate ['x.str], ['x.str, unresolvedalias('COUNT(1), None)]
+- 'Join Inner, ('x.str = 'y.str)
   :- 'UnresolvedRelation `df`, x
   +- 'UnresolvedRelation `df`, y

Login Plan after analyzed, all the variables are reference to the objects:

Aggregate [str#227], [str#227, count(1) AS count(1)#241L]
+- Join Inner, (str#227 = str#233)
   :- SubqueryAlias x, `df`
   :  +- Project [_1#223 AS int#226, _2#224 AS str#227]
   :     +- LocalRelation [_1#223, _2#224]
   +- SubqueryAlias y, `df`
      +- Project [_1#223 AS int#232, _2#224 AS str#233]
         +- LocalRelation [_1#223, _2#224]
Translate the Logic Plan to Spark Plan.

Code in Dataset.scala, that translate Logic Plan to RDDs.

  /**
   * Represents the content of the Dataset as an [[RDD]] of [[T]].
   *
   * @group basic
   * @since 1.6.0
   */
  lazy val rdd: RDD[T] = {
    val objectType = exprEnc.deserializer.dataType
    val deserialized = CatalystSerde.deserialize[T](logicalPlan)
    sparkSession.sessionState.executePlan(deserialized).toRdd.mapPartitions { rows =>
      rows.map(_.get(0, objectType).asInstanceOf[T])
    }
  }

Code in QueryExecution.scala
The planner(Current is SparkPlaner) will use many strategies to convert Login Plans to corresponding Spark Plans.

  lazy val sparkPlan: SparkPlan = {
    SparkSession.setActiveSession(sparkSession)
    // TODO: We use next(), i.e. take the first plan returned by the planner, here for now,
    //       but we will implement to choose the best plan.
    planner.plan(ReturnAnswer(optimizedPlan)).next()
  }
 
  // executedPlan should not be used to initialize any SparkPlan. It should be
  // only used for execution.
  lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan)
 
  /** Internal version of the RDD. Avoids copies and has no schema */
  lazy val toRdd: RDD[InternalRow] = executedPlan.execute()
Plan

Using code “planner.plan(ReturnAnswer(optimizedPlan)).next()” to explain:
Code in SparkPlaner.scala

  def strategies: Seq[Strategy] =
      extraStrategies ++ (
      FileSourceStrategy ::
      DataSourceStrategy ::
      DDLStrategy ::
      SpecialLimits ::
      Aggregation ::
      JoinSelection ::
      InMemoryScans ::
      BasicOperators :: Nil)

For example, this is on how Join Login Plan to BroadcastHashJoinExec Spark Plan in JoinSelection Strategy.

  object JoinSelection extends Strategy with PredicateHelper {
 
    def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
 
      // --- BroadcastHashJoin --------------------------------------------------------------------
 
      case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
        if canBuildRight(joinType) && canBroadcast(right) =>
        Seq(joins.BroadcastHashJoinExec(
          leftKeys, rightKeys, joinType, BuildRight, condition, planLater(left), planLater(right)))
Prepare for Execution

Using code “lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan)” to explain:
When we create BroadcastHashJoinExec Spark Plan for Join Login Plan, we have the parameter buildSide as BuildRight here.
And the method will be invoked during prepareForExecution(sparkPlan).

case class BroadcastHashJoinExec(
    leftKeys: Seq[Expression],
    rightKeys: Seq[Expression],
    joinType: JoinType,
    buildSide: BuildSide,
    condition: Option[Expression],
    left: SparkPlan,
    right: SparkPlan)
  extends BinaryExecNode with HashJoin with CodegenSupport {
 
  override def requiredChildDistribution: Seq[Distribution] = {
    val mode = HashedRelationBroadcastMode(buildKeys)
    buildSide match {
      case BuildLeft =>
        BroadcastDistribution(mode) :: UnspecifiedDistribution :: Nil
      case BuildRight =>
        UnspecifiedDistribution :: BroadcastDistribution(mode) :: Nil
    }
  }

When prepareForExecution for plan, all the preparations will apply to the Spark Plan.

  /**
   * Prepares a planned [[SparkPlan]] for execution by inserting shuffle operations and internal
   * row format conversions as needed.
   */
  protected def prepareForExecution(plan: SparkPlan): SparkPlan = {
    preparations.foldLeft(plan) { case (sp, rule) => rule.apply(sp) }
  }
 
  /** A sequence of rules that will be applied in order to the physical plan before execution. */
  protected def preparations: Seq[Rule[SparkPlan]] = Seq(
    python.ExtractPythonUDFs,
    PlanSubqueries(sparkSession),
    EnsureRequirements(sparkSession.sessionState.conf),
    CollapseCodegenStages(sparkSession.sessionState.conf),
    ReuseExchange(sparkSession.sessionState.conf),
    ReuseSubquery(sparkSession.sessionState.conf))

The EnsureRequirements rule will invoke requiredChildDistributions method get the distribution mode for Join child.
At here, the second child will be wrapped by BroadcastExchangeExec.
The BroadcastExchangeExec will execute at prepare session before query session, please see later explain.
Code in EnsureRequirements.scala

  def apply(plan: SparkPlan): SparkPlan = plan.transformUp {
    case operator @ ShuffleExchange(partitioning, child, _) =>
      child.children match {
        case ShuffleExchange(childPartitioning, baseChild, _)::Nil =>
          if (childPartitioning.guarantees(partitioning)) child else operator
        case _ => operator
      }
    case operator: SparkPlan => ensureDistributionAndOrdering(operator)
  }
 
  private def ensureDistributionAndOrdering(operator: SparkPlan): SparkPlan = {
    val requiredChildDistributions: Seq[Distribution] = operator.requiredChildDistribution
    val requiredChildOrderings: Seq[Seq[SortOrder]] = operator.requiredChildOrdering
    var children: Seq[SparkPlan] = operator.children
    assert(requiredChildDistributions.length == children.length)
    assert(requiredChildOrderings.length == children.length)
 
    // Ensure that the operator's children satisfy their output distribution requirements:
    children = children.zip(requiredChildDistributions).map {
      case (child, distribution) if child.outputPartitioning.satisfies(distribution) =>
        child
      case (child, BroadcastDistribution(mode)) =>
        BroadcastExchangeExec(mode, child)
      case (child, distribution) =>
        ShuffleExchange(createPartitioning(distribution, defaultNumPreShufflePartitions), child)
    }
Login Plan to Spark Plan result

We have full plan compare as bellow:

== Parsed Logical Plan ==
'DeserializeToObject unresolveddeserializer(createexternalrow(getcolumnbyordinal(0, StringType).toString, getcolumnbyordinal(1, LongType), StructField(str,StringType,true), StructField(count(1),LongType,false))), obj#241: org.apache.spark.sql.Row
+- Aggregate [str#227], [str#227, count(1) AS count(1)#235L]
   +- Join Inner, (str#227 = str#233)
      :- SubqueryAlias x, `df`
      :  +- Project [_1#223 AS int#226, _2#224 AS str#227]
      :     +- LocalRelation [_1#223, _2#224]
      +- SubqueryAlias y, `df`
         +- Project [_1#223 AS int#232, _2#224 AS str#233]
            +- LocalRelation [_1#223, _2#224]
 
== Analyzed Logical Plan ==
obj: org.apache.spark.sql.Row
DeserializeToObject createexternalrow(str#227.toString, count(1)#235L, StructField(str,StringType,true), StructField(count(1),LongType,false)), obj#241: org.apache.spark.sql.Row
+- Aggregate [str#227], [str#227, count(1) AS count(1)#235L]
   +- Join Inner, (str#227 = str#233)
      :- SubqueryAlias x, `df`
      :  +- Project [_1#223 AS int#226, _2#224 AS str#227]
      :     +- LocalRelation [_1#223, _2#224]
      +- SubqueryAlias y, `df`
         +- Project [_1#223 AS int#232, _2#224 AS str#233]
            +- LocalRelation [_1#223, _2#224]
 
== Optimized Logical Plan ==
DeserializeToObject createexternalrow(str#227.toString, count(1)#235L, StructField(str,StringType,true), StructField(count(1),LongType,false)), obj#241: org.apache.spark.sql.Row
+- Aggregate [str#227], [str#227, count(1) AS count(1)#235L]
   +- Project [str#227]
      +- Join Inner, (str#227 = str#233)
         :- Project [_2#224 AS str#227]
         :  +- Filter isnotnull(_2#224)
         :     +- LocalRelation [_1#223, _2#224]
         +- Project [_2#224 AS str#233]
            +- Filter isnotnull(_2#224)
               +- LocalRelation [_1#223, _2#224]
 
== Physical Plan ==
DeserializeToObject createexternalrow(str#227.toString, count(1)#235L, StructField(str,StringType,true), StructField(count(1),LongType,false)), obj#241: org.apache.spark.sql.Row
+- *HashAggregate(keys=[str#227], functions=[count(1)], output=[str#227, count(1)#235L])
   +- Exchange hashpartitioning(str#227, 5)
      +- *HashAggregate(keys=[str#227], functions=[partial_count(1)], output=[str#227, count#240L])
         +- *Project [str#227]
            +- *BroadcastHashJoin [str#227], [str#233], Inner, BuildRight
               :- *Project [_2#224 AS str#227]
               :  +- *Filter isnotnull(_2#224)
               :     +- LocalTableScan [_1#223, _2#224]
               +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
                  +- *Project [_2#224 AS str#233]
                     +- *Filter isnotnull(_2#224)
                        +- LocalTableScan [_1#223, _2#224]
Execute the Spark Plan and generate the RDDs.

Code in SparkPlan.scala, all the SparkPlans have a sequence of dependency, when get toRdd variable, the root SparkPlan will be executed.
Notice that, the execute method is a wrapper of doExecute method with extra things, prepare and wait sub queries before doExecute.

  /**
   * Returns the result of this query as an RDD[InternalRow] by delegating to `doExecute` after
   * preparations.
   *
   * Concrete implementations of SparkPlan should override `doExecute`.
   */
  final def execute(): RDD[InternalRow] = executeQuery {
    doExecute()
  }
 
  /**
   * Execute a query after preparing the query and adding query plan information to created RDDs
   * for visualization.
   */
  protected final def executeQuery[T](query: => T): T = {
    RDDOperationScope.withScope(sparkContext, nodeName, false, true) {
      prepare()
      waitForSubqueries()
      query
    }
  }

Code in SparkPlan.scala, the prepare method will do sub queries and execute them.

  /**
   * Prepare a SparkPlan for execution. It's idempotent.
   */
  final def prepare(): Unit = {
    // doPrepare() may depend on it's children, we should call prepare() on all the children first.
    children.foreach(_.prepare())
    synchronized {
      if (!prepared) {
        prepareSubqueries()
        doPrepare()
        prepared = true
      }
    }
  }
Prepare

For example, BroadcastExchangeExec have doPrepare thing(code in BroadcastExchangeExec.scala),
as BroadcastExchangeExec will do subquery and broadcast them.

  override protected def doPrepare(): Unit = {
    // Materialize the future.
    relationFuture
  }
 
  private lazy val relationFuture: Future[broadcast.Broadcast[Any]] = {
    // broadcastFuture is used in "doExecute". Therefore we can get the execution id correctly here.
    val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
    Future {
      // This will run in another thread. Set the execution id so that we can connect these jobs
      // with the correct execution.
      SQLExecution.withExecutionId(sparkContext, executionId) {
        try {
          val beforeCollect = System.nanoTime()
          // Note that we use .executeCollect() because we don't want to convert data to Scala types
          val input: Array[InternalRow] = child.executeCollect()
          if (input.length >= 512000000) {
            throw new SparkException(
              s"Cannot broadcast the table with more than 512 millions rows: ${input.length} rows")
          }
          val beforeBuild = System.nanoTime()
          longMetric("collectTime") += (beforeBuild - beforeCollect) / 1000000
          val dataSize = input.map(_.asInstanceOf[UnsafeRow].getSizeInBytes.toLong).sum
          longMetric("dataSize") += dataSize
          if (dataSize >= (8L << 30)) {
            throw new SparkException(
              s"Cannot broadcast the table that is larger than 8GB: ${dataSize >> 30} GB")
          }
 
          // Construct and broadcast the relation.
          val relation = mode.transform(input)
          val beforeBroadcast = System.nanoTime()
          longMetric("buildTime") += (beforeBroadcast - beforeBuild) / 1000000
 
          val broadcasted = sparkContext.broadcast(relation)
          longMetric("broadcastTime") += (System.nanoTime() - beforeBroadcast) / 1000000
 
          // There are some cases we don't care about the metrics and call `SparkPlan.doExecute`
          // directly without setting an execution id. We should be tolerant to it.
          if (executionId != null) {
            sparkContext.listenerBus.post(SparkListenerDriverAccumUpdates(
              executionId.toLong, metrics.values.map(m => m.id -> m.value).toSeq))
          }
 
          broadcasted
        } catch {
          case oe: OutOfMemoryError =>
            throw new OutOfMemoryError(s"Not enough memory to build and broadcast the table to " +
              s"all worker nodes. As a workaround, you can either disable broadcast by setting " +
              s"${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key} to -1 or increase the spark driver " +
              s"memory by setting ${SparkLauncher.DRIVER_MEMORY} to a higher value")
              .initCause(oe.getCause)
        }
      }
    }(BroadcastExchangeExec.executionContext)
  }
Query (doExecute)

At here, let using ” val input: Array[InternalRow] = child.executeCollect()” for example.
child is WholeStageCodegenExec here, which represent as plan is:

*Project [_2#224 AS str#233]
+- *Filter isnotnull(_2#224)
   +- LocalTableScan [_1#223, _2#224]

Let see how the WholeStageCodegenExec generate the RDDs.

  /**
   * Generates code for this subtree.
   *
   * @return the tuple of the codegen context and the actual generated source.
   */
  def doCodeGen(): (CodegenContext, CodeAndComment) = {
    val ctx = new CodegenContext
    val code = child.asInstanceOf[CodegenSupport].produce(ctx, this)
    val source = s"""
      public Object generate(Object[] references) {
        return new GeneratedIterator(references);
      }
 
      ${ctx.registerComment(s"""Codegend pipeline for\n${child.treeString.trim}""")}
      final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
 
        private Object[] references;
        private scala.collection.Iterator[] inputs;
        ${ctx.declareMutableStates()}
 
        public GeneratedIterator(Object[] references) {
          this.references = references;
        }
 
        public void init(int index, scala.collection.Iterator[] inputs) {
          partitionIndex = index;
          this.inputs = inputs;
          ${ctx.initMutableStates()}
          ${ctx.initPartition()}
        }
 
        ${ctx.declareAddedFunctions()}
 
        protected void processNext() throws java.io.IOException {
          ${code.trim}
        }
      }
      """.trim
 
    // try to compile, helpful for debug
    val cleanedSource = CodeFormatter.stripOverlappingComments(
      new CodeAndComment(CodeFormatter.stripExtraNewLines(source), ctx.getPlaceHolderToComments()))
 
    logDebug(s"\n${CodeFormatter.format(cleanedSource)}")
    (ctx, cleanedSource)
  }
 
  override def doExecute(): RDD[InternalRow] = {
    val (ctx, cleanedSource) = doCodeGen()
    // try to compile and fallback if it failed
    try {
      CodeGenerator.compile(cleanedSource)
    } catch {
      case e: Exception if !Utils.isTesting && sqlContext.conf.wholeStageFallback =>
        // We should already saw the error message
        logWarning(s"Whole-stage codegen disabled for this plan:\n $treeString")
        return child.execute()
    }
    val references = ctx.references.toArray
 
    val durationMs = longMetric("pipelineTime")
 
    val rdds = child.asInstanceOf[CodegenSupport].inputRDDs()
    assert(rdds.size <= 2, "Up to two input RDDs can be supported")
    if (rdds.length == 1) {
      rdds.head.mapPartitionsWithIndex { (index, iter) =>
        val clazz = CodeGenerator.compile(cleanedSource)
        val buffer = clazz.generate(references).asInstanceOf[BufferedRowIterator]
        buffer.init(index, Array(iter))
        new Iterator[InternalRow] {
          override def hasNext: Boolean = {
            val v = buffer.hasNext
            if (!v) durationMs += buffer.durationMs()
            v
          }
          override def next: InternalRow = buffer.next()
        }
      }
    } else {
      // Right now, we support up to two input RDDs.
      rdds.head.zipPartitions(rdds(1)) { (leftIter, rightIter) =>
        Iterator((leftIter, rightIter))
        // a small hack to obtain the correct partition index
      }.mapPartitionsWithIndex { (index, zippedIter) =>
        val (leftIter, rightIter) = zippedIter.next()
        val clazz = CodeGenerator.compile(cleanedSource)
        val buffer = clazz.generate(references).asInstanceOf[BufferedRowIterator]
        buffer.init(index, Array(leftIter, rightIter))
        new Iterator[InternalRow] {
          override def hasNext: Boolean = {
            val v = buffer.hasNext
            if (!v) durationMs += buffer.durationMs()
            v
          }
          override def next: InternalRow = buffer.next()
        }
      }
    }
  }

When doing doExecute method, it will invoke doCodeGen method to generate the Java Class for represent the child’s plan.
At here, we have:

/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIterator(references);
/* 003 */ }
/* 004 */
/* 005 */ /**
 * Codegend pipeline for
 * Project [_2#224 AS str#233]
 * +- Filter isnotnull(_2#224)
 *    +- LocalTableScan [_1#223, _2#224]
 */
/* 006 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 007 */   private Object[] references;
/* 008 */   private scala.collection.Iterator[] inputs;
/* 009 */   private scala.collection.Iterator inputadapter_input;
/* 010 */   private org.apache.spark.sql.execution.metric.SQLMetric filter_numOutputRows;
/* 011 */   private UnsafeRow filter_result;
/* 012 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder filter_holder;
/* 013 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter filter_rowWriter;
/* 014 */   private UnsafeRow project_result;
/* 015 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder project_holder;
/* 016 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter project_rowWriter;
/* 017 */
/* 018 */   public GeneratedIterator(Object[] references) {
/* 019 */     this.references = references;
/* 020 */   }
/* 021 */
/* 022 */   public void init(int index, scala.collection.Iterator[] inputs) {
/* 023 */     partitionIndex = index;
/* 024 */     this.inputs = inputs;
/* 025 */     inputadapter_input = inputs[0];
/* 026 */     this.filter_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[0];
/* 027 */     filter_result = new UnsafeRow(2);
/* 028 */     this.filter_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(filter_result, 32);
/* 029 */     this.filter_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(filter_holder, 2);
/* 030 */     project_result = new UnsafeRow(1);
/* 031 */     this.project_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(project_result, 32);
/* 032 */     this.project_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(project_holder, 1);
/* 033 */
/* 034 */   }
/* 035 */
/* 036 */   protected void processNext() throws java.io.IOException {
/* 037 */     // PRODUCE: Project [_2#224 AS str#233]
/* 038 */     // PRODUCE: Filter isnotnull(_2#224)
/* 039 */     // PRODUCE: InputAdapter
/* 040 */     while (inputadapter_input.hasNext()) {
/* 041 */       InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
/* 042 */       // CONSUME: Filter isnotnull(_2#224)
/* 043 */       // input[1, string, true]
/* 044 */       boolean inputadapter_isNull1 = inputadapter_row.isNullAt(1);
/* 045 */       UTF8String inputadapter_value1 = inputadapter_isNull1 ? null : (inputadapter_row.getUTF8String(1));
/* 046 */
/* 047 */       if (!(!(inputadapter_isNull1))) continue;
/* 048 */
/* 049 */       filter_numOutputRows.add(1);
/* 050 */
/* 051 */       // CONSUME: Project [_2#224 AS str#233]
/* 052 */       // CONSUME: WholeStageCodegen
/* 053 */       project_holder.reset();
/* 054 */
/* 055 */       project_rowWriter.write(0, inputadapter_value1);
/* 056 */       project_result.setTotalSize(project_holder.totalSize());
/* 057 */       append(project_result);
/* 058 */       if (shouldStop()) return;
/* 059 */     }
/* 060 */   }
/* 061 */ }

Continue, let see the key logic.
It will compile the source code and using java reflect to create the buffer object.
Then wrap it as a RDD.

    val rdds = child.asInstanceOf[CodegenSupport].inputRDDs()
    assert(rdds.size <= 2, "Up to two input RDDs can be supported")
    if (rdds.length == 1) {
      rdds.head.mapPartitionsWithIndex { (index, iter) =>
        val clazz = CodeGenerator.compile(cleanedSource)
        val buffer = clazz.generate(references).asInstanceOf[BufferedRowIterator]
        buffer.init(index, Array(iter))
        new Iterator[InternalRow] {
          override def hasNext: Boolean = {
            val v = buffer.hasNext
            if (!v) durationMs += buffer.durationMs()
            v
          }
          override def next: InternalRow = buffer.next()
        }
      }
    } else {

For the total SQL, we have the Java code generated:

/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIterator(references);
/* 003 */ }
/* 004 */
/* 005 */ /**
 * Codegend pipeline for
 * HashAggregate(keys=[str#227], functions=[partial_count(1)], output=[str#227, count#240L])
 * +- Project [str#227]
 *    +- BroadcastHashJoin [str#227], [str#233], Inner, BuildRight
 *       :- Project [_2#224 AS str#227]
 *       :  +- Filter isnotnull(_2#224)
 *       :     +- LocalTableScan [_1#223, _2#224]
 *       +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
 *          +- *Project [_2#224 AS str#233]
 *             +- *Filter isnotnull(_2#224)
 *                +- LocalTableScan [_1#223, _2#224]
 */
/* 006 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 007 */   private Object[] references;
/* 008 */   private scala.collection.Iterator[] inputs;
/* 009 */   private boolean agg_initAgg;
/* 010 */   private boolean agg_bufIsNull;
/* 011 */   private long agg_bufValue;
/* 012 */   private org.apache.spark.sql.execution.aggregate.HashAggregateExec agg_plan;
/* 013 */   private agg_FastHashMap agg_fastHashMap;
/* 014 */   private org.apache.spark.unsafe.KVIterator agg_fastHashMapIter;
/* 015 */   private org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap agg_hashMap;
/* 016 */   private org.apache.spark.sql.execution.UnsafeKVExternalSorter agg_sorter;
/* 017 */   private org.apache.spark.unsafe.KVIterator agg_mapIter;
/* 018 */   private org.apache.spark.sql.execution.metric.SQLMetric agg_peakMemory;
/* 019 */   private org.apache.spark.sql.execution.metric.SQLMetric agg_spillSize;
/* 020 */   private scala.collection.Iterator inputadapter_input;
/* 021 */   private org.apache.spark.sql.execution.metric.SQLMetric filter_numOutputRows;
/* 022 */   private UnsafeRow filter_result;
/* 023 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder filter_holder;
/* 024 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter filter_rowWriter;
/* 025 */   private UnsafeRow project_result;
/* 026 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder project_holder;
/* 027 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter project_rowWriter;
/* 028 */   private org.apache.spark.broadcast.TorrentBroadcast bhj_broadcast;
/* 029 */   private org.apache.spark.sql.execution.joins.UnsafeHashedRelation bhj_relation;
/* 030 */   private UnsafeRow bhj_result;
/* 031 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder bhj_holder;
/* 032 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter bhj_rowWriter;
/* 033 */   private org.apache.spark.sql.execution.metric.SQLMetric bhj_numOutputRows;
/* 034 */   private UnsafeRow bhj_result1;
/* 035 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder bhj_holder1;
/* 036 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter bhj_rowWriter1;
/* 037 */   private UnsafeRow project_result1;
/* 038 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder project_holder1;
/* 039 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter project_rowWriter1;
/* 040 */   private UnsafeRow agg_result1;
/* 041 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder;
/* 042 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter;
/* 043 */   private int agg_value4;
/* 044 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowJoiner agg_unsafeRowJoiner;
/* 045 */   private org.apache.spark.sql.execution.metric.SQLMetric wholestagecodegen_numOutputRows;
/* 046 */   private org.apache.spark.sql.execution.metric.SQLMetric wholestagecodegen_aggTime;
/* 047 */
/* 048 */   public GeneratedIterator(Object[] references) {
/* 049 */     this.references = references;
/* 050 */   }
/* 051 */
/* 052 */   public void init(int index, scala.collection.Iterator[] inputs) {
/* 053 */     partitionIndex = index;
/* 054 */     this.inputs = inputs;
/* 055 */     wholestagecodegen_init_0();
/* 056 */     wholestagecodegen_init_1();
/* 057 */     wholestagecodegen_init_2();
/* 058 */
/* 059 */   }
/* 060 */
/* 061 */   private void wholestagecodegen_init_0() {
/* 062 */     agg_initAgg = false;
/* 063 */
/* 064 */     this.agg_plan = (org.apache.spark.sql.execution.aggregate.HashAggregateExec) references[0];
/* 065 */     agg_fastHashMap = new agg_FastHashMap(agg_plan.getTaskMemoryManager(), agg_plan.getEmptyAggregationBuffer());
/* 066 */
/* 067 */     this.agg_peakMemory = (org.apache.spark.sql.execution.metric.SQLMetric) references[1];
/* 068 */     this.agg_spillSize = (org.apache.spark.sql.execution.metric.SQLMetric) references[2];
/* 069 */     inputadapter_input = inputs[0];
/* 070 */     this.filter_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[3];
/* 071 */     filter_result = new UnsafeRow(2);
/* 072 */     this.filter_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(filter_result, 32);
/* 073 */     this.filter_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(filter_holder, 2);
/* 074 */     project_result = new UnsafeRow(1);
/* 075 */     this.project_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(project_result, 32);
/* 076 */     this.project_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(project_holder, 1);
/* 077 */
/* 078 */   }
/* 079 */
/* 080 */   public class agg_FastHashMap {
/* 081 */     private org.apache.spark.sql.catalyst.expressions.RowBasedKeyValueBatch batch;
/* 082 */     private int[] buckets;
/* 083 */     private int capacity = 1 << 16;
/* 084 */     private double loadFactor = 0.5;
/* 085 */     private int numBuckets = (int) (capacity / loadFactor);
/* 086 */     private int maxSteps = 2;
/* 087 */     private int numRows = 0;
/* 088 */     private org.apache.spark.sql.types.StructType keySchema = new org.apache.spark.sql.types.StructType().add("str", org.apache.spark.sql.types.DataTypes.StringType);
/* 089 */     private org.apache.spark.sql.types.StructType valueSchema = new org.apache.spark.sql.types.StructType().add("count", org.apache.spark.sql.types.DataTypes.LongType);
/* 090 */     private Object emptyVBase;
/* 091 */     private long emptyVOff;
/* 092 */     private int emptyVLen;
/* 093 */     private boolean isBatchFull = false;
/* 094 */
/* 095 */     public agg_FastHashMap(
/* 096 */       org.apache.spark.memory.TaskMemoryManager taskMemoryManager,
/* 097 */       InternalRow emptyAggregationBuffer) {
/* 098 */       batch = org.apache.spark.sql.catalyst.expressions.RowBasedKeyValueBatch
/* 099 */       .allocate(keySchema, valueSchema, taskMemoryManager, capacity);
/* 100 */
/* 101 */       final UnsafeProjection valueProjection = UnsafeProjection.create(valueSchema);
/* 102 */       final byte[] emptyBuffer = valueProjection.apply(emptyAggregationBuffer).getBytes();
/* 103 */
/* 104 */       emptyVBase = emptyBuffer;
/* 105 */       emptyVOff = Platform.BYTE_ARRAY_OFFSET;
/* 106 */       emptyVLen = emptyBuffer.length;
/* 107 */
/* 108 */       buckets = new int[numBuckets];
/* 109 */       java.util.Arrays.fill(buckets, -1);
/* 110 */     }
/* 111 */
/* 112 */     public org.apache.spark.sql.catalyst.expressions.UnsafeRow findOrInsert(UTF8String agg_key) {
/* 113 */       long h = hash(agg_key);
/* 114 */       int step = 0;
/* 115 */       int idx = (int) h & (numBuckets - 1);
/* 116 */       while (step < maxSteps) {
/* 117 */         // Return bucket index if it's either an empty slot or already contains the key
/* 118 */         if (buckets[idx] == -1) {
/* 119 */           if (numRows < capacity && !isBatchFull) {
/* 120 */             // creating the unsafe for new entry
/* 121 */             UnsafeRow agg_result = new UnsafeRow(1);
/* 122 */             org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder
/* 123 */             = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_result,
/* 124 */               32);
/* 125 */             org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter
/* 126 */             = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(
/* 127 */               agg_holder,
/* 128 */               1);
/* 129 */             agg_holder.reset(); //TODO: investigate if reset or zeroout are actually needed
/* 130 */             agg_rowWriter.zeroOutNullBytes();
/* 131 */             agg_rowWriter.write(0, agg_key);
/* 132 */             agg_result.setTotalSize(agg_holder.totalSize());
/* 133 */             Object kbase = agg_result.getBaseObject();
/* 134 */             long koff = agg_result.getBaseOffset();
/* 135 */             int klen = agg_result.getSizeInBytes();
/* 136 */
/* 137 */             UnsafeRow vRow
/* 138 */             = batch.appendRow(kbase, koff, klen, emptyVBase, emptyVOff, emptyVLen);
/* 139 */             if (vRow == null) {
/* 140 */               isBatchFull = true;
/* 141 */             } else {
/* 142 */               buckets[idx] = numRows++;
/* 143 */             }
/* 144 */             return vRow;
/* 145 */           } else {
/* 146 */             // No more space
/* 147 */             return null;
/* 148 */           }
/* 149 */         } else if (equals(idx, agg_key)) {
/* 150 */           return batch.getValueRow(buckets[idx]);
/* 151 */         }
/* 152 */         idx = (idx + 1) & (numBuckets - 1);
/* 153 */         step++;
/* 154 */       }
/* 155 */       // Didn't find it
/* 156 */       return null;
/* 157 */     }
/* 158 */
/* 159 */     private boolean equals(int idx, UTF8String agg_key) {
/* 160 */       UnsafeRow row = batch.getKeyRow(buckets[idx]);
/* 161 */       return (row.getUTF8String(0).equals(agg_key));
/* 162 */     }
/* 163 */
/* 164 */     private long hash(UTF8String agg_key) {
/* 165 */       long agg_hash = 0;
/* 166 */
/* 167 */       int agg_result = 0;
/* 168 */       byte[] agg_bytes = agg_key.getBytes();
/* 169 */       for (int i = 0; i < agg_bytes.length; i++) {
/* 170 */         int agg_hash1 = agg_bytes[i];
/* 171 */         agg_result = (agg_result ^ (0x9e3779b9)) + agg_hash1 + (agg_result << 6) + (agg_result >>> 2);
/* 172 */       }
/* 173 */
/* 174 */       agg_hash = (agg_hash ^ (0x9e3779b9)) + agg_result + (agg_hash << 6) + (agg_hash >>> 2);
/* 175 */
/* 176 */       return agg_hash;
/* 177 */     }
/* 178 */
/* 179 */     public org.apache.spark.unsafe.KVIterator<UnsafeRow, UnsafeRow> rowIterator() {
/* 180 */       return batch.rowIterator();
/* 181 */     }
/* 182 */
/* 183 */     public void close() {
/* 184 */       batch.close();
/* 185 */     }
/* 186 */
/* 187 */   }
/* 188 */
/* 189 */   private void agg_doAggregateWithKeys() throws java.io.IOException {
/* 190 */     agg_hashMap = agg_plan.createHashMap();
/* 191 */
/* 192 */     // PRODUCE: Project [str#227]
/* 193 */     // PRODUCE: BroadcastHashJoin [str#227], [str#233], Inner, BuildRight
/* 194 */     // PRODUCE: Project [_2#224 AS str#227]
/* 195 */     // PRODUCE: Filter isnotnull(_2#224)
/* 196 */     // PRODUCE: InputAdapter
/* 197 */     while (inputadapter_input.hasNext()) {
/* 198 */       InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
/* 199 */       // CONSUME: Filter isnotnull(_2#224)
/* 200 */       // input[1, string, true]
/* 201 */       boolean inputadapter_isNull1 = inputadapter_row.isNullAt(1);
/* 202 */       UTF8String inputadapter_value1 = inputadapter_isNull1 ? null : (inputadapter_row.getUTF8String(1));
/* 203 */
/* 204 */       if (!(!(inputadapter_isNull1))) continue;
/* 205 */
/* 206 */       filter_numOutputRows.add(1);
/* 207 */
/* 208 */       // CONSUME: Project [_2#224 AS str#227]
/* 209 */       // CONSUME: BroadcastHashJoin [str#227], [str#233], Inner, BuildRight
/* 210 */       // generate join key for stream side
/* 211 */
/* 212 */       bhj_holder.reset();
/* 213 */
/* 214 */       bhj_rowWriter.write(0, inputadapter_value1);
/* 215 */       bhj_result.setTotalSize(bhj_holder.totalSize());
/* 216 */
/* 217 */       // find matches from HashedRelation
/* 218 */       UnsafeRow bhj_matched = bhj_result.anyNull() ? null: (UnsafeRow)bhj_relation.getValue(bhj_result);
/* 219 */       if (bhj_matched == null) continue;
/* 220 */
/* 221 */       bhj_numOutputRows.add(1);
/* 222 */
/* 223 */       // CONSUME: Project [str#227]
/* 224 */       // CONSUME: HashAggregate(keys=[str#227], functions=[partial_count(1)])
/* 225 */       UnsafeRow agg_unsafeRowAggBuffer = null;
/* 226 */
/* 227 */       UnsafeRow agg_fastAggBuffer = null;
/* 228 */
/* 229 */       if (true) {
/* 230 */         if (!false) {
/* 231 */           agg_fastAggBuffer = agg_fastHashMap.findOrInsert(
/* 232 */             inputadapter_value1);
/* 233 */         }
/* 234 */       }
/* 235 */
/* 236 */       if (agg_fastAggBuffer == null) {
/* 237 */         // generate grouping key
/* 238 */         agg_holder.reset();
/* 239 */
/* 240 */         agg_rowWriter.write(0, inputadapter_value1);
/* 241 */         agg_result1.setTotalSize(agg_holder.totalSize());
/* 242 */         // hash(input[0, string, true], 42)
/* 243 */         agg_value4 = 42;
/* 244 */
/* 245 */         if (!false) {
/* 246 */           agg_value4 = org.apache.spark.unsafe.hash.Murmur3_x86_32.hashUnsafeBytes(inputadapter_value1.getBaseObject(), inputadapter_value1.getBaseOffset(), inputadapter_value1.numBytes(), agg_value4);
/* 247 */         }
/* 248 */         if (true) {
/* 249 */           // try to get the buffer from hash map
/* 250 */           agg_unsafeRowAggBuffer =
/* 251 */           agg_hashMap.getAggregationBufferFromUnsafeRow(agg_result1, agg_value4);
/* 252 */         }
/* 253 */         if (agg_unsafeRowAggBuffer == null) {
/* 254 */           if (agg_sorter == null) {
/* 255 */             agg_sorter = agg_hashMap.destructAndCreateExternalSorter();
/* 256 */           } else {
/* 257 */             agg_sorter.merge(agg_hashMap.destructAndCreateExternalSorter());
/* 258 */           }
/* 259 */
/* 260 */           // the hash map had be spilled, it should have enough memory now,
/* 261 */           // try  to allocate buffer again.
/* 262 */           agg_unsafeRowAggBuffer =
/* 263 */           agg_hashMap.getAggregationBufferFromUnsafeRow(agg_result1, agg_value4);
/* 264 */           if (agg_unsafeRowAggBuffer == null) {
/* 265 */             // failed to allocate the first page
/* 266 */             throw new OutOfMemoryError("No enough memory for aggregation");
/* 267 */           }
/* 268 */         }
/* 269 */       }
/* 270 */
/* 271 */       if (agg_fastAggBuffer != null) {
/* 272 */         // update fast row
/* 273 */
/* 274 */         // common sub-expressions
/* 275 */
/* 276 */         // evaluate aggregate function
/* 277 */         // (input[0, bigint, false] + 1)
/* 278 */         boolean agg_isNull8 = false;
/* 279 */         // input[0, bigint, false]
/* 280 */         long agg_value10 = agg_fastAggBuffer.getLong(0);
/* 281 */
/* 282 */         long agg_value9 = -1L;
/* 283 */         agg_value9 = agg_value10 + 1L;
/* 284 */         // update fast row
/* 285 */         agg_fastAggBuffer.setLong(0, agg_value9);
/* 286 */
/* 287 */       } else {
/* 288 */         // update unsafe row
/* 289 */
/* 290 */         // common sub-expressions
/* 291 */
/* 292 */         // evaluate aggregate function
/* 293 */         // (input[0, bigint, false] + 1)
/* 294 */         boolean agg_isNull5 = false;
/* 295 */         // input[0, bigint, false]
/* 296 */         long agg_value7 = agg_unsafeRowAggBuffer.getLong(0);
/* 297 */
/* 298 */         long agg_value6 = -1L;
/* 299 */         agg_value6 = agg_value7 + 1L;
/* 300 */         // update unsafe row buffer
/* 301 */         agg_unsafeRowAggBuffer.setLong(0, agg_value6);
/* 302 */
/* 303 */       }
/* 304 */       if (shouldStop()) return;
/* 305 */     }
/* 306 */
/* 307 */     agg_fastHashMapIter = agg_fastHashMap.rowIterator();
/* 308 */
/* 309 */     agg_mapIter = agg_plan.finishAggregate(agg_hashMap, agg_sorter, agg_peakMemory, agg_spillSize);
/* 310 */   }
/* 311 */
/* 312 */   private void wholestagecodegen_init_2() {
/* 313 */     agg_result1 = new UnsafeRow(1);
/* 314 */     this.agg_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_result1, 32);
/* 315 */     this.agg_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_holder, 1);
/* 316 */
/* 317 */     agg_unsafeRowJoiner = agg_plan.createUnsafeJoiner();
/* 318 */     this.wholestagecodegen_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[6];
/* 319 */     this.wholestagecodegen_aggTime = (org.apache.spark.sql.execution.metric.SQLMetric) references[7];
/* 320 */
/* 321 */   }
/* 322 */
/* 323 */   private void wholestagecodegen_init_1() {
/* 324 */     this.bhj_broadcast = (org.apache.spark.broadcast.TorrentBroadcast) references[4];
/* 325 */
/* 326 */     bhj_relation = ((org.apache.spark.sql.execution.joins.UnsafeHashedRelation) bhj_broadcast.value()).asReadOnlyCopy();
/* 327 */     incPeakExecutionMemory(bhj_relation.estimatedSize());
/* 328 */
/* 329 */     bhj_result = new UnsafeRow(1);
/* 330 */     this.bhj_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(bhj_result, 32);
/* 331 */     this.bhj_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(bhj_holder, 1);
/* 332 */     this.bhj_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[5];
/* 333 */     bhj_result1 = new UnsafeRow(2);
/* 334 */     this.bhj_holder1 = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(bhj_result1, 64);
/* 335 */     this.bhj_rowWriter1 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(bhj_holder1, 2);
/* 336 */     project_result1 = new UnsafeRow(1);
/* 337 */     this.project_holder1 = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(project_result1, 32);
/* 338 */     this.project_rowWriter1 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(project_holder1, 1);
/* 339 */
/* 340 */   }
/* 341 */
/* 342 */   protected void processNext() throws java.io.IOException {
/* 343 */     // PRODUCE: HashAggregate(keys=[str#227], functions=[partial_count(1)])
/* 344 */     if (!agg_initAgg) {
/* 345 */       agg_initAgg = true;
/* 346 */       long wholestagecodegen_beforeAgg = System.nanoTime();
/* 347 */       agg_doAggregateWithKeys();
/* 348 */       wholestagecodegen_aggTime.add((System.nanoTime() - wholestagecodegen_beforeAgg) / 1000000);
/* 349 */     }
/* 350 */
/* 351 */     // output the result
/* 352 */
/* 353 */     while (agg_fastHashMapIter.next()) {
/* 354 */       wholestagecodegen_numOutputRows.add(1);
/* 355 */       UnsafeRow agg_aggKey = (UnsafeRow) agg_fastHashMapIter.getKey();
/* 356 */       UnsafeRow agg_aggBuffer = (UnsafeRow) agg_fastHashMapIter.getValue();
/* 357 */
/* 358 */       UnsafeRow agg_resultRow = agg_unsafeRowJoiner.join(agg_aggKey, agg_aggBuffer);
/* 359 */
/* 360 */       // CONSUME: WholeStageCodegen
/* 361 */       append(agg_resultRow);
/* 362 */
/* 363 */       if (shouldStop()) return;
/* 364 */     }
/* 365 */     agg_fastHashMap.close();
/* 366 */
/* 367 */     while (agg_mapIter.next()) {
/* 368 */       wholestagecodegen_numOutputRows.add(1);
/* 369 */       UnsafeRow agg_aggKey = (UnsafeRow) agg_mapIter.getKey();
/* 370 */       UnsafeRow agg_aggBuffer = (UnsafeRow) agg_mapIter.getValue();
/* 371 */
/* 372 */       UnsafeRow agg_resultRow = agg_unsafeRowJoiner.join(agg_aggKey, agg_aggBuffer);
/* 373 */
/* 374 */       // CONSUME: WholeStageCodegen
/* 375 */       append(agg_resultRow);
/* 376 */
/* 377 */       if (shouldStop()) return;
/* 378 */     }
/* 379 */
/* 380 */     agg_mapIter.close();
/* 381 */     if (agg_sorter == null) {
/* 382 */       agg_hashMap.free();
/* 383 */     }
/* 384 */   }
/* 385 */ }
Execute the RDDs on Spark and get the result.

Above steps have generated the RDDs, then should execute the RDDs now.
See RDD execute flow for more info.本作品采用知识共享署名 4.0 国际许可协议进行许可。

发表回复