-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-36194][SQL] Add a logical plan visitor to propagate the distinct attributes #35651
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
49e62ff
6a02605
900837f
c44f242
4bc92a6
1659465
8ac519f
a34e2af
62e9cd7
1ef1dea
f0db8ec
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,129 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.catalyst.plans.logical | ||
|
|
||
| import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, ExpressionSet, NamedExpression} | ||
| import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys | ||
| import org.apache.spark.sql.catalyst.plans.{Inner, LeftSemiOrAnti} | ||
|
|
||
| /** | ||
| * A visitor pattern for traversing a [[LogicalPlan]] tree and propagate the distinct attributes. | ||
| */ | ||
| object DistinctKeyVisitor extends LogicalPlanVisitor[Set[ExpressionSet]] { | ||
|
|
||
| private def projectDistinctKeys( | ||
| keys: Set[ExpressionSet], projectList: Seq[NamedExpression]): Set[ExpressionSet] = { | ||
| val outputSet = ExpressionSet(projectList.map(_.toAttribute)) | ||
| val distinctKeys = keys.filter(_.subsetOf(outputSet)) | ||
| val aliases = projectList.filter(_.isInstanceOf[Alias]) | ||
| if (aliases.isEmpty) { | ||
| distinctKeys | ||
| } else { | ||
| val aliasedDistinctKeys = keys.map { expressionSet => | ||
| expressionSet.map { expression => | ||
| expression transform { | ||
| case expr: Expression => | ||
| aliases | ||
| .collectFirst { case a: Alias if a.child.semanticEquals(expr) => a.toAttribute } | ||
| .getOrElse(expr) | ||
| } | ||
| } | ||
| } | ||
| aliasedDistinctKeys.collect { | ||
| case es: ExpressionSet if es.subsetOf(outputSet) => ExpressionSet(es) | ||
| } ++ distinctKeys | ||
| }.filter(_.nonEmpty) | ||
| } | ||
|
|
||
| override def default(p: LogicalPlan): Set[ExpressionSet] = Set.empty[ExpressionSet] | ||
|
|
||
| override def visitAggregate(p: Aggregate): Set[ExpressionSet] = { | ||
| val groupingExps = ExpressionSet(p.groupingExpressions) // handle group by a, a | ||
| projectDistinctKeys(Set(groupingExps), p.aggregateExpressions) | ||
| } | ||
|
|
||
| override def visitDistinct(p: Distinct): Set[ExpressionSet] = Set(ExpressionSet(p.output)) | ||
|
|
||
| override def visitExcept(p: Except): Set[ExpressionSet] = | ||
| if (!p.isAll && p.deterministic) Set(ExpressionSet(p.output)) else default(p) | ||
|
|
||
| override def visitExpand(p: Expand): Set[ExpressionSet] = default(p) | ||
|
|
||
| override def visitFilter(p: Filter): Set[ExpressionSet] = p.child.distinctKeys | ||
|
|
||
| override def visitGenerate(p: Generate): Set[ExpressionSet] = default(p) | ||
|
|
||
| override def visitGlobalLimit(p: GlobalLimit): Set[ExpressionSet] = { | ||
| p.maxRows match { | ||
| case Some(value) if value <= 1 => Set(ExpressionSet(p.output)) | ||
| case _ => p.child.distinctKeys | ||
| } | ||
| } | ||
|
|
||
| override def visitIntersect(p: Intersect): Set[ExpressionSet] = { | ||
| if (!p.isAll && p.deterministic) Set(ExpressionSet(p.output)) else default(p) | ||
| } | ||
|
|
||
| override def visitJoin(p: Join): Set[ExpressionSet] = { | ||
| p match { | ||
| case Join(_, _, LeftSemiOrAnti(_), _, _) => | ||
| p.left.distinctKeys | ||
| case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, _, _, _, _, _) | ||
| if p.left.distinctKeys.exists(_.subsetOf(ExpressionSet(leftKeys))) && | ||
| p.right.distinctKeys.exists(_.subsetOf(ExpressionSet(rightKeys))) => | ||
| Set(ExpressionSet(leftKeys), ExpressionSet(rightKeys)) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1 |
||
| case _ => default(p) | ||
cloud-fan marked this conversation as resolved.
Show resolved
Hide resolved
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. for left outer, we can propagate from right side.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. spark.sql("create table t1(a int, b int) using parquet")
spark.sql("create table t2(x int, y int) using parquet")
spark.sql("insert into t1 values(1, 1), (2, 2)")
spark.sql("insert into t2 values(3, 3), (4, 4)")
spark.sql("select * from t1 left join (select distinct * from t2)t2 on t1.a = t2.x and t1.b = t2.y").showThe output is: We can't distinguish the distinct keys.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry my fault. We can propagate the left side distinct keys if
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1 |
||
| } | ||
| } | ||
|
|
||
| override def visitLocalLimit(p: LocalLimit): Set[ExpressionSet] = p.child.distinctKeys | ||
|
|
||
| override def visitPivot(p: Pivot): Set[ExpressionSet] = default(p) | ||
|
|
||
| override def visitProject(p: Project): Set[ExpressionSet] = { | ||
| if (p.child.distinctKeys.nonEmpty) { | ||
| projectDistinctKeys(p.child.distinctKeys.map(ExpressionSet(_)), p.projectList) | ||
| } else { | ||
| default(p) | ||
| } | ||
| } | ||
|
|
||
| override def visitRepartition(p: Repartition): Set[ExpressionSet] = p.child.distinctKeys | ||
|
|
||
| override def visitRepartitionByExpr(p: RepartitionByExpression): Set[ExpressionSet] = | ||
| p.child.distinctKeys | ||
|
|
||
| override def visitSample(p: Sample): Set[ExpressionSet] = { | ||
| if (!p.withReplacement) p.child.distinctKeys else default(p) | ||
| } | ||
|
|
||
| override def visitScriptTransform(p: ScriptTransformation): Set[ExpressionSet] = default(p) | ||
|
|
||
| override def visitUnion(p: Union): Set[ExpressionSet] = default(p) | ||
|
|
||
| override def visitWindow(p: Window): Set[ExpressionSet] = p.child.distinctKeys | ||
|
|
||
| override def visitTail(p: Tail): Set[ExpressionSet] = p.child.distinctKeys | ||
|
|
||
| override def visitSort(p: Sort): Set[ExpressionSet] = p.child.distinctKeys | ||
|
|
||
| override def visitRebalancePartitions(p: RebalancePartitions): Set[ExpressionSet] = | ||
| p.child.distinctKeys | ||
|
|
||
| override def visitWithCTE(p: WithCTE): Set[ExpressionSet] = p.plan.distinctKeys | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,34 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.catalyst.plans.logical | ||
|
|
||
| import org.apache.spark.sql.catalyst.expressions.ExpressionSet | ||
| import org.apache.spark.sql.internal.SQLConf.PROPAGATE_DISTINCT_KEYS_ENABLED | ||
|
|
||
| /** | ||
| * A trait to add distinct attributes to [[LogicalPlan]]. For example: | ||
| * {{{ | ||
| * SELECT a, b, SUM(c) FROM Tab1 GROUP BY a, b | ||
| * // returns a, b | ||
| * }}} | ||
| */ | ||
| trait LogicalPlanDistinctKeys { self: LogicalPlan => | ||
| lazy val distinctKeys: Set[ExpressionSet] = { | ||
| if (conf.getConf(PROPAGATE_DISTINCT_KEYS_ENABLED)) DistinctKeyVisitor.visit(self) else Set.empty | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about a mix? e.g.
SELECT a, 1, b FROM ... GROUP BY a, b, cThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We already support this case, add a new test case:
https://github.com/apache/spark/pull/35779/files#diff-7cd933ffc7b9ce86d5973bee80f4a5bd4a021c0f0ff81defe1f020bcb55b4b3bR153-R159