Skip to content

Commit c3f46d5

Browse files
Ngone51Mridul Muralidharan
authored andcommitted
[SPARK-41360][CORE] Avoid BlockManager re-registration if the executor has been lost
### What changes were proposed in this pull request? This PR majorly proposes to reject the block manager re-registration if the executor has been already considered lost/dead from the scheduler backend. Along with the major proposal, this PR also includes a few other changes: * Only post `SparkListenerBlockManagerAdded` event when the registration succeeds * Return an "invalid" executor id when the re-registration fails * Do not report all blocks when the re-registration fails ### Why are the changes needed? BlockManager re-registration from lost executor (terminated/terminating executor or orphan executor) has led to some known issues, e.g., false-active executor shows up in UP (SPARK-35011), [block fetching to the dead executor](#32114 (comment)). And since there's no re-registration from the lost executor itself, it's meaningless to have BlockManager re-registration when the executor is already lost. Regarding the corner case where the re-registration event comes earlier before the lost executor is actually removed from the scheduler backend, I think it is not possible. Because re-registration will only be required when the BlockManager doesn't see the block manager in `blockManagerInfo`. And the block manager will only be removed from `blockManagerInfo` whether when the executor is already know lost or removed by the driver proactively. So the executor should always be removed from the scheduler backend first before the re-registration event comes. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit test Closes #38876 from Ngone51/fix-blockmanager-reregister. Authored-by: Yi Wu <[email protected]> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
1 parent 7cb8288 commit c3f46d5

File tree

6 files changed

+119
-16
lines changed

6 files changed

+119
-16
lines changed

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -637,9 +637,14 @@ private[spark] class BlockManager(
637637
def reregister(): Unit = {
638638
// TODO: We might need to rate limit re-registering.
639639
logInfo(s"BlockManager $blockManagerId re-registering with master")
640-
master.registerBlockManager(blockManagerId, diskBlockManager.localDirsString, maxOnHeapMemory,
641-
maxOffHeapMemory, storageEndpoint)
642-
reportAllBlocks()
640+
val id = master.registerBlockManager(blockManagerId, diskBlockManager.localDirsString,
641+
maxOnHeapMemory, maxOffHeapMemory, storageEndpoint, isReRegister = true)
642+
if (id.executorId != BlockManagerId.INVALID_EXECUTOR_ID) {
643+
reportAllBlocks()
644+
} else {
645+
logError("Exiting executor due to block manager re-registration failure")
646+
System.exit(-1)
647+
}
643648
}
644649

645650
/**

core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,4 +147,6 @@ private[spark] object BlockManagerId {
147147
}
148148

149149
private[spark] val SHUFFLE_MERGER_IDENTIFIER = "shuffle-push-merger"
150+
151+
private[spark] val INVALID_EXECUTOR_ID = "invalid"
150152
}

core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -74,11 +74,25 @@ class BlockManagerMaster(
7474
localDirs: Array[String],
7575
maxOnHeapMemSize: Long,
7676
maxOffHeapMemSize: Long,
77-
storageEndpoint: RpcEndpointRef): BlockManagerId = {
77+
storageEndpoint: RpcEndpointRef,
78+
isReRegister: Boolean = false): BlockManagerId = {
7879
logInfo(s"Registering BlockManager $id")
7980
val updatedId = driverEndpoint.askSync[BlockManagerId](
80-
RegisterBlockManager(id, localDirs, maxOnHeapMemSize, maxOffHeapMemSize, storageEndpoint))
81-
logInfo(s"Registered BlockManager $updatedId")
81+
RegisterBlockManager(
82+
id,
83+
localDirs,
84+
maxOnHeapMemSize,
85+
maxOffHeapMemSize,
86+
storageEndpoint,
87+
isReRegister
88+
)
89+
)
90+
if (updatedId.executorId == BlockManagerId.INVALID_EXECUTOR_ID) {
91+
assert(isReRegister, "Got invalid executor id from non re-register case")
92+
logInfo(s"Re-register BlockManager $id failed")
93+
} else {
94+
logInfo(s"Registered BlockManager $updatedId")
95+
}
8296
updatedId
8397
}
8498

core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala

Lines changed: 34 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -117,8 +117,10 @@ class BlockManagerMasterEndpoint(
117117
RpcUtils.makeDriverRef(CoarseGrainedSchedulerBackend.ENDPOINT_NAME, conf, rpcEnv)
118118

119119
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
120-
case RegisterBlockManager(id, localDirs, maxOnHeapMemSize, maxOffHeapMemSize, endpoint) =>
121-
context.reply(register(id, localDirs, maxOnHeapMemSize, maxOffHeapMemSize, endpoint))
120+
case RegisterBlockManager(
121+
id, localDirs, maxOnHeapMemSize, maxOffHeapMemSize, endpoint, isReRegister) =>
122+
context.reply(
123+
register(id, localDirs, maxOnHeapMemSize, maxOffHeapMemSize, endpoint, isReRegister))
122124

123125
case _updateBlockInfo @
124126
UpdateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) =>
@@ -572,7 +574,8 @@ class BlockManagerMasterEndpoint(
572574
localDirs: Array[String],
573575
maxOnHeapMemSize: Long,
574576
maxOffHeapMemSize: Long,
575-
storageEndpoint: RpcEndpointRef): BlockManagerId = {
577+
storageEndpoint: RpcEndpointRef,
578+
isReRegister: Boolean): BlockManagerId = {
576579
// the dummy id is not expected to contain the topology information.
577580
// we get that info here and respond back with a more fleshed out block manager id
578581
val id = BlockManagerId(
@@ -583,7 +586,12 @@ class BlockManagerMasterEndpoint(
583586

584587
val time = System.currentTimeMillis()
585588
executorIdToLocalDirs.put(id.executorId, localDirs)
586-
if (!blockManagerInfo.contains(id)) {
589+
// SPARK-41360: For the block manager re-registration, we should only allow it when
590+
// the executor is recognized as active by the scheduler backend. Otherwise, this kind
591+
// of re-registration from the terminating/stopped executor is meaningless and harmful.
592+
lazy val isExecutorAlive =
593+
driverEndpoint.askSync[Boolean](CoarseGrainedClusterMessages.IsExecutorAlive(id.executorId))
594+
if (!blockManagerInfo.contains(id) && (!isReRegister || isExecutorAlive)) {
587595
blockManagerIdByExecutor.get(id.executorId) match {
588596
case Some(oldId) =>
589597
// A block manager of the same executor already exists, so remove it (assumed dead)
@@ -616,10 +624,29 @@ class BlockManagerMasterEndpoint(
616624
if (pushBasedShuffleEnabled) {
617625
addMergerLocation(id)
618626
}
627+
listenerBus.post(SparkListenerBlockManagerAdded(time, id,
628+
maxOnHeapMemSize + maxOffHeapMemSize, Some(maxOnHeapMemSize), Some(maxOffHeapMemSize)))
619629
}
620-
listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxOnHeapMemSize + maxOffHeapMemSize,
621-
Some(maxOnHeapMemSize), Some(maxOffHeapMemSize)))
622-
id
630+
val updatedId = if (isReRegister && !isExecutorAlive) {
631+
assert(!blockManagerInfo.contains(id),
632+
"BlockManager re-registration shouldn't succeed when the executor is lost")
633+
634+
logInfo(s"BlockManager ($id) re-registration is rejected since " +
635+
s"the executor (${id.executorId}) has been lost")
636+
637+
// Use "invalid" as the return executor id to indicate the block manager that
638+
// re-registration failed. It's a bit hacky but fine since the returned block
639+
// manager id won't be accessed in the case of re-registration. And we'll use
640+
// this "invalid" executor id to print better logs and avoid blocks reporting.
641+
BlockManagerId(
642+
BlockManagerId.INVALID_EXECUTOR_ID,
643+
id.host,
644+
id.port,
645+
id.topologyInfo)
646+
} else {
647+
id
648+
}
649+
updatedId
623650
}
624651

625652
private def updateShuffleBlockInfo(blockId: BlockId, blockManagerId: BlockManagerId)

core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,8 @@ private[spark] object BlockManagerMessages {
6363
localDirs: Array[String],
6464
maxOnHeapMemSize: Long,
6565
maxOffHeapMemSize: Long,
66-
sender: RpcEndpointRef)
66+
sender: RpcEndpointRef,
67+
isReRegister: Boolean)
6768
extends ToBlockManagerMaster
6869

6970
case class UpdateBlockInfo(

core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala

Lines changed: 56 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -295,7 +295,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with PrivateMethodTe
295295
eventually(timeout(5.seconds)) {
296296
// make sure both bm1 and bm2 are registered at driver side BlockManagerMaster
297297
verify(master, times(2))
298-
.registerBlockManager(mc.any(), mc.any(), mc.any(), mc.any(), mc.any())
298+
.registerBlockManager(mc.any(), mc.any(), mc.any(), mc.any(), mc.any(), mc.any())
299299
assert(driverEndpoint.askSync[Boolean](
300300
CoarseGrainedClusterMessages.IsExecutorAlive(bm1Id.executorId)))
301301
assert(driverEndpoint.askSync[Boolean](
@@ -361,6 +361,44 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with PrivateMethodTe
361361
master.removeShuffle(0, true)
362362
}
363363

364+
test("SPARK-41360: Avoid block manager re-registration if the executor has been lost") {
365+
// Set up a DriverEndpoint which always returns isExecutorAlive=false
366+
rpcEnv.setupEndpoint(CoarseGrainedSchedulerBackend.ENDPOINT_NAME,
367+
new RpcEndpoint {
368+
override val rpcEnv: RpcEnv = BlockManagerSuite.this.rpcEnv
369+
370+
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
371+
case CoarseGrainedClusterMessages.RegisterExecutor(executorId, _, _, _, _, _, _, _) =>
372+
context.reply(true)
373+
case CoarseGrainedClusterMessages.IsExecutorAlive(executorId) =>
374+
// always return false
375+
context.reply(false)
376+
}
377+
}
378+
)
379+
380+
// Set up a block manager endpoint and endpoint reference
381+
val bmRef = rpcEnv.setupEndpoint(s"bm-0", new RpcEndpoint {
382+
override val rpcEnv: RpcEnv = BlockManagerSuite.this.rpcEnv
383+
384+
private def reply[T](context: RpcCallContext, response: T): Unit = {
385+
context.reply(response)
386+
}
387+
388+
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
389+
case RemoveRdd(_) => reply(context, 1)
390+
case RemoveBroadcast(_, _) => reply(context, 1)
391+
case RemoveShuffle(_) => reply(context, true)
392+
}
393+
})
394+
val bmId = BlockManagerId(s"exec-0", "localhost", 1234, None)
395+
// Register the block manager with isReRegister = true
396+
val updatedId = master.registerBlockManager(
397+
bmId, Array.empty, 2000, 0, bmRef, isReRegister = true)
398+
// The re-registration should fail since the executor is considered as dead by DriverEndpoint
399+
assert(updatedId.executorId === BlockManagerId.INVALID_EXECUTOR_ID)
400+
}
401+
364402
test("StorageLevel object caching") {
365403
val level1 = StorageLevel(false, false, false, 3)
366404
// this should return the same object as level1
@@ -669,6 +707,22 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with PrivateMethodTe
669707
val a1 = new Array[Byte](400)
670708
val a2 = new Array[Byte](400)
671709

710+
// Set up a DriverEndpoint which simulates the executor is alive (required by SPARK-41360)
711+
rpcEnv.setupEndpoint(CoarseGrainedSchedulerBackend.ENDPOINT_NAME,
712+
new RpcEndpoint {
713+
override val rpcEnv: RpcEnv = BlockManagerSuite.this.rpcEnv
714+
715+
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
716+
case CoarseGrainedClusterMessages.IsExecutorAlive(executorId) =>
717+
if (executorId == store.blockManagerId.executorId) {
718+
context.reply(true)
719+
} else {
720+
context.reply(false)
721+
}
722+
}
723+
}
724+
)
725+
672726
store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
673727
assert(master.getLocations("a1").size > 0, "master was not told about a1")
674728

@@ -2207,7 +2261,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with PrivateMethodTe
22072261
}.getMessage
22082262
assert(e.contains("TimeoutException"))
22092263
verify(master, times(0))
2210-
.registerBlockManager(mc.any(), mc.any(), mc.any(), mc.any(), mc.any())
2264+
.registerBlockManager(mc.any(), mc.any(), mc.any(), mc.any(), mc.any(), mc.any())
22112265
server.close()
22122266
}
22132267
}

0 commit comments

Comments
 (0)