SQL is popularity on relational data procession. And Spark is good at supporting SQL. This article will analyse the basic flow about how the SQL processed on Spark.
Still, I will use an Unit test with comments on Code to explain:
test("self join with aliases") {
Seq(1, 2, 3).map(i => (i, i.toString)).toDF("int", "str").createOrReplaceTempView("df")
checkAnswer(
sql(
"""
|SELECT x.str, COUNT(*)
|FROM df x JOIN df y ON x.str = y.str
|GROUP BY x.str
""".stripMargin),
Row("1", 1) :: Row("2", 1) :: Row("3", 1) :: Nil)
} |
test("self join with aliases") {
Seq(1, 2, 3).map(i => (i, i.toString)).toDF("int", "str").createOrReplaceTempView("df")
checkAnswer(
sql(
"""
|SELECT x.str, COUNT(*)
|FROM df x JOIN df y ON x.str = y.str
|GROUP BY x.str
""".stripMargin),
Row("1", 1) :: Row("2", 1) :: Row("3", 1) :: Nil)
}
Parse SQL with ANTLR.
sql(
"""
|SELECT x.str, COUNT(*)
|FROM df x JOIN df y ON x.str = y.str
|GROUP BY x.str
""".stripMargin), |
sql(
"""
|SELECT x.str, COUNT(*)
|FROM df x JOIN df y ON x.str = y.str
|GROUP BY x.str
""".stripMargin),
Code in SparkSession.scala
/**
* Executes a SQL query using Spark, returning the result as a [[DataFrame]].
* The dialect that is used for SQL parsing can be configured with 'spark.sql.dialect'.
*
* @since 2.0.0
*/
def sql(sqlText: String): DataFrame = {
Dataset.ofRows(self, sessionState.sqlParser.parsePlan(sqlText))
} |
/**
* Executes a SQL query using Spark, returning the result as a [[DataFrame]].
* The dialect that is used for SQL parsing can be configured with 'spark.sql.dialect'.
*
* @since 2.0.0
*/
def sql(sqlText: String): DataFrame = {
Dataset.ofRows(self, sessionState.sqlParser.parsePlan(sqlText))
}
Code in ParseDriver.scala
The parse(sqlText) will produce SqlBaseParser, which the class is generated by ANTLR.
Then astBuilder will visit the parser to generate the Logic Plan.
/** Creates LogicalPlan for a given SQL string. */
override def parsePlan(sqlText: String): LogicalPlan = parse(sqlText) { parser =>
astBuilder.visitSingleStatement(parser.singleStatement()) match {
case plan: LogicalPlan => plan
case _ =>
val position = Origin(None, None)
throw new ParseException(Option(sqlText), "Unsupported SQL statement", position, position)
}
} |
/** Creates LogicalPlan for a given SQL string. */
override def parsePlan(sqlText: String): LogicalPlan = parse(sqlText) { parser =>
astBuilder.visitSingleStatement(parser.singleStatement()) match {
case plan: LogicalPlan => plan
case _ =>
val position = Origin(None, None)
throw new ParseException(Option(sqlText), "Unsupported SQL statement", position, position)
}
}
The sqlText at here is “SELECT x.str, COUNT(*) FROM df x JOIN df y ON x.str = y.str GROUP BY x.str”.
Visit the grammar tree and generate the Logic Plan.
Let us debug on how the Join relation Logic Plan is generated.
Code in AstBuilder.scala
The visitFromClause override the SqlBaseVisitor which is generated code by ANTLR.
The parser which call this method.
/**
* Create a logical plan for a given 'FROM' clause. Note that we support multiple (comma
* separated) relations here, these get converted into a single plan by condition-less inner join.
*/
override def visitFromClause(ctx: FromClauseContext): LogicalPlan = withOrigin(ctx) {
val from = ctx.relation.asScala.foldLeft(null: LogicalPlan) { (left, relation) =>
val right = plan(relation.relationPrimary)
val join = right.optionalMap(left)(Join(_, _, Inner, None))
withJoinRelations(join, relation)
}
ctx.lateralView.asScala.foldLeft(from)(withGenerate)
} |
/**
* Create a logical plan for a given 'FROM' clause. Note that we support multiple (comma
* separated) relations here, these get converted into a single plan by condition-less inner join.
*/
override def visitFromClause(ctx: FromClauseContext): LogicalPlan = withOrigin(ctx) {
val from = ctx.relation.asScala.foldLeft(null: LogicalPlan) { (left, relation) =>
val right = plan(relation.relationPrimary)
val join = right.optionalMap(left)(Join(_, _, Inner, None))
withJoinRelations(join, relation)
}
ctx.lateralView.asScala.foldLeft(from)(withGenerate)
}
Get the join and relation, then invoke withJoinRelations to produce the Join Logic Plan.
/**
* Join one more [[LogicalPlan]]s to the current logical plan.
*/
private def withJoinRelations(base: LogicalPlan, ctx: RelationContext): LogicalPlan = {
ctx.joinRelation.asScala.foldLeft(base) { (left, join) =>
withOrigin(join) {
val baseJoinType = join.joinType match {
case null => Inner
case jt if jt.CROSS != null => Cross
case jt if jt.FULL != null => FullOuter
case jt if jt.SEMI != null => LeftSemi
case jt if jt.ANTI != null => LeftAnti
case jt if jt.LEFT != null => LeftOuter
case jt if jt.RIGHT != null => RightOuter
case _ => Inner
}
// Resolve the join type and join condition
val (joinType, condition) = Option(join.joinCriteria) match {
case Some(c) if c.USING != null =>
val columns = c.identifier.asScala.map { column =>
UnresolvedAttribute.quoted(column.getText)
}
(UsingJoin(baseJoinType, columns), None)
case Some(c) if c.booleanExpression != null =>
(baseJoinType, Option(expression(c.booleanExpression)))
case None if join.NATURAL != null =>
if (baseJoinType == Cross) {
throw new ParseException("NATURAL CROSS JOIN is not supported", ctx)
}
(NaturalJoin(baseJoinType), None)
case None =>
(baseJoinType, None)
}
Join(left, plan(join.right), joinType, condition)
}
}
} |
/**
* Join one more [[LogicalPlan]]s to the current logical plan.
*/
private def withJoinRelations(base: LogicalPlan, ctx: RelationContext): LogicalPlan = {
ctx.joinRelation.asScala.foldLeft(base) { (left, join) =>
withOrigin(join) {
val baseJoinType = join.joinType match {
case null => Inner
case jt if jt.CROSS != null => Cross
case jt if jt.FULL != null => FullOuter
case jt if jt.SEMI != null => LeftSemi
case jt if jt.ANTI != null => LeftAnti
case jt if jt.LEFT != null => LeftOuter
case jt if jt.RIGHT != null => RightOuter
case _ => Inner
}
// Resolve the join type and join condition
val (joinType, condition) = Option(join.joinCriteria) match {
case Some(c) if c.USING != null =>
val columns = c.identifier.asScala.map { column =>
UnresolvedAttribute.quoted(column.getText)
}
(UsingJoin(baseJoinType, columns), None)
case Some(c) if c.booleanExpression != null =>
(baseJoinType, Option(expression(c.booleanExpression)))
case None if join.NATURAL != null =>
if (baseJoinType == Cross) {
throw new ParseException("NATURAL CROSS JOIN is not supported", ctx)
}
(NaturalJoin(baseJoinType), None)
case None =>
(baseJoinType, None)
}
Join(left, plan(join.right), joinType, condition)
}
}
}
继续阅读“SQL execute on Spark”本作品采用知识共享署名 4.0 国际许可协议进行许可。