From 74c42aecb7bd793061ebeb8750054f6d91c5f6f1 Mon Sep 17 00:00:00 2001 From: Derek Sabry Date: Mon, 30 Nov 2015 16:53:42 -0800 Subject: [PATCH 1/7] Use number to refer to columns in group by clause --- .../main/scala/org/apache/spark/sql/hive/HiveQl.scala | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index 091caab921fe..ce5f44d2b598 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -1103,7 +1103,16 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C groupByClause.map(e => e match { case Token("TOK_GROUPBY", children) => // Not a transformation so must be either project or aggregation. - Aggregate(children.map(nodeToExpr), selectExpressions, withLateralView) + var newChildren = Seq[ASTNode]() + children.foreach(child => + newChildren = newChildren :+ (child.getText match { + case clauseName if clauseName forall Character.isDigit => + val Token("TOK_SELEXPR", columns) = selectClause.get.getChildren + .get(clauseName.toInt - 1) + columns(0) + case _ => child + })) + Aggregate(newChildren.map(nodeToExpr), selectExpressions, withLateralView) case _ => sys.error("Expect GROUP BY") }), groupingSetsClause.map(e => e match { From a179e6f83849fa894efc501b1e4a1940ff67a543 Mon Sep 17 00:00:00 2001 From: Derek Sabry Date: Thu, 3 Dec 2015 11:49:10 -0800 Subject: [PATCH 2/7] Group by Column Number for Spark SQL --- .../sql/catalyst/analysis/Analyzer.scala | 26 +++++++++++++++++++ .../org/apache/spark/sql/SQLQuerySuite.scala | 6 +++++ .../org/apache/spark/sql/hive/HiveQl.scala | 11 +------- 3 files changed, 33 insertions(+), 10 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 47962ebe6ef8..f202f184805b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -70,6 +70,7 @@ class Analyzer( Batch("Resolution", fixedPoint, ResolveRelations :: ResolveReferences :: + ResolveGroupByReferences :: ResolveGroupingAnalytics :: ResolvePivot :: ResolveSortReferences :: @@ -178,6 +179,31 @@ class Analyzer( } } + /** + * Replaces queries of the form "SELECT expr FROM A GROUP BY 1" + * with a query of the form "SELECT expr FROM A GROUP BY expr" + */ + object ResolveGroupByReferences extends Rule[LogicalPlan] { + + def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { + case Aggregate(groups, aggs, child) => + var newGroups = Seq[Expression]() + groups.foreach(group => + newGroups = newGroups :+ (group match { + case clauseName if clauseName.prettyString forall Character.isDigit => + aggs(group.prettyString.toInt - 1) match { + case u : UnresolvedAlias => + u.child + case a : Alias => + a.child + } + case _ => group + })) + Aggregate(newGroups, aggs, child) + + } + } + object ResolveGroupingAnalytics extends Rule[LogicalPlan] { /* * GROUP BY a, b, c WITH ROLLUP diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index bb82b562aaaa..3ffd071a0a17 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -2028,4 +2028,10 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { Row(false) :: Row(true) :: Nil) } + test("SPARK-12063: Group by Column Number identifier") { + checkAnswer( + sql("SELECT a, SUM(b) FROM testData2 GROUP BY 1"), + Seq(Row(1, 3), Row(2, 3), Row(3, 3))) + } + } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index ce5f44d2b598..091caab921fe 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -1103,16 +1103,7 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C groupByClause.map(e => e match { case Token("TOK_GROUPBY", children) => // Not a transformation so must be either project or aggregation. - var newChildren = Seq[ASTNode]() - children.foreach(child => - newChildren = newChildren :+ (child.getText match { - case clauseName if clauseName forall Character.isDigit => - val Token("TOK_SELEXPR", columns) = selectClause.get.getChildren - .get(clauseName.toInt - 1) - columns(0) - case _ => child - })) - Aggregate(newChildren.map(nodeToExpr), selectExpressions, withLateralView) + Aggregate(children.map(nodeToExpr), selectExpressions, withLateralView) case _ => sys.error("Expect GROUP BY") }), groupingSetsClause.map(e => e match { From b4cfcbf645ad3576851ae6dc2f02fc9443aa5b33 Mon Sep 17 00:00:00 2001 From: Derek Sabry Date: Mon, 7 Dec 2015 12:19:29 -0800 Subject: [PATCH 3/7] minor style change --- .../sql/catalyst/analysis/Analyzer.scala | 24 ++++++++----------- 1 file changed, 10 insertions(+), 14 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index f202f184805b..2da2df873088 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -187,20 +187,16 @@ class Analyzer( def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { case Aggregate(groups, aggs, child) => - var newGroups = Seq[Expression]() - groups.foreach(group => - newGroups = newGroups :+ (group match { - case clauseName if clauseName.prettyString forall Character.isDigit => - aggs(group.prettyString.toInt - 1) match { - case u : UnresolvedAlias => - u.child - case a : Alias => - a.child - } - case _ => group - })) - Aggregate(newGroups, aggs, child) - + Aggregate(groups.map(group => group match { + case g if g.prettyString forall Character.isDigit => + aggs(g.prettyString.toInt - 1) match { + case u : UnresolvedAlias => + u.child + case a : Alias => + a.child + } + case _ => group + }), aggs, child) } } From 8a5a4f63fc66792e924f4f3355df357815aae13b Mon Sep 17 00:00:00 2001 From: Derek Sabry Date: Wed, 9 Dec 2015 20:07:44 -0800 Subject: [PATCH 4/7] Add Sort() case --- .../sql/catalyst/analysis/Analyzer.scala | 45 +++++++++++++------ .../org/apache/spark/sql/SQLQuerySuite.scala | 9 +++- 2 files changed, 40 insertions(+), 14 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 2da2df873088..700207bbc149 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -70,7 +70,7 @@ class Analyzer( Batch("Resolution", fixedPoint, ResolveRelations :: ResolveReferences :: - ResolveGroupByReferences :: + ResolveNumericReferences :: ResolveGroupingAnalytics :: ResolvePivot :: ResolveSortReferences :: @@ -180,23 +180,42 @@ class Analyzer( } /** - * Replaces queries of the form "SELECT expr FROM A GROUP BY 1" - * with a query of the form "SELECT expr FROM A GROUP BY expr" + * Replaces queries of the form "SELECT expr FROM A GROUP BY 1 ORDER BY 1" + * with a query of the form "SELECT expr FROM A GROUP BY expr ORDER BY expr" */ - object ResolveGroupByReferences extends Rule[LogicalPlan] { + object ResolveNumericReferences extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { case Aggregate(groups, aggs, child) => - Aggregate(groups.map(group => group match { - case g if g.prettyString forall Character.isDigit => - aggs(g.prettyString.toInt - 1) match { - case u : UnresolvedAlias => - u.child - case a : Alias => - a.child + val newGroups = groups.map { + case group if group.isInstanceOf[Literal] && group.dataType.isInstanceOf[IntegralType] => + aggs(group.toString.toInt - 1) match { + case u: UnresolvedAlias => + u.child match { + case UnresolvedStar(_) => // Can't replace literal with column yet + group + case _ => u.child + } + case a: Alias => a.child + case a: AttributeReference => a } - case _ => group - }), aggs, child) + case group => group + } + Aggregate(newGroups, aggs, child) + case Sort(ordering, global, child) => + val newOrdering = ordering.map { + case o if o.child.isInstanceOf[Literal] && o.dataType.isInstanceOf[IntegralType] => + val newExpr = child.asInstanceOf[Project].projectList(o.child.toString.toInt - 1) + match { + case u: UnresolvedAlias => + u.child + case a: Alias => + a.child + } + SortOrder(newExpr, o.direction) + case other => other + } + Sort(newOrdering, global, child) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 3ffd071a0a17..66db8407b77e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -2028,10 +2028,17 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { Row(false) :: Row(true) :: Nil) } - test("SPARK-12063: Group by Column Number identifier") { + test("SPARK-12063: Group by Columns Number") { checkAnswer( sql("SELECT a, SUM(b) FROM testData2 GROUP BY 1"), Seq(Row(1, 3), Row(2, 3), Row(3, 3))) } + + test("SPARK-12063: Order by Column Number") { + Seq(("one", 1), ("two", 2), ("three", 3), ("one", 5)).toDF("k", "v").registerTempTable("ord") + checkAnswer( + sql("SELECT v from ord order by 1 desc"), + Row(5) :: Row(3) :: Row(2) :: Row(1) :: Nil) + } } From bd453d5f6744aa8fdd03b5ee2ecd44b471165eb4 Mon Sep 17 00:00:00 2001 From: Derek Sabry Date: Thu, 10 Dec 2015 11:30:13 -0800 Subject: [PATCH 5/7] scala style --- .../src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 66db8407b77e..27c51b6c80ca 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -2033,7 +2033,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { sql("SELECT a, SUM(b) FROM testData2 GROUP BY 1"), Seq(Row(1, 3), Row(2, 3), Row(3, 3))) } - + test("SPARK-12063: Order by Column Number") { Seq(("one", 1), ("two", 2), ("three", 3), ("one", 5)).toDF("k", "v").registerTempTable("ord") checkAnswer( From e4edc31426788f167be81bdeb72aea64227c17bf Mon Sep 17 00:00:00 2001 From: Derek Sabry Date: Thu, 10 Dec 2015 19:22:18 -0800 Subject: [PATCH 6/7] Remove literal in group by clause test, as this functioanlity has changed --- .../org/apache/spark/sql/SQLQuerySuite.scala | 19 ------------------- 1 file changed, 19 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 27c51b6c80ca..863a214078f8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -472,25 +472,6 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { Seq(Row(1, 3), Row(2, 3), Row(3, 3))) } - test("literal in agg grouping expressions") { - checkAnswer( - sql("SELECT a, count(1) FROM testData2 GROUP BY a, 1"), - Seq(Row(1, 2), Row(2, 2), Row(3, 2))) - checkAnswer( - sql("SELECT a, count(2) FROM testData2 GROUP BY a, 2"), - Seq(Row(1, 2), Row(2, 2), Row(3, 2))) - - checkAnswer( - sql("SELECT a, 1, sum(b) FROM testData2 GROUP BY a, 1"), - sql("SELECT a, 1, sum(b) FROM testData2 GROUP BY a")) - checkAnswer( - sql("SELECT a, 1, sum(b) FROM testData2 GROUP BY a, 1 + 2"), - sql("SELECT a, 1, sum(b) FROM testData2 GROUP BY a")) - checkAnswer( - sql("SELECT 1, 2, sum(b) FROM testData2 GROUP BY 1, 2"), - sql("SELECT 1, 2, sum(b) FROM testData2")) - } - test("aggregates with nulls") { checkAnswer( sql("SELECT SKEWNESS(a), KURTOSIS(a), MIN(a), MAX(a)," + From 09b3f77630be93c87abc6b64c4754b7cb32495e7 Mon Sep 17 00:00:00 2001 From: Derek Sabry Date: Thu, 10 Dec 2015 19:44:36 -0800 Subject: [PATCH 7/7] aggregate in group by clause causes failure in execution --- .../apache/spark/sql/catalyst/analysis/CheckAnalysis.scala | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 7b2c93d63d67..aeccedb5f214 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -146,6 +146,12 @@ trait CheckAnalysis { s"data type.") } + if (expr.isInstanceOf[AggregateExpression] || expr.isInstanceOf[AggregateFunction]) { + // Aggregate function in group by clause; this fails to execute + failAnalysis(s"aggregate expression ${expr.prettyString} should not " + + s"appear in grouping expression.") + } + if (!expr.deterministic) { // This is just a sanity check, our analysis rule PullOutNondeterministic should // already pull out those nondeterministic expressions and evaluate them in