From f4b1ae8b30c2a572d4f44cd68b66c224aeee553b Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Fri, 11 Mar 2016 14:29:45 +0800 Subject: [PATCH 1/6] tmp --- .../apache/spark/sql/hive/SQLBuilder.scala | 144 ++++++++++++------ .../sql/hive/LogicalPlanToSQLSuite.scala | 8 + 2 files changed, 105 insertions(+), 47 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala index 760335bba5d4a..24f812fefc41e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala @@ -24,6 +24,7 @@ import scala.util.control.NonFatal import org.apache.spark.Logging import org.apache.spark.sql.{DataFrame, SQLContext} import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.optimizer.CollapseProject import org.apache.spark.sql.catalyst.plans.logical._ @@ -53,7 +54,14 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi def this(df: DataFrame) = this(df.queryExecution.analyzed, df.sqlContext) def toSQL: String = { + println(logicalPlan.treeString) val canonicalizedPlan = Canonicalizer.execute(logicalPlan) + println(canonicalizedPlan.treeString) + + val output = canonicalizedPlan.output.zip(logicalPlan.output).map { + case (a1, a2) => Alias(a1, a2.name)() + } + val finalPlan = Project(output, SubqueryAlias("result", canonicalizedPlan)) try { val replaced = canonicalizedPlan.transformAllExpressions { case e: SubqueryExpression => @@ -131,6 +139,9 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi case _: Aggregate => "HAVING" case _ => "WHERE" } + println(condition.treeString) + println(condition.getClass) + println(condition.sql) build(toSQL(child), whereOrHaving, condition.sql) case p @ Distinct(u: Union) if u.children.length > 1 => @@ -147,18 +158,7 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi case p: Except => build("(" + toSQL(p.left), ") EXCEPT (", toSQL(p.right) + ")") - case p: SubqueryAlias => - p.child match { - // Persisted data source relation - case LogicalRelation(_, _, Some(TableIdentifier(table, Some(database)))) => - s"${quoteIdentifier(database)}.${quoteIdentifier(table)}" - // Parentheses is not used for persisted data source relations - // e.g., select x.c1 from (t1) as x inner join (t1) as y on x.c1 = y.c1 - case SubqueryAlias(_, _: LogicalRelation | _: MetastoreRelation) => - build(toSQL(p.child), "AS", p.alias) - case _ => - build("(" + toSQL(p.child) + ")", "AS", p.alias) - } + case p: SubqueryAlias => build("(" + toSQL(p.child) + ")", "AS", p.alias) case p: Join => build( @@ -168,11 +168,12 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi toSQL(p.right), p.condition.map(" ON " + _.sql).getOrElse("")) - case p: MetastoreRelation => - build( - s"${quoteIdentifier(p.databaseName)}.${quoteIdentifier(p.tableName)}", - p.alias.map(a => s" AS ${quoteIdentifier(a)}").getOrElse("") - ) + case SQLTable(database, table, _, sample) => + val qualifiedName = s"${quoteIdentifier(database)}.${quoteIdentifier(table)}" + sample.map { case (lowerBound, upperBound) => + val fraction = math.min(100, math.max(0, (upperBound - lowerBound) * 100)) + qualifiedName + " TABLESAMPLE(" + fraction + " PERCENT)" + }.getOrElse(qualifiedName) case Sort(orders, _, RepartitionByExpression(partitionExprs, child, _)) if orders.map(_.child) == partitionExprs => @@ -316,6 +317,14 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi // `Aggregate`s. CollapseProject), Batch("Recover Scoping Info", Once, + EliminateSubqueryAliases, + // A logical plan is allowed to have same-name outputs with different qualifiers(e.g. the + // `Join` operator). However, this kind of plan can't be put under a subquery as we will + // erase and assign a new qualifier to all outputs and make it impossible to distinguish + // same-name outputs. This rule renames all attributes, to guarantee different attributes( + // with different exprId) always have different names. + ClearAttributeName, + ResolveSQLTable, // Used to handle other auxiliary `Project`s added by analyzer (e.g. // `ResolveAggregateFunctions` rule) AddSubquery, @@ -341,6 +350,39 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi ) ) + object ClearAttributeName extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = plan.transformAllExpressions { + case a: Attribute => a.withName(normalizedName(a)) + case a: Alias => Alias(a.child, normalizedName(a))(exprId = a.exprId) + } + } + + object ResolveSQLTable extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = plan.transformDown { + case s @ Sample(lowerBound, upperBound, _, _, l @ LogicalRelation(_, _, + Some(TableIdentifier(table, Some(database))))) if s.isTableSample => + restoreColumnNames( + SQLTable(database, table, l.output, Some(lowerBound -> upperBound))) + + case s @ Sample(lowerBound, upperBound, _, _, m: MetastoreRelation) if s.isTableSample => + restoreColumnNames( + SQLTable(m.databaseName, m.tableName, m.output, Some(lowerBound -> upperBound))) + + case l @ LogicalRelation(_, _, Some(TableIdentifier(table, Some(database)))) => + restoreColumnNames(SQLTable(database, table, l.output, None)) + + case m: MetastoreRelation => + restoreColumnNames(SQLTable(m.databaseName, m.tableName, m.output, None)) + } + + private def restoreColumnNames(table: SQLTable): SubqueryAlias = { + val output = table.output.map { attr => + Alias(attr.withQualifiers(Nil), normalizedName(attr))(exprId = attr.exprId) + } + addSubquery(Project(output, table)) + } + } + object AddSubquery extends Rule[LogicalPlan] { override def apply(tree: LogicalPlan): LogicalPlan = tree transformUp { // This branch handles aggregate functions within HAVING clauses. For example: @@ -354,42 +396,19 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi // +- Filter ... // +- Aggregate ... // +- MetastoreRelation default, src, None - case plan @ Project(_, Filter(_, _: Aggregate)) => wrapChildWithSubquery(plan) + case p @ Project(_, f @ Filter(_, _: Aggregate)) => p.copy(child = addSubquery(f)) - case w @ Window(_, _, _, Filter(_, _: Aggregate)) => wrapChildWithSubquery(w) + case w @ Window(_, _, _, f @ Filter(_, _: Aggregate)) => w.copy(child = addSubquery(f)) - case plan @ Project(_, - _: SubqueryAlias - | _: Filter - | _: Join - | _: MetastoreRelation - | OneRowRelation - | _: LocalLimit - | _: GlobalLimit - | _: Sample - ) => plan - - case plan: Project => wrapChildWithSubquery(plan) + case p: Project => p.copy(child = addSubqueryIfNeeded(p.child)) // We will generate "SELECT ... FROM ..." for Window operator, so its child operator should // be able to put in the FROM clause, or we wrap it with a subquery. - case w @ Window(_, _, _, - _: SubqueryAlias - | _: Filter - | _: Join - | _: MetastoreRelation - | OneRowRelation - | _: LocalLimit - | _: GlobalLimit - | _: Sample - ) => w - - case w: Window => wrapChildWithSubquery(w) - } + case w: Window => w.copy(child = addSubqueryIfNeeded(w.child)) - private def wrapChildWithSubquery(plan: UnaryNode): LogicalPlan = { - val newChild = SubqueryAlias(SQLBuilder.newSubqueryName, plan.child) - plan.withNewChildren(Seq(newChild)) + case j: Join => j.copy( + left = addSubqueryIfNeeded(j.left), + right = addSubqueryIfNeeded(j.right)) } } @@ -404,9 +423,40 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi } } } + + private def normalizedName(n: NamedExpression): String = "gen_attr_" + n.exprId.id + + private def needSubquery(plan: LogicalPlan): Boolean = plan match { + case _: SubqueryAlias => false + case _: Filter => false + case _: Join => false + case _: LocalLimit => false + case _: GlobalLimit => false + case _: SQLTable => false + case OneRowRelation => false + case _ => true + } + + private def addSubquery(plan: LogicalPlan): SubqueryAlias = { + SubqueryAlias(SQLBuilder.newSubqueryName, plan) + } + + private def addSubqueryIfNeeded(plan: LogicalPlan): LogicalPlan = { + if (needSubquery(plan)) { + addSubquery(plan) + } else { + plan + } + } } } +case class SQLTable( + database: String, + table: String, + output: Seq[Attribute], + sample: Option[(Double, Double)]) extends LeafNode + object SQLBuilder { private val nextSubqueryId = new AtomicLong(0) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala index 198652b355fe2..3e0e54ff38cad 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala @@ -550,4 +550,12 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils { |WINDOW w AS (PARTITION BY key % 5 ORDER BY key) """.stripMargin) } + + test("window with join") { + checkHiveQl( + """ + |SELECT x.key, MAX(y.key) OVER (PARTITION BY x.key % 5 ORDER BY x.key) + |FROM parquet_t1 x JOIN parquet_t1 y ON x.key = y.key + """.stripMargin) + } } From 198b406a643d908483408ec6ccea26ffdc464aa9 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Fri, 11 Mar 2016 22:35:50 +0800 Subject: [PATCH 2/6] assign globally unique names to all attributes to avoid ambiguity --- .../apache/spark/sql/hive/SQLBuilder.scala | 113 ++++++++---------- 1 file changed, 47 insertions(+), 66 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala index 24f812fefc41e..76e4a96cfabee 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala @@ -54,16 +54,15 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi def this(df: DataFrame) = this(df.queryExecution.analyzed, df.sqlContext) def toSQL: String = { - println(logicalPlan.treeString) val canonicalizedPlan = Canonicalizer.execute(logicalPlan) - println(canonicalizedPlan.treeString) - - val output = canonicalizedPlan.output.zip(logicalPlan.output).map { - case (a1, a2) => Alias(a1, a2.name)() + // Canonicalizer will remove all naming information, we should add it back by adding an extra + // Project and alias the outputs. + val outputWithName = canonicalizedPlan.output.zip(logicalPlan.output).map { + case (a1, a2) => Alias(a1.withQualifiers(Nil), a2.name)() } - val finalPlan = Project(output, SubqueryAlias("result", canonicalizedPlan)) + val finalPlan = Project(outputWithName, SubqueryAlias("result", canonicalizedPlan)) try { - val replaced = canonicalizedPlan.transformAllExpressions { + val replaced = finalPlan.transformAllExpressions { case e: SubqueryExpression => SubqueryHolder(new SQLBuilder(e.query, sqlContext).toSQL) case e: NonSQLExpression => @@ -139,9 +138,6 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi case _: Aggregate => "HAVING" case _ => "WHERE" } - println(condition.treeString) - println(condition.getClass) - println(condition.sql) build(toSQL(child), whereOrHaving, condition.sql) case p @ Distinct(u: Union) if u.children.length > 1 => @@ -275,17 +271,25 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi val groupingSetSQL = "GROUPING SETS(" + groupingSet.map(e => s"(${e.map(_.sql).mkString(", ")})").mkString(", ") + ")" - val aggExprs = agg.aggregateExpressions.map { case expr => - expr.transformDown { + val aggExprs = agg.aggregateExpressions.map { case aggExpr => + aggExpr.transformDown { // grouping_id() is converted to VirtualColumn.groupingIdName by Analyzer. Revert it back. case ar: AttributeReference if ar == gid => GroupingID(Nil) - case ar: AttributeReference if groupByAttrMap.contains(ar) => groupByAttrMap(ar) + case ar: AttributeReference if groupByAttrMap.contains(ar) => + groupByAttrMap(ar) case a @ Cast(BitwiseAnd( ShiftRight(ar: AttributeReference, Literal(value: Any, IntegerType)), Literal(1, IntegerType)), ByteType) if ar == gid => // for converting an expression to its original SQL format grouping(col) val idx = groupByExprs.length - 1 - value.asInstanceOf[Int] groupByExprs.lift(idx).map(Grouping).getOrElse(a) + } match { + // Ancestor operators may reference the output of this grouping set, and we use exprId to + // generate a unique name for each attribute, so we should make sure the transformed + // aggregate expression won't change the output, i.e. exprId and alias name should remain + // the same. + case ne: NamedExpression if ne.exprId == aggExpr.exprId => ne + case e => Alias(e, normalizedName(aggExpr))(exprId = aggExpr.exprId) } } @@ -309,6 +313,8 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi ) } + private def normalizedName(n: NamedExpression): String = "gen_attr_" + n.exprId.id + object Canonicalizer extends RuleExecutor[LogicalPlan] { override protected def batches: Seq[Batch] = Seq( Batch("Collapse Project", FixedPoint(100), @@ -317,43 +323,28 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi // `Aggregate`s. CollapseProject), Batch("Recover Scoping Info", Once, + // Remove all sub queries, as we will insert new ones when it's necessary. EliminateSubqueryAliases, // A logical plan is allowed to have same-name outputs with different qualifiers(e.g. the - // `Join` operator). However, this kind of plan can't be put under a subquery as we will + // `Join` operator). However, this kind of plan can't be put under a sub query as we will // erase and assign a new qualifier to all outputs and make it impossible to distinguish - // same-name outputs. This rule renames all attributes, to guarantee different attributes( - // with different exprId) always have different names. - ClearAttributeName, + // same-name outputs. This rule renames all attributes, to guarantee different + // attributes(with different exprId) always have different names. It also removes all + // qualifiers, as attributes have unique names now and we don't need qualifiers to resolve + // ambiguity. + NormalizedAttribute, + // Wraps table information with SQLTable, and combine `Sample` operator if there are any. ResolveSQLTable, - // Used to handle other auxiliary `Project`s added by analyzer (e.g. - // `ResolveAggregateFunctions` rule) - AddSubquery, - // Previous rule will add extra sub-queries, this rule is used to re-propagate and update - // the qualifiers bottom up, e.g.: - // - // Sort - // ordering = t1.a - // Project - // projectList = [t1.a, t1.b] - // Subquery gen_subquery - // child ... - // - // will be transformed to: - // - // Sort - // ordering = gen_subquery.a - // Project - // projectList = [gen_subquery.a, gen_subquery.b] - // Subquery gen_subquery - // child ... - UpdateQualifiers + // Insert sub queries on top of operators that need to appear after FROM clause. + AddSubquery ) ) - object ClearAttributeName extends Rule[LogicalPlan] { + object NormalizedAttribute extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = plan.transformAllExpressions { - case a: Attribute => a.withName(normalizedName(a)) - case a: Alias => Alias(a.child, normalizedName(a))(exprId = a.exprId) + case a: AttributeReference => AttributeReference(normalizedName(a), a.dataType)( + exprId = a.exprId, isGenerated = false, qualifiers = Nil) + case a: Alias => Alias(a.child, normalizedName(a))(exprId = a.exprId, qualifiers = Nil) } } @@ -361,25 +352,29 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi override def apply(plan: LogicalPlan): LogicalPlan = plan.transformDown { case s @ Sample(lowerBound, upperBound, _, _, l @ LogicalRelation(_, _, Some(TableIdentifier(table, Some(database))))) if s.isTableSample => - restoreColumnNames( - SQLTable(database, table, l.output, Some(lowerBound -> upperBound))) + makeTable(database, table, l.output, Some(lowerBound -> upperBound)) case s @ Sample(lowerBound, upperBound, _, _, m: MetastoreRelation) if s.isTableSample => - restoreColumnNames( - SQLTable(m.databaseName, m.tableName, m.output, Some(lowerBound -> upperBound))) + makeTable(m.databaseName, m.tableName, m.output, Some(lowerBound -> upperBound)) case l @ LogicalRelation(_, _, Some(TableIdentifier(table, Some(database)))) => - restoreColumnNames(SQLTable(database, table, l.output, None)) + makeTable(database, table, l.output, None) case m: MetastoreRelation => - restoreColumnNames(SQLTable(m.databaseName, m.tableName, m.output, None)) + makeTable(m.databaseName, m.tableName, m.output, None) } - private def restoreColumnNames(table: SQLTable): SubqueryAlias = { - val output = table.output.map { attr => - Alias(attr.withQualifiers(Nil), normalizedName(attr))(exprId = attr.exprId) + private def makeTable( + database: String, + table: String, + output: Seq[Attribute], + sample: Option[(Double, Double)]): SubqueryAlias = { + val outputWithoutQualifier = output.map(_.withQualifiers(Nil)) + val sqlTable = SQLTable(database, table, outputWithoutQualifier, sample) + val aliasedOutput = outputWithoutQualifier.map { attr => + Alias(attr, normalizedName(attr))(exprId = attr.exprId) } - addSubquery(Project(output, table)) + addSubquery(Project(aliasedOutput, sqlTable)) } } @@ -412,20 +407,6 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi } } - object UpdateQualifiers extends Rule[LogicalPlan] { - override def apply(tree: LogicalPlan): LogicalPlan = tree transformUp { - case plan => - val inputAttributes = plan.children.flatMap(_.output) - plan transformExpressions { - case a: AttributeReference if !plan.producedAttributes.contains(a) => - val qualifier = inputAttributes.find(_ semanticEquals a).map(_.qualifiers) - a.withQualifiers(qualifier.getOrElse(Nil)) - } - } - } - - private def normalizedName(n: NamedExpression): String = "gen_attr_" + n.exprId.id - private def needSubquery(plan: LogicalPlan): Boolean = plan match { case _: SubqueryAlias => false case _: Filter => false From 21a142d775b93f76dc16f2bf25af40f42f61774a Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Sat, 12 Mar 2016 10:14:31 +0800 Subject: [PATCH 3/6] one more test and mionr cleanup --- .../org/apache/spark/sql/hive/SQLBuilder.scala | 18 +++++++++++++++--- .../spark/sql/hive/LogicalPlanToSQLSuite.scala | 10 ++++++++++ 2 files changed, 25 insertions(+), 3 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala index 76e4a96cfabee..6ee166b3625d2 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala @@ -55,12 +55,24 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi def toSQL: String = { val canonicalizedPlan = Canonicalizer.execute(logicalPlan) + val outputNames = logicalPlan.output.map(_.name) + val qualifiers = logicalPlan.output.flatMap(_.qualifiers).distinct + + // Keep the qualifier information by using it as sub-query name, if there is only one qualifier + // present. + val finalName = if (qualifiers.isEmpty || qualifiers.length > 1) { + SQLBuilder.newSubqueryName + } else { + qualifiers.head + } + // Canonicalizer will remove all naming information, we should add it back by adding an extra // Project and alias the outputs. - val outputWithName = canonicalizedPlan.output.zip(logicalPlan.output).map { - case (a1, a2) => Alias(a1.withQualifiers(Nil), a2.name)() + val aliasedOutput = canonicalizedPlan.output.zip(outputNames).map { + case (attr, name) => Alias(attr.withQualifiers(Nil), name)() } - val finalPlan = Project(outputWithName, SubqueryAlias("result", canonicalizedPlan)) + val finalPlan = Project(aliasedOutput, SubqueryAlias(finalName, canonicalizedPlan)) + try { val replaced = finalPlan.transformAllExpressions { case e: SubqueryExpression => diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala index 3e0e54ff38cad..f02ecb48d53a3 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala @@ -558,4 +558,14 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils { |FROM parquet_t1 x JOIN parquet_t1 y ON x.key = y.key """.stripMargin) } + + test("join 2 tables and aggregate function in having clause") { + checkHiveQl( + """ + |SELECT COUNT(a.value), b.KEY, a.KEY + |FROM parquet_t1 a, parquet_t1 b + |GROUP BY a.KEY, b.KEY + |HAVING MAX(a.KEY) > 0 + """.stripMargin) + } } From ade17d8b6c1cd31a961ac57597bc3a3ba91711ba Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Sat, 12 Mar 2016 10:54:29 +0800 Subject: [PATCH 4/6] cleanup --- .../spark/sql/catalyst/expressions/namedExpressions.scala | 6 ++---- .../main/scala/org/apache/spark/sql/hive/SQLBuilder.scala | 7 ++++--- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala index 1af543764776e..271ef33090980 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala @@ -185,8 +185,7 @@ case class Alias(child: Expression, name: String)( override def sql: String = { val qualifiersString = if (qualifiers.isEmpty) "" else qualifiers.map(quoteIdentifier).mkString("", ".", ".") - val aliasName = if (isGenerated) s"$name#${exprId.id}" else s"$name" - s"${child.sql} AS $qualifiersString${quoteIdentifier(aliasName)}" + s"${child.sql} AS $qualifiersString${quoteIdentifier(name)}" } } @@ -302,8 +301,7 @@ case class AttributeReference( override def sql: String = { val qualifiersString = if (qualifiers.isEmpty) "" else qualifiers.map(quoteIdentifier).mkString("", ".", ".") - val attrRefName = if (isGenerated) s"$name#${exprId.id}" else s"$name" - s"$qualifiersString${quoteIdentifier(attrRefName)}" + s"$qualifiersString${quoteIdentifier(name)}" } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala index 6ee166b3625d2..da4700b1c36e1 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala @@ -354,9 +354,10 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi object NormalizedAttribute extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = plan.transformAllExpressions { - case a: AttributeReference => AttributeReference(normalizedName(a), a.dataType)( - exprId = a.exprId, isGenerated = false, qualifiers = Nil) - case a: Alias => Alias(a.child, normalizedName(a))(exprId = a.exprId, qualifiers = Nil) + case a: AttributeReference => + AttributeReference(normalizedName(a), a.dataType)(exprId = a.exprId, qualifiers = Nil) + case a: Alias => + Alias(a.child, normalizedName(a))(exprId = a.exprId, qualifiers = Nil) } } From 5b12aa08add3cf97c938ad6d7dd7de8e0118e55a Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Sat, 12 Mar 2016 14:38:47 +0800 Subject: [PATCH 5/6] remove case Sample --- .../org/apache/spark/sql/hive/SQLBuilder.scala | 17 ----------------- 1 file changed, 17 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala index da4700b1c36e1..af32715fd56df 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala @@ -128,23 +128,6 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi case Limit(limitExpr, child) => s"${toSQL(child)} LIMIT ${limitExpr.sql}" - case p: Sample if p.isTableSample => - val fraction = math.min(100, math.max(0, (p.upperBound - p.lowerBound) * 100)) - p.child match { - case m: MetastoreRelation => - val aliasName = m.alias.getOrElse("") - build( - s"`${m.databaseName}`.`${m.tableName}`", - "TABLESAMPLE(" + fraction + " PERCENT)", - aliasName) - case s: SubqueryAlias => - val aliasName = if (s.child.isInstanceOf[SubqueryAlias]) s.alias else "" - val plan = if (s.child.isInstanceOf[SubqueryAlias]) s.child else s - build(toSQL(plan), "TABLESAMPLE(" + fraction + " PERCENT)", aliasName) - case _ => - build(toSQL(p.child), "TABLESAMPLE(" + fraction + " PERCENT)") - } - case Filter(condition, child) => val whereOrHaving = child match { case _: Aggregate => "HAVING" From 5ef9fd4956eb76d7d6988e445fbd6ae61a1d0481 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Tue, 15 Mar 2016 16:56:03 +0800 Subject: [PATCH 6/6] address comments --- .../apache/spark/sql/hive/SQLBuilder.scala | 105 +++++++++--------- 1 file changed, 53 insertions(+), 52 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala index af32715fd56df..3bc8e9a5e03c7 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala @@ -60,10 +60,10 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi // Keep the qualifier information by using it as sub-query name, if there is only one qualifier // present. - val finalName = if (qualifiers.isEmpty || qualifiers.length > 1) { - SQLBuilder.newSubqueryName - } else { + val finalName = if (qualifiers.length == 1) { qualifiers.head + } else { + SQLBuilder.newSubqueryName } // Canonicalizer will remove all naming information, we should add it back by adding an extra @@ -267,18 +267,19 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi groupingSet.map(e => s"(${e.map(_.sql).mkString(", ")})").mkString(", ") + ")" val aggExprs = agg.aggregateExpressions.map { case aggExpr => - aggExpr.transformDown { + val originalAggExpr = aggExpr.transformDown { // grouping_id() is converted to VirtualColumn.groupingIdName by Analyzer. Revert it back. case ar: AttributeReference if ar == gid => GroupingID(Nil) - case ar: AttributeReference if groupByAttrMap.contains(ar) => - groupByAttrMap(ar) + case ar: AttributeReference if groupByAttrMap.contains(ar) => groupByAttrMap(ar) case a @ Cast(BitwiseAnd( ShiftRight(ar: AttributeReference, Literal(value: Any, IntegerType)), Literal(1, IntegerType)), ByteType) if ar == gid => // for converting an expression to its original SQL format grouping(col) val idx = groupByExprs.length - 1 - value.asInstanceOf[Int] groupByExprs.lift(idx).map(Grouping).getOrElse(a) - } match { + } + + originalAggExpr match { // Ancestor operators may reference the output of this grouping set, and we use exprId to // generate a unique name for each attribute, so we should make sure the transformed // aggregate expression won't change the output, i.e. exprId and alias name should remain @@ -328,7 +329,9 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi // qualifiers, as attributes have unique names now and we don't need qualifiers to resolve // ambiguity. NormalizedAttribute, - // Wraps table information with SQLTable, and combine `Sample` operator if there are any. + // Finds the table relations and wrap them with `SQLTable`s. If there are any `Sample` + // operators on top of a table relation, merge the sample information into `SQLTable` of + // that table relation, as we can only convert table sample to standard SQL string. ResolveSQLTable, // Insert sub queries on top of operators that need to appear after FROM clause. AddSubquery @@ -346,31 +349,22 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi object ResolveSQLTable extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = plan.transformDown { - case s @ Sample(lowerBound, upperBound, _, _, l @ LogicalRelation(_, _, - Some(TableIdentifier(table, Some(database))))) if s.isTableSample => - makeTable(database, table, l.output, Some(lowerBound -> upperBound)) - - case s @ Sample(lowerBound, upperBound, _, _, m: MetastoreRelation) if s.isTableSample => - makeTable(m.databaseName, m.tableName, m.output, Some(lowerBound -> upperBound)) - - case l @ LogicalRelation(_, _, Some(TableIdentifier(table, Some(database)))) => - makeTable(database, table, l.output, None) - - case m: MetastoreRelation => - makeTable(m.databaseName, m.tableName, m.output, None) + case Sample(lowerBound, upperBound, _, _, ExtractSQLTable(table)) => + aliasColumns(table.withSample(lowerBound, upperBound)) + case ExtractSQLTable(table) => + aliasColumns(table) } - private def makeTable( - database: String, - table: String, - output: Seq[Attribute], - sample: Option[(Double, Double)]): SubqueryAlias = { - val outputWithoutQualifier = output.map(_.withQualifiers(Nil)) - val sqlTable = SQLTable(database, table, outputWithoutQualifier, sample) - val aliasedOutput = outputWithoutQualifier.map { attr => + /** + * Aliases the table columns to the generated attribute names, as we use exprId to generate + * unique name for each attribute when normalize attributes, and we can't reference table + * columns with their real names. + */ + private def aliasColumns(table: SQLTable): LogicalPlan = { + val aliasedOutput = table.output.map { attr => Alias(attr, normalizedName(attr))(exprId = attr.exprId) } - addSubquery(Project(aliasedOutput, sqlTable)) + addSubquery(Project(aliasedOutput, table)) } } @@ -403,36 +397,43 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi } } - private def needSubquery(plan: LogicalPlan): Boolean = plan match { - case _: SubqueryAlias => false - case _: Filter => false - case _: Join => false - case _: LocalLimit => false - case _: GlobalLimit => false - case _: SQLTable => false - case OneRowRelation => false - case _ => true - } - private def addSubquery(plan: LogicalPlan): SubqueryAlias = { SubqueryAlias(SQLBuilder.newSubqueryName, plan) } - private def addSubqueryIfNeeded(plan: LogicalPlan): LogicalPlan = { - if (needSubquery(plan)) { - addSubquery(plan) - } else { - plan - } + private def addSubqueryIfNeeded(plan: LogicalPlan): LogicalPlan = plan match { + case _: SubqueryAlias => plan + case _: Filter => plan + case _: Join => plan + case _: LocalLimit => plan + case _: GlobalLimit => plan + case _: SQLTable => plan + case OneRowRelation => plan + case _ => addSubquery(plan) } } -} -case class SQLTable( - database: String, - table: String, - output: Seq[Attribute], - sample: Option[(Double, Double)]) extends LeafNode + case class SQLTable( + database: String, + table: String, + output: Seq[Attribute], + sample: Option[(Double, Double)] = None) extends LeafNode { + def withSample(lowerBound: Double, upperBound: Double): SQLTable = + this.copy(sample = Some(lowerBound -> upperBound)) + } + + object ExtractSQLTable { + def unapply(plan: LogicalPlan): Option[SQLTable] = plan match { + case l @ LogicalRelation(_, _, Some(TableIdentifier(table, Some(database)))) => + Some(SQLTable(database, table, l.output.map(_.withQualifiers(Nil)))) + + case m: MetastoreRelation => + Some(SQLTable(m.databaseName, m.tableName, m.output.map(_.withQualifiers(Nil)))) + + case _ => None + } + } +} object SQLBuilder { private val nextSubqueryId = new AtomicLong(0)