SQL execute on Spark

原创文章,转载请注明: 转载自慢慢的回味

本文链接地址: SQL execute on Spark

SQL is popularity on relational data procession. And Spark is good at supporting SQL. This article will analyse the basic flow about how the SQL processed on Spark.
Still, I will use an Unit test with comments on Code to explain:

  test("self join with aliases") {
    Seq(1, 2, 3).map(i => (i, i.toString)).toDF("int", "str").createOrReplaceTempView("df")
 
    checkAnswer(
      sql(
        """
          |SELECT x.str, COUNT(*)
          |FROM df x JOIN df y ON x.str = y.str
          |GROUP BY x.str
        """.stripMargin),
      Row("1", 1) :: Row("2", 1) :: Row("3", 1) :: Nil)
  }

Parse SQL with ANTLR.
      sql(
        """
          |SELECT x.str, COUNT(*)
          |FROM df x JOIN df y ON x.str = y.str
          |GROUP BY x.str
        """.stripMargin),

Code in SparkSession.scala

  /**
   * Executes a SQL query using Spark, returning the result as a [[DataFrame]].
   * The dialect that is used for SQL parsing can be configured with 'spark.sql.dialect'.
   *
   * @since 2.0.0
   */
  def sql(sqlText: String): DataFrame = {
    Dataset.ofRows(self, sessionState.sqlParser.parsePlan(sqlText))
  }

Code in ParseDriver.scala
The parse(sqlText) will produce SqlBaseParser, which the class is generated by ANTLR.
Then astBuilder will visit the parser to generate the Logic Plan.

  /** Creates LogicalPlan for a given SQL string. */
  override def parsePlan(sqlText: String): LogicalPlan = parse(sqlText) { parser =>
    astBuilder.visitSingleStatement(parser.singleStatement()) match {
      case plan: LogicalPlan => plan
      case _ =>
        val position = Origin(None, None)
        throw new ParseException(Option(sqlText), "Unsupported SQL statement", position, position)
    }
  }

The sqlText at here is “SELECT x.str, COUNT(*) FROM df x JOIN df y ON x.str = y.str GROUP BY x.str”.

Visit the grammar tree and generate the Logic Plan.

Let us debug on how the Join relation Logic Plan is generated.

Code in AstBuilder.scala
The visitFromClause override the SqlBaseVisitor which is generated code by ANTLR.
The parser which call this method.

  /**
   * Create a logical plan for a given 'FROM' clause. Note that we support multiple (comma
   * separated) relations here, these get converted into a single plan by condition-less inner join.
   */
  override def visitFromClause(ctx: FromClauseContext): LogicalPlan = withOrigin(ctx) {
    val from = ctx.relation.asScala.foldLeft(null: LogicalPlan) { (left, relation) =>
      val right = plan(relation.relationPrimary)
      val join = right.optionalMap(left)(Join(_, _, Inner, None))
      withJoinRelations(join, relation)
    }
    ctx.lateralView.asScala.foldLeft(from)(withGenerate)
  }

Get the join and relation, then invoke withJoinRelations to produce the Join Logic Plan.

  /**
   * Join one more [[LogicalPlan]]s to the current logical plan.
   */
  private def withJoinRelations(base: LogicalPlan, ctx: RelationContext): LogicalPlan = {
    ctx.joinRelation.asScala.foldLeft(base) { (left, join) =>
      withOrigin(join) {
        val baseJoinType = join.joinType match {
          case null => Inner
          case jt if jt.CROSS != null => Cross
          case jt if jt.FULL != null => FullOuter
          case jt if jt.SEMI != null => LeftSemi
          case jt if jt.ANTI != null => LeftAnti
          case jt if jt.LEFT != null => LeftOuter
          case jt if jt.RIGHT != null => RightOuter
          case _ => Inner
        }
 
        // Resolve the join type and join condition
        val (joinType, condition) = Option(join.joinCriteria) match {
          case Some(c) if c.USING != null =>
            val columns = c.identifier.asScala.map { column =>
              UnresolvedAttribute.quoted(column.getText)
            }
            (UsingJoin(baseJoinType, columns), None)
          case Some(c) if c.booleanExpression != null =>
            (baseJoinType, Option(expression(c.booleanExpression)))
          case None if join.NATURAL != null =>
            if (baseJoinType == Cross) {
              throw new ParseException("NATURAL CROSS JOIN is not supported", ctx)
            }
            (NaturalJoin(baseJoinType), None)
          case None =>
            (baseJoinType, None)
        }
        Join(left, plan(join.right), joinType, condition)
      }
    }
  }

继续阅读“SQL execute on Spark”本作品采用知识共享署名 4.0 国际许可协议进行许可。

RDD execute flow

原创文章,转载请注明: 转载自慢慢的回味

本文链接地址: RDD execute flow

This article will explain how the RDD to be executed with comments on code.
The spark git version is: SHA-1: 8b325b17ecdf013b7a6edcb7ee3773546bd914df

1 Using a unit test for example
46
47
48
49
50
51
52
53
54
  class BroadcastSuite extends SparkFunSuite with LocalSparkContext {
 
  test("Using TorrentBroadcast locally") {
    sc = new SparkContext("local", "test")
    val list = List[Int](1, 2, 3, 4)
    val broadcast = sc.broadcast(list)
    val results = sc.parallelize(1 to 2).map(x => (x, broadcast.value.sum))
    assert(results.collect().toSet === Set((1, 10), (2, 10)))
  }
2 Broadcast the data
  val broadcast = sc.broadcast(list)
 
  def broadcast[T: ClassTag](value: T): Broadcast[T] = {
    assertNotStopped()
    require(!classOf[RDD[_]].isAssignableFrom(classTag[T].runtimeClass),
      "Can not directly broadcast RDDs; instead, call collect() and broadcast the result.")
    val bc = env.broadcastManager.newBroadcast[T](value, isLocal)
    val callSite = getCallSite
    logInfo("Created broadcast " + bc.id + " from " + callSite.shortForm)
    cleaner.foreach(_.registerBroadcastForCleanup(bc))
    bc
  }
BroadcastManager.scala
55
56
57
  def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean): Broadcast[T] = {
    broadcastFactory.newBroadcast[T](value_, isLocal, nextBroadcastId.getAndIncrement())
  }
TorrentBroadcastFactory.scala
  override def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean, id: Long): Broadcast[T] = {
    new TorrentBroadcast[T](value_, id)
  }
TorrentBroadcast.scala
  //lazy keyword is good for here, transfer the data to block store
  @transient private lazy val _value: T = readBroadcastBlock()
  //read the data from block store
  private val numBlocks: Int = writeBlocks(obj)
 
  override protected def getValue() = {
    _value
  }
3 Create RDD

It will create two RDDs,first parallelize the data, then a map,
the map RDD will reference the function. They are not be executed immediately.

  val results = sc.parallelize(1 to 2).map(x => (x, broadcast.value.sum))
 
  def parallelize[T: ClassTag](
      seq: Seq[T],
      numSlices: Int = defaultParallelism): RDD[T] = withScope {
    assertNotStopped()
    new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
  }
 
  def map[U: ClassTag](f: T => U): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
  }
4 Run RDD to get the result
  results.collect()
 
  //Here is the really area for RDD execute
  def collect(): Array[T] = withScope {
    val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
    Array.concat(results: _*)
  }

继续阅读“RDD execute flow”本作品采用知识共享署名 4.0 国际许可协议进行许可。