-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-36194][SQL] Remove the aggregation from left semi/anti join if the same aggregation has already been done on left side #33404
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Kubernetes integration test unable to build dist. exiting with code: 1 |
...c/main/scala/org/apache/spark/sql/catalyst/optimizer/RemoveAggsThroughLeftSemiAntiJoin.scala
Outdated
Show resolved
Hide resolved
...c/main/scala/org/apache/spark/sql/catalyst/optimizer/RemoveAggsThroughLeftSemiAntiJoin.scala
Outdated
Show resolved
Hide resolved
...c/main/scala/org/apache/spark/sql/catalyst/optimizer/RemoveAggsThroughLeftSemiAntiJoin.scala
Outdated
Show resolved
Hide resolved
...c/main/scala/org/apache/spark/sql/catalyst/optimizer/RemoveAggsThroughLeftSemiAntiJoin.scala
Outdated
Show resolved
Hide resolved
|
Thank you, @wangyum ! |
|
Test build #141202 has finished for PR 33404 at commit
|
|
Also, cc @cloud-fan , @maropu , @viirya |
|
Thank you for adding more test case, @wangyum . |
...la/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAggregatesInLeftSemiAntiJoin.scala
Outdated
Show resolved
Hide resolved
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Test build #141230 has finished for PR 33404 at commit
|
|
Kubernetes integration test unable to build dist. exiting with code: 1 |
|
Test build #141562 has finished for PR 33404 at commit
|
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Test build #141593 has finished for PR 33404 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should use Set[ExpressionSet]. If we group by a, b and then select a, b, a as c, then the distinct keys shold be Set([a, b], [c, b])
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #141760 has finished for PR 33404 at commit
|
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Test build #141808 has finished for PR 33404 at commit
|
|
Another similar query: select distinct STATUS,RecordTypeId, count(oracle_id) as cnt_id from t group by 1,2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it have to be join specific? it looks like it should be able to handle any nodes. Ideally could remove the entire case upper @ Aggregate(_, _, lower: Aggregate) branch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ahh, I now noticed, that it was allready discussed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can generalize it. We can leverage the propagated distinct keys and remove group-only aggregate (or turn it into project) if the group cols are already distinct.
...t/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/DistinctAttributesVisitor.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd prefer this
/**
* Add a new ExpressionSet S into distinctKeys D.
* To minimize the size of D:
* 1. If there is a subset of S in D, return D.
* 2. Otherwise, remove all the ExpressionSet containing S from D, and add the new one.
*/
private def addDistinctKey(
keys: DistinctKeys,
newExpressionSet: ExpressionSet): DistinctKeys = {
if (keys.exists(_.subsetOf(newExpressionSet))) {
keys
} else {
keys.filterNot(s => newExpressionSet.subsetOf(s)) + newExpressionSet
}
}
/**
* Propagate distinct keys with projectList.
* For each alias in project list, replace the corresponding expression in distinctKeys.
* To minimize the size of distinctKeys, remove all ExpressionSet that not subset of projectList.
*/
private def projectDistinctKeys(
keys: DistinctKeys,
projectList: Seq[NamedExpression]): DistinctKeys = {
val outputSet = ExpressionSet(projectList.map(_.toAttribute))
val aliases = projectList.filter(_.isInstanceOf[Alias])
if (aliases.isEmpty) return keys.filter(_.subsetOf(outputSet))
val aliasedDistinctKeys = keys.map { expressionSet =>
expressionSet.map { expression =>
expression transform {
case expr: Expression =>
aliases
.collectFirst { case a: Alias if a.child.semanticEquals(expr) => a.toAttribute }
.getOrElse(expr)
}
}
}
aliasedDistinctKeys.collect {
case es: ExpressionSet if es.subsetOf(outputSet) => ExpressionSet(es)
}
}
override def visitAggregate(p: Aggregate): Set[ExpressionSet] = {
val distinctKeysWithGrouping =
addDistinctKey(p.child.distinctKeys, ExpressionSet(p.groupingExpressions))
projectDistinctKeys(distinctKeysWithGrouping, p.aggregateExpressions)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
spark.sql("create table t1 (a int, b int, c int) using parquet")
spark.sql("select a, b, a as e, b as f from t1 group by a, b").queryExecution.analyzed.distinctKeysFor such a query, which distinct keys do you prefer?
Set(ExpressionSet(e#0, f#1))
or
Set(ExpressionSet(a#2, b#3), ExpressionSet(a#2, f#1), ExpressionSet(b#3, e#0), ExpressionSet(e#0, f#1))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The latter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I updated it to the following code to support it.
override def visitAggregate(p: Aggregate): Set[ExpressionSet] = {
val groupingExps = ExpressionSet(p.groupingExpressions) // handle group by a, a
val aggExpressions = p.aggregateExpressions.filter {
case _: Attribute | _: Alias => true
case _ => false
}
aggExpressions.toSet.subsets(groupingExps.size).filter { s =>
groupingExps.subsetOf(ExpressionSet(s.map {
case a: Alias => a.child
case o => o
}))
}.map(s => ExpressionSet(s.map(_.toAttribute))).toSet
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sample can also propagate the distinct keys from child
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not define this in DistinctKeyVisitor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved it to DistinctKeyVisitor.
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Kubernetes integration test starting |
|
Test build #144812 has finished for PR 33404 at commit
|
|
Kubernetes integration test status failure |
|
Test build #144814 has finished for PR 33404 at commit
|
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #144823 has finished for PR 33404 at commit
|
|
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. |
|
@wangyum do you have time to revisit this and pass all tests? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: DistinctKeyVisitor seems a simpler and more general name
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, I don't think the order of ExpressionSet matters, and Set[ExpressionSet] is better
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and shall we make it a trait so that LogicalPlan can extend it directly? then we don't need LogicalPlanDistinctKeys. I think it's simpler if we only have one visitor implementation for distinct keys, which should be true.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- 1 to make it a trait.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should propagate the distinct keys from the child as well. This should be done in other places as well so we need to add a method for it, e.g.
def addDistinctKey(keys: Set[AttributeSet], newExpressionSet: ExpressionSet): Set[AttributeSet]
The idea is: if keys already indicate the newExpressionSet, e.g. we have [a, b] in keys, we can ignore newExpressionSet if it's [a, b, c]. Else, we should clean up keys and add newExpressionSet. e.g. we have [a, b, c] in keys and the newExpressionSet is [a, b], then we should remove [a, b, c] from keys.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
finally we can simply write val distinctKeys = addDistinctKey(p.child.distinctKeys, ExpressionSet(p.groupingExpressions)) here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does the distinct attributes related to the child distinct attributes? For example:
create table t(a int, b int, c int, d int, e int) using parquet;
select a, b, c, sum(d) from (select distinct * from t) t1 group by a, b, c;There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is mostly for project list so maybe projectDistinctKeys is a better name: def projectDistinctKeys(keys: Set[ExpressionSet], projectList: Seq[NamedExpression]): Set[ExpressionSet]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks quite inefficient, to generate all the subsets. I think the logic here is:
- produce correct distinct keys w.r.t. the alias mapping in the project list
- prune invalid distinct keys that are not output by the project list.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a filter before build subsets:
val expressions = keys.flatMap(_.toSet)
projectList.filter {
case a: Alias => expressions.exists(_.semanticEquals(a.child))
case ne: NamedExpression => expressions.exists(_.semanticEquals(ne))
}.toSet.subsets(keys.map(_.size).min).filter { s =>
val references = s.map {
case a: Alias => a.child
case ne => ne
}
keys.exists(_.equals(ExpressionSet(references)))
}.map(s => AttributeSet(s.map(_.toAttribute))).toSet…on has already been done on left side
| import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeSet, ExpressionSet, NamedExpression} | ||
|
|
||
|
|
||
| trait QueryPlanDistinctKeys { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer something like this
trait DistinctKeyVisitor extends LogicalPlanVisitor[Set[ExpressionSet]] { self: LogicalPlan =>
lazy val distinctKeys: DistinctKeys = {
if (check conf) {
visit(self)
} else {
default(self)
}
}
def visitXXX
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The benefit is that we can centralize the distinct key logic in this file, not in many LogicalPlan classes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
New pull request: #35651
What changes were proposed in this pull request?
This pr add a new logical plan visitor named
DistinctAttributesVisitorto find out all the distinct attributes in current logical plan. For example:The output is: {a#1, b#2}, {b#2, aliased_a#0}.
Enhance
RemoveRedundantAggregatesto remove the aggregation from left semi/anti join if the same aggregation has already been done on left side. For example:Before this PR:
After this PR:
Why are the changes needed?
Improve query performance.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Unit test and TPC-DS benchmark test.