-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-35276][CORE] Calculate checksum for shuffle data and write as checksum file #32401
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #138086 has finished for PR 32401 at commit
|
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
b346876 to
3267fba
Compare
|
I have resolved the issue (regression, make it a built-in feature) as mentioned by @tgravescs at #32385 (comment). So I think it's ready for review. |
|
Kubernetes integration test starting |
|
Test build #139498 has finished for PR 32401 at commit
|
|
Kubernetes integration test status success |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do we write the shuffle index file right now? In this method, we also calculate the partition lengths but we don't write the index file immediately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh actually we did, but it's done by ShuffleMapOutputWriter.commitAllPartitions. Does this checksum file work with custom shuffle extensions?
3267fba to
2ca071f
Compare
|
Test build #139649 has finished for PR 32401 at commit
|
|
Kubernetes integration test unable to build dist. exiting with code: 1 |
|
I will try to complete the review of this today or tomorrow @Ngone51 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TBH I don't think the current shuffle API provides enough abstraction to do checksum. I'm OK with this change as the shuffle API is still private, but we should revisit the shuffle API later, so that checksum can be done at the shuffle implementation side.
The current issue I see is, Spark writes local spill files and then asks the shuffle implementation to "transfer" the spill files. Then Spark has to do checksum by itself during spill file writing, to reduce the perf overhead.
We can discuss it later.
mridulm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some of my comments from the other PR is relevant here as well, added a few more comments on this PR.
Thanks for working on this @Ngone51 !
Btw, do you have any details on what the overhead of introducing this is ? On both checksum generation time, and (in case of failures), validation time ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given the validation happens in shuffle service, we have to allow specifying checksum's to be configurable with future evolution in mind.
Also, this will need to be conveyed to ESS (filename extension ?) and allow to be configurable in spark.
The requirement that the checksum itself is a long and the checksum algo should extend from zip.Checksum sounds fine to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, this will need to be conveyed to ESS (filename extension ?) and allow to be configurable in spark.
hmm..I'm not sure if get this concern. Could you elaborate more?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The checksum algorithm being used is specified at ESS and spark executor as Adler32 in code currently - from an evolution point of view, if we have to change/support other checksum algo's in future, this will become an incompatible change : which will require ESS and client to be upgraded together to support it.
Thoughts ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah I agree it would be nice to be configurable and recorded. perhaps either extension on checksum file or metadata in the checksum file. I would expect ESS to indicate if its supported - ie perhaps error is unknown or unsupported checksum type when trying to diagnose. I think the other part might be in the push based shuffle. I assume if its merging files it may have to recalculate these so needs to know the algorithm to use?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+CC @otterc for @tgravescs's on push based shuffle.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, for push-based shuffle the server would need to know the which algorithm to use. Just thinking out loud, the server can use this checksum to verify that a block which was pushed to it was corrupt or not by calculating and comparing the checksum.
- This verification may be expensive though while the blocks are getting merged. So, maybe we can think of doing it asynchronously.
- Also may require adding checksum to block push message which would be backward incompatible so maybe we create a new message for it.
Anyways, so if we do want the algorithm to be configurable, can we leverage the RegisterExecutor message for it?
We can sent the additional information about what algorithm is being used and if the shuffle server doesn't support it, checksum is not calculated. This would be similar to how we were sending the attemptId information to the shuffle server for push-based shuffle.
Just a thought.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Anyways, so if we do want the algorithm to be configurable, can we leverage the RegisterExecutor message for it?
The reason I was initially proposing adding checksum algo to the file name itself as a suffix is to minimize the state required to reason about which algorithm is being used. We wont need to pass it from container to ESS, or persist it across ESS restarts, etc.
Tom's additional suggestion of including it in checksum file itself as metadata also works - given the current index'ing into the file for a given partition (8 * partition_id), metadata at end of file might be more convenient place to record this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason I suggested to leverage the RegistorExecutor message to communicate the checksum algo to ESS is because for push-based shuffle, the method of communicating the checksum algorithm to the ESS via checksum file, which is generated when the shuffle data is generated, will not work. This is if we want to validate that a pushed block is corrupt or not at the remote ESS in future.
However, leveraging this for push-based shuffle is not part of this change but since it was mentioned I thought how push-based shuffle can utilize it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Excellent point @otterc - this would handle the case of merger not having any local shuffle files (from executors on that node), but only data pushed to it.
core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/internal/config/package.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/io/MutableCheckedOutputStream.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/shuffle/ShufflePartitionPairsWriter.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
Outdated
Show resolved
Hide resolved
2ca071f to
47983ca
Compare
@mridulm I have run the TPCDS benchmark with 3tb datasets internally and there's no regression. I didn't count the accurate generation time, but it's surly trival according to the benchmark results. For the validation time, I didn't pay attention to it previously since the major issue was checksum calculation at that time. I can take a look later. However, I think the validation is a part of error handling so I think won't count it for performance influence. |
|
Kubernetes integration test unable to build dist. exiting with code: 1 |
|
Test build #139937 has finished for PR 32401 at commit
|
|
so what did the benchmarking numbers look like? Was there an average hit across or mostly just noise? |
core/src/main/java/org/apache/spark/shuffle/api/ShuffleMapOutputWriter.java
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/internal/config/package.scala
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah I agree it would be nice to be configurable and recorded. perhaps either extension on checksum file or metadata in the checksum file. I would expect ESS to indicate if its supported - ie perhaps error is unknown or unsupported checksum type when trying to diagnose. I think the other part might be in the push based shuffle. I assume if its merging files it may have to recalculate these so needs to know the algorithm to use?
47983ca to
b5a2235
Compare
|
Test build #140652 has finished for PR 32401 at commit
|
@tgravescs Our internal benchmark runs between the baseline (master) and target (master + checksum changes). It measures the end-to-end execution time of TPC-DS queries. So the numbers are actually the execution time of queries. And from the result, it mostly just noise. |
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Kubernetes integration test unable to build dist. exiting with code: 1 |
|
Test build #141051 has finished for PR 32401 at commit
|
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Test build #141061 has finished for PR 32401 at commit
|
|
Test build #141141 has finished for PR 32401 at commit
|
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
I plan to merge this later today. In case there are ongoing reviews, please do comment so that I can hold off. |
…checksum file ### What changes were proposed in this pull request? This is the initial work of add checksum support of shuffle. This is a piece of #32385. And this PR only adds checksum functionality at the shuffle writer side. Basically, the idea is to wrap a `MutableCheckedOutputStream`* upon the `FileOutputStream` while the shuffle writer generating the shuffle data. But the specific wrapping places are a bit different among the shuffle writers due to their different implementation: * `BypassMergeSortShuffleWriter` - wrap on each partition file * `UnsafeShuffleWriter` - wrap on each spill files directly since they doesn't require aggregation, sorting * `SortShuffleWriter` - wrap on the `ShufflePartitionPairsWriter` after merged spill files since they might require aggregation, sorting \* `MutableCheckedOutputStream` is a variant of `java.util.zip.CheckedOutputStream` which can change the checksum calculator at runtime. And we use the `Adler32`, which uses the CRC-32 algorithm but much faster, to calculate the checksum as the same as `Broadcast`'s checksum. ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? Yes, added a new conf: `spark.shuffle.checksum`. ### How was this patch tested? Added unit tests. Closes #32401 from Ngone51/add-checksum-files. Authored-by: yi.wu <[email protected]> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com> (cherry picked from commit 4783fb7) Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
|
Merged to master and branch-3.2 Thanks for the reviews @cloud-fan, @otterc, @tgravescs, @HeartSaVioR |
|
+CC @gengliangwang |
|
@mridulm Thanks for the merge! I saw you also merged this into 3.2 but you may see that new confs in this PR are versioned as 3.3.0. Because my original thought was only merging this feature to master branch given that more pieces are required for this feature and 3.2.0 is going to release soon. So, I'm afraid a partially completed feature won't help in 3.2. |
|
I haven't looked into the code deeply (I just helped to fix RAT issue) but IMHO the value of this PR (worth to ship 3.2 or not) depends on whether we "verify" the checksum or not. If we only write the checksum and not yet verify it, nothing is changed yet in end users' point of view and we should wait for next PR(s) to be completed. If this PR introduces the checksum verification as well (and proper error message), personally this PR itself seems to worth to ship without waiting for other PRs. |
|
Thanks @HeartSaVioR That's actually my concern. This PR only writes checksum but without verification. Verification is planned to be implemented in a separate PR and I worry we can't complete it before the 3.2 release. |
|
@Ngone51 I was assuming we will be completing the verification for 3.2, given the earlier WIP pr which had the full impl :-) |
|
@gengliangwang what sort of timeline do we have before we go into RC ? |
|
I think there are around 2 weeks left and it seems promising to merge the verification PR before RC. +1 to have this feature in 3.2 to improve stability. |
|
Thanks, @mridulm @cloud-fan I'll try my best to push the validation PR first (I'm working on it right now). We could revert this later if we can't get the validation PR in. |
|
@Ngone51 Yes let's see if we can make it before 3.2. Thanks for the work! |
|
Thanks for the clarifications ! This sounds good. |
…checksum file This is the initial work of add checksum support of shuffle. This is a piece of apache#32385. And this PR only adds checksum functionality at the shuffle writer side. Basically, the idea is to wrap a `MutableCheckedOutputStream`* upon the `FileOutputStream` while the shuffle writer generating the shuffle data. But the specific wrapping places are a bit different among the shuffle writers due to their different implementation: * `BypassMergeSortShuffleWriter` - wrap on each partition file * `UnsafeShuffleWriter` - wrap on each spill files directly since they doesn't require aggregation, sorting * `SortShuffleWriter` - wrap on the `ShufflePartitionPairsWriter` after merged spill files since they might require aggregation, sorting \* `MutableCheckedOutputStream` is a variant of `java.util.zip.CheckedOutputStream` which can change the checksum calculator at runtime. And we use the `Adler32`, which uses the CRC-32 algorithm but much faster, to calculate the checksum as the same as `Broadcast`'s checksum. Yes, added a new conf: `spark.shuffle.checksum`. Added unit tests. Closes apache#32401 from Ngone51/add-checksum-files. Authored-by: yi.wu <[email protected]> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com> (cherry picked from commit 4783fb7) Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
…checksum file ### What changes were proposed in this pull request? This is the initial work of add checksum support of shuffle. This is a piece of apache#32385. And this PR only adds checksum functionality at the shuffle writer side. Basically, the idea is to wrap a `MutableCheckedOutputStream`* upon the `FileOutputStream` while the shuffle writer generating the shuffle data. But the specific wrapping places are a bit different among the shuffle writers due to their different implementation: * `BypassMergeSortShuffleWriter` - wrap on each partition file * `UnsafeShuffleWriter` - wrap on each spill files directly since they doesn't require aggregation, sorting * `SortShuffleWriter` - wrap on the `ShufflePartitionPairsWriter` after merged spill files since they might require aggregation, sorting \* `MutableCheckedOutputStream` is a variant of `java.util.zip.CheckedOutputStream` which can change the checksum calculator at runtime. And we use the `Adler32`, which uses the CRC-32 algorithm but much faster, to calculate the checksum as the same as `Broadcast`'s checksum. ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? Yes, added a new conf: `spark.shuffle.checksum`. ### How was this patch tested? Added unit tests. Closes apache#32401 from Ngone51/add-checksum-files. Authored-by: yi.wu <[email protected]> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com> (cherry picked from commit 4783fb7) Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
What changes were proposed in this pull request?
This is the initial work of add checksum support of shuffle. This is a piece of #32385. And this PR only adds checksum functionality at the shuffle writer side.
Basically, the idea is to wrap a
MutableCheckedOutputStream* upon theFileOutputStreamwhile the shuffle writer generating the shuffle data. But the specific wrapping places are a bit different among the shuffle writers due to their different implementation:BypassMergeSortShuffleWriter- wrap on each partition fileUnsafeShuffleWriter- wrap on each spill files directly since they doesn't require aggregation, sortingSortShuffleWriter- wrap on theShufflePartitionPairsWriterafter merged spill files since they might require aggregation, sorting*
MutableCheckedOutputStreamis a variant ofjava.util.zip.CheckedOutputStreamwhich can change the checksum calculator at runtime.And we use the
Adler32, which uses the CRC-32 algorithm but much faster, to calculate the checksum as the same asBroadcast's checksum.Why are the changes needed?
Does this PR introduce any user-facing change?
Yes, added a new conf:
spark.shuffle.checksum.How was this patch tested?
Added unit tests.