-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-49386][CORE][SQL] Add memory based thresholds for shuffle spill #47856
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Let's probably file a new JIRA |
|
I am a bit swamped unfortunately, and I dont think I will be able to ensure this gets merged before next monday @dongjoon-hyun - sorry about that :-( @cxzl25, will try to get around to reviewing this soon - apologies for the delay |
|
+CC @Ngone51 as well. |
|
Thank you for letting me know, @mridulm ~ No problem at all. |
|
Kindly ping @mridulm, do you have a chance to take another look? I also found this PR is helpful for stability for jobs that spill huge data. |
mridulm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a few comments, mostly looks good to me.
Thanks for working on this @cxzl25, and apologies for the delay in getting to this !
+CC @HyukjinKwon, @cloud-fan as well for review.
core/src/main/scala/org/apache/spark/internal/config/package.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By moving _elementsRead > numElementsForceSpillThreshold here, we would actually reduce some unnecessary allocations .... nice !
core/src/main/scala/org/apache/spark/util/collection/Spillable.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The config name is a bit confusing.
spark.sql.windowExec.buffer.spill.threshold vs spark.sql.windowExec.buffer.spill.size.threshold.
Same for the others introduced.
I will let @HyukjinKwon or @cloud-fan comment better though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not super used to this area. I would rarther follow the suggestions from you / others.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @HyukjinKwon !
+CC @dongjoon-hyun as well.
|
I am planning to merge this next week if there are no concerns @cloud-fan , @dongjoon-hyun. I am not super keen on the naming of some of the sql configs, would your thoughts on that (as well as rest of the PR). Also, +CC @attilapiros for feedback as well. |
core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/util/collection/Spillable.scala
Outdated
Show resolved
Hide resolved
|
@mridulm @cxzl25 @attilapiros @HyukjinKwon @pan3793 Hi all was just curious if there was any issues regarding this pr or if it will be merged in OSS Spark sometime soon? Thanks again for making this change! |
|
I did not merge it given @attilapiros was actively reviewing it. |
|
checking |
attilapiros
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM after the code duplicate is resolved.
sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowEvaluatorFactory.scala
Outdated
Show resolved
Hide resolved
When running large shuffles (700TB input data, 200k map tasks, 50k reducers on a 300 nodes cluster) the job is regularly OOMing in map and reduce phase. IIUC ShuffleExternalSorter (map side) and ExternalAppendOnlyMap and ExternalSorter (reduce side) are trying to max out the available execution memory. This in turn doesn't play nice with the Garbage Collector and executors are failing with OutOfMemoryError when the memory allocation from these in-memory structure is maxing out the available heap size (in our case we are running with 9 cores/executor, 32G per executor) To mitigate this, one can set spark.shuffle.spill.numElementsForceSpillThreshold to force the spill on disk. While this config works, it is not flexible enough as it's expressed in number of elements, and in our case we run multiple shuffles in a single job and element size is different from one stage to another. This patch extends the spill threshold behaviour and adds two new parameters to control the spill based on memory usage: - spark.shuffle.spill.map.maxRecordsSizeForSpillThreshold - spark.shuffle.spill.reduce.maxRecordsSizeForSpillThreshold
…e and GlutenJoinSuite tests see apache/spark#47856
…e and GlutenJoinSuite tests see apache/spark#47856
…e and GlutenJoinSuite tests see apache/spark#47856
…e and GlutenJoinSuite tests see apache/spark#47856
…e and GlutenJoinSuite tests see apache/spark#47856
…e and GlutenJoinSuite tests see apache/spark#47856
…e and GlutenJoinSuite tests see apache/spark#47856
…e and GlutenJoinSuite tests see apache/spark#47856
…e and GlutenJoinSuite tests see apache/spark#47856
…e and GlutenJoinSuite tests see apache/spark#47856
…e and GlutenJoinSuite tests see apache/spark#47856
…ite and GlutenJoinSuite tests see apache/spark#47856
…ite and GlutenJoinSuite tests see apache/spark#47856
…ite and GlutenJoinSuite tests see apache/spark#47856
…ite and GlutenJoinSuite tests see apache/spark#47856
…ite and GlutenJoinSuite tests see apache/spark#47856
…ite and GlutenJoinSuite tests see apache/spark#47856
…ite and GlutenJoinSuite tests see apache/spark#47856
…ite and GlutenJoinSuite tests see apache/spark#47856
…ite and GlutenJoinSuite tests see apache/spark#47856
…ite and GlutenJoinSuite tests see apache/spark#47856
…ite and GlutenJoinSuite tests see apache/spark#47856
…ite and GlutenJoinSuite tests see apache/spark#47856
…ite and GlutenJoinSuite tests see apache/spark#47856
…ite and GlutenJoinSuite tests see apache/spark#47856
…ite and GlutenJoinSuite tests see apache/spark#47856
…ite and GlutenJoinSuite tests see apache/spark#47856
…ite and GlutenJoinSuite tests see apache/spark#47856
…ite and GlutenJoinSuite tests see apache/spark#47856
…ite and GlutenJoinSuite tests see apache/spark#47856
Original author: @amuraru
What changes were proposed in this pull request?
This PR aims to support add memory based thresholds for shuffle spill.
Introduce configuration
Why are the changes needed?
#24618
We can only determine the number of spills by configuring
spark.shuffle.spill.numElementsForceSpillThreshold. In some scenarios, the size of a row may be very large in the memory.Does this PR introduce any user-facing change?
No
How was this patch tested?
GA
Verified in the production environment, the task time is shortened, the number of spill disks is reduced, there is a better chance to compress the shuffle data, and the size of the spill to disk is also significantly reduced.
Current
PR

Was this patch authored or co-authored using generative AI tooling?
No