Skip to content

Conversation

@CodingCat
Copy link
Contributor

What changes were proposed in this pull request?

Why are the changes needed?

Does this PR resolve a correctness bug?

Does this PR introduce any user-facing change?

How was this patch tested?

PbReportMissingShuffleIdResponse.newBuilder().setSuccess(ret).build()
context.reply(pbReportMissingShuffleIdResponse)
/*
val latestUpstreamShuffleId = shuffleIds.maxBy(_._2._1)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the original dedup logic may suffer from a race condition described as following

stage A depends on shuffle 1, due to "too early deletion", the missing report is sent and handled for shuffle 1, at this point, a new shuffle id is generated, so latestUpstreamShuffleId._2._1 is no longer UNKNOWN_MISSING_CELEBORN_SHUFFLE_ID... the missing report is handled again... then mess up everything

// be cleaned up as it is entirely unusuable
if (determinate && !isBarrierStage && !isCelebornSkewShuffleOrChildShuffle(
appShuffleId)) {
appShuffleId) && !conf.clientShuffleEarlyDeletion) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we cannot reuse the shuffle id when this feature is turned on, think about the following

stage B.0 depends on shuffle 1 which was written by stage A.0

due to "too early deletion", shuffle 1 id is lost, we need to run A.1 , now , shuffle 1 has been deleted from "registered shuffle" , if we reuse 1 as the id and send to tasks of A.1, we will fall into errors like "shuffle not registered"

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant