From 50ac2e4276d9dbc45ca6dfde2cfcc37bf9a9ad5a Mon Sep 17 00:00:00 2001 From: jerryshao Date: Wed, 1 Mar 2017 13:54:21 +0800 Subject: [PATCH 1/8] Improve the blacklist mechanism to handle external shuffle service unavailable situation Change-Id: I1c0776ec98866c5294ea4ed5d98793fdcebf44ae --- .../spark/scheduler/BlacklistTracker.scala | 83 +++++++++++++------ .../spark/scheduler/TaskSetManager.scala | 6 ++ .../spark/scheduler/TaskSetManagerSuite.scala | 33 ++++++++ 3 files changed, 97 insertions(+), 25 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala index e130e609e4f6..9c3eededf614 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala @@ -21,7 +21,7 @@ import java.util.concurrent.atomic.AtomicReference import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} -import org.apache.spark.{ExecutorAllocationClient, SparkConf, SparkContext} +import org.apache.spark.{ExecutorAllocationClient, SparkConf, SparkContext, SparkEnv} import org.apache.spark.internal.Logging import org.apache.spark.internal.config import org.apache.spark.util.{Clock, SystemClock, Utils} @@ -145,6 +145,61 @@ private[scheduler] class BlacklistTracker ( nextExpiryTime = math.min(execMinExpiry, nodeMinExpiry) } + private def killBlacklistedExecutor(exec: String): Unit = { + if (conf.get(config.BLACKLIST_KILL_ENABLED)) { + allocationClient match { + case Some(a) => + logInfo(s"Killing blacklisted executor id $exec " + + s"since spark.blacklist.killBlacklistedExecutors is set.") + a.killExecutors(Seq(exec), true, true) + case None => + logWarning(s"Not attempting to kill blacklisted executor id $exec " + + s"since allocation client is not defined.") + } + } + } + + private def killExecutorsOnBlacklistedNode(node: String): Unit = { + if (conf.get(config.BLACKLIST_KILL_ENABLED)) { + allocationClient match { + case Some(a) => + logInfo(s"Killing all executors on blacklisted host $node " + + s"since spark.blacklist.killBlacklistedExecutors is set.") + if (a.killExecutorsOnHost(node) == false) { + logError(s"Killing executors on node $node failed.") + } + case None => + logWarning(s"Not attempting to kill executors on blacklisted host $node " + + s"since allocation client is not defined.") + } + } + } + + def updateBlacklistForFetchFailure(host: String, exec: String, numFailedTasks: Int): Unit = { + val now = clock.getTimeMillis() + val expiryTimeForNewBlacklists = now + BLACKLIST_TIMEOUT_MILLIS + if (!executorIdToBlacklistStatus.contains(exec)) { + logInfo(s"Blacklisting executor $exec due to fetch failure") + + executorIdToBlacklistStatus.put(exec, BlacklistedExecutor(host, expiryTimeForNewBlacklists)) + listenerBus.post(SparkListenerExecutorBlacklisted(now, exec, numFailedTasks)) + updateNextExpiryTime() + killBlacklistedExecutor(exec) + + val blacklistedExecsOnNode = nodeToBlacklistedExecs.getOrElseUpdate(exec, HashSet[String]()) + blacklistedExecsOnNode += exec + + if (SparkEnv.get.blockManager.externalShuffleServiceEnabled && + !nodeIdToBlacklistExpiryTime.contains(host)) { + logInfo(s"blacklisting node $host due to fetch failure of external shuffle service") + + nodeIdToBlacklistExpiryTime.put(host, expiryTimeForNewBlacklists) + listenerBus.post(SparkListenerNodeBlacklisted(now, exec, blacklistedExecsOnNode.size)) + _nodeBlacklist.set(nodeIdToBlacklistExpiryTime.keySet.toSet) + killExecutorsOnBlacklistedNode(host) + } + } + } def updateBlacklistForSuccessfulTaskSet( stageId: Int, @@ -174,17 +229,7 @@ private[scheduler] class BlacklistTracker ( listenerBus.post(SparkListenerExecutorBlacklisted(now, exec, newTotal)) executorIdToFailureList.remove(exec) updateNextExpiryTime() - if (conf.get(config.BLACKLIST_KILL_ENABLED)) { - allocationClient match { - case Some(allocationClient) => - logInfo(s"Killing blacklisted executor id $exec " + - s"since spark.blacklist.killBlacklistedExecutors is set.") - allocationClient.killExecutors(Seq(exec), true, true) - case None => - logWarning(s"Not attempting to kill blacklisted executor id $exec " + - s"since allocation client is not defined.") - } - } + killBlacklistedExecutor(exec) // In addition to blacklisting the executor, we also update the data for failures on the // node, and potentially put the entire node into a blacklist as well. @@ -199,19 +244,7 @@ private[scheduler] class BlacklistTracker ( nodeIdToBlacklistExpiryTime.put(node, expiryTimeForNewBlacklists) listenerBus.post(SparkListenerNodeBlacklisted(now, node, blacklistedExecsOnNode.size)) _nodeBlacklist.set(nodeIdToBlacklistExpiryTime.keySet.toSet) - if (conf.get(config.BLACKLIST_KILL_ENABLED)) { - allocationClient match { - case Some(allocationClient) => - logInfo(s"Killing all executors on blacklisted host $node " + - s"since spark.blacklist.killBlacklistedExecutors is set.") - if (allocationClient.killExecutorsOnHost(node) == false) { - logError(s"Killing executors on node $node failed.") - } - case None => - logWarning(s"Not attempting to kill executors on blacklisted host $node " + - s"since allocation client is not defined.") - } - } + killExecutorsOnBlacklistedNode(node) } } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index a41b059fa7de..97ab02c99b03 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -774,6 +774,12 @@ private[spark] class TaskSetManager( tasksSuccessful += 1 } isZombie = true + + if (fetchFailed.bmAddress != null) { + blacklistTracker.foreach(_.updateBlacklistForFetchFailure(fetchFailed.bmAddress.host, + fetchFailed.bmAddress.executorId, numTasks - tasksSuccessful)) + } + None case ef: ExceptionFailure => diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index db14c9acfdce..7dfeead5c830 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -1140,6 +1140,39 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg .updateBlacklistForFailedTask(anyString(), anyString(), anyInt()) } + test("update application blacklist for shuffle-fetch") { + // Setup a taskset, and fail some tasks for a fetch failure, preemption, denied commit, + // and killed task. + val conf = new SparkConf() + .set(config.BLACKLIST_ENABLED, true) + .set(config.SHUFFLE_SERVICE_ENABLED, true) + sc = new SparkContext("local", "test", conf) + sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2")) + val taskSet = FakeTask.createTaskSet(4) + val blacklistTracker = new BlacklistTracker(sc, None) + val tsm = new TaskSetManager(sched, taskSet, 4, Some(blacklistTracker)) + + // make some offers to our taskset, to get tasks we will fail + val taskDescs = Seq( + "exec1" -> "host1", + "exec2" -> "host2" + ).flatMap { case (exec, host) => + // offer each executor twice (simulating 2 cores per executor) + (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, TaskLocality.ANY)} + } + assert(taskDescs.size === 4) + + assert(!blacklistTracker.isExecutorBlacklisted(taskDescs(0).executorId)) + assert(!blacklistTracker.isNodeBlacklisted("host1")) + + // Fail the task with fetch failure + tsm.handleFailedTask(taskDescs(0).taskId, TaskState.FAILED, + FetchFailed(BlockManagerId(taskDescs(0).executorId, "host1", 12345), 0, 0, 0, "ignored")) + + assert(blacklistTracker.isExecutorBlacklisted(taskDescs(0).executorId)) + assert(blacklistTracker.isNodeBlacklisted("host1")) + } + private def createTaskResult( id: Int, accumUpdates: Seq[AccumulatorV2[_, _]] = Seq.empty): DirectTaskResult[Int] = { From 759ebc91f42f6b3d0a5c7b92a96fcda3eaaf5a82 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Thu, 9 Mar 2017 10:46:37 +0800 Subject: [PATCH 2/8] Add configuration to turn off the feature Change-Id: I9a930d63eb23a86e8823b206222ab3a5c41ad930 --- .../spark/internal/config/package.scala | 5 +++ .../spark/scheduler/BlacklistTracker.scala | 39 ++++++++++--------- .../spark/scheduler/TaskSetManagerSuite.scala | 1 + docs/configuration.md | 9 +++++ 4 files changed, 36 insertions(+), 18 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index e193ed222e22..98da8146e016 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -149,6 +149,11 @@ package object config { .internal() .timeConf(TimeUnit.MILLISECONDS) .createOptional + + private[spark] val BLACKLIST_FETCH_FAILURE_ENABLED = + ConfigBuilder("spark.blacklist.application.fetchFailure.enabled") + .booleanConf + .createWithDefault(false) // End blacklist confs private[spark] val LISTENER_BUS_EVENT_QUEUE_SIZE = diff --git a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala index 9c3eededf614..95fd9afdd82d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala @@ -61,6 +61,7 @@ private[scheduler] class BlacklistTracker ( private val MAX_FAILURES_PER_EXEC = conf.get(config.MAX_FAILURES_PER_EXEC) private val MAX_FAILED_EXEC_PER_NODE = conf.get(config.MAX_FAILED_EXEC_PER_NODE) val BLACKLIST_TIMEOUT_MILLIS = BlacklistTracker.getBlacklistTimeout(conf) + private val BLACKLIST_FETCH_FAILURE_ENABLED = conf.get(config.BLACKLIST_FETCH_FAILURE_ENABLED) /** * A map from executorId to information on task failures. Tracks the time of each task failure, @@ -176,28 +177,30 @@ private[scheduler] class BlacklistTracker ( } def updateBlacklistForFetchFailure(host: String, exec: String, numFailedTasks: Int): Unit = { - val now = clock.getTimeMillis() - val expiryTimeForNewBlacklists = now + BLACKLIST_TIMEOUT_MILLIS - if (!executorIdToBlacklistStatus.contains(exec)) { - logInfo(s"Blacklisting executor $exec due to fetch failure") + if (BLACKLIST_FETCH_FAILURE_ENABLED) { + val now = clock.getTimeMillis() + val expiryTimeForNewBlacklists = now + BLACKLIST_TIMEOUT_MILLIS + if (!executorIdToBlacklistStatus.contains(exec)) { + logInfo(s"Blacklisting executor $exec due to fetch failure") - executorIdToBlacklistStatus.put(exec, BlacklistedExecutor(host, expiryTimeForNewBlacklists)) - listenerBus.post(SparkListenerExecutorBlacklisted(now, exec, numFailedTasks)) - updateNextExpiryTime() - killBlacklistedExecutor(exec) + executorIdToBlacklistStatus.put(exec, BlacklistedExecutor(host, expiryTimeForNewBlacklists)) + listenerBus.post(SparkListenerExecutorBlacklisted(now, exec, numFailedTasks)) + updateNextExpiryTime() + killBlacklistedExecutor(exec) - val blacklistedExecsOnNode = nodeToBlacklistedExecs.getOrElseUpdate(exec, HashSet[String]()) - blacklistedExecsOnNode += exec + val blacklistedExecsOnNode = nodeToBlacklistedExecs.getOrElseUpdate(exec, HashSet[String]()) + blacklistedExecsOnNode += exec - if (SparkEnv.get.blockManager.externalShuffleServiceEnabled && - !nodeIdToBlacklistExpiryTime.contains(host)) { - logInfo(s"blacklisting node $host due to fetch failure of external shuffle service") + if (SparkEnv.get.blockManager.externalShuffleServiceEnabled && + !nodeIdToBlacklistExpiryTime.contains(host)) { + logInfo(s"blacklisting node $host due to fetch failure of external shuffle service") - nodeIdToBlacklistExpiryTime.put(host, expiryTimeForNewBlacklists) - listenerBus.post(SparkListenerNodeBlacklisted(now, exec, blacklistedExecsOnNode.size)) - _nodeBlacklist.set(nodeIdToBlacklistExpiryTime.keySet.toSet) - killExecutorsOnBlacklistedNode(host) - } + nodeIdToBlacklistExpiryTime.put(host, expiryTimeForNewBlacklists) + listenerBus.post(SparkListenerNodeBlacklisted(now, exec, blacklistedExecsOnNode.size)) + _nodeBlacklist.set(nodeIdToBlacklistExpiryTime.keySet.toSet) + killExecutorsOnBlacklistedNode(host) + } + } } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 7dfeead5c830..29a02c4e16dd 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -1146,6 +1146,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg val conf = new SparkConf() .set(config.BLACKLIST_ENABLED, true) .set(config.SHUFFLE_SERVICE_ENABLED, true) + .set(config.BLACKLIST_FETCH_FAILURE_ENABLED, true) sc = new SparkContext("local", "test", conf) sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2")) val taskSet = FakeTask.createTaskSet(4) diff --git a/docs/configuration.md b/docs/configuration.md index a6b6d5dfa5f9..579ea28a205f 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1448,6 +1448,15 @@ Apart from these, the following properties are also available, and may be useful all of the executors on that node will be killed. + + spark.blacklist.application.fetchFailure.enabled + false + + (Experimental) If set to "true", Spark will blacklist the executors immediately when the fetch failure + happened. If external shuffle service is enabled, then the whole node will be blacklisted. This configuration + is to handle some scenarios where shuffle fetch is available and cannot be recovered through retry. + + spark.speculation false From 533ee170cb4d5d0b10b5b297498ff513c604e16e Mon Sep 17 00:00:00 2001 From: jerryshao Date: Wed, 22 Mar 2017 18:16:23 +0800 Subject: [PATCH 3/8] Address the comments Change-Id: I4ccbd63f33bc013d5b97afa41b005d83a63be86a --- .../spark/scheduler/BlacklistTracker.scala | 20 ++++++++++++++++--- .../spark/scheduler/TaskSetManager.scala | 4 ++-- 2 files changed, 19 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala index 95fd9afdd82d..b2e311798492 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala @@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicReference import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} import org.apache.spark.{ExecutorAllocationClient, SparkConf, SparkContext, SparkEnv} +import org.apache.spark.annotation.Experimental import org.apache.spark.internal.Logging import org.apache.spark.internal.config import org.apache.spark.util.{Clock, SystemClock, Utils} @@ -176,15 +177,28 @@ private[scheduler] class BlacklistTracker ( } } - def updateBlacklistForFetchFailure(host: String, exec: String, numFailedTasks: Int): Unit = { + @Experimental + def updateBlacklistForFetchFailure(host: String, exec: String): Unit = { if (BLACKLIST_FETCH_FAILURE_ENABLED) { + logWarning( + s""" + |${config.BLACKLIST_FETCH_FAILURE_ENABLED.key} is enabled. If we blacklist + |on fetch failures, we are implicitly saying that we believe the failure is + |non-transient, and can't be recovered from (even if this is the first fetch failure). + |If the external shuffle-service is on, then every other executor on this node would + |be suffering from the same issue, so we should blacklist (and potentially kill) all + |of them immediately. + """.stripMargin) + val now = clock.getTimeMillis() val expiryTimeForNewBlacklists = now + BLACKLIST_TIMEOUT_MILLIS if (!executorIdToBlacklistStatus.contains(exec)) { logInfo(s"Blacklisting executor $exec due to fetch failure") executorIdToBlacklistStatus.put(exec, BlacklistedExecutor(host, expiryTimeForNewBlacklists)) - listenerBus.post(SparkListenerExecutorBlacklisted(now, exec, numFailedTasks)) + // We hardcoded number of failure tasks to 1 for fetch failure, because there's no + // reattempt for this failure. + listenerBus.post(SparkListenerExecutorBlacklisted(now, exec, 1)) updateNextExpiryTime() killBlacklistedExecutor(exec) @@ -192,7 +206,7 @@ private[scheduler] class BlacklistTracker ( blacklistedExecsOnNode += exec if (SparkEnv.get.blockManager.externalShuffleServiceEnabled && - !nodeIdToBlacklistExpiryTime.contains(host)) { + !nodeIdToBlacklistExpiryTime.contains(host)) { logInfo(s"blacklisting node $host due to fetch failure of external shuffle service") nodeIdToBlacklistExpiryTime.put(host, expiryTimeForNewBlacklists) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 97ab02c99b03..02d374dc37cd 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -776,8 +776,8 @@ private[spark] class TaskSetManager( isZombie = true if (fetchFailed.bmAddress != null) { - blacklistTracker.foreach(_.updateBlacklistForFetchFailure(fetchFailed.bmAddress.host, - fetchFailed.bmAddress.executorId, numTasks - tasksSuccessful)) + blacklistTracker.foreach(_.updateBlacklistForFetchFailure( + fetchFailed.bmAddress.host, fetchFailed.bmAddress.executorId)) } None From f633a3f227ab4ddf53072b3a17e7b4a7e36258f7 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Thu, 23 Mar 2017 20:35:54 +0800 Subject: [PATCH 4/8] Address the comments Change-Id: I060d0722f48c1d028a53f95860cd355b6b7915fc --- .../spark/scheduler/BlacklistTracker.scala | 8 ++-- .../scheduler/BlacklistTrackerSuite.scala | 40 +++++++++++++++++++ .../spark/scheduler/TaskSetManagerSuite.scala | 3 +- docs/configuration.md | 3 +- 4 files changed, 45 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala index b2e311798492..bc1f30729fa1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala @@ -21,8 +21,7 @@ import java.util.concurrent.atomic.AtomicReference import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} -import org.apache.spark.{ExecutorAllocationClient, SparkConf, SparkContext, SparkEnv} -import org.apache.spark.annotation.Experimental +import org.apache.spark.{ExecutorAllocationClient, SparkConf, SparkContext} import org.apache.spark.internal.Logging import org.apache.spark.internal.config import org.apache.spark.util.{Clock, SystemClock, Utils} @@ -177,7 +176,6 @@ private[scheduler] class BlacklistTracker ( } } - @Experimental def updateBlacklistForFetchFailure(host: String, exec: String): Unit = { if (BLACKLIST_FETCH_FAILURE_ENABLED) { logWarning( @@ -197,7 +195,7 @@ private[scheduler] class BlacklistTracker ( executorIdToBlacklistStatus.put(exec, BlacklistedExecutor(host, expiryTimeForNewBlacklists)) // We hardcoded number of failure tasks to 1 for fetch failure, because there's no - // reattempt for this failure. + // reattempt for such failure. listenerBus.post(SparkListenerExecutorBlacklisted(now, exec, 1)) updateNextExpiryTime() killBlacklistedExecutor(exec) @@ -205,7 +203,7 @@ private[scheduler] class BlacklistTracker ( val blacklistedExecsOnNode = nodeToBlacklistedExecs.getOrElseUpdate(exec, HashSet[String]()) blacklistedExecsOnNode += exec - if (SparkEnv.get.blockManager.externalShuffleServiceEnabled && + if (conf.getBoolean("spark.shuffle.service.enabled", false) && !nodeIdToBlacklistExpiryTime.contains(host)) { logInfo(s"blacklisting node $host due to fetch failure of external shuffle service") diff --git a/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala index 2b18ebee79a2..04ce398fa758 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala @@ -529,4 +529,44 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M verify(allocationClientMock).killExecutors(Seq("2"), true, true) verify(allocationClientMock).killExecutorsOnHost("hostA") } + + test("fetch failure blacklisting kills executors, configured by BLACKLIST_KILL_ENABLED") { + val allocationClientMock = mock[ExecutorAllocationClient] + when(allocationClientMock.killExecutors(any(), any(), any())).thenReturn(Seq("called")) + when(allocationClientMock.killExecutorsOnHost("hostA")).thenAnswer(new Answer[Boolean] { + // To avoid a race between blacklisting and killing, it is important that the nodeBlacklist + // is updated before we ask the executor allocation client to kill all the executors + // on a particular host. + override def answer(invocation: InvocationOnMock): Boolean = { + if (blacklist.nodeBlacklist.contains("hostA") == false) { + throw new IllegalStateException("hostA should be on the blacklist") + } + true + } + }) + + conf.set(config.BLACKLIST_FETCH_FAILURE_ENABLED, true) + blacklist = new BlacklistTracker(listenerBusMock, conf, Some(allocationClientMock), clock) + + // Disable auto-kill. Blacklist an executor and make sure killExecutors is not called. + conf.set(config.BLACKLIST_KILL_ENABLED, false) + blacklist.updateBlacklistForFetchFailure("hostA", exec = "1") + + verify(allocationClientMock, never).killExecutors(any(), any(), any()) + verify(allocationClientMock, never).killExecutorsOnHost(any()) + + // Enable auto-kill. Blacklist an executor and make sure killExecutors is called. + conf.set(config.BLACKLIST_KILL_ENABLED, true) + blacklist = new BlacklistTracker(listenerBusMock, conf, Some(allocationClientMock), clock) + blacklist.updateBlacklistForFetchFailure("hostA", exec = "1") + + verify(allocationClientMock).killExecutors(Seq("1"), true, true) + + // Enable external shuffle service to see if all the executors on this node will be killed. + conf.set("spark.shuffle.service.enabled", "true") + blacklist.updateBlacklistForFetchFailure("hostA", exec = "2") + + verify(allocationClientMock).killExecutors(Seq("2"), true, true) + verify(allocationClientMock).killExecutorsOnHost("hostA") + } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 29a02c4e16dd..a566740a0a75 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -1141,8 +1141,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg } test("update application blacklist for shuffle-fetch") { - // Setup a taskset, and fail some tasks for a fetch failure, preemption, denied commit, - // and killed task. + // Setup a taskset, and fail some one task for fetch failure. val conf = new SparkConf() .set(config.BLACKLIST_ENABLED, true) .set(config.SHUFFLE_SERVICE_ENABLED, true) diff --git a/docs/configuration.md b/docs/configuration.md index 579ea28a205f..4a00afe22c1a 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1453,8 +1453,7 @@ Apart from these, the following properties are also available, and may be useful false (Experimental) If set to "true", Spark will blacklist the executors immediately when the fetch failure - happened. If external shuffle service is enabled, then the whole node will be blacklisted. This configuration - is to handle some scenarios where shuffle fetch is available and cannot be recovered through retry. + happened. If external shuffle service is enabled, then the whole node will be blacklisted. From 44c7108bdf478f823f567d44ed703d445febf6fe Mon Sep 17 00:00:00 2001 From: jerryshao Date: Thu, 25 May 2017 14:55:40 +0800 Subject: [PATCH 5/8] Address the comments Change-Id: Ied1e7078e38188099a08f71bd8db2d141a8e093e --- .../spark/scheduler/BlacklistTracker.scala | 37 +++++++++---------- .../scheduler/BlacklistTrackerSuite.scala | 5 ++- .../spark/scheduler/TaskSetManagerSuite.scala | 1 - 3 files changed, 20 insertions(+), 23 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala index bc1f30729fa1..74f0cfc6fe20 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala @@ -178,19 +178,26 @@ private[scheduler] class BlacklistTracker ( def updateBlacklistForFetchFailure(host: String, exec: String): Unit = { if (BLACKLIST_FETCH_FAILURE_ENABLED) { - logWarning( - s""" - |${config.BLACKLIST_FETCH_FAILURE_ENABLED.key} is enabled. If we blacklist - |on fetch failures, we are implicitly saying that we believe the failure is - |non-transient, and can't be recovered from (even if this is the first fetch failure). - |If the external shuffle-service is on, then every other executor on this node would - |be suffering from the same issue, so we should blacklist (and potentially kill) all - |of them immediately. - """.stripMargin) + // spark.blacklist.application.fetchFailure.enabled is enabled. If we blacklist + // on fetch failures, we are implicitly saying that we believe the failure is + // non-transient, and can't be recovered from (even if this is the first fetch failure). + // If the external shuffle-service is on, then every other executor on this node would + // be suffering from the same issue, so we should blacklist (and potentially kill) all + // of them immediately. val now = clock.getTimeMillis() val expiryTimeForNewBlacklists = now + BLACKLIST_TIMEOUT_MILLIS - if (!executorIdToBlacklistStatus.contains(exec)) { + + if (conf.get(config.SHUFFLE_SERVICE_ENABLED)) { + if (!nodeIdToBlacklistExpiryTime.contains(host)) { + logInfo(s"blacklisting node $host due to fetch failure of external shuffle service") + + nodeIdToBlacklistExpiryTime.put(host, expiryTimeForNewBlacklists) + listenerBus.post(SparkListenerNodeBlacklisted(now, host, 1)) + _nodeBlacklist.set(nodeIdToBlacklistExpiryTime.keySet.toSet) + killExecutorsOnBlacklistedNode(host) + } + } else if (!executorIdToBlacklistStatus.contains(exec)) { logInfo(s"Blacklisting executor $exec due to fetch failure") executorIdToBlacklistStatus.put(exec, BlacklistedExecutor(host, expiryTimeForNewBlacklists)) @@ -202,16 +209,6 @@ private[scheduler] class BlacklistTracker ( val blacklistedExecsOnNode = nodeToBlacklistedExecs.getOrElseUpdate(exec, HashSet[String]()) blacklistedExecsOnNode += exec - - if (conf.getBoolean("spark.shuffle.service.enabled", false) && - !nodeIdToBlacklistExpiryTime.contains(host)) { - logInfo(s"blacklisting node $host due to fetch failure of external shuffle service") - - nodeIdToBlacklistExpiryTime.put(host, expiryTimeForNewBlacklists) - listenerBus.post(SparkListenerNodeBlacklisted(now, exec, blacklistedExecsOnNode.size)) - _nodeBlacklist.set(nodeIdToBlacklistExpiryTime.keySet.toSet) - killExecutorsOnBlacklistedNode(host) - } } } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala index 04ce398fa758..d0143248c025 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala @@ -561,12 +561,13 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M blacklist.updateBlacklistForFetchFailure("hostA", exec = "1") verify(allocationClientMock).killExecutors(Seq("1"), true, true) + verify(allocationClientMock, never).killExecutorsOnHost(any()) // Enable external shuffle service to see if all the executors on this node will be killed. - conf.set("spark.shuffle.service.enabled", "true") + conf.set(config.SHUFFLE_SERVICE_ENABLED, true) blacklist.updateBlacklistForFetchFailure("hostA", exec = "2") - verify(allocationClientMock).killExecutors(Seq("2"), true, true) + verify(allocationClientMock, never).killExecutors(Seq("2"), true, true) verify(allocationClientMock).killExecutorsOnHost("hostA") } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index a566740a0a75..80fb67472581 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -1169,7 +1169,6 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg tsm.handleFailedTask(taskDescs(0).taskId, TaskState.FAILED, FetchFailed(BlockManagerId(taskDescs(0).executorId, "host1", 12345), 0, 0, 0, "ignored")) - assert(blacklistTracker.isExecutorBlacklisted(taskDescs(0).executorId)) assert(blacklistTracker.isNodeBlacklisted("host1")) } From 524fbfcdd661f5919515b8e18515d346003a3ebe Mon Sep 17 00:00:00 2001 From: jerryshao Date: Sat, 27 May 2017 11:44:02 +0800 Subject: [PATCH 6/8] Fix kill executor cannot work bug and address comments Change-Id: I420fba198ce9ad87eb7fc232ce1e183e973028bd --- .../apache/spark/scheduler/BlacklistTracker.scala | 4 ++-- .../apache/spark/scheduler/TaskSchedulerImpl.scala | 10 ++++++---- .../spark/scheduler/BlacklistTrackerSuite.scala | 14 ++++++++++++++ docs/configuration.md | 5 +++-- 4 files changed, 25 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala index 74f0cfc6fe20..1165741396b3 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala @@ -178,8 +178,7 @@ private[scheduler] class BlacklistTracker ( def updateBlacklistForFetchFailure(host: String, exec: String): Unit = { if (BLACKLIST_FETCH_FAILURE_ENABLED) { - // spark.blacklist.application.fetchFailure.enabled is enabled. If we blacklist - // on fetch failures, we are implicitly saying that we believe the failure is + // If we blacklist on fetch failures, we are implicitly saying that we believe the failure is // non-transient, and can't be recovered from (even if this is the first fetch failure). // If the external shuffle-service is on, then every other executor on this node would // be suffering from the same issue, so we should blacklist (and potentially kill) all @@ -196,6 +195,7 @@ private[scheduler] class BlacklistTracker ( listenerBus.post(SparkListenerNodeBlacklisted(now, host, 1)) _nodeBlacklist.set(nodeIdToBlacklistExpiryTime.keySet.toSet) killExecutorsOnBlacklistedNode(host) + updateNextExpiryTime() } } else if (!executorIdToBlacklistStatus.contains(exec)) { logInfo(s"Blacklisting executor $exec due to fetch failure") diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 1b6bc9139f9c..4c7f61f1d475 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -54,7 +54,7 @@ import org.apache.spark.util.{AccumulatorV2, ThreadUtils, Utils} private[spark] class TaskSchedulerImpl private[scheduler]( val sc: SparkContext, val maxTaskFailures: Int, - private[scheduler] val blacklistTrackerOpt: Option[BlacklistTracker], + mockBlacklistTracker: Option[BlacklistTracker] = None, isLocal: Boolean = false) extends TaskScheduler with Logging { @@ -63,18 +63,20 @@ private[spark] class TaskSchedulerImpl private[scheduler]( def this(sc: SparkContext) = { this( sc, - sc.conf.get(config.MAX_TASK_FAILURES), - TaskSchedulerImpl.maybeCreateBlacklistTracker(sc)) + sc.conf.get(config.MAX_TASK_FAILURES)) } def this(sc: SparkContext, maxTaskFailures: Int, isLocal: Boolean) = { this( sc, maxTaskFailures, - TaskSchedulerImpl.maybeCreateBlacklistTracker(sc), + mockBlacklistTracker = None, isLocal = isLocal) } + private[scheduler] lazy val blacklistTrackerOpt = + mockBlacklistTracker.orElse(maybeCreateBlacklistTracker(sc)) + val conf = sc.conf // How often to check for speculative tasks diff --git a/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala index d0143248c025..99a4553d54f6 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala @@ -558,16 +558,30 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M // Enable auto-kill. Blacklist an executor and make sure killExecutors is called. conf.set(config.BLACKLIST_KILL_ENABLED, true) blacklist = new BlacklistTracker(listenerBusMock, conf, Some(allocationClientMock), clock) + clock.advance(1000) blacklist.updateBlacklistForFetchFailure("hostA", exec = "1") verify(allocationClientMock).killExecutors(Seq("1"), true, true) verify(allocationClientMock, never).killExecutorsOnHost(any()) + assert(blacklist.executorIdToBlacklistStatus.contains("1")) + assert(blacklist.executorIdToBlacklistStatus("1").node === "hostA") + assert(blacklist.executorIdToBlacklistStatus("1").expiryTime === + 1000 + blacklist.BLACKLIST_TIMEOUT_MILLIS) + assert(blacklist.nextExpiryTime === 1000 + blacklist.BLACKLIST_TIMEOUT_MILLIS) + assert(blacklist.nodeIdToBlacklistExpiryTime.isEmpty) + // Enable external shuffle service to see if all the executors on this node will be killed. conf.set(config.SHUFFLE_SERVICE_ENABLED, true) + clock.advance(1000) blacklist.updateBlacklistForFetchFailure("hostA", exec = "2") verify(allocationClientMock, never).killExecutors(Seq("2"), true, true) verify(allocationClientMock).killExecutorsOnHost("hostA") + + assert(blacklist.nodeIdToBlacklistExpiryTime.contains("hostA")) + assert(blacklist.nodeIdToBlacklistExpiryTime("hostA") === + 2000 + blacklist.BLACKLIST_TIMEOUT_MILLIS) + assert(blacklist.nextExpiryTime === 1000 + blacklist.BLACKLIST_TIMEOUT_MILLIS) } } diff --git a/docs/configuration.md b/docs/configuration.md index 4a00afe22c1a..127cd5310c1f 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1452,8 +1452,9 @@ Apart from these, the following properties are also available, and may be useful spark.blacklist.application.fetchFailure.enabled false - (Experimental) If set to "true", Spark will blacklist the executors immediately when the fetch failure - happened. If external shuffle service is enabled, then the whole node will be blacklisted. + (Experimental) If set to "true", Spark will blacklist the executor immediately when a fetch + failure happenes. If external shuffle service is enabled, then the whole node will be + blacklisted. From 9a14105f88a7a932a4e831ff2377327500250645 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Wed, 31 May 2017 13:52:57 +0800 Subject: [PATCH 7/8] Further address the comments Change-Id: I63998f3eadcd32589b35d667ac10b65275a862eb --- .../spark/scheduler/BlacklistTracker.scala | 8 +++++--- .../spark/scheduler/TaskSchedulerImpl.scala | 18 +++--------------- .../scheduler/TaskSchedulerImplSuite.scala | 4 +++- 3 files changed, 11 insertions(+), 19 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala index 1165741396b3..cd8e61d6d020 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala @@ -151,7 +151,7 @@ private[scheduler] class BlacklistTracker ( allocationClient match { case Some(a) => logInfo(s"Killing blacklisted executor id $exec " + - s"since spark.blacklist.killBlacklistedExecutors is set.") + s"since ${config.BLACKLIST_KILL_ENABLED.key} is set.") a.killExecutors(Seq(exec), true, true) case None => logWarning(s"Not attempting to kill blacklisted executor id $exec " + @@ -165,7 +165,7 @@ private[scheduler] class BlacklistTracker ( allocationClient match { case Some(a) => logInfo(s"Killing all executors on blacklisted host $node " + - s"since spark.blacklist.killBlacklistedExecutors is set.") + s"since ${config.BLACKLIST_KILL_ENABLED.key} is set.") if (a.killExecutorsOnHost(node) == false) { logError(s"Killing executors on node $node failed.") } @@ -179,7 +179,9 @@ private[scheduler] class BlacklistTracker ( def updateBlacklistForFetchFailure(host: String, exec: String): Unit = { if (BLACKLIST_FETCH_FAILURE_ENABLED) { // If we blacklist on fetch failures, we are implicitly saying that we believe the failure is - // non-transient, and can't be recovered from (even if this is the first fetch failure). + // non-transient, and can't be recovered from (even if this is the first fetch failure, + // stage is retried after just one failure, so we don't always get a chance to collect + // multiple fetch failures). // If the external shuffle-service is on, then every other executor on this node would // be suffering from the same issue, so we should blacklist (and potentially kill) all // of them immediately. diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 4c7f61f1d475..f3aec5950f6c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -51,31 +51,19 @@ import org.apache.spark.util.{AccumulatorV2, ThreadUtils, Utils} * acquire a lock on us, so we need to make sure that we don't try to lock the backend while * we are holding a lock on ourselves. */ -private[spark] class TaskSchedulerImpl private[scheduler]( +private[spark] class TaskSchedulerImpl( val sc: SparkContext, val maxTaskFailures: Int, - mockBlacklistTracker: Option[BlacklistTracker] = None, isLocal: Boolean = false) extends TaskScheduler with Logging { import TaskSchedulerImpl._ def this(sc: SparkContext) = { - this( - sc, - sc.conf.get(config.MAX_TASK_FAILURES)) + this(sc, sc.conf.get(config.MAX_TASK_FAILURES)) } - def this(sc: SparkContext, maxTaskFailures: Int, isLocal: Boolean) = { - this( - sc, - maxTaskFailures, - mockBlacklistTracker = None, - isLocal = isLocal) - } - - private[scheduler] lazy val blacklistTrackerOpt = - mockBlacklistTracker.orElse(maybeCreateBlacklistTracker(sc)) + private[scheduler] lazy val blacklistTrackerOpt = maybeCreateBlacklistTracker(sc) val conf = sc.conf diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 8b9d45f734cd..a00337776dad 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -87,7 +87,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B conf.set(config.BLACKLIST_ENABLED, true) sc = new SparkContext(conf) taskScheduler = - new TaskSchedulerImpl(sc, sc.conf.getInt("spark.task.maxFailures", 4), Some(blacklist)) { + new TaskSchedulerImpl(sc, sc.conf.getInt("spark.task.maxFailures", 4)) { override def createTaskSetManager(taskSet: TaskSet, maxFailures: Int): TaskSetManager = { val tsm = super.createTaskSetManager(taskSet, maxFailures) // we need to create a spied tsm just so we can set the TaskSetBlacklist @@ -98,6 +98,8 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B stageToMockTaskSetBlacklist(taskSet.stageId) = taskSetBlacklist tsmSpy } + + override private[scheduler] lazy val blacklistTrackerOpt = Some(blacklist) } setupHelper() } From 3cf9cfd0ef78e0d0cc2780e563968ed2bd22ac39 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Thu, 1 Jun 2017 10:59:56 +0800 Subject: [PATCH 8/8] Add the comments to address why blacklistTrackOpt should be lazily initialized Change-Id: I2a58f4bc5d84591cd23c93aea5ee50331a6b80b5 --- .../scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index f3aec5950f6c..a521391b79e2 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -63,6 +63,8 @@ private[spark] class TaskSchedulerImpl( this(sc, sc.conf.get(config.MAX_TASK_FAILURES)) } + // Lazily initializing blackListTrackOpt to avoid getting empty ExecutorAllocationClient, + // because ExecutorAllocationClient is created after this TaskSchedulerImpl. private[scheduler] lazy val blacklistTrackerOpt = maybeCreateBlacklistTracker(sc) val conf = sc.conf