-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-38832][SQL] Remove unnecessary distinct in aggregate expression by distinctKeys #36117
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| .rebalance().groupBy()(countDistinct($"a") as "x", sumDistinct($"a") as "y").analyze | ||
| comparePlans(Optimize.execute(q2), q2) | ||
|
|
||
| // avoid remove double data type attr |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what will go wrong if we optimize this case as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Physical Aggregate will wrap NormalizeNaNAndZero for float/double to handle NaN and -0.0, so It's result value might be different with the original expression ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nvm, the child will also do the same thing. We do not need consider this at here.
wangyum
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't work because EliminateDistinct is only executed once and it before EliminateSubqueryAliases:
=== Applying Rule org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases ===
Aggregate [count(distinct id#3L) AS count(DISTINCT id)#5L] Aggregate [count(distinct id#3L) AS count(DISTINCT id)#5L]
!+- SubqueryAlias __auto_generated_subquery_name +- Aggregate [id#3L], [id#3L]
! +- Aggregate [id#3L], [id#3L] +- Relation default.t[id#3L] parquet
! +- SubqueryAlias spark_catalog.default.t
! +- Relation default.t[id#3L] parquet
|
@wangyum good catch, seems |
|
|
||
| case ae: AggregateExpression if ae.isDistinct && | ||
| agg.child.distinctKeys.exists( | ||
| _.subsetOf(ExpressionSet(ae.aggregateFunction.children.filterNot(_.foldable)))) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it correct?
If input plan to this rule is:
SELECT a, count(distinct c) FROM (
SELECT distinct a, b, c
FROM t
)
GROUP BY a
Will the added case branch rewrite the plan to
SELECT a, count(c) FROM (
SELECT distinct a, b, c
FROM t
)
GROUP BY a
agg.child.distinctKeys is {a, b, c}
ExpressionSet(ae.aggregateFunction.children.filterNot(_.foldable)) is {c}.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the distinctKeys of distinct a, b, c is ExpressionSet(a, b, c) not ExpressionSet(a), ExpressionSet(b), ExpressionSet(c)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right. I forgot distinctKeys is a set of sets.
How about:
agg.child.distinctKeys.exists(
key => !key.isEmpty() &&
key.subsetOf(ExpressionSet(ae.aggregateFunction.children.filterNot(_.foldable))))
Alternatively, we can do a require here to make sure that we never return an empty key:
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlanDistinctKeys.scala#L32
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make sense, I add a require at LogicalPlanDistinctKeys
|
thanks, merging to master! |
|
thank you @cloud-fan @wangyum @sigmod |
| if (conf.getConf(PROPAGATE_DISTINCT_KEYS_ENABLED)) DistinctKeyVisitor.visit(self) else Set.empty | ||
| if (conf.getConf(PROPAGATE_DISTINCT_KEYS_ENABLED)) { | ||
| val keys = DistinctKeyVisitor.visit(self) | ||
| require(keys.forall(_.nonEmpty)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really need this require? It looks fine to have an empty set as the distinct keys, e.g. global aggregate without keys. It means that the entire data set is distinct (have at most one row), and EliminateDistinct is OK with empty set in distinct keys.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's more about avoid some unexpected things. It will be a correctness issue if other opterators return empty distinct key. And as you mentioned, the global aggregate has already optimzied by EliminateDistinct and OptimizeOneRowPlan, so it's fine ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My point is that DistinctKeyVisitor does not work with global aggregate now, and an empty expression set is still a valid distinct key, why do we forbid it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have already forbidden it inside DistinctKeyVisitor. Do you think we should support that case ?
Line 50 in a67acba
| }.filter(_.nonEmpty) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is only done at the else branch, not the if branch. I think we have two options:
- keep the requirement, and add the
filterin theifbranch as well - remove the requirement, and remove the
filterfrom theelsebranch.
I prefer option 2 as I think an empty expression set does mean something as a distinct key, we should not ignore this information. It also works the same as other distinct keys:
- It can replace all other distinct keys as it's a subset of any expression set
- It can satisfy any distinct key requirement, e.g. remove unnecessary distinct in aggregate functions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is only done at the else branch, not the if branch
It's a good point. I will do a followup soon.
…or distinct key ### What changes were proposed in this pull request? - Improve `DistinctKeyVisitor` that support propagate empty set - Small improvement for match alias ### Why are the changes needed? Make distinct keys can be used to optimize more case, see comment #36117 (comment) ### Does this PR introduce _any_ user-facing change? Improve performance ### How was this patch tested? add test Closes #36281 from ulysses-you/SPARK-38832-followup. Authored-by: ulysses-you <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
What changes were proposed in this pull request?
Make
EliminateDistinctsupport eliminate distinct by child distinct keys.Why are the changes needed?
We can remove the distinct in aggregate expression if the distinct semantics is guaranteed by child.
For example:
Does this PR introduce any user-facing change?
improve performance
How was this patch tested?
add test in
EliminateDistinctSuite