-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-32917][SHUFFLE][CORE] Adds support for executors to push shuffle blocks after successful map task completion #30312
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
@Victsm @mridulm @tgravescs @jiangxb1987 @attilapiros @Ngone51 Please take a look. |
fa5a778 to
b54589b
Compare
|
ok to test |
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Test build #130957 has finished for PR 30312 at commit
|
|
retest this please |
|
Kubernetes integration test starting |
core/src/main/scala/org/apache/spark/shuffle/ShuffleWriter.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/shuffle/ShuffleWriter.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/shuffle/ShuffleWriter.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/shuffle/ShuffleWriter.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/shuffle/ShuffleWriter.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/shuffle/ShuffleWriter.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/shuffle/ShuffleWriter.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/shuffle/ShuffleWriter.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/shuffle/ShuffleWriter.scala
Outdated
Show resolved
Hide resolved
|
Kubernetes integration test status failure |
core/src/main/scala/org/apache/spark/shuffle/ShuffleWriter.scala
Outdated
Show resolved
Hide resolved
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #131004 has finished for PR 30312 at commit
|
|
Test build #131010 has finished for PR 30312 at commit
|
|
Test build #131013 has finished for PR 30312 at commit
|
common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/shuffle/ShuffleWriter.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/internal/config/package.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/internal/config/package.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/shuffle/ShuffleWriter.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala
Outdated
Show resolved
Hide resolved
…r.scala Co-authored-by: Min Shen <[email protected]>
…r.scala Co-authored-by: Min Shen <[email protected]>
|
Test build #133044 has finished for PR 30312 at commit
|
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Test build #133049 has finished for PR 30312 at commit
|
|
ok to test |
|
Given no other comments, will merge once tests pass (have retriggered it). |
|
Test build #133803 has started for PR 30312 at commit |
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
test this please |
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Test build #133807 has finished for PR 30312 at commit
|
|
Merged to master. |
|
Thanks @mridulm for merging and also reviewing. Thanks @Ngone51, @dongjoon-hyun, and @Victsm for the reviews as well. |
…ush.based.enabled in ShuffleBlockPusherSuite ### What changes were proposed in this pull request? It is a trivial change to remove the reference to an incorrect configuration for push-based shuffle from a test suite. Ref: #30312 With SPARK-32917, `ShuffleBlockPusher` and its test suite was introduced. `ShuffleBlockPusher` is created only when push-based shuffle is enabled and the tests in `ShuffleBlockPusherSuite` are just testing the functionality in the pusher. So there is no need to have these configs enabled in these test. ### Why are the changes needed? This change removes an incorrect configuration from the test suite. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? This change just removes an incorrect configuration from the test suite so haven't added any UTs for it. Closes #32992 from otterc/SPARK-35836. Authored-by: Chandni Singh <[email protected]> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
…le blocks after successful map task completion ### What changes were proposed in this pull request? This is the shuffle writer side change where executors can push data to remote shuffle services. This is needed for push-based shuffle - SPIP [SPARK-30602](https://issues.apache.org/jira/browse/SPARK-30602). Summary of changes: - This adds support for executors to push shuffle blocks after map tasks complete writing shuffle data. - This also introduces a timeout specifically for creating connection to remote shuffle services. ### Why are the changes needed? - These changes are needed for push-based shuffle. Refer to the SPIP in [SPARK-30602](https://issues.apache.org/jira/browse/SPARK-30602). - The main reason to create a separate connection creation timeout is because the existing `connectionTimeoutMs` is overloaded and is used for connection creation timeouts as well as connection idle timeout. The connection creation timeout should be much lower than the idle timeouts. The default for `connectionTimeoutMs` is 120s. This is quite high for just establishing the connections. If a shuffle server node is bad then the connection creation will fail within few seconds. However, an overloaded shuffle server may take much longer to respond to a request and the channel can stay idle for a much longer time which is expected. Another reason is that with push-based shuffle, an executor may be fetching shuffle data and pushing shuffle data (next stage) simultaneously. Both these tasks will share the same connections with the shuffle service. If there is a bad shuffle server node and the connection creation timeout is very high then both these tasks end up waiting a long time time eventually impacting the performance. ### Does this PR introduce _any_ user-facing change? Yes. This PR introduces client-side configs for push-based shuffle. If push-based shuffle is turned-off then the users will not see any change. ### How was this patch tested? Added unit tests. The reference PR with the consolidated changes covering the complete implementation is also provided in [SPARK-30602](https://issues.apache.org/jira/browse/SPARK-30602). We have already verified the functionality and the improved performance as documented in the SPIP doc. Lead-authored-by: Min Shen mshenlinkedin.com Co-authored-by: Chandni Singh chsinghlinkedin.com Co-authored-by: Ye Zhou yezhoulinkedin.com Closes apache#30312 from otterc/SPARK-32917. Lead-authored-by: Chandni Singh <[email protected]> Co-authored-by: Chandni Singh <[email protected]> Co-authored-by: Min Shen <[email protected]> Co-authored-by: Ye Zhou <[email protected]> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
…ush.based.enabled in ShuffleBlockPusherSuite ### What changes were proposed in this pull request? It is a trivial change to remove the reference to an incorrect configuration for push-based shuffle from a test suite. Ref: apache#30312 With SPARK-32917, `ShuffleBlockPusher` and its test suite was introduced. `ShuffleBlockPusher` is created only when push-based shuffle is enabled and the tests in `ShuffleBlockPusherSuite` are just testing the functionality in the pusher. So there is no need to have these configs enabled in these test. ### Why are the changes needed? This change removes an incorrect configuration from the test suite. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? This change just removes an incorrect configuration from the test suite so haven't added any UTs for it. Closes apache#32992 from otterc/SPARK-35836. Authored-by: Chandni Singh <[email protected]> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
…le blocks after successful map task completion This is the shuffle writer side change where executors can push data to remote shuffle services. This is needed for push-based shuffle - SPIP [SPARK-30602](https://issues.apache.org/jira/browse/SPARK-30602). Summary of changes: - This adds support for executors to push shuffle blocks after map tasks complete writing shuffle data. - This also introduces a timeout specifically for creating connection to remote shuffle services. - These changes are needed for push-based shuffle. Refer to the SPIP in [SPARK-30602](https://issues.apache.org/jira/browse/SPARK-30602). - The main reason to create a separate connection creation timeout is because the existing `connectionTimeoutMs` is overloaded and is used for connection creation timeouts as well as connection idle timeout. The connection creation timeout should be much lower than the idle timeouts. The default for `connectionTimeoutMs` is 120s. This is quite high for just establishing the connections. If a shuffle server node is bad then the connection creation will fail within few seconds. However, an overloaded shuffle server may take much longer to respond to a request and the channel can stay idle for a much longer time which is expected. Another reason is that with push-based shuffle, an executor may be fetching shuffle data and pushing shuffle data (next stage) simultaneously. Both these tasks will share the same connections with the shuffle service. If there is a bad shuffle server node and the connection creation timeout is very high then both these tasks end up waiting a long time time eventually impacting the performance. Yes. This PR introduces client-side configs for push-based shuffle. If push-based shuffle is turned-off then the users will not see any change. Added unit tests. The reference PR with the consolidated changes covering the complete implementation is also provided in [SPARK-30602](https://issues.apache.org/jira/browse/SPARK-30602). We have already verified the functionality and the improved performance as documented in the SPIP doc. Lead-authored-by: Min Shen mshenlinkedin.com Co-authored-by: Chandni Singh chsinghlinkedin.com Co-authored-by: Ye Zhou yezhoulinkedin.com Closes #30312 from otterc/SPARK-32917. Lead-authored-by: Chandni Singh <[email protected]> Co-authored-by: Chandni Singh <[email protected]> Co-authored-by: Min Shen <[email protected]> Co-authored-by: Ye Zhou <[email protected]> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
…ush.based.enabled in ShuffleBlockPusherSuite ### What changes were proposed in this pull request? It is a trivial change to remove the reference to an incorrect configuration for push-based shuffle from a test suite. Ref: #30312 With SPARK-32917, `ShuffleBlockPusher` and its test suite was introduced. `ShuffleBlockPusher` is created only when push-based shuffle is enabled and the tests in `ShuffleBlockPusherSuite` are just testing the functionality in the pusher. So there is no need to have these configs enabled in these test. ### Why are the changes needed? This change removes an incorrect configuration from the test suite. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? This change just removes an incorrect configuration from the test suite so haven't added any UTs for it. Closes #32992 from otterc/SPARK-35836. Authored-by: Chandni Singh <[email protected]> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
What changes were proposed in this pull request?
This is the shuffle writer side change where executors can push data to remote shuffle services. This is needed for push-based shuffle - SPIP SPARK-30602.
Summary of changes:
Why are the changes needed?
connectionTimeoutMsis overloaded and is used for connection creation timeouts as well as connection idle timeout. The connection creation timeout should be much lower than the idle timeouts. The default forconnectionTimeoutMsis 120s. This is quite high for just establishing the connections. If a shuffle server node is bad then the connection creation will fail within few seconds. However, an overloaded shuffle server may take much longer to respond to a request and the channel can stay idle for a much longer time which is expected. Another reason is that with push-based shuffle, an executor may be fetching shuffle data and pushing shuffle data (next stage) simultaneously. Both these tasks will share the same connections with the shuffle service. If there is a bad shuffle server node and the connection creation timeout is very high then both these tasks end up waiting a long time time eventually impacting the performance.Does this PR introduce any user-facing change?
Yes. This PR introduces client-side configs for push-based shuffle. If push-based shuffle is turned-off then the users will not see any change.
How was this patch tested?
Added unit tests.
The reference PR with the consolidated changes covering the complete implementation is also provided in SPARK-30602.
We have already verified the functionality and the improved performance as documented in the SPIP doc.
Lead-authored-by: Min Shen [email protected]
Co-authored-by: Chandni Singh [email protected]
Co-authored-by: Ye Zhou [email protected]