@@ -24,6 +24,7 @@ import scala.util.control.NonFatal
2424import org .apache .spark .Logging
2525import org .apache .spark .sql .{DataFrame , SQLContext }
2626import org .apache .spark .sql .catalyst .TableIdentifier
27+ import org .apache .spark .sql .catalyst .analysis .EliminateSubqueryAliases
2728import org .apache .spark .sql .catalyst .expressions ._
2829import org .apache .spark .sql .catalyst .optimizer .CollapseProject
2930import org .apache .spark .sql .catalyst .plans .logical ._
@@ -54,8 +55,26 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
5455
5556 def toSQL : String = {
5657 val canonicalizedPlan = Canonicalizer .execute(logicalPlan)
58+ val outputNames = logicalPlan.output.map(_.name)
59+ val qualifiers = logicalPlan.output.flatMap(_.qualifiers).distinct
60+
61+ // Keep the qualifier information by using it as sub-query name, if there is only one qualifier
62+ // present.
63+ val finalName = if (qualifiers.length == 1 ) {
64+ qualifiers.head
65+ } else {
66+ SQLBuilder .newSubqueryName
67+ }
68+
69+ // Canonicalizer will remove all naming information, we should add it back by adding an extra
70+ // Project and alias the outputs.
71+ val aliasedOutput = canonicalizedPlan.output.zip(outputNames).map {
72+ case (attr, name) => Alias (attr.withQualifiers(Nil ), name)()
73+ }
74+ val finalPlan = Project (aliasedOutput, SubqueryAlias (finalName, canonicalizedPlan))
75+
5776 try {
58- val replaced = canonicalizedPlan .transformAllExpressions {
77+ val replaced = finalPlan .transformAllExpressions {
5978 case e : SubqueryExpression =>
6079 SubqueryHolder (new SQLBuilder (e.query, sqlContext).toSQL)
6180 case e : NonSQLExpression =>
@@ -109,23 +128,6 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
109128 case Limit (limitExpr, child) =>
110129 s " ${toSQL(child)} LIMIT ${limitExpr.sql}"
111130
112- case p : Sample if p.isTableSample =>
113- val fraction = math.min(100 , math.max(0 , (p.upperBound - p.lowerBound) * 100 ))
114- p.child match {
115- case m : MetastoreRelation =>
116- val aliasName = m.alias.getOrElse(" " )
117- build(
118- s " ` ${m.databaseName}`.` ${m.tableName}` " ,
119- " TABLESAMPLE(" + fraction + " PERCENT)" ,
120- aliasName)
121- case s : SubqueryAlias =>
122- val aliasName = if (s.child.isInstanceOf [SubqueryAlias ]) s.alias else " "
123- val plan = if (s.child.isInstanceOf [SubqueryAlias ]) s.child else s
124- build(toSQL(plan), " TABLESAMPLE(" + fraction + " PERCENT)" , aliasName)
125- case _ =>
126- build(toSQL(p.child), " TABLESAMPLE(" + fraction + " PERCENT)" )
127- }
128-
129131 case Filter (condition, child) =>
130132 val whereOrHaving = child match {
131133 case _ : Aggregate => " HAVING"
@@ -147,18 +149,7 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
147149 case p : Except =>
148150 build(" (" + toSQL(p.left), " ) EXCEPT (" , toSQL(p.right) + " )" )
149151
150- case p : SubqueryAlias =>
151- p.child match {
152- // Persisted data source relation
153- case LogicalRelation (_, _, Some (TableIdentifier (table, Some (database)))) =>
154- s " ${quoteIdentifier(database)}. ${quoteIdentifier(table)}"
155- // Parentheses is not used for persisted data source relations
156- // e.g., select x.c1 from (t1) as x inner join (t1) as y on x.c1 = y.c1
157- case SubqueryAlias (_, _ : LogicalRelation | _ : MetastoreRelation ) =>
158- build(toSQL(p.child), " AS" , p.alias)
159- case _ =>
160- build(" (" + toSQL(p.child) + " )" , " AS" , p.alias)
161- }
152+ case p : SubqueryAlias => build(" (" + toSQL(p.child) + " )" , " AS" , p.alias)
162153
163154 case p : Join =>
164155 build(
@@ -168,11 +159,12 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
168159 toSQL(p.right),
169160 p.condition.map(" ON " + _.sql).getOrElse(" " ))
170161
171- case p : MetastoreRelation =>
172- build(
173- s " ${quoteIdentifier(p.databaseName)}. ${quoteIdentifier(p.tableName)}" ,
174- p.alias.map(a => s " AS ${quoteIdentifier(a)}" ).getOrElse(" " )
175- )
162+ case SQLTable (database, table, _, sample) =>
163+ val qualifiedName = s " ${quoteIdentifier(database)}. ${quoteIdentifier(table)}"
164+ sample.map { case (lowerBound, upperBound) =>
165+ val fraction = math.min(100 , math.max(0 , (upperBound - lowerBound) * 100 ))
166+ qualifiedName + " TABLESAMPLE(" + fraction + " PERCENT)"
167+ }.getOrElse(qualifiedName)
176168
177169 case Sort (orders, _, RepartitionByExpression (partitionExprs, child, _))
178170 if orders.map(_.child) == partitionExprs =>
@@ -274,8 +266,8 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
274266 val groupingSetSQL = " GROUPING SETS(" +
275267 groupingSet.map(e => s " ( ${e.map(_.sql).mkString(" , " )}) " ).mkString(" , " ) + " )"
276268
277- val aggExprs = agg.aggregateExpressions.map { case expr =>
278- expr .transformDown {
269+ val aggExprs = agg.aggregateExpressions.map { case aggExpr =>
270+ val originalAggExpr = aggExpr .transformDown {
279271 // grouping_id() is converted to VirtualColumn.groupingIdName by Analyzer. Revert it back.
280272 case ar : AttributeReference if ar == gid => GroupingID (Nil )
281273 case ar : AttributeReference if groupByAttrMap.contains(ar) => groupByAttrMap(ar)
@@ -286,6 +278,15 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
286278 val idx = groupByExprs.length - 1 - value.asInstanceOf [Int ]
287279 groupByExprs.lift(idx).map(Grouping ).getOrElse(a)
288280 }
281+
282+ originalAggExpr match {
283+ // Ancestor operators may reference the output of this grouping set, and we use exprId to
284+ // generate a unique name for each attribute, so we should make sure the transformed
285+ // aggregate expression won't change the output, i.e. exprId and alias name should remain
286+ // the same.
287+ case ne : NamedExpression if ne.exprId == aggExpr.exprId => ne
288+ case e => Alias (e, normalizedName(aggExpr))(exprId = aggExpr.exprId)
289+ }
289290 }
290291
291292 build(
@@ -308,6 +309,8 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
308309 )
309310 }
310311
312+ private def normalizedName (n : NamedExpression ): String = " gen_attr_" + n.exprId.id
313+
311314 object Canonicalizer extends RuleExecutor [LogicalPlan ] {
312315 override protected def batches : Seq [Batch ] = Seq (
313316 Batch (" Collapse Project" , FixedPoint (100 ),
@@ -316,31 +319,55 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
316319 // `Aggregate`s.
317320 CollapseProject ),
318321 Batch (" Recover Scoping Info" , Once ,
319- // Used to handle other auxiliary `Project`s added by analyzer (e.g.
320- // `ResolveAggregateFunctions` rule)
321- AddSubquery ,
322- // Previous rule will add extra sub-queries, this rule is used to re-propagate and update
323- // the qualifiers bottom up, e.g.:
324- //
325- // Sort
326- // ordering = t1.a
327- // Project
328- // projectList = [t1.a, t1.b]
329- // Subquery gen_subquery
330- // child ...
331- //
332- // will be transformed to:
333- //
334- // Sort
335- // ordering = gen_subquery.a
336- // Project
337- // projectList = [gen_subquery.a, gen_subquery.b]
338- // Subquery gen_subquery
339- // child ...
340- UpdateQualifiers
322+ // Remove all sub queries, as we will insert new ones when it's necessary.
323+ EliminateSubqueryAliases ,
324+ // A logical plan is allowed to have same-name outputs with different qualifiers(e.g. the
325+ // `Join` operator). However, this kind of plan can't be put under a sub query as we will
326+ // erase and assign a new qualifier to all outputs and make it impossible to distinguish
327+ // same-name outputs. This rule renames all attributes, to guarantee different
328+ // attributes(with different exprId) always have different names. It also removes all
329+ // qualifiers, as attributes have unique names now and we don't need qualifiers to resolve
330+ // ambiguity.
331+ NormalizedAttribute ,
332+ // Finds the table relations and wrap them with `SQLTable`s. If there are any `Sample`
333+ // operators on top of a table relation, merge the sample information into `SQLTable` of
334+ // that table relation, as we can only convert table sample to standard SQL string.
335+ ResolveSQLTable ,
336+ // Insert sub queries on top of operators that need to appear after FROM clause.
337+ AddSubquery
341338 )
342339 )
343340
341+ object NormalizedAttribute extends Rule [LogicalPlan ] {
342+ override def apply (plan : LogicalPlan ): LogicalPlan = plan.transformAllExpressions {
343+ case a : AttributeReference =>
344+ AttributeReference (normalizedName(a), a.dataType)(exprId = a.exprId, qualifiers = Nil )
345+ case a : Alias =>
346+ Alias (a.child, normalizedName(a))(exprId = a.exprId, qualifiers = Nil )
347+ }
348+ }
349+
350+ object ResolveSQLTable extends Rule [LogicalPlan ] {
351+ override def apply (plan : LogicalPlan ): LogicalPlan = plan.transformDown {
352+ case Sample (lowerBound, upperBound, _, _, ExtractSQLTable (table)) =>
353+ aliasColumns(table.withSample(lowerBound, upperBound))
354+ case ExtractSQLTable (table) =>
355+ aliasColumns(table)
356+ }
357+
358+ /**
359+ * Aliases the table columns to the generated attribute names, as we use exprId to generate
360+ * unique name for each attribute when normalize attributes, and we can't reference table
361+ * columns with their real names.
362+ */
363+ private def aliasColumns (table : SQLTable ): LogicalPlan = {
364+ val aliasedOutput = table.output.map { attr =>
365+ Alias (attr, normalizedName(attr))(exprId = attr.exprId)
366+ }
367+ addSubquery(Project (aliasedOutput, table))
368+ }
369+ }
370+
344371 object AddSubquery extends Rule [LogicalPlan ] {
345372 override def apply (tree : LogicalPlan ): LogicalPlan = tree transformUp {
346373 // This branch handles aggregate functions within HAVING clauses. For example:
@@ -354,55 +381,56 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
354381 // +- Filter ...
355382 // +- Aggregate ...
356383 // +- MetastoreRelation default, src, None
357- case plan @ Project (_, Filter (_, _ : Aggregate )) => wrapChildWithSubquery(plan )
384+ case p @ Project (_, f @ Filter (_, _ : Aggregate )) => p.copy(child = addSubquery(f) )
358385
359- case w @ Window (_, _, _, Filter (_, _ : Aggregate )) => wrapChildWithSubquery(w )
386+ case w @ Window (_, _, _, f @ Filter (_, _ : Aggregate )) => w.copy(child = addSubquery(f) )
360387
361- case plan @ Project (_,
362- _ : SubqueryAlias
363- | _ : Filter
364- | _ : Join
365- | _ : MetastoreRelation
366- | OneRowRelation
367- | _ : LocalLimit
368- | _ : GlobalLimit
369- | _ : Sample
370- ) => plan
371-
372- case plan : Project => wrapChildWithSubquery(plan)
388+ case p : Project => p.copy(child = addSubqueryIfNeeded(p.child))
373389
374390 // We will generate "SELECT ... FROM ..." for Window operator, so its child operator should
375391 // be able to put in the FROM clause, or we wrap it with a subquery.
376- case w @ Window (_, _, _,
377- _ : SubqueryAlias
378- | _ : Filter
379- | _ : Join
380- | _ : MetastoreRelation
381- | OneRowRelation
382- | _ : LocalLimit
383- | _ : GlobalLimit
384- | _ : Sample
385- ) => w
386-
387- case w : Window => wrapChildWithSubquery(w)
388- }
392+ case w : Window => w.copy(child = addSubqueryIfNeeded(w.child))
389393
390- private def wrapChildWithSubquery ( plan : UnaryNode ) : LogicalPlan = {
391- val newChild = SubqueryAlias ( SQLBuilder .newSubqueryName, plan.child)
392- plan.withNewChildren( Seq (newChild ))
394+ case j : Join => j.copy(
395+ left = addSubqueryIfNeeded(j.left),
396+ right = addSubqueryIfNeeded(j.right ))
393397 }
394398 }
395399
396- object UpdateQualifiers extends Rule [LogicalPlan ] {
397- override def apply (tree : LogicalPlan ): LogicalPlan = tree transformUp {
398- case plan =>
399- val inputAttributes = plan.children.flatMap(_.output)
400- plan transformExpressions {
401- case a : AttributeReference if ! plan.producedAttributes.contains(a) =>
402- val qualifier = inputAttributes.find(_ semanticEquals a).map(_.qualifiers)
403- a.withQualifiers(qualifier.getOrElse(Nil ))
404- }
405- }
400+ private def addSubquery (plan : LogicalPlan ): SubqueryAlias = {
401+ SubqueryAlias (SQLBuilder .newSubqueryName, plan)
402+ }
403+
404+ private def addSubqueryIfNeeded (plan : LogicalPlan ): LogicalPlan = plan match {
405+ case _ : SubqueryAlias => plan
406+ case _ : Filter => plan
407+ case _ : Join => plan
408+ case _ : LocalLimit => plan
409+ case _ : GlobalLimit => plan
410+ case _ : SQLTable => plan
411+ case OneRowRelation => plan
412+ case _ => addSubquery(plan)
413+ }
414+ }
415+
416+ case class SQLTable (
417+ database : String ,
418+ table : String ,
419+ output : Seq [Attribute ],
420+ sample : Option [(Double , Double )] = None ) extends LeafNode {
421+ def withSample (lowerBound : Double , upperBound : Double ): SQLTable =
422+ this .copy(sample = Some (lowerBound -> upperBound))
423+ }
424+
425+ object ExtractSQLTable {
426+ def unapply (plan : LogicalPlan ): Option [SQLTable ] = plan match {
427+ case l @ LogicalRelation (_, _, Some (TableIdentifier (table, Some (database)))) =>
428+ Some (SQLTable (database, table, l.output.map(_.withQualifiers(Nil ))))
429+
430+ case m : MetastoreRelation =>
431+ Some (SQLTable (m.databaseName, m.tableName, m.output.map(_.withQualifiers(Nil ))))
432+
433+ case _ => None
406434 }
407435 }
408436}
0 commit comments