Skip to content

Conversation

@otterc
Copy link
Contributor

@otterc otterc commented Mar 22, 2021

What changes were proposed in this pull request?

This PR fixes bugs that causes corruption of push-merged blocks when a client terminates while pushing block. RemoteBlockPushResolver was introduced in #30062 (SPARK-32916).

There are 2 scenarios where the merged blocks get corrupted:

  1. StreamCallback.onFailure() is called more than once. Initially we assumed that the onFailure callback will be called just once per stream. However, we observed that this is called twice when a client connection is reset. When the client connection is reset then there are 2 events that get triggered in this order.
  • exceptionCaught. This event is propagated to StreamInterceptor. StreamInterceptor.exceptionCaught() invokes callback.onFailure(streamId, cause). This is the first time StreamCallback.onFailure() will be invoked.
  • channelInactive. Since the channel closes, the channelInactive event gets triggered which again is propagated to StreamInterceptor. StreamInterceptor.channelInactive() invokes callback.onFailure(streamId, new ClosedChannelException()). This is the second time StreamCallback.onFailure() will be invoked.
  1. The flag isWriting is set prematurely to true. This introduces an edge case where a stream that is trying to merge a duplicate block (created because of a speculative task) may interfere with an active stream if the duplicate stream fails.

Also adding additional changes that improve the code.

  1. Using positional writes all the time because this simplifies the code and with microbenchmarking haven't seen any performance impact.
  2. Additional minor changes suggested by @mridulm during an internal review.

Why are the changes needed?

These are bug fixes and simplify the code.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Added unit tests. I have also tested these changes in Linkedin's internal fork on a cluster.

Co-authored-by: Chandni Singh [email protected]
Co-authored-by: Min Shen [email protected]

@github-actions github-actions bot added the CORE label Mar 22, 2021
@otterc
Copy link
Contributor Author

otterc commented Mar 22, 2021

@tgravescs @Ngone51 @attilapiros @mridulm @Victsm
Please help review these bug fixes

@tgravescs
Copy link
Contributor

does this cause data corruption if people use it with spark 3.1.1 release? Or are these block somehow caught and shuffle ends up failing?

@otterc
Copy link
Contributor Author

otterc commented Mar 22, 2021

does this cause data corruption if people use it with spark 3.1.1 release? Or are these block somehow caught and shuffle ends up failing?

  • In 3.1.1 push-based shuffle is not complete. As in, the changes needed to fetch merged shuffle blocks are not there. So, users in 3.1.1 can't use push-based shuffle.

  • In our implementation of fetch merged shuffle blocks (which is not in 3.1.1), if the client encounters any issues with the merged blocks then it falls-back to fetching original unmerged blocks that made that merged block.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@otterc
Copy link
Contributor Author

otterc commented Mar 23, 2021

This test failure is unrelated:

[info] *** 1 TEST FAILED ***
[error] Failed: Total 2998, Failed 1, Errors 0, Passed 2997, Ignored 7, Canceled 1
[error] Failed tests:
[error] 	org.apache.spark.storage.FallbackStorageSuite

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @otterc .
SPARK-32916 is already released with Fix Version: 3.1.0. To have an independent Fix Version, this should have a new JIRA issue.

Copy link
Member

@Ngone51 Ngone51 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm..I try to understand how those 2 scenarios cause the merged block corrupted.

  1. Do you mean called StreamCallback.onFailure() for 2 times cause the block corrupted?Seems like the thing onFailure does is only to setCurrentMapIndex(-1) and setEncounteredFailure(true). And they don't touch files, e.g., reset position or truncate.

  2. I can see how the duplicate stream may interfere with an active stream. e.g., the active stream may see getCurrentMapIndex < 0 and isEncounteredFailure=true while writing normally itself. But it seems like the active stream is able to heal itself with the current framework.

I properly missed some details. Could you elaborate more about how corruption happens? Thanks.

@otterc otterc force-pushed the SPARK-32916-followup branch from 5aaaa6f to c33f961 Compare March 23, 2021 16:09
@otterc otterc changed the title [SPARK-32916][FOLLOW-UP][SHUFFLE] Fixes cases of corruption in merged shuffle … [SPARK-34840][SHUFFLE] Fixes cases of corruption in merged shuffle … Mar 23, 2021
@otterc
Copy link
Contributor Author

otterc commented Mar 23, 2021

Hi, @otterc .
SPARK-32916 is already released with Fix Version: 3.1.0. To have an independent Fix Version, this should have a new JIRA issue.

I created SPARK-34840 to address this. cc. @dongjoon-hyun

@dongjoon-hyun
Copy link
Member

Thank you, @otterc !

@dongjoon-hyun dongjoon-hyun dismissed their stale review March 23, 2021 16:28

New JIRA is created.

@otterc
Copy link
Contributor Author

otterc commented Mar 23, 2021

hmm..I try to understand how those 2 scenarios cause the merged block corrupted.

  1. Do you mean called StreamCallback.onFailure() for 2 times cause the block corrupted?Seems like the thing onFailure does is only to setCurrentMapIndex(-1) and setEncounteredFailure(true). And they don't touch files, e.g., reset position or truncate.
  2. I can see how the duplicate stream may interfere with an active stream. e.g., the active stream may see getCurrentMapIndex < 0 and isEncounteredFailure=true while writing normally itself. But it seems like the active stream is able to heal itself with the current framework.

I properly missed some details. Could you elaborate more about how corruption happens? Thanks.

In both the scenarios, the currentMapId of the shuffle partition is modified to -1 which can interfere with an active stream (stream that is writing). By interfering, I mean it gives a chance to another stream which is waiting to merge to same shuffle partition to start writing without the active stream completing successfully or with failure.

Providing examples for both of these:

  1. When on onFailure is called twice
  • Say stream1 merging shufflePush_0_1_2 wrote some data and has isWriting=true. Now it failed, so it sets currentMapId of partition_0_2 to -1.
  • Another stream2 which wants to merge shufflePush_0_2_2 can now start merging its bufs to partition_0_2 and it sets currentMapId of partition_0_2 to 2.
  • Another stream3 which wants to merge shufflePush_0_3_2 will defer its buffers because stream2 is the active one right now (currentMapId is 2).
  • stream2 has only merged few bufs, but then stream1.onFailure() is invoked again and that will change the currentMapId of partiton_0_2 to -1. This becomes a problem because stream2 hasn't completed successfully (or with failure) and now stream3 is allowedToWrite. If stream3 starts writing buffers when stream2 has not appended all its buffers, then the data of shufflePush_0_2_2 will be corrupted.
  1. Duplicate stream.
  • Say stream1 merging shufflePush_0_1_2 wrote some data and has isWriting=true. It completed successfully and then sets currentMapId of partition_0_2 to -1.
  • Now stream1duplicate which is also trying to merge shufflePush_0_1_2 will be allowedToWrite because the currentMapId of partition_0_2 is -1 and it sets isWriting=true. However, we identify that it is a duplication stream and just return without modifying currentMapId.
  • stream2 which tries to merge shufflePush_0_2_2 will be allowedToWrite because currentMapId=-1. It sets currentMapId=2 and start writing.
  • If stream1Duplicate encounters a failure now, it has isWriting on and so can reset currentMapId of partition_0_2. This again gives a chance to another stream say stream3 to allowedToWrite without stream2 to complete.

I have added UTs for both these cases as well with similar examples.
@Ngone51

@Ngone51
Copy link
Member

Ngone51 commented Mar 24, 2021

@otterc Thanks for the explanation. Now I understand the cause.

To confirm, for the example 2, I think the first 2 steps are not necessary, right?

@otterc
Copy link
Contributor Author

otterc commented Mar 24, 2021

To confirm, for the example 2, I think the first 2 steps are not necessary, right?

@Ngone51 I think the first 2 steps are necessary because in this edge case this can only happen when a stream is trying to merge a duplicate block, which was stream1duplicate in my example, and fails. The problem with that is that we were setting isWriting=true early in such cases. So when it fails then it can unset currentMapId.

Let me know if I am missing some other cases. I can add UTs for them as well.

@Ngone51
Copy link
Member

Ngone51 commented Mar 24, 2021

  • If stream2Duplicate encounters a failure now, it has isWriting on and so can reset currentMapId of partition_0_2. This again gives a chance to another stream say stream3 to allowedToWrite without stream2 to complete.

So, it should be stream1Duplicate instead of stream2Duplicate here?

@otterc
Copy link
Contributor Author

otterc commented Mar 24, 2021

  • If stream2Duplicate encounters a failure now, it has isWriting on and so can reset currentMapId of partition_0_2. This again gives a chance to another stream say stream3 to allowedToWrite without stream2 to complete.

So, it should be stream1Duplicate instead of stream2Duplicate here?

Right, that was a typo. Yes, it should be stream1Duplicate. Thanks for pointing it out.
I edited the example as well for others if they go through it.

@Ngone51
Copy link
Member

Ngone51 commented Mar 24, 2021

Ok, I get it now.

Copy link
Member

@Ngone51 Ngone51 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, except one minor comment.

}
}
}
isWriting = false;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this into the if condition scope?

Copy link
Contributor Author

@otterc otterc Mar 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can move this to if scope and that would not change the behavior or cause any issues. The only reason I had it outside because it was consistent with where this flag is unset in onComplete. I understand that is a very trivial cosmetic reason so can move this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, keeping it consistent sounds fine. we can leave it as it is since it's trivial.

@mridulm
Copy link
Contributor

mridulm commented Mar 25, 2021

LGTM, thanks @otterc.
Merging to master and 3.1

@asfgit asfgit closed this in 6d88212 Mar 25, 2021
asfgit pushed a commit that referenced this pull request Mar 25, 2021
### What changes were proposed in this pull request?
This PR fixes bugs that causes corruption of push-merged blocks when a client terminates while pushing block. `RemoteBlockPushResolver` was introduced in #30062 (SPARK-32916).

There are 2 scenarios where the merged blocks get corrupted:
1. `StreamCallback.onFailure()` is called more than once. Initially we assumed that the onFailure callback will be called just once per stream. However, we observed that this is called twice when a client connection is reset. When the client connection is reset then there are 2 events that get triggered in this order.
 - `exceptionCaught`. This event is propagated to `StreamInterceptor`. `StreamInterceptor.exceptionCaught()` invokes `callback.onFailure(streamId, cause)`. This is the first time StreamCallback.onFailure() will be invoked.
 - `channelInactive`. Since the channel closes, the `channelInactive` event gets triggered which again is propagated to `StreamInterceptor`. `StreamInterceptor.channelInactive()` invokes `callback.onFailure(streamId, new ClosedChannelException())`. This is the second time  StreamCallback.onFailure() will be invoked.

2. The flag `isWriting` is set prematurely to true. This introduces an edge case where a stream that is trying to merge a duplicate block (created because of a speculative task) may interfere with an active stream if the duplicate stream fails.

Also adding additional changes that improve the code.

1.  Using positional writes all the time because this simplifies the code and with microbenchmarking haven't seen any performance impact.
2. Additional minor changes suggested by mridulm during an internal review.

### Why are the changes needed?
These are bug fixes and simplify the code.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Added unit tests. I have also tested these changes in Linkedin's internal fork on a cluster.

Co-authored-by: Chandni Singh chsinghlinkedin.com
Co-authored-by: Min Shen mshenlinkedin.com

Closes #31934 from otterc/SPARK-32916-followup.

Lead-authored-by: Chandni Singh <[email protected]>
Co-authored-by: Min Shen <[email protected]>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
(cherry picked from commit 6d88212)
Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
@mridulm
Copy link
Contributor

mridulm commented Mar 25, 2021

Thanks for the reviews @Ngone51, @dongjoon-hyun !

flyrain pushed a commit to flyrain/spark that referenced this pull request Sep 21, 2021
### What changes were proposed in this pull request?
This PR fixes bugs that causes corruption of push-merged blocks when a client terminates while pushing block. `RemoteBlockPushResolver` was introduced in apache#30062 (SPARK-32916).

There are 2 scenarios where the merged blocks get corrupted:
1. `StreamCallback.onFailure()` is called more than once. Initially we assumed that the onFailure callback will be called just once per stream. However, we observed that this is called twice when a client connection is reset. When the client connection is reset then there are 2 events that get triggered in this order.
 - `exceptionCaught`. This event is propagated to `StreamInterceptor`. `StreamInterceptor.exceptionCaught()` invokes `callback.onFailure(streamId, cause)`. This is the first time StreamCallback.onFailure() will be invoked.
 - `channelInactive`. Since the channel closes, the `channelInactive` event gets triggered which again is propagated to `StreamInterceptor`. `StreamInterceptor.channelInactive()` invokes `callback.onFailure(streamId, new ClosedChannelException())`. This is the second time  StreamCallback.onFailure() will be invoked.

2. The flag `isWriting` is set prematurely to true. This introduces an edge case where a stream that is trying to merge a duplicate block (created because of a speculative task) may interfere with an active stream if the duplicate stream fails.

Also adding additional changes that improve the code.

1.  Using positional writes all the time because this simplifies the code and with microbenchmarking haven't seen any performance impact.
2. Additional minor changes suggested by mridulm during an internal review.

### Why are the changes needed?
These are bug fixes and simplify the code.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Added unit tests. I have also tested these changes in Linkedin's internal fork on a cluster.

Co-authored-by: Chandni Singh chsinghlinkedin.com
Co-authored-by: Min Shen mshenlinkedin.com

Closes apache#31934 from otterc/SPARK-32916-followup.

Lead-authored-by: Chandni Singh <[email protected]>
Co-authored-by: Min Shen <[email protected]>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
(cherry picked from commit 6d88212)
Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
fishcus pushed a commit to fishcus/spark that referenced this pull request Jan 12, 2022
### What changes were proposed in this pull request?
This PR fixes bugs that causes corruption of push-merged blocks when a client terminates while pushing block. `RemoteBlockPushResolver` was introduced in apache#30062 (SPARK-32916).

There are 2 scenarios where the merged blocks get corrupted:
1. `StreamCallback.onFailure()` is called more than once. Initially we assumed that the onFailure callback will be called just once per stream. However, we observed that this is called twice when a client connection is reset. When the client connection is reset then there are 2 events that get triggered in this order.
 - `exceptionCaught`. This event is propagated to `StreamInterceptor`. `StreamInterceptor.exceptionCaught()` invokes `callback.onFailure(streamId, cause)`. This is the first time StreamCallback.onFailure() will be invoked.
 - `channelInactive`. Since the channel closes, the `channelInactive` event gets triggered which again is propagated to `StreamInterceptor`. `StreamInterceptor.channelInactive()` invokes `callback.onFailure(streamId, new ClosedChannelException())`. This is the second time  StreamCallback.onFailure() will be invoked.

2. The flag `isWriting` is set prematurely to true. This introduces an edge case where a stream that is trying to merge a duplicate block (created because of a speculative task) may interfere with an active stream if the duplicate stream fails.

Also adding additional changes that improve the code.

1.  Using positional writes all the time because this simplifies the code and with microbenchmarking haven't seen any performance impact.
2. Additional minor changes suggested by mridulm during an internal review.

### Why are the changes needed?
These are bug fixes and simplify the code.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Added unit tests. I have also tested these changes in Linkedin's internal fork on a cluster.

Co-authored-by: Chandni Singh chsinghlinkedin.com
Co-authored-by: Min Shen mshenlinkedin.com

Closes apache#31934 from otterc/SPARK-32916-followup.

Lead-authored-by: Chandni Singh <[email protected]>
Co-authored-by: Min Shen <[email protected]>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
(cherry picked from commit 6d88212)
Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
domybest11 pushed a commit to domybest11/spark that referenced this pull request Jun 15, 2022
### What changes were proposed in this pull request?
This PR fixes bugs that causes corruption of push-merged blocks when a client terminates while pushing block. `RemoteBlockPushResolver` was introduced in apache#30062 (SPARK-32916).

There are 2 scenarios where the merged blocks get corrupted:
1. `StreamCallback.onFailure()` is called more than once. Initially we assumed that the onFailure callback will be called just once per stream. However, we observed that this is called twice when a client connection is reset. When the client connection is reset then there are 2 events that get triggered in this order.
 - `exceptionCaught`. This event is propagated to `StreamInterceptor`. `StreamInterceptor.exceptionCaught()` invokes `callback.onFailure(streamId, cause)`. This is the first time StreamCallback.onFailure() will be invoked.
 - `channelInactive`. Since the channel closes, the `channelInactive` event gets triggered which again is propagated to `StreamInterceptor`. `StreamInterceptor.channelInactive()` invokes `callback.onFailure(streamId, new ClosedChannelException())`. This is the second time  StreamCallback.onFailure() will be invoked.

2. The flag `isWriting` is set prematurely to true. This introduces an edge case where a stream that is trying to merge a duplicate block (created because of a speculative task) may interfere with an active stream if the duplicate stream fails.

Also adding additional changes that improve the code.

1.  Using positional writes all the time because this simplifies the code and with microbenchmarking haven't seen any performance impact.
2. Additional minor changes suggested by mridulm during an internal review.

### Why are the changes needed?
These are bug fixes and simplify the code.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Added unit tests. I have also tested these changes in Linkedin's internal fork on a cluster.

Co-authored-by: Chandni Singh chsinghlinkedin.com
Co-authored-by: Min Shen mshenlinkedin.com

Closes apache#31934 from otterc/SPARK-32916-followup.

Lead-authored-by: Chandni Singh <[email protected]>
Co-authored-by: Min Shen <[email protected]>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
wangyum pushed a commit that referenced this pull request May 26, 2023
### What changes were proposed in this pull request?
This PR fixes bugs that causes corruption of push-merged blocks when a client terminates while pushing block. `RemoteBlockPushResolver` was introduced in #30062 (SPARK-32916).

There are 2 scenarios where the merged blocks get corrupted:
1. `StreamCallback.onFailure()` is called more than once. Initially we assumed that the onFailure callback will be called just once per stream. However, we observed that this is called twice when a client connection is reset. When the client connection is reset then there are 2 events that get triggered in this order.
 - `exceptionCaught`. This event is propagated to `StreamInterceptor`. `StreamInterceptor.exceptionCaught()` invokes `callback.onFailure(streamId, cause)`. This is the first time StreamCallback.onFailure() will be invoked.
 - `channelInactive`. Since the channel closes, the `channelInactive` event gets triggered which again is propagated to `StreamInterceptor`. `StreamInterceptor.channelInactive()` invokes `callback.onFailure(streamId, new ClosedChannelException())`. This is the second time  StreamCallback.onFailure() will be invoked.

2. The flag `isWriting` is set prematurely to true. This introduces an edge case where a stream that is trying to merge a duplicate block (created because of a speculative task) may interfere with an active stream if the duplicate stream fails.

Also adding additional changes that improve the code.

1.  Using positional writes all the time because this simplifies the code and with microbenchmarking haven't seen any performance impact.
2. Additional minor changes suggested by mridulm during an internal review.

### Why are the changes needed?
These are bug fixes and simplify the code.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Added unit tests. I have also tested these changes in Linkedin's internal fork on a cluster.

Co-authored-by: Chandni Singh chsinghlinkedin.com
Co-authored-by: Min Shen mshenlinkedin.com

Closes #31934 from otterc/SPARK-32916-followup.

Lead-authored-by: Chandni Singh <[email protected]>
Co-authored-by: Min Shen <[email protected]>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants