Analyzer in Spark

原创文章,转载请注明: 转载自慢慢的回味

本文链接地址: Analyzer in Spark

The Analyzer is extend from RuleExecutor which will execute the rule on Logic Plan.

Execute the rule

And the RuleExecutor will invoke execute method to apply them per batch per rule.

  /**
   * Executes the batches of rules defined by the subclass. The batches are executed serially
   * using the defined execution strategy. Within each batch, rules are also executed serially.
   */
  def execute(plan: TreeType): TreeType = {
    var curPlan = plan
 
    batches.foreach { batch =>
      val batchStartPlan = curPlan
      var iteration = 1
      var lastPlan = curPlan
      var continue = true
 
      // Run until fix point (or the max number of iterations as specified in the strategy.
      while (continue) {
        curPlan = batch.rules.foldLeft(curPlan) {
          case (plan, rule) =>
            val startTime = System.nanoTime()
            val result = rule(plan)
            val runTime = System.nanoTime() - startTime
            RuleExecutor.timeMap.addAndGet(rule.ruleName, runTime)
 
            if (!result.fastEquals(plan)) {
              logTrace(
                s"""
                  |=== Applying Rule ${rule.ruleName} ===
                  |${sideBySide(plan.treeString, result.treeString).mkString("\n")}
                """.stripMargin)
            }
 
            result
        }
......
Rules in Analyzer

There are many rules in Analyzer in Spark:

lazy val batches: Seq[Batch] = Seq(
    Batch("Substitution", fixedPoint,
      CTESubstitution,
      WindowsSubstitution,
      EliminateUnions,
      new SubstituteUnresolvedOrdinals(conf)),
    Batch("Resolution", fixedPoint,
      ResolveTableValuedFunctions ::
      ResolveRelations ::
      ResolveReferences ::
      ResolveCreateNamedStruct ::
      ResolveDeserializer ::
      ResolveNewInstance ::
      ResolveUpCast ::
      ResolveGroupingAnalytics ::
      ResolvePivot ::
      ResolveOrdinalInOrderByAndGroupBy ::
      ResolveMissingReferences ::
      ExtractGenerator ::
      ResolveGenerate ::
      ResolveFunctions ::
      ResolveAliases ::
      ResolveSubquery ::
      ResolveWindowOrder ::
      ResolveWindowFrame ::
      ResolveNaturalAndUsingJoin ::
      ExtractWindowExpressions ::
      GlobalAggregates ::
      ResolveAggregateFunctions ::
      TimeWindowing ::
      ResolveInlineTables ::
      TypeCoercion.typeCoercionRules ++
      extendedResolutionRules : _*),
    Batch("Nondeterministic", Once,
      PullOutNondeterministic),
    Batch("UDF", Once,
      HandleNullInputsForUDF),
    Batch("FixNullability", Once,
      FixNullability),
    Batch("Cleanup", fixedPoint,
      CleanupAliases)
  )

继续阅读“Analyzer in Spark”本作品采用知识共享署名 4.0 国际许可协议进行许可。

TreeNode in Spark analyze

原创文章,转载请注明: 转载自慢慢的回味

本文链接地址: TreeNode in Spark analyze

TreeNode(Code in TreeNode.scala) have very important holder in Spark which is a parent of QueryPlan(Parent of LogicPlan and SparkPlan).
So understand the TreeNode will have a well help to understand the Spark SQL process flow.

Methods
mapProductIterator method
  /**
   * Efficient alternative to `productIterator.map(f).toArray`.
   */
  protected def mapProductIterator[B: ClassTag](f: Any => B): Array[B] = {
    val arr = Array.ofDim[B](productArity)
    var i = 0
    while (i < arr.length) {
      arr(i) = f(productElement(i))
      i += 1
    }
    arr
  }

The method will iterate all the element and apply the function “f”.
At here, productArity is size of the product. For case class, it is the size of the parameters.
For example, Case Join at here, the productArity value is 3.
And for productElement() method, it will get the element at the index value.
So productElement(0) is “left”, productElement(1) is “right”, productElement(2) is “joinType” and productElement(3) is “condition”.

case class Join(
    left: LogicalPlan,
    right: LogicalPlan,
    joinType: JoinType,
    condition: Option[Expression])
  extends BinaryNode with PredicateHelper {

So the Join Logic Plan will apply all the parameters to parameter function “f”.
继续阅读“TreeNode in Spark analyze”本作品采用知识共享署名 4.0 国际许可协议进行许可。

SQL execute on Spark

原创文章,转载请注明: 转载自慢慢的回味

本文链接地址: SQL execute on Spark

SQL is popularity on relational data procession. And Spark is good at supporting SQL. This article will analyse the basic flow about how the SQL processed on Spark.
Still, I will use an Unit test with comments on Code to explain:

  test("self join with aliases") {
    Seq(1, 2, 3).map(i => (i, i.toString)).toDF("int", "str").createOrReplaceTempView("df")
 
    checkAnswer(
      sql(
        """
          |SELECT x.str, COUNT(*)
          |FROM df x JOIN df y ON x.str = y.str
          |GROUP BY x.str
        """.stripMargin),
      Row("1", 1) :: Row("2", 1) :: Row("3", 1) :: Nil)
  }

Parse SQL with ANTLR.
      sql(
        """
          |SELECT x.str, COUNT(*)
          |FROM df x JOIN df y ON x.str = y.str
          |GROUP BY x.str
        """.stripMargin),

Code in SparkSession.scala

  /**
   * Executes a SQL query using Spark, returning the result as a [[DataFrame]].
   * The dialect that is used for SQL parsing can be configured with 'spark.sql.dialect'.
   *
   * @since 2.0.0
   */
  def sql(sqlText: String): DataFrame = {
    Dataset.ofRows(self, sessionState.sqlParser.parsePlan(sqlText))
  }

Code in ParseDriver.scala
The parse(sqlText) will produce SqlBaseParser, which the class is generated by ANTLR.
Then astBuilder will visit the parser to generate the Logic Plan.

  /** Creates LogicalPlan for a given SQL string. */
  override def parsePlan(sqlText: String): LogicalPlan = parse(sqlText) { parser =>
    astBuilder.visitSingleStatement(parser.singleStatement()) match {
      case plan: LogicalPlan => plan
      case _ =>
        val position = Origin(None, None)
        throw new ParseException(Option(sqlText), "Unsupported SQL statement", position, position)
    }
  }

The sqlText at here is “SELECT x.str, COUNT(*) FROM df x JOIN df y ON x.str = y.str GROUP BY x.str”.

Visit the grammar tree and generate the Logic Plan.

Let us debug on how the Join relation Logic Plan is generated.

Code in AstBuilder.scala
The visitFromClause override the SqlBaseVisitor which is generated code by ANTLR.
The parser which call this method.

  /**
   * Create a logical plan for a given 'FROM' clause. Note that we support multiple (comma
   * separated) relations here, these get converted into a single plan by condition-less inner join.
   */
  override def visitFromClause(ctx: FromClauseContext): LogicalPlan = withOrigin(ctx) {
    val from = ctx.relation.asScala.foldLeft(null: LogicalPlan) { (left, relation) =>
      val right = plan(relation.relationPrimary)
      val join = right.optionalMap(left)(Join(_, _, Inner, None))
      withJoinRelations(join, relation)
    }
    ctx.lateralView.asScala.foldLeft(from)(withGenerate)
  }

Get the join and relation, then invoke withJoinRelations to produce the Join Logic Plan.

  /**
   * Join one more [[LogicalPlan]]s to the current logical plan.
   */
  private def withJoinRelations(base: LogicalPlan, ctx: RelationContext): LogicalPlan = {
    ctx.joinRelation.asScala.foldLeft(base) { (left, join) =>
      withOrigin(join) {
        val baseJoinType = join.joinType match {
          case null => Inner
          case jt if jt.CROSS != null => Cross
          case jt if jt.FULL != null => FullOuter
          case jt if jt.SEMI != null => LeftSemi
          case jt if jt.ANTI != null => LeftAnti
          case jt if jt.LEFT != null => LeftOuter
          case jt if jt.RIGHT != null => RightOuter
          case _ => Inner
        }
 
        // Resolve the join type and join condition
        val (joinType, condition) = Option(join.joinCriteria) match {
          case Some(c) if c.USING != null =>
            val columns = c.identifier.asScala.map { column =>
              UnresolvedAttribute.quoted(column.getText)
            }
            (UsingJoin(baseJoinType, columns), None)
          case Some(c) if c.booleanExpression != null =>
            (baseJoinType, Option(expression(c.booleanExpression)))
          case None if join.NATURAL != null =>
            if (baseJoinType == Cross) {
              throw new ParseException("NATURAL CROSS JOIN is not supported", ctx)
            }
            (NaturalJoin(baseJoinType), None)
          case None =>
            (baseJoinType, None)
        }
        Join(left, plan(join.right), joinType, condition)
      }
    }
  }

继续阅读“SQL execute on Spark”本作品采用知识共享署名 4.0 国际许可协议进行许可。

RDD execute flow

原创文章,转载请注明: 转载自慢慢的回味

本文链接地址: RDD execute flow

This article will explain how the RDD to be executed with comments on code.
The spark git version is: SHA-1: 8b325b17ecdf013b7a6edcb7ee3773546bd914df

1 Using a unit test for example
46
47
48
49
50
51
52
53
54
  class BroadcastSuite extends SparkFunSuite with LocalSparkContext {
 
  test("Using TorrentBroadcast locally") {
    sc = new SparkContext("local", "test")
    val list = List[Int](1, 2, 3, 4)
    val broadcast = sc.broadcast(list)
    val results = sc.parallelize(1 to 2).map(x =&gt; (x, broadcast.value.sum))
    assert(results.collect().toSet === Set((1, 10), (2, 10)))
  }
2 Broadcast the data
  val broadcast = sc.broadcast(list)
 
  def broadcast[T: ClassTag](value: T): Broadcast[T] = {
    assertNotStopped()
    require(!classOf[RDD[_]].isAssignableFrom(classTag[T].runtimeClass),
      "Can not directly broadcast RDDs; instead, call collect() and broadcast the result.")
    val bc = env.broadcastManager.newBroadcast[T](value, isLocal)
    val callSite = getCallSite
    logInfo("Created broadcast " + bc.id + " from " + callSite.shortForm)
    cleaner.foreach(_.registerBroadcastForCleanup(bc))
    bc
  }
BroadcastManager.scala
55
56
57
  def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean): Broadcast[T] = {
    broadcastFactory.newBroadcast[T](value_, isLocal, nextBroadcastId.getAndIncrement())
  }
TorrentBroadcastFactory.scala
  override def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean, id: Long): Broadcast[T] = {
    new TorrentBroadcast[T](value_, id)
  }
TorrentBroadcast.scala
  //lazy keyword is good for here, transfer the data to block store
  @transient private lazy val _value: T = readBroadcastBlock()
  //read the data from block store
  private val numBlocks: Int = writeBlocks(obj)
 
  override protected def getValue() = {
    _value
  }
3 Create RDD

It will create two RDDs,first parallelize the data, then a map,
the map RDD will reference the function. They are not be executed immediately.

  val results = sc.parallelize(1 to 2).map(x => (x, broadcast.value.sum))
 
  def parallelize[T: ClassTag](
      seq: Seq[T],
      numSlices: Int = defaultParallelism): RDD[T] = withScope {
    assertNotStopped()
    new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
  }
 
  def map[U: ClassTag](f: T => U): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
  }
4 Run RDD to get the result
  results.collect()
 
  //Here is the really area for RDD execute
  def collect(): Array[T] = withScope {
    val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
    Array.concat(results: _*)
  }

继续阅读“RDD execute flow”本作品采用知识共享署名 4.0 国际许可协议进行许可。