原创文章,转载请注明: 转载自慢慢的回味
本文链接地址: Spark Planner for Converting Logical Plan to Spark Plan
The Spark Planner is the bridge from Logic Plan to Spark Plan which will convert Logic Plan to Spark Plan.
The SparkPlanner extends from SparkStrategies(Code in SparkStrategies.scala).
It have several strategies:
def strategies: Seq[Strategy] = extraStrategies ++ ( FileSourceStrategy :: DataSourceStrategy :: DDLStrategy :: SpecialLimits :: Aggregation :: JoinSelection :: InMemoryScans :: BasicOperators :: Nil) |
Let me introduce several here:
SpecialLimits
The Logical Plan Limit will be converted to Spark Plan TakeOrderedAndProjectExec here.
/** * Plans special cases of limit operators. */ object SpecialLimits extends Strategy { override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case logical.ReturnAnswer(rootPlan) => rootPlan match { case logical.Limit(IntegerLiteral(limit), logical.Sort(order, true, child)) => execution.TakeOrderedAndProjectExec(limit, order, child.output, planLater(child)) :: Nil case logical.Limit( IntegerLiteral(limit), logical.Project(projectList, logical.Sort(order, true, child))) => execution.TakeOrderedAndProjectExec( limit, order, projectList, planLater(child)) :: Nil case logical.Limit(IntegerLiteral(limit), child) => execution.CollectLimitExec(limit, planLater(child)) :: Nil case other => planLater(other) :: Nil } case logical.Limit(IntegerLiteral(limit), logical.Sort(order, true, child)) => execution.TakeOrderedAndProjectExec(limit, order, child.output, planLater(child)) :: Nil case logical.Limit( IntegerLiteral(limit), logical.Project(projectList, logical.Sort(order, true, child))) => execution.TakeOrderedAndProjectExec( limit, order, projectList, planLater(child)) :: Nil case _ => Nil } } |
Aggregation
The Logical Plan Aggregate will be convert to bellow Spark Plan based on conditions:
SortAggregateExec
HashAggregateExec
ObjectHashAggregateExec
/** * Used to plan the aggregate operator for expressions based on the AggregateFunction2 interface. */ object Aggregation extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case PhysicalAggregation( groupingExpressions, aggregateExpressions, resultExpressions, child) => val (functionsWithDistinct, functionsWithoutDistinct) = aggregateExpressions.partition(_.isDistinct) ...... |
Let me use SQL “SELECT x.str, COUNT(*) FROM df x JOIN df y ON x.str = y.str GROUP BY x.str” for example.
And at here, “COUNT(*)” is an aggregate expression.
继续阅读“Spark Planner for Converting Logical Plan to Spark Plan”本作品采用知识共享署名 4.0 国际许可协议进行许可。