diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index bceb26cfd4f8a..5114cf70e3f26 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -94,6 +94,7 @@ private[spark] class ExecutorAllocationManager( client: ExecutorAllocationClient, listenerBus: LiveListenerBus, conf: SparkConf, + cleaner: Option[ContextCleaner] = None, clock: Clock = new SystemClock()) extends Logging { @@ -148,7 +149,7 @@ private[spark] class ExecutorAllocationManager( // Listener for Spark events that impact the allocation policy val listener = new ExecutorAllocationListener - val executorMonitor = new ExecutorMonitor(conf, client, clock) + val executorMonitor = new ExecutorMonitor(conf, client, listenerBus, clock) // Executor that handles the scheduling task. private val executor = @@ -194,11 +195,13 @@ private[spark] class ExecutorAllocationManager( throw new SparkException( s"s${DYN_ALLOCATION_SUSTAINED_SCHEDULER_BACKLOG_TIMEOUT.key} must be > 0!") } - // Require external shuffle service for dynamic allocation - // Otherwise, we may lose shuffle files when killing executors - if (!conf.get(config.SHUFFLE_SERVICE_ENABLED) && !testing) { - throw new SparkException("Dynamic allocation of executors requires the external " + - "shuffle service. You may enable this through spark.shuffle.service.enabled.") + if (!conf.get(config.SHUFFLE_SERVICE_ENABLED)) { + if (conf.get(config.DYN_ALLOCATION_SHUFFLE_TRACKING)) { + logWarning("Dynamic allocation without a shuffle service is an experimental feature.") + } else if (!testing) { + throw new SparkException("Dynamic allocation of executors requires the external " + + "shuffle service. You may enable this through spark.shuffle.service.enabled.") + } } if (executorAllocationRatio > 1.0 || executorAllocationRatio <= 0.0) { @@ -214,6 +217,7 @@ private[spark] class ExecutorAllocationManager( def start(): Unit = { listenerBus.addToManagementQueue(listener) listenerBus.addToManagementQueue(executorMonitor) + cleaner.foreach(_.attachListener(executorMonitor)) val scheduleTask = new Runnable() { override def run(): Unit = { diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index a0d7aa743223c..7af9067145212 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -553,14 +553,22 @@ class SparkContext(config: SparkConf) extends Logging { None } - // Optionally scale number of executors dynamically based on workload. Exposed for testing. + _cleaner = + if (_conf.get(CLEANER_REFERENCE_TRACKING)) { + Some(new ContextCleaner(this)) + } else { + None + } + _cleaner.foreach(_.start()) + val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf) _executorAllocationManager = if (dynamicAllocationEnabled) { schedulerBackend match { case b: ExecutorAllocationClient => Some(new ExecutorAllocationManager( - schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf)) + schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf, + cleaner = cleaner)) case _ => None } @@ -569,14 +577,6 @@ class SparkContext(config: SparkConf) extends Logging { } _executorAllocationManager.foreach(_.start()) - _cleaner = - if (_conf.get(CLEANER_REFERENCE_TRACKING)) { - Some(new ContextCleaner(this)) - } else { - None - } - _cleaner.foreach(_.start()) - setupAndStartListenerBus() postEnvironmentUpdate() postApplicationStart() diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 76d3d6ee3d8f8..f2b88fe00cdf9 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -369,6 +369,17 @@ package object config { .checkValue(_ >= 0L, "Timeout must be >= 0.") .createWithDefault(60) + private[spark] val DYN_ALLOCATION_SHUFFLE_TRACKING = + ConfigBuilder("spark.dynamicAllocation.shuffleTracking.enabled") + .booleanConf + .createWithDefault(false) + + private[spark] val DYN_ALLOCATION_SHUFFLE_TIMEOUT = + ConfigBuilder("spark.dynamicAllocation.shuffleTimeout") + .timeConf(TimeUnit.MILLISECONDS) + .checkValue(_ >= 0L, "Timeout must be >= 0.") + .createWithDefault(Long.MaxValue) + private[spark] val DYN_ALLOCATION_SCHEDULER_BACKLOG_TIMEOUT = ConfigBuilder("spark.dynamicAllocation.schedulerBacklogTimeout") .timeConf(TimeUnit.SECONDS).createWithDefault(1) diff --git a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala index 33a68f24bd53a..e3216151462bd 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala @@ -37,7 +37,8 @@ class StageInfo( val parentIds: Seq[Int], val details: String, val taskMetrics: TaskMetrics = null, - private[spark] val taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty) { + private[spark] val taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty, + private[spark] val shuffleDepId: Option[Int] = None) { /** When this stage was submitted from the DAGScheduler to a TaskScheduler. */ var submissionTime: Option[Long] = None /** Time when all tasks in the stage completed or when the stage was cancelled. */ @@ -90,6 +91,10 @@ private[spark] object StageInfo { ): StageInfo = { val ancestorRddInfos = stage.rdd.getNarrowAncestors.map(RDDInfo.fromRdd) val rddInfos = Seq(RDDInfo.fromRdd(stage.rdd)) ++ ancestorRddInfos + val shuffleDepId = stage match { + case sms: ShuffleMapStage => Option(sms.shuffleDep).map(_.shuffleId) + case _ => None + } new StageInfo( stage.id, attemptId, @@ -99,6 +104,7 @@ private[spark] object StageInfo { stage.parents.map(_.id), stage.details, taskMetrics, - taskLocalityPreferences) + taskLocalityPreferences, + shuffleDepId) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala index 9aac4d2281ecf..f5beb403555e9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala @@ -36,14 +36,19 @@ import org.apache.spark.util.Clock private[spark] class ExecutorMonitor( conf: SparkConf, client: ExecutorAllocationClient, - clock: Clock) extends SparkListener with Logging { + listenerBus: LiveListenerBus, + clock: Clock) extends SparkListener with CleanerListener with Logging { private val idleTimeoutMs = TimeUnit.SECONDS.toMillis( conf.get(DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT)) private val storageTimeoutMs = TimeUnit.SECONDS.toMillis( conf.get(DYN_ALLOCATION_CACHED_EXECUTOR_IDLE_TIMEOUT)) + private val shuffleTimeoutMs = conf.get(DYN_ALLOCATION_SHUFFLE_TIMEOUT) + private val fetchFromShuffleSvcEnabled = conf.get(SHUFFLE_SERVICE_ENABLED) && conf.get(SHUFFLE_SERVICE_FETCH_RDD_ENABLED) + private val shuffleTrackingEnabled = !conf.get(SHUFFLE_SERVICE_ENABLED) && + conf.get(DYN_ALLOCATION_SHUFFLE_TRACKING) private val executors = new ConcurrentHashMap[String, Tracker]() @@ -64,6 +69,26 @@ private[spark] class ExecutorMonitor( private val nextTimeout = new AtomicLong(Long.MaxValue) private var timedOutExecs = Seq.empty[String] + // Active job tracking. + // + // The following state is used when an external shuffle service is not in use, and allows Spark + // to scale down based on whether the shuffle data stored in executors is in use. + // + // The algorithm works as following: when jobs start, some state is kept that tracks which stages + // are part of that job, and which shuffle ID is attached to those stages. As tasks finish, the + // executor tracking code is updated to include the list of shuffles for which it's storing + // shuffle data. + // + // If executors hold shuffle data that is related to an active job, then the executor is + // considered to be in "shuffle busy" state; meaning that the executor is not allowed to be + // removed. If the executor has shuffle data but it doesn't relate to any active job, then it + // may be removed when idle, following the shuffle-specific timeout configuration. + // + // The following fields are not thread-safe and should be only used from the event thread. + private val shuffleToActiveJobs = new mutable.HashMap[Int, mutable.ArrayBuffer[Int]]() + private val stageToShuffleID = new mutable.HashMap[Int, Int]() + private val jobToStageIDs = new mutable.HashMap[Int, Seq[Int]]() + def reset(): Unit = { executors.clear() nextTimeout.set(Long.MaxValue) @@ -85,7 +110,7 @@ private[spark] class ExecutorMonitor( var newNextTimeout = Long.MaxValue timedOutExecs = executors.asScala - .filter { case (_, exec) => !exec.pendingRemoval } + .filter { case (_, exec) => !exec.pendingRemoval && !exec.hasActiveShuffle } .filter { case (_, exec) => val deadline = exec.timeoutAt if (deadline > now) { @@ -124,6 +149,109 @@ private[spark] class ExecutorMonitor( def pendingRemovalCount: Int = executors.asScala.count { case (_, exec) => exec.pendingRemoval } + override def onJobStart(event: SparkListenerJobStart): Unit = { + if (!shuffleTrackingEnabled) { + return + } + + val shuffleStages = event.stageInfos.flatMap { s => + s.shuffleDepId.toSeq.map { shuffleId => + s.stageId -> shuffleId + } + } + + var updateExecutors = false + shuffleStages.foreach { case (stageId, shuffle) => + val jobIDs = shuffleToActiveJobs.get(shuffle) match { + case Some(jobs) => + // If a shuffle is being re-used, we need to re-scan the executors and update their + // tracker with the information that the shuffle data they're storing is in use. + logDebug(s"Reusing shuffle $shuffle in job ${event.jobId}.") + updateExecutors = true + jobs + + case _ => + logDebug(s"Registered new shuffle $shuffle (from stage $stageId).") + val jobs = new mutable.ArrayBuffer[Int]() + shuffleToActiveJobs(shuffle) = jobs + jobs + } + jobIDs += event.jobId + } + + if (updateExecutors) { + val activeShuffleIds = shuffleStages.map(_._2).toSeq + var needTimeoutUpdate = false + val activatedExecs = new mutable.ArrayBuffer[String]() + executors.asScala.foreach { case (id, exec) => + if (!exec.hasActiveShuffle) { + exec.updateActiveShuffles(activeShuffleIds) + if (exec.hasActiveShuffle) { + needTimeoutUpdate = true + activatedExecs += id + } + } + } + + logDebug(s"Activated executors ${activatedExecs.mkString(",")} due to shuffle data " + + s"needed by new job ${event.jobId}.") + + if (needTimeoutUpdate) { + nextTimeout.set(Long.MinValue) + } + } + + stageToShuffleID ++= shuffleStages + jobToStageIDs(event.jobId) = shuffleStages.map(_._1).toSeq + } + + override def onJobEnd(event: SparkListenerJobEnd): Unit = { + if (!shuffleTrackingEnabled) { + return + } + + var updateExecutors = false + val activeShuffles = new mutable.ArrayBuffer[Int]() + shuffleToActiveJobs.foreach { case (shuffleId, jobs) => + jobs -= event.jobId + if (jobs.nonEmpty) { + activeShuffles += shuffleId + } else { + // If a shuffle went idle we need to update all executors to make sure they're correctly + // tracking active shuffles. + updateExecutors = true + } + } + + if (updateExecutors) { + if (log.isDebugEnabled()) { + if (activeShuffles.nonEmpty) { + logDebug( + s"Job ${event.jobId} ended, shuffles ${activeShuffles.mkString(",")} still active.") + } else { + logDebug(s"Job ${event.jobId} ended, no active shuffles remain.") + } + } + + val deactivatedExecs = new mutable.ArrayBuffer[String]() + executors.asScala.foreach { case (id, exec) => + if (exec.hasActiveShuffle) { + exec.updateActiveShuffles(activeShuffles) + if (!exec.hasActiveShuffle) { + deactivatedExecs += id + } + } + } + + logDebug(s"Executors ${deactivatedExecs.mkString(",")} do not have active shuffle data " + + s"after job ${event.jobId} finished.") + } + + jobToStageIDs.remove(event.jobId).foreach { stages => + stages.foreach { id => stageToShuffleID -= id } + } + } + override def onTaskStart(event: SparkListenerTaskStart): Unit = { val executorId = event.taskInfo.executorId // Guard against a late arriving task start event (SPARK-26927). @@ -137,6 +265,21 @@ private[spark] class ExecutorMonitor( val executorId = event.taskInfo.executorId val exec = executors.get(executorId) if (exec != null) { + // If the task succeeded and the stage generates shuffle data, record that this executor + // holds data for the shuffle. This code will track all executors that generate shuffle + // for the stage, even if speculative tasks generate duplicate shuffle data and end up + // being ignored by the map output tracker. + // + // This means that an executor may be marked as having shuffle data, and thus prevented + // from being removed, even though the data may not be used. + if (shuffleTrackingEnabled && event.reason == Success) { + stageToShuffleID.get(event.stageId).foreach { shuffleId => + exec.addShuffle(shuffleId) + } + } + + // Update the number of running tasks after checking for shuffle data, so that the shuffle + // information is up-to-date in case the executor is going idle. exec.updateRunningTasks(-1) } } @@ -171,7 +314,6 @@ private[spark] class ExecutorMonitor( // available. So don't count blocks that can be served by the external service. if (storageLevel.isValid && (!fetchFromShuffleSvcEnabled || !storageLevel.useDisk)) { val hadCachedBlocks = exec.cachedBlocks.nonEmpty - val blocks = exec.cachedBlocks.getOrElseUpdate(blockId.rddId, new mutable.BitSet(blockId.splitIndex)) blocks += blockId.splitIndex @@ -201,6 +343,25 @@ private[spark] class ExecutorMonitor( } } + override def onOtherEvent(event: SparkListenerEvent): Unit = event match { + case ShuffleCleanedEvent(id) => cleanupShuffle(id) + case _ => + } + + override def rddCleaned(rddId: Int): Unit = { } + + override def shuffleCleaned(shuffleId: Int): Unit = { + // Because this is called in a completely separate thread, we post a custom event to the + // listener bus so that the internal state is safely updated. + listenerBus.post(ShuffleCleanedEvent(shuffleId)) + } + + override def broadcastCleaned(broadcastId: Long): Unit = { } + + override def accumCleaned(accId: Long): Unit = { } + + override def checkpointCleaned(rddId: Long): Unit = { } + // Visible for testing. private[dynalloc] def isExecutorIdle(id: String): Boolean = { Option(executors.get(id)).map(_.isIdle).getOrElse(throw new NoSuchElementException(id)) @@ -209,7 +370,7 @@ private[spark] class ExecutorMonitor( // Visible for testing private[dynalloc] def timedOutExecutors(when: Long): Seq[String] = { executors.asScala.flatMap { case (id, tracker) => - if (tracker.timeoutAt <= when) Some(id) else None + if (tracker.isIdle && tracker.timeoutAt <= when) Some(id) else None }.toSeq } @@ -236,6 +397,14 @@ private[spark] class ExecutorMonitor( } } + private def cleanupShuffle(id: Int): Unit = { + logDebug(s"Cleaning up state related to shuffle $id.") + shuffleToActiveJobs -= id + executors.asScala.foreach { case (_, exec) => + exec.removeShuffle(id) + } + } + private class Tracker { @volatile var timeoutAt: Long = Long.MaxValue @@ -244,6 +413,7 @@ private[spark] class ExecutorMonitor( @volatile var timedOut: Boolean = false var pendingRemoval: Boolean = false + var hasActiveShuffle: Boolean = false private var idleStart: Long = -1 private var runningTasks: Int = 0 @@ -252,8 +422,11 @@ private[spark] class ExecutorMonitor( // This should only be used in the event thread. val cachedBlocks = new mutable.HashMap[Int, mutable.BitSet]() - // For testing. - def isIdle: Boolean = idleStart >= 0 + // The set of shuffles for which shuffle data is held by the executor. + // This should only be used in the event thread. + private val shuffleIds = if (shuffleTrackingEnabled) new mutable.HashSet[Int]() else null + + def isIdle: Boolean = idleStart >= 0 && !hasActiveShuffle def updateRunningTasks(delta: Int): Unit = { runningTasks = math.max(0, runningTasks + delta) @@ -264,7 +437,18 @@ private[spark] class ExecutorMonitor( def updateTimeout(): Unit = { val oldDeadline = timeoutAt val newDeadline = if (idleStart >= 0) { - idleStart + (if (cachedBlocks.nonEmpty) storageTimeoutMs else idleTimeoutMs) + val timeout = if (cachedBlocks.nonEmpty || (shuffleIds != null && shuffleIds.nonEmpty)) { + val _cacheTimeout = if (cachedBlocks.nonEmpty) storageTimeoutMs else Long.MaxValue + val _shuffleTimeout = if (shuffleIds != null && shuffleIds.nonEmpty) { + shuffleTimeoutMs + } else { + Long.MaxValue + } + math.min(_cacheTimeout, _shuffleTimeout) + } else { + idleTimeoutMs + } + idleStart + timeout } else { Long.MaxValue } @@ -279,5 +463,32 @@ private[spark] class ExecutorMonitor( updateNextTimeout(newDeadline) } } + + def addShuffle(id: Int): Unit = { + if (shuffleIds.add(id)) { + hasActiveShuffle = true + } + } + + def removeShuffle(id: Int): Unit = { + if (shuffleIds.remove(id) && shuffleIds.isEmpty) { + hasActiveShuffle = false + if (isIdle) { + updateTimeout() + } + } + } + + def updateActiveShuffles(ids: Iterable[Int]): Unit = { + val hadActiveShuffle = hasActiveShuffle + hasActiveShuffle = ids.exists(shuffleIds.contains) + if (hadActiveShuffle && isIdle) { + updateTimeout() + } + } + } + + private case class ShuffleCleanedEvent(id: Int) extends SparkListenerEvent { + override protected[spark] def logEvent: Boolean = false } } diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index 3ba33e358ef03..191b516661e49 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -1008,7 +1008,7 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite { private def createManager( conf: SparkConf, clock: Clock = new SystemClock()): ExecutorAllocationManager = { - val manager = new ExecutorAllocationManager(client, listenerBus, conf, clock) + val manager = new ExecutorAllocationManager(client, listenerBus, conf, clock = clock) managers += manager manager.start() manager diff --git a/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala index 8d1577e835d27..e11ee97469b00 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala @@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit import scala.collection.mutable import org.mockito.ArgumentMatchers.any -import org.mockito.Mockito.{mock, when} +import org.mockito.Mockito.{doAnswer, mock, when} import org.apache.spark._ import org.apache.spark.internal.config._ @@ -34,10 +34,13 @@ class ExecutorMonitorSuite extends SparkFunSuite { private val idleTimeoutMs = TimeUnit.SECONDS.toMillis(60L) private val storageTimeoutMs = TimeUnit.SECONDS.toMillis(120L) + private val shuffleTimeoutMs = TimeUnit.SECONDS.toMillis(240L) private val conf = new SparkConf() .set(DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT.key, "60s") .set(DYN_ALLOCATION_CACHED_EXECUTOR_IDLE_TIMEOUT.key, "120s") + .set(DYN_ALLOCATION_SHUFFLE_TIMEOUT.key, "240s") + .set(SHUFFLE_SERVICE_ENABLED, true) private var monitor: ExecutorMonitor = _ private var client: ExecutorAllocationClient = _ @@ -55,7 +58,7 @@ class ExecutorMonitorSuite extends SparkFunSuite { when(client.isExecutorActive(any())).thenAnswer { invocation => knownExecs.contains(invocation.getArguments()(0).asInstanceOf[String]) } - monitor = new ExecutorMonitor(conf, client, clock) + monitor = new ExecutorMonitor(conf, client, null, clock) } test("basic executor timeout") { @@ -205,7 +208,7 @@ class ExecutorMonitorSuite extends SparkFunSuite { assert(monitor.timedOutExecutors(storageDeadline) === Seq("1")) conf.set(SHUFFLE_SERVICE_ENABLED, true).set(SHUFFLE_SERVICE_FETCH_RDD_ENABLED, true) - monitor = new ExecutorMonitor(conf, client, clock) + monitor = new ExecutorMonitor(conf, client, null, clock) monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", null)) monitor.onBlockUpdated(rddUpdate(1, 0, "1", level = StorageLevel.MEMORY_ONLY)) @@ -259,8 +262,119 @@ class ExecutorMonitorSuite extends SparkFunSuite { assert(monitor.timedOutExecutors().toSet === Set("2")) } + test("shuffle block tracking") { + val bus = mockListenerBus() + conf.set(DYN_ALLOCATION_SHUFFLE_TRACKING, true).set(SHUFFLE_SERVICE_ENABLED, false) + monitor = new ExecutorMonitor(conf, client, bus, clock) + + // 3 jobs: 2 and 3 share a shuffle, 1 has a separate shuffle. + val stage1 = stageInfo(1, shuffleId = 0) + val stage2 = stageInfo(2) + + val stage3 = stageInfo(3, shuffleId = 1) + val stage4 = stageInfo(4) + + val stage5 = stageInfo(5, shuffleId = 1) + val stage6 = stageInfo(6) + + // Start jobs 1 and 2. Finish a task on each, but don't finish the jobs. This should prevent the + // executor from going idle since there are active shuffles. + monitor.onJobStart(SparkListenerJobStart(1, clock.getTimeMillis(), Seq(stage1, stage2))) + monitor.onJobStart(SparkListenerJobStart(2, clock.getTimeMillis(), Seq(stage3, stage4))) + + monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", null)) + assert(monitor.timedOutExecutors(idleDeadline) === Seq("1")) + + // First a failed task, to make sure it does not count. + monitor.onTaskStart(SparkListenerTaskStart(1, 0, taskInfo("1", 1))) + monitor.onTaskEnd(SparkListenerTaskEnd(1, 0, "foo", TaskResultLost, taskInfo("1", 1), null)) + assert(monitor.timedOutExecutors(idleDeadline) === Seq("1")) + + monitor.onTaskStart(SparkListenerTaskStart(1, 0, taskInfo("1", 1))) + monitor.onTaskEnd(SparkListenerTaskEnd(1, 0, "foo", Success, taskInfo("1", 1), null)) + assert(monitor.timedOutExecutors(idleDeadline).isEmpty) + + monitor.onTaskStart(SparkListenerTaskStart(3, 0, taskInfo("1", 1))) + monitor.onTaskEnd(SparkListenerTaskEnd(3, 0, "foo", Success, taskInfo("1", 1), null)) + assert(monitor.timedOutExecutors(idleDeadline).isEmpty) + + // Finish the jobs, now the executor should be idle, but with the shuffle timeout, since the + // shuffles are not active. + monitor.onJobEnd(SparkListenerJobEnd(1, clock.getTimeMillis(), JobSucceeded)) + assert(!monitor.isExecutorIdle("1")) + + monitor.onJobEnd(SparkListenerJobEnd(2, clock.getTimeMillis(), JobSucceeded)) + assert(monitor.isExecutorIdle("1")) + assert(monitor.timedOutExecutors(idleDeadline).isEmpty) + assert(monitor.timedOutExecutors(storageDeadline).isEmpty) + assert(monitor.timedOutExecutors(shuffleDeadline) === Seq("1")) + + // Start job 3. Since it shares a shuffle with job 2, the executor should not be considered + // idle anymore, even if no tasks are run. + monitor.onJobStart(SparkListenerJobStart(3, clock.getTimeMillis(), Seq(stage5, stage6))) + assert(!monitor.isExecutorIdle("1")) + assert(monitor.timedOutExecutors(shuffleDeadline).isEmpty) + + monitor.onJobEnd(SparkListenerJobEnd(3, clock.getTimeMillis(), JobSucceeded)) + assert(monitor.timedOutExecutors(idleDeadline).isEmpty) + assert(monitor.timedOutExecutors(shuffleDeadline) === Seq("1")) + + // Clean up the shuffles, executor now should now time out at the idle deadline. + monitor.shuffleCleaned(0) + assert(monitor.timedOutExecutors(idleDeadline).isEmpty) + monitor.shuffleCleaned(1) + assert(monitor.timedOutExecutors(idleDeadline) === Seq("1")) + } + + test("shuffle tracking with multiple executors and concurrent jobs") { + val bus = mockListenerBus() + conf.set(DYN_ALLOCATION_SHUFFLE_TRACKING, true).set(SHUFFLE_SERVICE_ENABLED, false) + monitor = new ExecutorMonitor(conf, client, bus, clock) + + monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", null)) + monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "2", null)) + + // Two separate jobs with separate shuffles. The first job will only run tasks on + // executor 1, the second on executor 2. Ensures that jobs finishing don't affect + // executors that are active in other jobs. + + val stage1 = stageInfo(1, shuffleId = 0) + val stage2 = stageInfo(2) + monitor.onJobStart(SparkListenerJobStart(1, clock.getTimeMillis(), Seq(stage1, stage2))) + + val stage3 = stageInfo(3, shuffleId = 1) + val stage4 = stageInfo(4) + monitor.onJobStart(SparkListenerJobStart(2, clock.getTimeMillis(), Seq(stage3, stage4))) + + monitor.onTaskStart(SparkListenerTaskStart(1, 0, taskInfo("1", 1))) + monitor.onTaskEnd(SparkListenerTaskEnd(1, 0, "foo", Success, taskInfo("1", 1), null)) + assert(monitor.timedOutExecutors(idleDeadline) === Seq("2")) + + monitor.onTaskStart(SparkListenerTaskStart(3, 0, taskInfo("2", 1))) + monitor.onTaskEnd(SparkListenerTaskEnd(3, 0, "foo", Success, taskInfo("2", 1), null)) + assert(monitor.timedOutExecutors(idleDeadline).isEmpty) + + monitor.onJobEnd(SparkListenerJobEnd(1, clock.getTimeMillis(), JobSucceeded)) + assert(monitor.isExecutorIdle("1")) + assert(!monitor.isExecutorIdle("2")) + + monitor.onJobEnd(SparkListenerJobEnd(2, clock.getTimeMillis(), JobSucceeded)) + assert(monitor.isExecutorIdle("2")) + assert(monitor.timedOutExecutors(idleDeadline).isEmpty) + + monitor.shuffleCleaned(0) + monitor.shuffleCleaned(1) + assert(monitor.timedOutExecutors(idleDeadline).toSet === Set("1", "2")) + } + private def idleDeadline: Long = clock.getTimeMillis() + idleTimeoutMs + 1 private def storageDeadline: Long = clock.getTimeMillis() + storageTimeoutMs + 1 + private def shuffleDeadline: Long = clock.getTimeMillis() + shuffleTimeoutMs + 1 + + private def stageInfo(id: Int, shuffleId: Int = -1): StageInfo = { + new StageInfo(id, 0, s"stage$id", 1, Nil, Nil, "", + shuffleDepId = if (shuffleId >= 0) Some(shuffleId) else None) + } private def taskInfo( execId: String, @@ -286,4 +400,16 @@ class ExecutorMonitorSuite extends SparkFunSuite { RDDBlockId(rddId, splitIndex), level, 1L, 0L)) } + /** + * Mock the listener bus *only* for the functionality needed by the shuffle tracking code. + * Any other event sent through the mock bus will fail. + */ + private def mockListenerBus(): LiveListenerBus = { + val bus = mock(classOf[LiveListenerBus]) + doAnswer { invocation => + monitor.onOtherEvent(invocation.getArguments()(0).asInstanceOf[SparkListenerEvent]) + }.when(bus).post(any()) + bus + } + } diff --git a/docs/configuration.md b/docs/configuration.md index 06b040866f13d..108862416f8da 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -2114,6 +2114,26 @@ Apart from these, the following properties are also available, and may be useful description. +
spark.dynamicAllocation.shuffleTracking.enabled
false
spark.dynamicAllocation.shuffleTimeout
infinity