diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 4cda4b180d97..8ef0c3719856 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -413,6 +413,34 @@ package object config { .intConf .createWithDefault(1) + private[spark] val STORAGE_DECOMMISSION_ENABLED = + ConfigBuilder("spark.storage.decommission.enabled") + .doc("Whether to decommission the block manager when decommissioning executor") + .version("3.1.0") + .booleanConf + .createWithDefault(false) + + private[spark] val STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK = + ConfigBuilder("spark.storage.decommission.maxReplicationFailuresPerBlock") + .internal() + .doc("Maximum number of failures which can be handled for the replication of " + + "one RDD block when block manager is decommissioning and trying to move its " + + "existing blocks.") + .version("3.1.0") + .intConf + .createWithDefault(3) + + private[spark] val STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL = + ConfigBuilder("spark.storage.decommission.replicationReattemptInterval") + .internal() + .doc("The interval of time between consecutive cache block replication reattempts " + + "happening on each decommissioning executor (due to storage decommissioning).") + .version("3.1.0") + .timeConf(TimeUnit.MILLISECONDS) + .checkValue(_ > 0, "Time interval between two consecutive attempts of " + + "cache block replication should be positive.") + .createWithDefaultString("30s") + private[spark] val STORAGE_REPLICATION_TOPOLOGY_FILE = ConfigBuilder("spark.storage.replication.topologyFile") .version("2.1.0") diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 701d69ba4349..67638a5f9593 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -438,6 +438,19 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp logError(s"Unexpected error during decommissioning ${e.toString}", e) } logInfo(s"Finished decommissioning executor $executorId.") + + if (conf.get(STORAGE_DECOMMISSION_ENABLED)) { + try { + logInfo("Starting decommissioning block manager corresponding to " + + s"executor $executorId.") + scheduler.sc.env.blockManager.master.decommissionBlockManagers(Seq(executorId)) + } catch { + case e: Exception => + logError("Unexpected error during block manager " + + s"decommissioning for executor $executorId: ${e.toString}", e) + } + logInfo(s"Acknowledged decommissioning block manager corresponding to $executorId.") + } } else { logInfo(s"Skipping decommissioning of executor $executorId.") } @@ -574,7 +587,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp */ private[spark] def decommissionExecutor(executorId: String): Unit = { if (driverEndpoint != null) { - logInfo("Propegating executor decommission to driver.") + logInfo("Propagating executor decommission to driver.") driverEndpoint.send(DecommissionExecutor(executorId)) } } @@ -658,7 +671,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp /** * Update the cluster manager on our scheduling needs. Three bits of information are included * to help it make decisions. - * @param resourceProfileToNumExecutors The total number of executors we'd like to have per + * @param resourceProfileIdToNumExecutors The total number of executors we'd like to have per * ResourceProfile. The cluster manager shouldn't kill any * running executor to reach this number, but, if all * existing executors were to die, this is the number diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index e7f8de5ab7e4..e0478ad09601 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -54,6 +54,7 @@ import org.apache.spark.rpc.RpcEnv import org.apache.spark.scheduler.ExecutorCacheTaskLocation import org.apache.spark.serializer.{SerializerInstance, SerializerManager} import org.apache.spark.shuffle.{ShuffleManager, ShuffleWriteMetricsReporter} +import org.apache.spark.storage.BlockManagerMessages.ReplicateBlock import org.apache.spark.storage.memory._ import org.apache.spark.unsafe.Platform import org.apache.spark.util._ @@ -241,6 +242,9 @@ private[spark] class BlockManager( private var blockReplicationPolicy: BlockReplicationPolicy = _ + private var blockManagerDecommissioning: Boolean = false + private var decommissionManager: Option[BlockManagerDecommissionManager] = None + // A DownloadFileManager used to track all the files of remote blocks which are above the // specified memory threshold. Files will be deleted automatically based on weak reference. // Exposed for test @@ -1551,18 +1555,22 @@ private[spark] class BlockManager( } /** - * Called for pro-active replenishment of blocks lost due to executor failures + * Replicates a block to peer block managers based on existingReplicas and maxReplicas * * @param blockId blockId being replicate * @param existingReplicas existing block managers that have a replica * @param maxReplicas maximum replicas needed + * @param maxReplicationFailures number of replication failures to tolerate before + * giving up. + * @return whether block was successfully replicated or not */ def replicateBlock( blockId: BlockId, existingReplicas: Set[BlockManagerId], - maxReplicas: Int): Unit = { + maxReplicas: Int, + maxReplicationFailures: Option[Int] = None): Boolean = { logInfo(s"Using $blockManagerId to pro-actively replicate $blockId") - blockInfoManager.lockForReading(blockId).foreach { info => + blockInfoManager.lockForReading(blockId).forall { info => val data = doGetLocalBytes(blockId, info) val storageLevel = StorageLevel( useDisk = info.level.useDisk, @@ -1570,11 +1578,13 @@ private[spark] class BlockManager( useOffHeap = info.level.useOffHeap, deserialized = info.level.deserialized, replication = maxReplicas) - // we know we are called as a result of an executor removal, so we refresh peer cache - // this way, we won't try to replicate to a missing executor with a stale reference + // we know we are called as a result of an executor removal or because the current executor + // is getting decommissioned. so we refresh peer cache before trying replication, we won't + // try to replicate to a missing executor/another decommissioning executor getPeers(forceFetch = true) try { - replicate(blockId, data, storageLevel, info.classTag, existingReplicas) + replicate( + blockId, data, storageLevel, info.classTag, existingReplicas, maxReplicationFailures) } finally { logDebug(s"Releasing lock for $blockId") releaseLockAndDispose(blockId, data) @@ -1591,9 +1601,11 @@ private[spark] class BlockManager( data: BlockData, level: StorageLevel, classTag: ClassTag[_], - existingReplicas: Set[BlockManagerId] = Set.empty): Unit = { + existingReplicas: Set[BlockManagerId] = Set.empty, + maxReplicationFailures: Option[Int] = None): Boolean = { - val maxReplicationFailures = conf.get(config.STORAGE_MAX_REPLICATION_FAILURE) + val maxReplicationFailureCount = maxReplicationFailures.getOrElse( + conf.get(config.STORAGE_MAX_REPLICATION_FAILURE)) val tLevel = StorageLevel( useDisk = level.useDisk, useMemory = level.useMemory, @@ -1617,7 +1629,7 @@ private[spark] class BlockManager( blockId, numPeersToReplicateTo) - while(numFailures <= maxReplicationFailures && + while(numFailures <= maxReplicationFailureCount && !peersForReplication.isEmpty && peersReplicatedTo.size < numPeersToReplicateTo) { val peer = peersForReplication.head @@ -1641,6 +1653,10 @@ private[spark] class BlockManager( peersForReplication = peersForReplication.tail peersReplicatedTo += peer } catch { + // Rethrow interrupt exception + case e: InterruptedException => + throw e + // Everything else we may retry case NonFatal(e) => logWarning(s"Failed to replicate $blockId to $peer, failure #$numFailures", e) peersFailedToReplicateTo += peer @@ -1665,9 +1681,11 @@ private[spark] class BlockManager( if (peersReplicatedTo.size < numPeersToReplicateTo) { logWarning(s"Block $blockId replicated to only " + s"${peersReplicatedTo.size} peer(s) instead of $numPeersToReplicateTo peers") + return false } logDebug(s"block $blockId replicated to ${peersReplicatedTo.mkString(", ")}") + return true } /** @@ -1761,6 +1779,60 @@ private[spark] class BlockManager( blocksToRemove.size } + def decommissionBlockManager(): Unit = { + if (!blockManagerDecommissioning) { + logInfo("Starting block manager decommissioning process") + blockManagerDecommissioning = true + decommissionManager = Some(new BlockManagerDecommissionManager(conf)) + decommissionManager.foreach(_.start()) + } else { + logDebug("Block manager already in decommissioning state") + } + } + + /** + * Tries to offload all cached RDD blocks from this BlockManager to peer BlockManagers + * Visible for testing + */ + def decommissionRddCacheBlocks(): Unit = { + val replicateBlocksInfo = master.getReplicateInfoForRDDBlocks(blockManagerId) + + if (replicateBlocksInfo.nonEmpty) { + logInfo(s"Need to replicate ${replicateBlocksInfo.size} blocks " + + "for block manager decommissioning") + } else { + logWarning(s"Asked to decommission RDD cache blocks, but no blocks to migrate") + return + } + + // Maximum number of storage replication failure which replicateBlock can handle + val maxReplicationFailures = conf.get( + config.STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK) + + // TODO: We can sort these blocks based on some policy (LRU/blockSize etc) + // so that we end up prioritize them over each other + val blocksFailedReplication = replicateBlocksInfo.map { + case ReplicateBlock(blockId, existingReplicas, maxReplicas) => + val replicatedSuccessfully = replicateBlock( + blockId, + existingReplicas.toSet, + maxReplicas, + maxReplicationFailures = Some(maxReplicationFailures)) + if (replicatedSuccessfully) { + logInfo(s"Block $blockId offloaded successfully, Removing block now") + removeBlock(blockId) + logInfo(s"Block $blockId removed") + } else { + logWarning(s"Failed to offload block $blockId") + } + (blockId, replicatedSuccessfully) + }.filterNot(_._2).map(_._1) + if (blocksFailedReplication.nonEmpty) { + logWarning("Blocks failed replication in cache decommissioning " + + s"process: ${blocksFailedReplication.mkString(",")}") + } + } + /** * Remove all blocks belonging to the given broadcast. */ @@ -1829,7 +1901,58 @@ private[spark] class BlockManager( data.dispose() } + /** + * Class to handle block manager decommissioning retries + * It creates a Thread to retry offloading all RDD cache blocks + */ + private class BlockManagerDecommissionManager(conf: SparkConf) { + @volatile private var stopped = false + private val sleepInterval = conf.get( + config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL) + + private val blockReplicationThread = new Thread { + override def run(): Unit = { + var failures = 0 + while (blockManagerDecommissioning + && !stopped + && !Thread.interrupted() + && failures < 20) { + try { + logDebug("Attempting to replicate all cached RDD blocks") + decommissionRddCacheBlocks() + logInfo("Attempt to replicate all cached blocks done") + Thread.sleep(sleepInterval) + } catch { + case _: InterruptedException => + logInfo("Interrupted during migration, will not refresh migrations.") + stopped = true + case NonFatal(e) => + failures += 1 + logError("Error occurred while trying to replicate cached RDD blocks" + + s" for block manager decommissioning (failure count: $failures)", e) + } + } + } + } + blockReplicationThread.setDaemon(true) + blockReplicationThread.setName("block-replication-thread") + + def start(): Unit = { + logInfo("Starting block replication thread") + blockReplicationThread.start() + } + + def stop(): Unit = { + if (!stopped) { + stopped = true + logInfo("Stopping block replication thread") + blockReplicationThread.interrupt() + } + } + } + def stop(): Unit = { + decommissionManager.foreach(_.stop()) blockTransferService.close() if (blockStoreClient ne blockTransferService) { // Closing should be idempotent, but maybe not for the NioBlockTransferService. diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index e440c1ab7bcd..3cfa5d2a2581 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -43,6 +43,16 @@ class BlockManagerMaster( logInfo("Removed " + execId + " successfully in removeExecutor") } + /** Decommission block managers corresponding to given set of executors */ + def decommissionBlockManagers(executorIds: Seq[String]): Unit = { + driverEndpoint.ask[Unit](DecommissionBlockManagers(executorIds)) + } + + /** Get Replication Info for all the RDD blocks stored in given blockManagerId */ + def getReplicateInfoForRDDBlocks(blockManagerId: BlockManagerId): Seq[ReplicateBlock] = { + driverEndpoint.askSync[Seq[ReplicateBlock]](GetReplicateInfoForRDDBlocks(blockManagerId)) + } + /** Request removal of a dead executor from the driver endpoint. * This is only called on the driver side. Non-blocking */ diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index d7f7eedc7f33..d936420a9927 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -65,6 +65,9 @@ class BlockManagerMasterEndpoint( // Mapping from executor ID to block manager ID. private val blockManagerIdByExecutor = new mutable.HashMap[String, BlockManagerId] + // Set of block managers which are decommissioning + private val decommissioningBlockManagerSet = new mutable.HashSet[BlockManagerId] + // Mapping from block id to the set of block managers that have the block. private val blockLocations = new JHashMap[BlockId, mutable.HashSet[BlockManagerId]] @@ -153,6 +156,13 @@ class BlockManagerMasterEndpoint( removeExecutor(execId) context.reply(true) + case DecommissionBlockManagers(executorIds) => + decommissionBlockManagers(executorIds.flatMap(blockManagerIdByExecutor.get)) + context.reply(true) + + case GetReplicateInfoForRDDBlocks(blockManagerId) => + context.reply(getReplicateInfoForRDDBlocks(blockManagerId)) + case StopBlockManagerMaster => context.reply(true) stop() @@ -257,6 +267,7 @@ class BlockManagerMasterEndpoint( // Remove the block manager from blockManagerIdByExecutor. blockManagerIdByExecutor -= blockManagerId.executorId + decommissioningBlockManagerSet.remove(blockManagerId) // Remove it from blockManagerInfo and remove all the blocks. blockManagerInfo.remove(blockManagerId) @@ -299,6 +310,39 @@ class BlockManagerMasterEndpoint( blockManagerIdByExecutor.get(execId).foreach(removeBlockManager) } + /** + * Decommission the given Seq of blockmanagers + * - Adds these block managers to decommissioningBlockManagerSet Set + * - Sends the DecommissionBlockManager message to each of the [[BlockManagerSlaveEndpoint]] + */ + def decommissionBlockManagers(blockManagerIds: Seq[BlockManagerId]): Future[Seq[Unit]] = { + val newBlockManagersToDecommission = blockManagerIds.toSet.diff(decommissioningBlockManagerSet) + val futures = newBlockManagersToDecommission.map { blockManagerId => + decommissioningBlockManagerSet.add(blockManagerId) + val info = blockManagerInfo(blockManagerId) + info.slaveEndpoint.ask[Unit](DecommissionBlockManager) + } + Future.sequence{ futures.toSeq } + } + + /** + * Returns a Seq of ReplicateBlock for each RDD block stored by given blockManagerId + * @param blockManagerId - block manager id for which ReplicateBlock info is needed + * @return Seq of ReplicateBlock + */ + private def getReplicateInfoForRDDBlocks(blockManagerId: BlockManagerId): Seq[ReplicateBlock] = { + val info = blockManagerInfo(blockManagerId) + + val rddBlocks = info.blocks.keySet().asScala.filter(_.isRDD) + rddBlocks.map { blockId => + val currentBlockLocations = blockLocations.get(blockId) + val maxReplicas = currentBlockLocations.size + 1 + val remainingLocations = currentBlockLocations.toSeq.filter(bm => bm != blockManagerId) + val replicateMsg = ReplicateBlock(blockId, remainingLocations, maxReplicas) + replicateMsg + }.toSeq + } + // Remove a block from the slaves that have it. This can only be used to remove // blocks that the master knows about. private def removeBlockFromWorkers(blockId: BlockId): Unit = { @@ -536,7 +580,11 @@ class BlockManagerMasterEndpoint( private def getPeers(blockManagerId: BlockManagerId): Seq[BlockManagerId] = { val blockManagerIds = blockManagerInfo.keySet if (blockManagerIds.contains(blockManagerId)) { - blockManagerIds.filterNot { _.isDriver }.filterNot { _ == blockManagerId }.toSeq + blockManagerIds + .filterNot { _.isDriver } + .filterNot { _ == blockManagerId } + .diff(decommissioningBlockManagerSet) + .toSeq } else { Seq.empty } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala index 895f48d0709f..7d4f2fff5c34 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala @@ -36,6 +36,8 @@ private[spark] object BlockManagerMessages { case class ReplicateBlock(blockId: BlockId, replicas: Seq[BlockManagerId], maxReplicas: Int) extends ToBlockManagerSlave + case object DecommissionBlockManager extends ToBlockManagerSlave + // Remove all blocks belonging to a specific RDD. case class RemoveRdd(rddId: Int) extends ToBlockManagerSlave @@ -125,6 +127,11 @@ private[spark] object BlockManagerMessages { case object GetStorageStatus extends ToBlockManagerMaster + case class DecommissionBlockManagers(executorIds: Seq[String]) extends ToBlockManagerMaster + + case class GetReplicateInfoForRDDBlocks(blockManagerId: BlockManagerId) + extends ToBlockManagerMaster + case class GetBlockStatus(blockId: BlockId, askSlaves: Boolean = true) extends ToBlockManagerMaster diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala index 29e21142ce44..a3a714910349 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala @@ -61,6 +61,9 @@ class BlockManagerSlaveEndpoint( SparkEnv.get.shuffleManager.unregisterShuffle(shuffleId) } + case DecommissionBlockManager => + context.reply(blockManager.decommissionBlockManager()) + case RemoveBroadcast(broadcastId, _) => doAsync[Int]("removing broadcast " + broadcastId, context) { blockManager.removeBroadcast(broadcastId, tellMaster = true) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala new file mode 100644 index 000000000000..7456ca7f02a2 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.storage + +import java.util.concurrent.Semaphore + +import scala.collection.mutable.ArrayBuffer +import scala.concurrent.duration._ + +import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite, Success} +import org.apache.spark.internal.config +import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd, SparkListenerTaskStart} +import org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend +import org.apache.spark.util.{ResetSystemProperties, ThreadUtils} + +class BlockManagerDecommissionSuite extends SparkFunSuite with LocalSparkContext + with ResetSystemProperties { + + override def beforeEach(): Unit = { + val conf = new SparkConf().setAppName("test") + .set(config.Worker.WORKER_DECOMMISSION_ENABLED, true) + .set(config.STORAGE_DECOMMISSION_ENABLED, true) + + sc = new SparkContext("local-cluster[2, 1, 1024]", "test", conf) + } + + test(s"verify that an already running task which is going to cache data succeeds " + + s"on a decommissioned executor") { + // Create input RDD with 10 partitions + val input = sc.parallelize(1 to 10, 10) + val accum = sc.longAccumulator("mapperRunAccumulator") + // Do a count to wait for the executors to be registered. + input.count() + + // Create a new RDD where we have sleep in each partition, we are also increasing + // the value of accumulator in each partition + val sleepyRdd = input.mapPartitions { x => + Thread.sleep(500) + accum.add(1) + x + } + + // Listen for the job + val sem = new Semaphore(0) + val taskEndEvents = ArrayBuffer.empty[SparkListenerTaskEnd] + sc.addSparkListener(new SparkListener { + override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { + sem.release() + } + + override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { + taskEndEvents.append(taskEnd) + } + }) + + // Cache the RDD lazily + sleepyRdd.persist() + + // Start the computation of RDD - this step will also cache the RDD + val asyncCount = sleepyRdd.countAsync() + + // Wait for the job to have started + sem.acquire(1) + + // Give Spark a tiny bit to start the tasks after the listener says hello + Thread.sleep(100) + // Decommission one of the executor + val sched = sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend] + val execs = sched.getExecutorIds() + assert(execs.size == 2, s"Expected 2 executors but found ${execs.size}") + val execToDecommission = execs.head + sched.decommissionExecutor(execToDecommission) + + // Wait for job to finish + val asyncCountResult = ThreadUtils.awaitResult(asyncCount, 6.seconds) + assert(asyncCountResult === 10) + // All 10 tasks finished, so accum should have been increased 10 times + assert(accum.value === 10) + + // All tasks should be successful, nothing should have failed + sc.listenerBus.waitUntilEmpty() + assert(taskEndEvents.size === 10) // 10 mappers + assert(taskEndEvents.map(_.reason).toSet === Set(Success)) + + // Since the RDD is cached, so further usage of same RDD should use the + // cached data. Original RDD partitions should not be recomputed i.e. accum + // should have same value like before + assert(sleepyRdd.count() === 10) + assert(accum.value === 10) + } +} diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 8d06768a2b28..bfef8f1ab29d 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -1706,6 +1706,64 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE verify(liveListenerBus, never()).post(SparkListenerBlockUpdated(BlockUpdatedInfo(updateInfo))) } + test("test decommission block manager should not be part of peers") { + val exec1 = "exec1" + val exec2 = "exec2" + val exec3 = "exec3" + val store1 = makeBlockManager(800, exec1) + val store2 = makeBlockManager(800, exec2) + val store3 = makeBlockManager(800, exec3) + + assert(master.getPeers(store3.blockManagerId).map(_.executorId).toSet === Set(exec1, exec2)) + + val data = new Array[Byte](4) + val blockId = rdd(0, 0) + store1.putSingle(blockId, data, StorageLevel.MEMORY_ONLY_2) + assert(master.getLocations(blockId).size === 2) + + master.decommissionBlockManagers(Seq(exec1)) + // store1 is decommissioned, so it should not be part of peer list for store3 + assert(master.getPeers(store3.blockManagerId).map(_.executorId).toSet === Set(exec2)) + } + + test("test decommissionRddCacheBlocks should offload all cached blocks") { + val store1 = makeBlockManager(800, "exec1") + val store2 = makeBlockManager(800, "exec2") + val store3 = makeBlockManager(800, "exec3") + + val data = new Array[Byte](4) + val blockId = rdd(0, 0) + store1.putSingle(blockId, data, StorageLevel.MEMORY_ONLY_2) + assert(master.getLocations(blockId).size === 2) + assert(master.getLocations(blockId).contains(store1.blockManagerId)) + + store1.decommissionRddCacheBlocks() + assert(master.getLocations(blockId).size === 2) + assert(master.getLocations(blockId).toSet === Set(store2.blockManagerId, + store3.blockManagerId)) + } + + test("test decommissionRddCacheBlocks should keep the block if it is not able to offload") { + val store1 = makeBlockManager(3500, "exec1") + val store2 = makeBlockManager(1000, "exec2") + + val dataLarge = new Array[Byte](1500) + val blockIdLarge = rdd(0, 0) + val dataSmall = new Array[Byte](1) + val blockIdSmall = rdd(0, 1) + + store1.putSingle(blockIdLarge, dataLarge, StorageLevel.MEMORY_ONLY) + store1.putSingle(blockIdSmall, dataSmall, StorageLevel.MEMORY_ONLY) + assert(master.getLocations(blockIdLarge) === Seq(store1.blockManagerId)) + assert(master.getLocations(blockIdSmall) === Seq(store1.blockManagerId)) + + store1.decommissionRddCacheBlocks() + // Smaller block offloaded to store2 + assert(master.getLocations(blockIdSmall) === Seq(store2.blockManagerId)) + // Larger block still present in store1 as it can't be offloaded + assert(master.getLocations(blockIdLarge) === Seq(store1.blockManagerId)) + } + class MockBlockTransferService(val maxFailures: Int) extends BlockTransferService { var numCalls = 0 var tempFileManager: DownloadFileManager = null diff --git a/dev/test-dependencies.sh b/dev/test-dependencies.sh index 936ac00f6b9e..e7efba59db92 100755 --- a/dev/test-dependencies.sh +++ b/dev/test-dependencies.sh @@ -17,7 +17,7 @@ # limitations under the License. # -set -e +set -ex FWDIR="$(cd "`dirname $0`"/..; pwd)" cd "$FWDIR"