Spark Planner for Converting Logical Plan to Spark Plan

原创文章,转载请注明: 转载自慢慢的回味

本文链接地址: Spark Planner for Converting Logical Plan to Spark Plan

The Spark Planner is the bridge from Logic Plan to Spark Plan which will convert Logic Plan to Spark Plan.
The SparkPlanner extends from SparkStrategies(Code in SparkStrategies.scala).
It have several strategies:

  def strategies: Seq[Strategy] =
      extraStrategies ++ (
      FileSourceStrategy ::
      DataSourceStrategy ::
      DDLStrategy ::
      SpecialLimits ::
      Aggregation ::
      JoinSelection ::
      InMemoryScans ::
      BasicOperators :: Nil)

Let me introduce several here:

SpecialLimits

The Logical Plan Limit will be converted to Spark Plan TakeOrderedAndProjectExec here.

 /**
   * Plans special cases of limit operators.
   */
  object SpecialLimits extends Strategy {
    override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
      case logical.ReturnAnswer(rootPlan) => rootPlan match {
        case logical.Limit(IntegerLiteral(limit), logical.Sort(order, true, child)) =>
          execution.TakeOrderedAndProjectExec(limit, order, child.output, planLater(child)) :: Nil
        case logical.Limit(
            IntegerLiteral(limit),
            logical.Project(projectList, logical.Sort(order, true, child))) =>
          execution.TakeOrderedAndProjectExec(
            limit, order, projectList, planLater(child)) :: Nil
        case logical.Limit(IntegerLiteral(limit), child) =>
          execution.CollectLimitExec(limit, planLater(child)) :: Nil
        case other => planLater(other) :: Nil
      }
      case logical.Limit(IntegerLiteral(limit), logical.Sort(order, true, child)) =>
        execution.TakeOrderedAndProjectExec(limit, order, child.output, planLater(child)) :: Nil
      case logical.Limit(
          IntegerLiteral(limit), logical.Project(projectList, logical.Sort(order, true, child))) =>
        execution.TakeOrderedAndProjectExec(
          limit, order, projectList, planLater(child)) :: Nil
      case _ => Nil
    }
  }
Aggregation

The Logical Plan Aggregate will be convert to bellow Spark Plan based on conditions:
SortAggregateExec
HashAggregateExec
ObjectHashAggregateExec

 /**
   * Used to plan the aggregate operator for expressions based on the AggregateFunction2 interface.
   */
  object Aggregation extends Strategy {
    def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
      case PhysicalAggregation(
          groupingExpressions, aggregateExpressions, resultExpressions, child) =>
 
        val (functionsWithDistinct, functionsWithoutDistinct) =
          aggregateExpressions.partition(_.isDistinct)
......

Let me use SQL “SELECT x.str, COUNT(*) FROM df x JOIN df y ON x.str = y.str GROUP BY x.str” for example.
And at here, “COUNT(*)” is an aggregate expression.

继续阅读“Spark Planner for Converting Logical Plan to Spark Plan”本作品采用知识共享署名 4.0 国际许可协议进行许可。

Spark Plan

原创文章,转载请注明: 转载自慢慢的回味

本文链接地址: Spark Plan

The article will list the class hierarchy for Spark Plan and describe a core Plan which is WholeStageCodegenExec.

Spark Plan sub classes hierarchy
SparkPlan 
    OutputFakerExec 
    EventTimeWatermarkExec 
    CodegenSupport 
        DeserializeToObjectExec 
        GenerateExec 
        HashAggregateExec 
        RangeExec 
        ExpandExec 
        BaseLimitExec 
            GlobalLimitExec 
            LocalLimitExec 
        DataSourceScanExec 
            FileSourceScanExec 
            RowDataSourceScanExec 
        WholeStageCodegenExec 
        SortExec 
        ProjectExec 
        BroadcastHashJoinExec 
        InputAdapter 
        DebugExec in package$ 
        SampleExec 
        SerializeFromObjectExec 
        SortMergeJoinExec 
        MapElementsExec 
        FilterExec 
    StatefulOperator 
        StateStoreSaveExec 
        StateStoreRestoreExec 
    UnionExec 
    ExecutedCommandExec 
    UnaryExecNode 
        DeserializeToObjectExec 
        SortAggregateExec 
        GenerateExec 
        CollectLimitExec 
        FlatMapGroupsInRExec 
        HashAggregateExec 
        ExpandExec 
        BaseLimitExec 
            GlobalLimitExec 
            LocalLimitExec 
        Exchange 
            ShuffleExchange 
            BroadcastExchangeExec 
        WholeStageCodegenExec 
        AppendColumnsExec 
        SortExec 
        MapGroupsExec 
        ProjectExec 
        ObjectConsumerExec 
            SerializeFromObjectExec 
            MapPartitionsExec 
            AppendColumnsWithObjectExec 
            MapElementsExec 
        InsertIntoHiveTable 
        DebugExec in package$ 
        InputAdapter 
        SampleExec 
        CoalesceExec 
        ExceptionInjectingOperator 
        ObjectHashAggregateExec 
        ReferenceSort 
        ScriptTransformation 
        StateStoreSaveExec 
        WindowExec 
        SubqueryExec 
        TakeOrderedAndProjectExec 
        FilterExec 
        StateStoreRestoreExec 
    BinaryExecNode 
        BroadcastNestedLoopJoinExec 
        CoGroupExec 
        ShuffledHashJoinExec 
        CartesianProductExec 
        BroadcastHashJoinExec 
        SortMergeJoinExec 
    BatchEvalPythonExec 
    FastOperator 
    LeafExecNode 
        ReusedExchangeExec 
        InMemoryTableScanExec 
        PlanLater 
        RangeExec 
        LocalTableScanExec 
        DataSourceScanExec 
            FileSourceScanExec 
            RowDataSourceScanExec 
        RDDScanExec 
        MyPlan 
        ExternalRDDScanExec 
        HiveTableScanExec 
        StreamingRelationExec 
    DummySparkPlan 
    ObjectProducerExec 
        DeserializeToObjectExec 
        FlatMapGroupsInRExec 
        CoGroupExec 
        MapGroupsExec 
        ExternalRDDScanExec 
        MapPartitionsExec 
        MapElementsExec

继续阅读“Spark Plan”本作品采用知识共享署名 4.0 国际许可协议进行许可。

Logical Plan

原创文章,转载请注明: 转载自慢慢的回味

本文链接地址: Logical Plan

LogicalPlan classes hierarchy
LogicalPlan 
    InsertIntoTable : Insert data to a table[Logical Plan].
    LeafNode : The node don't have children.
        Command : The Logical Plan don't have output.
            CreateTable 
            RunnableCommand 
                ExplainCommand 
                DropDatabaseCommand 
                AlterTableSetPropertiesCommand 
                AlterTableUnsetPropertiesCommand 
                SetCommand 
                RefreshResource 
                ShowTablesCommand 
                CacheTableCommand 
                ShowDatabasesCommand 
                AddFileCommand 
                ListFilesCommand 
                RefreshTable 
                DescribeDatabaseCommand 
                CreateHiveTableAsSelectCommand 
                AddJarCommand 
                DropTableCommand 
                CreateDatabaseCommand 
                CreateDataSourceTableCommand 
                CreateTableCommand 
                AnalyzeColumnCommand 
                CreateViewCommand 
                AnalyzeTableCommand 
                SetDatabaseCommand 
                UncacheTableCommand 
                ClearCacheCommand$ 
                DropFunctionCommand 
                AlterTableAddPartitionCommand 
                CreateDataSourceTableAsSelectCommand 
                ShowFunctionsCommand 
                DescribeTableCommand 
                ShowPartitionsCommand 
                LoadDataCommand 
                ShowColumnsCommand 
                ListJarsCommand 
                ShowCreateTableCommand 
                AlterTableDropPartitionCommand 
                AlterTableRenameCommand 
                InsertIntoDataSourceCommand 
                CreateTableLikeCommand 
                AlterTableSetLocationCommand 
                AlterDatabasePropertiesCommand 
                AlterTableRecoverPartitionsCommand 
                DescribeFunctionCommand 
                TruncateTableCommand 
                AlterViewAsCommand 
                ShowTablePropertiesCommand 
                CreateFunctionCommand 
                InsertIntoHadoopFsRelationCommand 
                AlterTableSerDePropertiesCommand 
                ResetCommand$ 
                CreateTempViewUsing 
                AlterTableRenamePartitionCommand 
        StreamingRelation 
        StreamingExecutionRelation 
        Range 
        ExternalRDD 
        UnresolvedTableValuedFunction 
        MetastoreRelation 
        MemoryPlan 
        SQLTable in SQLBuilder 
        UnresolvedRelation 
        InMemoryRelation 
        SimpleCatalogRelation 
        LogicalRelation 
        UnresolvedInlineTable 
        LogicalRDD 
        OneRowRelation$ 
    Union 
    EventTimeWatermark 
    UnaryNode : Only have one child Logical Plan
        DeserializeToObject 
        Filter 
        ScriptTransformation 
        Distinct 
        BroadcastHint 
        Project 
        With 
        Pivot 
        MapGroups 
        Window 
        GlobalLimit 
        WithWindowDefinition 
        ReturnAnswer 
        ObjectConsumer 
            AppendColumnsWithObject 
            SerializeFromObject 
            MapElements 
            MapPartitions 
            MapPartitionsInR 
        Aggregate 
        Generate 
        FlatMapGroupsInR 
        TypedFilter 
        GroupingSets 
        RedistributeData 
            RepartitionByExpression 
            SortPartitions 
        SubqueryAlias 
        Expand 
        LocalLimit 
        Sort 
        Repartition 
        Sample 
        AppendColumns 
    BinaryNode : Have two children Logical Plan
        CoGroup 
        SetOperation 
            Except 
            Intersect 
        Join 
    ObjectProducer 
        DeserializeToObject 
        MapGroups 
        CoGroup 
        ExternalRDD 
        FlatMapGroupsInR 
        MapElements 
        MapPartitions 
        MapPartitionsInR

本作品采用知识共享署名 4.0 国际许可协议进行许可。

Analyzer in Spark

原创文章,转载请注明: 转载自慢慢的回味

本文链接地址: Analyzer in Spark

The Analyzer is extend from RuleExecutor which will execute the rule on Logic Plan.

Execute the rule

And the RuleExecutor will invoke execute method to apply them per batch per rule.

  /**
   * Executes the batches of rules defined by the subclass. The batches are executed serially
   * using the defined execution strategy. Within each batch, rules are also executed serially.
   */
  def execute(plan: TreeType): TreeType = {
    var curPlan = plan
 
    batches.foreach { batch =>
      val batchStartPlan = curPlan
      var iteration = 1
      var lastPlan = curPlan
      var continue = true
 
      // Run until fix point (or the max number of iterations as specified in the strategy.
      while (continue) {
        curPlan = batch.rules.foldLeft(curPlan) {
          case (plan, rule) =>
            val startTime = System.nanoTime()
            val result = rule(plan)
            val runTime = System.nanoTime() - startTime
            RuleExecutor.timeMap.addAndGet(rule.ruleName, runTime)
 
            if (!result.fastEquals(plan)) {
              logTrace(
                s"""
                  |=== Applying Rule ${rule.ruleName} ===
                  |${sideBySide(plan.treeString, result.treeString).mkString("\n")}
                """.stripMargin)
            }
 
            result
        }
......
Rules in Analyzer

There are many rules in Analyzer in Spark:

lazy val batches: Seq[Batch] = Seq(
    Batch("Substitution", fixedPoint,
      CTESubstitution,
      WindowsSubstitution,
      EliminateUnions,
      new SubstituteUnresolvedOrdinals(conf)),
    Batch("Resolution", fixedPoint,
      ResolveTableValuedFunctions ::
      ResolveRelations ::
      ResolveReferences ::
      ResolveCreateNamedStruct ::
      ResolveDeserializer ::
      ResolveNewInstance ::
      ResolveUpCast ::
      ResolveGroupingAnalytics ::
      ResolvePivot ::
      ResolveOrdinalInOrderByAndGroupBy ::
      ResolveMissingReferences ::
      ExtractGenerator ::
      ResolveGenerate ::
      ResolveFunctions ::
      ResolveAliases ::
      ResolveSubquery ::
      ResolveWindowOrder ::
      ResolveWindowFrame ::
      ResolveNaturalAndUsingJoin ::
      ExtractWindowExpressions ::
      GlobalAggregates ::
      ResolveAggregateFunctions ::
      TimeWindowing ::
      ResolveInlineTables ::
      TypeCoercion.typeCoercionRules ++
      extendedResolutionRules : _*),
    Batch("Nondeterministic", Once,
      PullOutNondeterministic),
    Batch("UDF", Once,
      HandleNullInputsForUDF),
    Batch("FixNullability", Once,
      FixNullability),
    Batch("Cleanup", fixedPoint,
      CleanupAliases)
  )

继续阅读“Analyzer in Spark”本作品采用知识共享署名 4.0 国际许可协议进行许可。

TreeNode in Spark analyze

原创文章,转载请注明: 转载自慢慢的回味

本文链接地址: TreeNode in Spark analyze

TreeNode(Code in TreeNode.scala) have very important holder in Spark which is a parent of QueryPlan(Parent of LogicPlan and SparkPlan).
So understand the TreeNode will have a well help to understand the Spark SQL process flow.

Methods
mapProductIterator method
  /**
   * Efficient alternative to `productIterator.map(f).toArray`.
   */
  protected def mapProductIterator[B: ClassTag](f: Any => B): Array[B] = {
    val arr = Array.ofDim[B](productArity)
    var i = 0
    while (i < arr.length) {
      arr(i) = f(productElement(i))
      i += 1
    }
    arr
  }

The method will iterate all the element and apply the function “f”.
At here, productArity is size of the product. For case class, it is the size of the parameters.
For example, Case Join at here, the productArity value is 3.
And for productElement() method, it will get the element at the index value.
So productElement(0) is “left”, productElement(1) is “right”, productElement(2) is “joinType” and productElement(3) is “condition”.

case class Join(
    left: LogicalPlan,
    right: LogicalPlan,
    joinType: JoinType,
    condition: Option[Expression])
  extends BinaryNode with PredicateHelper {

So the Join Logic Plan will apply all the parameters to parameter function “f”.
继续阅读“TreeNode in Spark analyze”本作品采用知识共享署名 4.0 国际许可协议进行许可。