Spark Plan

原创文章,转载请注明: 转载自慢慢的回味

本文链接地址: Spark Plan

The article will list the class hierarchy for Spark Plan and describe a core Plan which is WholeStageCodegenExec.

Spark Plan sub classes hierarchy
SparkPlan 
    OutputFakerExec 
    EventTimeWatermarkExec 
    CodegenSupport 
        DeserializeToObjectExec 
        GenerateExec 
        HashAggregateExec 
        RangeExec 
        ExpandExec 
        BaseLimitExec 
            GlobalLimitExec 
            LocalLimitExec 
        DataSourceScanExec 
            FileSourceScanExec 
            RowDataSourceScanExec 
        WholeStageCodegenExec 
        SortExec 
        ProjectExec 
        BroadcastHashJoinExec 
        InputAdapter 
        DebugExec in package$ 
        SampleExec 
        SerializeFromObjectExec 
        SortMergeJoinExec 
        MapElementsExec 
        FilterExec 
    StatefulOperator 
        StateStoreSaveExec 
        StateStoreRestoreExec 
    UnionExec 
    ExecutedCommandExec 
    UnaryExecNode 
        DeserializeToObjectExec 
        SortAggregateExec 
        GenerateExec 
        CollectLimitExec 
        FlatMapGroupsInRExec 
        HashAggregateExec 
        ExpandExec 
        BaseLimitExec 
            GlobalLimitExec 
            LocalLimitExec 
        Exchange 
            ShuffleExchange 
            BroadcastExchangeExec 
        WholeStageCodegenExec 
        AppendColumnsExec 
        SortExec 
        MapGroupsExec 
        ProjectExec 
        ObjectConsumerExec 
            SerializeFromObjectExec 
            MapPartitionsExec 
            AppendColumnsWithObjectExec 
            MapElementsExec 
        InsertIntoHiveTable 
        DebugExec in package$ 
        InputAdapter 
        SampleExec 
        CoalesceExec 
        ExceptionInjectingOperator 
        ObjectHashAggregateExec 
        ReferenceSort 
        ScriptTransformation 
        StateStoreSaveExec 
        WindowExec 
        SubqueryExec 
        TakeOrderedAndProjectExec 
        FilterExec 
        StateStoreRestoreExec 
    BinaryExecNode 
        BroadcastNestedLoopJoinExec 
        CoGroupExec 
        ShuffledHashJoinExec 
        CartesianProductExec 
        BroadcastHashJoinExec 
        SortMergeJoinExec 
    BatchEvalPythonExec 
    FastOperator 
    LeafExecNode 
        ReusedExchangeExec 
        InMemoryTableScanExec 
        PlanLater 
        RangeExec 
        LocalTableScanExec 
        DataSourceScanExec 
            FileSourceScanExec 
            RowDataSourceScanExec 
        RDDScanExec 
        MyPlan 
        ExternalRDDScanExec 
        HiveTableScanExec 
        StreamingRelationExec 
    DummySparkPlan 
    ObjectProducerExec 
        DeserializeToObjectExec 
        FlatMapGroupsInRExec 
        CoGroupExec 
        MapGroupsExec 
        ExternalRDDScanExec 
        MapPartitionsExec 
        MapElementsExec

继续阅读“Spark Plan”本作品采用知识共享署名 4.0 国际许可协议进行许可。

Analyzer in Spark

原创文章,转载请注明: 转载自慢慢的回味

本文链接地址: Analyzer in Spark

The Analyzer is extend from RuleExecutor which will execute the rule on Logic Plan.

Execute the rule

And the RuleExecutor will invoke execute method to apply them per batch per rule.

  /**
   * Executes the batches of rules defined by the subclass. The batches are executed serially
   * using the defined execution strategy. Within each batch, rules are also executed serially.
   */
  def execute(plan: TreeType): TreeType = {
    var curPlan = plan
 
    batches.foreach { batch =>
      val batchStartPlan = curPlan
      var iteration = 1
      var lastPlan = curPlan
      var continue = true
 
      // Run until fix point (or the max number of iterations as specified in the strategy.
      while (continue) {
        curPlan = batch.rules.foldLeft(curPlan) {
          case (plan, rule) =>
            val startTime = System.nanoTime()
            val result = rule(plan)
            val runTime = System.nanoTime() - startTime
            RuleExecutor.timeMap.addAndGet(rule.ruleName, runTime)
 
            if (!result.fastEquals(plan)) {
              logTrace(
                s"""
                  |=== Applying Rule ${rule.ruleName} ===
                  |${sideBySide(plan.treeString, result.treeString).mkString("\n")}
                """.stripMargin)
            }
 
            result
        }
......
Rules in Analyzer

There are many rules in Analyzer in Spark:

lazy val batches: Seq[Batch] = Seq(
    Batch("Substitution", fixedPoint,
      CTESubstitution,
      WindowsSubstitution,
      EliminateUnions,
      new SubstituteUnresolvedOrdinals(conf)),
    Batch("Resolution", fixedPoint,
      ResolveTableValuedFunctions ::
      ResolveRelations ::
      ResolveReferences ::
      ResolveCreateNamedStruct ::
      ResolveDeserializer ::
      ResolveNewInstance ::
      ResolveUpCast ::
      ResolveGroupingAnalytics ::
      ResolvePivot ::
      ResolveOrdinalInOrderByAndGroupBy ::
      ResolveMissingReferences ::
      ExtractGenerator ::
      ResolveGenerate ::
      ResolveFunctions ::
      ResolveAliases ::
      ResolveSubquery ::
      ResolveWindowOrder ::
      ResolveWindowFrame ::
      ResolveNaturalAndUsingJoin ::
      ExtractWindowExpressions ::
      GlobalAggregates ::
      ResolveAggregateFunctions ::
      TimeWindowing ::
      ResolveInlineTables ::
      TypeCoercion.typeCoercionRules ++
      extendedResolutionRules : _*),
    Batch("Nondeterministic", Once,
      PullOutNondeterministic),
    Batch("UDF", Once,
      HandleNullInputsForUDF),
    Batch("FixNullability", Once,
      FixNullability),
    Batch("Cleanup", fixedPoint,
      CleanupAliases)
  )

继续阅读“Analyzer in Spark”本作品采用知识共享署名 4.0 国际许可协议进行许可。

TreeNode in Spark analyze

原创文章,转载请注明: 转载自慢慢的回味

本文链接地址: TreeNode in Spark analyze

TreeNode(Code in TreeNode.scala) have very important holder in Spark which is a parent of QueryPlan(Parent of LogicPlan and SparkPlan).
So understand the TreeNode will have a well help to understand the Spark SQL process flow.

Methods
mapProductIterator method
  /**
   * Efficient alternative to `productIterator.map(f).toArray`.
   */
  protected def mapProductIterator[B: ClassTag](f: Any => B): Array[B] = {
    val arr = Array.ofDim[B](productArity)
    var i = 0
    while (i < arr.length) {
      arr(i) = f(productElement(i))
      i += 1
    }
    arr
  }

The method will iterate all the element and apply the function “f”.
At here, productArity is size of the product. For case class, it is the size of the parameters.
For example, Case Join at here, the productArity value is 3.
And for productElement() method, it will get the element at the index value.
So productElement(0) is “left”, productElement(1) is “right”, productElement(2) is “joinType” and productElement(3) is “condition”.

case class Join(
    left: LogicalPlan,
    right: LogicalPlan,
    joinType: JoinType,
    condition: Option[Expression])
  extends BinaryNode with PredicateHelper {

So the Join Logic Plan will apply all the parameters to parameter function “f”.
继续阅读“TreeNode in Spark analyze”本作品采用知识共享署名 4.0 国际许可协议进行许可。

SQL execute on Spark

原创文章,转载请注明: 转载自慢慢的回味

本文链接地址: SQL execute on Spark

SQL is popularity on relational data procession. And Spark is good at supporting SQL. This article will analyse the basic flow about how the SQL processed on Spark.
Still, I will use an Unit test with comments on Code to explain:

  test("self join with aliases") {
    Seq(1, 2, 3).map(i => (i, i.toString)).toDF("int", "str").createOrReplaceTempView("df")
 
    checkAnswer(
      sql(
        """
          |SELECT x.str, COUNT(*)
          |FROM df x JOIN df y ON x.str = y.str
          |GROUP BY x.str
        """.stripMargin),
      Row("1", 1) :: Row("2", 1) :: Row("3", 1) :: Nil)
  }

Parse SQL with ANTLR.
      sql(
        """
          |SELECT x.str, COUNT(*)
          |FROM df x JOIN df y ON x.str = y.str
          |GROUP BY x.str
        """.stripMargin),

Code in SparkSession.scala

  /**
   * Executes a SQL query using Spark, returning the result as a [[DataFrame]].
   * The dialect that is used for SQL parsing can be configured with 'spark.sql.dialect'.
   *
   * @since 2.0.0
   */
  def sql(sqlText: String): DataFrame = {
    Dataset.ofRows(self, sessionState.sqlParser.parsePlan(sqlText))
  }

Code in ParseDriver.scala
The parse(sqlText) will produce SqlBaseParser, which the class is generated by ANTLR.
Then astBuilder will visit the parser to generate the Logic Plan.

  /** Creates LogicalPlan for a given SQL string. */
  override def parsePlan(sqlText: String): LogicalPlan = parse(sqlText) { parser =>
    astBuilder.visitSingleStatement(parser.singleStatement()) match {
      case plan: LogicalPlan => plan
      case _ =>
        val position = Origin(None, None)
        throw new ParseException(Option(sqlText), "Unsupported SQL statement", position, position)
    }
  }

The sqlText at here is “SELECT x.str, COUNT(*) FROM df x JOIN df y ON x.str = y.str GROUP BY x.str”.

Visit the grammar tree and generate the Logic Plan.

Let us debug on how the Join relation Logic Plan is generated.

Code in AstBuilder.scala
The visitFromClause override the SqlBaseVisitor which is generated code by ANTLR.
The parser which call this method.

  /**
   * Create a logical plan for a given 'FROM' clause. Note that we support multiple (comma
   * separated) relations here, these get converted into a single plan by condition-less inner join.
   */
  override def visitFromClause(ctx: FromClauseContext): LogicalPlan = withOrigin(ctx) {
    val from = ctx.relation.asScala.foldLeft(null: LogicalPlan) { (left, relation) =>
      val right = plan(relation.relationPrimary)
      val join = right.optionalMap(left)(Join(_, _, Inner, None))
      withJoinRelations(join, relation)
    }
    ctx.lateralView.asScala.foldLeft(from)(withGenerate)
  }

Get the join and relation, then invoke withJoinRelations to produce the Join Logic Plan.

  /**
   * Join one more [[LogicalPlan]]s to the current logical plan.
   */
  private def withJoinRelations(base: LogicalPlan, ctx: RelationContext): LogicalPlan = {
    ctx.joinRelation.asScala.foldLeft(base) { (left, join) =>
      withOrigin(join) {
        val baseJoinType = join.joinType match {
          case null => Inner
          case jt if jt.CROSS != null => Cross
          case jt if jt.FULL != null => FullOuter
          case jt if jt.SEMI != null => LeftSemi
          case jt if jt.ANTI != null => LeftAnti
          case jt if jt.LEFT != null => LeftOuter
          case jt if jt.RIGHT != null => RightOuter
          case _ => Inner
        }
 
        // Resolve the join type and join condition
        val (joinType, condition) = Option(join.joinCriteria) match {
          case Some(c) if c.USING != null =>
            val columns = c.identifier.asScala.map { column =>
              UnresolvedAttribute.quoted(column.getText)
            }
            (UsingJoin(baseJoinType, columns), None)
          case Some(c) if c.booleanExpression != null =>
            (baseJoinType, Option(expression(c.booleanExpression)))
          case None if join.NATURAL != null =>
            if (baseJoinType == Cross) {
              throw new ParseException("NATURAL CROSS JOIN is not supported", ctx)
            }
            (NaturalJoin(baseJoinType), None)
          case None =>
            (baseJoinType, None)
        }
        Join(left, plan(join.right), joinType, condition)
      }
    }
  }

继续阅读“SQL execute on Spark”本作品采用知识共享署名 4.0 国际许可协议进行许可。

RDD execute flow

原创文章,转载请注明: 转载自慢慢的回味

本文链接地址: RDD execute flow

This article will explain how the RDD to be executed with comments on code.
The spark git version is: SHA-1: 8b325b17ecdf013b7a6edcb7ee3773546bd914df

1 Using a unit test for example
46
47
48
49
50
51
52
53
54
  class BroadcastSuite extends SparkFunSuite with LocalSparkContext {
 
  test("Using TorrentBroadcast locally") {
    sc = new SparkContext("local", "test")
    val list = List[Int](1, 2, 3, 4)
    val broadcast = sc.broadcast(list)
    val results = sc.parallelize(1 to 2).map(x =&gt; (x, broadcast.value.sum))
    assert(results.collect().toSet === Set((1, 10), (2, 10)))
  }
2 Broadcast the data
  val broadcast = sc.broadcast(list)
 
  def broadcast[T: ClassTag](value: T): Broadcast[T] = {
    assertNotStopped()
    require(!classOf[RDD[_]].isAssignableFrom(classTag[T].runtimeClass),
      "Can not directly broadcast RDDs; instead, call collect() and broadcast the result.")
    val bc = env.broadcastManager.newBroadcast[T](value, isLocal)
    val callSite = getCallSite
    logInfo("Created broadcast " + bc.id + " from " + callSite.shortForm)
    cleaner.foreach(_.registerBroadcastForCleanup(bc))
    bc
  }
BroadcastManager.scala
55
56
57
  def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean): Broadcast[T] = {
    broadcastFactory.newBroadcast[T](value_, isLocal, nextBroadcastId.getAndIncrement())
  }
TorrentBroadcastFactory.scala
  override def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean, id: Long): Broadcast[T] = {
    new TorrentBroadcast[T](value_, id)
  }
TorrentBroadcast.scala
  //lazy keyword is good for here, transfer the data to block store
  @transient private lazy val _value: T = readBroadcastBlock()
  //read the data from block store
  private val numBlocks: Int = writeBlocks(obj)
 
  override protected def getValue() = {
    _value
  }
3 Create RDD

It will create two RDDs,first parallelize the data, then a map,
the map RDD will reference the function. They are not be executed immediately.

  val results = sc.parallelize(1 to 2).map(x => (x, broadcast.value.sum))
 
  def parallelize[T: ClassTag](
      seq: Seq[T],
      numSlices: Int = defaultParallelism): RDD[T] = withScope {
    assertNotStopped()
    new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
  }
 
  def map[U: ClassTag](f: T => U): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
  }
4 Run RDD to get the result
  results.collect()
 
  //Here is the really area for RDD execute
  def collect(): Array[T] = withScope {
    val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
    Array.concat(results: _*)
  }

继续阅读“RDD execute flow”本作品采用知识共享署名 4.0 国际许可协议进行许可。