Skip to content

Commit 4d95bf6

Browse files
author
Marcelo Vanzin
committed
[SPARK-27963][core] Allow dynamic allocation without a shuffle service.
This change adds a new option that enables dynamic allocation without the need for a shuffle service. This mode works by tracking which stages generate shuffle files, and keeping executors that generate data for those shuffles alive while the jobs that use them are active. A separate timeout is also added for shuffle data; so that executors that hold shuffle data can use a separate timeout before being removed because of being idle. This allows the shuffle data to be kept around in case it is needed by some new job, or allow users to be more aggressive in timing out executors that don't have shuffle data in active use. The code also hooks up to the context cleaner so that shuffles that are garbage collected are detected, and the respective executors not held unnecessarily. Testing done with added unit tests, and also with TPC-DS workloads on YARN without a shuffle service.
1 parent eadb538 commit 4d95bf6

File tree

7 files changed

+332
-29
lines changed

7 files changed

+332
-29
lines changed

core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ private[spark] class ExecutorAllocationManager(
9494
client: ExecutorAllocationClient,
9595
listenerBus: LiveListenerBus,
9696
conf: SparkConf,
97+
cleaner: Option[ContextCleaner] = None,
9798
clock: Clock = new SystemClock())
9899
extends Logging {
99100

@@ -148,7 +149,7 @@ private[spark] class ExecutorAllocationManager(
148149
// Listener for Spark events that impact the allocation policy
149150
val listener = new ExecutorAllocationListener
150151

151-
val executorMonitor = new ExecutorMonitor(conf, client, clock)
152+
val executorMonitor = new ExecutorMonitor(conf, client, listenerBus, clock)
152153

153154
// Executor that handles the scheduling task.
154155
private val executor =
@@ -194,11 +195,13 @@ private[spark] class ExecutorAllocationManager(
194195
throw new SparkException(
195196
s"s${DYN_ALLOCATION_SUSTAINED_SCHEDULER_BACKLOG_TIMEOUT.key} must be > 0!")
196197
}
197-
// Require external shuffle service for dynamic allocation
198-
// Otherwise, we may lose shuffle files when killing executors
199-
if (!conf.get(config.SHUFFLE_SERVICE_ENABLED) && !testing) {
200-
throw new SparkException("Dynamic allocation of executors requires the external " +
201-
"shuffle service. You may enable this through spark.shuffle.service.enabled.")
198+
if (!conf.get(config.SHUFFLE_SERVICE_ENABLED)) {
199+
if (conf.get(config.DYN_ALLOCATION_SHUFFLE_TRACKING)) {
200+
logWarning("Dynamic allocation without a shuffle service is an experimental feature.")
201+
} else {
202+
throw new SparkException("Dynamic allocation of executors requires the external " +
203+
"shuffle service. You may enable this through spark.shuffle.service.enabled.")
204+
}
202205
}
203206

204207
if (executorAllocationRatio > 1.0 || executorAllocationRatio <= 0.0) {
@@ -214,6 +217,7 @@ private[spark] class ExecutorAllocationManager(
214217
def start(): Unit = {
215218
listenerBus.addToManagementQueue(listener)
216219
listenerBus.addToManagementQueue(executorMonitor)
220+
cleaner.foreach(_.attachListener(executorMonitor))
217221

218222
val scheduleTask = new Runnable() {
219223
override def run(): Unit = {

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -578,14 +578,22 @@ class SparkContext(config: SparkConf) extends Logging {
578578
None
579579
}
580580

581-
// Optionally scale number of executors dynamically based on workload. Exposed for testing.
581+
_cleaner =
582+
if (_conf.get(CLEANER_REFERENCE_TRACKING)) {
583+
Some(new ContextCleaner(this))
584+
} else {
585+
None
586+
}
587+
_cleaner.foreach(_.start())
588+
582589
val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf)
583590
_executorAllocationManager =
584591
if (dynamicAllocationEnabled) {
585592
schedulerBackend match {
586593
case b: ExecutorAllocationClient =>
587594
Some(new ExecutorAllocationManager(
588-
schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf))
595+
schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf,
596+
cleaner = cleaner))
589597
case _ =>
590598
None
591599
}
@@ -594,14 +602,6 @@ class SparkContext(config: SparkConf) extends Logging {
594602
}
595603
_executorAllocationManager.foreach(_.start())
596604

597-
_cleaner =
598-
if (_conf.get(CLEANER_REFERENCE_TRACKING)) {
599-
Some(new ContextCleaner(this))
600-
} else {
601-
None
602-
}
603-
_cleaner.foreach(_.start())
604-
605605
setupAndStartListenerBus()
606606
postEnvironmentUpdate()
607607
postApplicationStart()

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -367,6 +367,17 @@ package object config {
367367
.checkValue(_ >= 0L, "Timeout must be >= 0.")
368368
.createWithDefault(60)
369369

370+
private[spark] val DYN_ALLOCATION_SHUFFLE_TRACKING =
371+
ConfigBuilder("spark.dynamicAllocation.shuffleTracking.enabled")
372+
.booleanConf
373+
.createWithDefault(false)
374+
375+
private[spark] val DYN_ALLOCATION_SHUFFLE_TIMEOUT =
376+
ConfigBuilder("spark.dynamicAllocation.shuffleTimeout")
377+
.timeConf(TimeUnit.MILLISECONDS)
378+
.checkValue(_ >= 0L, "Timeout must be >= 0.")
379+
.createWithDefault(Long.MaxValue)
380+
370381
private[spark] val DYN_ALLOCATION_SCHEDULER_BACKLOG_TIMEOUT =
371382
ConfigBuilder("spark.dynamicAllocation.schedulerBacklogTimeout")
372383
.timeConf(TimeUnit.SECONDS).createWithDefault(1)

core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,8 @@ class StageInfo(
3737
val parentIds: Seq[Int],
3838
val details: String,
3939
val taskMetrics: TaskMetrics = null,
40-
private[spark] val taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty) {
40+
private[spark] val taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty,
41+
private[spark] val shuffleDepId: Option[Int] = None) {
4142
/** When this stage was submitted from the DAGScheduler to a TaskScheduler. */
4243
var submissionTime: Option[Long] = None
4344
/** Time when all tasks in the stage completed or when the stage was cancelled. */
@@ -90,6 +91,10 @@ private[spark] object StageInfo {
9091
): StageInfo = {
9192
val ancestorRddInfos = stage.rdd.getNarrowAncestors.map(RDDInfo.fromRdd)
9293
val rddInfos = Seq(RDDInfo.fromRdd(stage.rdd)) ++ ancestorRddInfos
94+
val shuffleDepId = stage match {
95+
case sms: ShuffleMapStage => Option(sms.shuffleDep).map(_.shuffleId)
96+
case _ => None
97+
}
9398
new StageInfo(
9499
stage.id,
95100
attemptId,
@@ -99,6 +104,7 @@ private[spark] object StageInfo {
99104
stage.parents.map(_.id),
100105
stage.details,
101106
taskMetrics,
102-
taskLocalityPreferences)
107+
taskLocalityPreferences,
108+
shuffleDepId)
103109
}
104110
}

0 commit comments

Comments
 (0)