原创文章,转载请注明: 转载自慢慢的回味
本文链接地址: TreeNode in Spark analyze
TreeNode(Code in TreeNode.scala) have very important holder in Spark which is a parent of QueryPlan(Parent of LogicPlan and SparkPlan).
So understand the TreeNode will have a well help to understand the Spark SQL process flow.
Methods
mapProductIterator method
/** * Efficient alternative to `productIterator.map(f).toArray`. */ protected def mapProductIterator[B: ClassTag](f: Any => B): Array[B] = { val arr = Array.ofDim[B](productArity) var i = 0 while (i < arr.length) { arr(i) = f(productElement(i)) i += 1 } arr } |
The method will iterate all the element and apply the function “f”.
At here, productArity is size of the product. For case class, it is the size of the parameters.
For example, Case Join at here, the productArity value is 3.
And for productElement() method, it will get the element at the index value.
So productElement(0) is “left”, productElement(1) is “right”, productElement(2) is “joinType” and productElement(3) is “condition”.
case class Join( left: LogicalPlan, right: LogicalPlan, joinType: JoinType, condition: Option[Expression]) extends BinaryNode with PredicateHelper { |
So the Join Logic Plan will apply all the parameters to parameter function “f”.
transformChildren method
/** * Returns a copy of this node where `rule` has been recursively applied to all the children of * this node. When `rule` does not apply to a given node it is left unchanged. * @param rule the function used to transform this nodes children */ protected def transformChildren( rule: PartialFunction[BaseType, BaseType], nextOperation: (BaseType, PartialFunction[BaseType, BaseType]) => BaseType): BaseType = { if (children.nonEmpty) { var changed = false val newArgs = mapProductIterator { case arg: TreeNode[_] if containsChild(arg) => val newChild = nextOperation(arg.asInstanceOf[BaseType], rule) if (!(newChild fastEquals arg)) { changed = true newChild } else { arg } ...... |
For each element which returned by productElement(i) is “arg” here,
the function “f” which the argument of mapProductIterator at here is:
case arg: TreeNode[_] if containsChild(arg) => val newChild = nextOperation(arg.asInstanceOf[BaseType], rule) if (!(newChild fastEquals arg)) { changed = true newChild } else { arg } ...... |
So the “arg” is the Join(Case class’s parameters). So the “arg” will be “left”, “right”, “joinType” and “condition” one by one.
“if containsChild(arg)” case condition will filter out the “arg” which belong to the children.
At here, Join class is extend from BinnaryNode, so the children will include “left” and “right”.
/** * A logical plan node with a left and right child. */ abstract class BinaryNode extends LogicalPlan { def left: LogicalPlan def right: LogicalPlan override final def children: Seq[LogicalPlan] = Seq(left, right) } |
So only the “left” and “right” “arg” will apply the case body statement.
val newChild = nextOperation(arg.asInstanceOf[BaseType], rule) if (!(newChild fastEquals arg)) { changed = true newChild } else { arg } |
The rule argument will be by function “nextOperation”.
transform transformDown transformUp methods
The different between them is execute the rule at first, then for children or execute the rule for children, then self at last.
/** * Returns a copy of this node where `rule` has been recursively applied to the tree. * When `rule` does not apply to a given node it is left unchanged. * Users should not expect a specific directionality. If a specific directionality is needed, * transformDown or transformUp should be used. * * @param rule the function use to transform this nodes children */ def transform(rule: PartialFunction[BaseType, BaseType]): BaseType = { transformDown(rule) } /** * Returns a copy of this node where `rule` has been recursively applied to it and all of its * children (pre-order). When `rule` does not apply to a given node it is left unchanged. * * @param rule the function used to transform this nodes children */ def transformDown(rule: PartialFunction[BaseType, BaseType]): BaseType = { val afterRule = CurrentOrigin.withOrigin(origin) { rule.applyOrElse(this, identity[BaseType]) } // Check if unchanged and then possibly return old copy to avoid gc churn. if (this fastEquals afterRule) { transformChildren(rule, (t, r) => t.transformDown(r)) } else { afterRule.transformChildren(rule, (t, r) => t.transformDown(r)) } } /** * Returns a copy of this node where `rule` has been recursively applied first to all of its * children and then itself (post-order). When `rule` does not apply to a given node, it is left * unchanged. * * @param rule the function use to transform this nodes children */ def transformUp(rule: PartialFunction[BaseType, BaseType]): BaseType = { val afterRuleOnChildren = transformChildren(rule, (t, r) => t.transformUp(r)) if (this fastEquals afterRuleOnChildren) { CurrentOrigin.withOrigin(origin) { rule.applyOrElse(this, identity[BaseType]) } } else { CurrentOrigin.withOrigin(origin) { rule.applyOrElse(afterRuleOnChildren, identity[BaseType]) } } } |
Sub Classes
TreeNode :- QueryPlan : It have transformExpressionsDown and transformExpressionsUp methods, compare to TreeNode, the rule only accept Express type. :- SparkPlan : Produced from LogicPlan, can produce the RDDs. :- CodegenSupport: These Spark Plan will generate Java code to implement the code logic. :- ...... :- UnaryExecNode: Have one child Spark Plan. :- ...... :- BinaryExecNode: Have two children Spark Plan. :- ...... :- LeafExecNode: No child Spark Plan. :- ...... :- ObjectProducerExec :- ...... :- LogicalPlan : To be created after parse the SQL. :- LeftNode: No child Logical Plan :- ...... :- UnaryNode: Have one child Logical Plan. :- ...... :- BinaryNode: Have two children Logical Plan. :- ...... :- ObjectProducerExec :- ...... :- Expression :- [[LeafExpression]]: an expression that has no child. :- ...... :- [[UnaryExpression]]: an expression that has one child. :- ...... :- [[BinaryExpression]]: an expression that has two children. :- ...... :- [[TernaryExpression]]: an expression that has three children. :- ...... :- [[BinaryOperator]]: a special case of [[BinaryExpression]] that requires two children to have the same output data type. :- ...... |
本作品采用知识共享署名 4.0 国际许可协议进行许可。