-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-28356][SHUFFLE][FOLLOWUP] Fix case with different pre-shuffle partition numbers #25479
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
I opened this PR to fix #25121 (comment) |
|
ok to test |
| // we should skip it when calculating the `partitionStartIndices`. | ||
| val validMetrics = shuffleMetrics.filter(_ != null) | ||
| if (validMetrics.nonEmpty) { | ||
| // We may have different pre-shuffle partition numbers, don't reduce shuffle partition number |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's also give an example about when we will have different pre-shuffle partition numbers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, added. Please let me know if it should be more detailed.
|
|
||
| val resultDf = df1.union(df2) | ||
|
|
||
| checkAnswer(resultDf, Seq((0), (1), (2), (3)).map(i => Row(i))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this fail without the fix?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It does. The plan is:
AdaptiveSparkPlan(isFinalPlan=false)
+- Union
:- Project [id#0L]
: +- SortMergeJoin [id#0L], [id#2L], Inner
: :- Sort [id#0L ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(id#0L, 5), true
: : +- Range (0, 3, step=1, splits=12)
: +- Sort [id#2L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(id#2L, 5), true
: +- Range (0, 3, step=1, splits=12)
+- HashAggregate(keys=[], functions=[sum(id#6L)], output=[sum(id)#10L])
+- Exchange SinglePartition, true
+- HashAggregate(keys=[], functions=[partial_sum(id#6L)], output=[sum#14L])
+- Range (0, 3, step=1, splits=12)
and the error comes from this assert: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReduceNumShufflePartitions.scala#L136
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you fill the Does this PR introduce any user-facing change section? Changing a query from failure to runnable is a user-facing change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh ok, sure, filled.
|
Test build #109215 has finished for PR 25479 at commit
|
| // partition) and a result of a SortMergeJoin (multiple partitions). | ||
| val distinctNumPreShufflePartitions = | ||
| validMetrics.map(stats => stats.bytesByPartitionId.length).distinct | ||
| if (validMetrics.nonEmpty && distinctNumPreShufflePartitions.length == 1) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After we have this condition distinctNumPreShufflePartitions.length == 1, do we still need the assert at L136? Shall we remove the assert?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we could remove it, but the assert has been there since the original version of ReduceNumShufflePartitions where the distinctNumPreShufflePartitions.length == 1 check was also included. I'm not sure what is the plan with ReduceNumShufflePartitions. @carsonwang, @maryannxue do you want to improve Union/SinglePartition handling in this rule? Shall we remove the assert?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is fine to remove it. We can improve the handling of Union/SinglePartition in future and it probably needs more changes and a new function to estimate the partition start indices.
|
Test build #109233 has finished for PR 25479 at commit
|
|
thanks, merging to master! |
…partition numbers ### What changes were proposed in this pull request? This PR reverts some of the latest changes in `ReduceNumShufflePartitions` to fix the case when there are different pre-shuffle partition numbers in the plan. Please see the new UT for an example. ### Why are the changes needed? Eliminate a bug. ### Does this PR introduce any user-facing change? Yes, some queries that failed will succeed now. ### How was this patch tested? Added new UT. Closes apache#25479 from peter-toth/SPARK-28356-followup. Authored-by: Peter Toth <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
What changes were proposed in this pull request?
This PR reverts some of the latest changes in
ReduceNumShufflePartitionsto fix the case when there are different pre-shuffle partition numbers in the plan. Please see the new UT for an example.Why are the changes needed?
Eliminate a bug.
Does this PR introduce any user-facing change?
Yes, some queries that failed will succeed now.
How was this patch tested?
Added new UT.