Skip to content

Commit b7be05a

Browse files
erenavsarogullarikayousterhout
authored andcommitted
[SPARK-19567][CORE][SCHEDULER] Support some Schedulable variables immutability and access
## What changes were proposed in this pull request? Some `Schedulable` Entities(`Pool` and `TaskSetManager`) variables need refactoring for _immutability_ and _access modifiers_ levels as follows: - From `var` to `val` (if there is no requirement): This is important to support immutability as much as possible. - Sample => `Pool`: `weight`, `minShare`, `priority`, `name` and `taskSetSchedulingAlgorithm`. - Access modifiers: Specially, `var`s access needs to be restricted from other parts of codebase to prevent potential side effects. - `TaskSetManager`: `tasksSuccessful`, `totalResultSize`, `calculatedTasks` etc... This PR is related with #15604 and has been created seperatedly to keep patch content as isolated and to help the reviewers. ## How was this patch tested? Added new UTs and existing UT coverage. Author: erenavsarogullari <[email protected]> Closes #16905 from erenavsarogullari/SPARK-19567.
1 parent 746a558 commit b7be05a

File tree

7 files changed

+58
-39
lines changed

7 files changed

+58
-39
lines changed

core/src/main/scala/org/apache/spark/scheduler/Pool.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,24 +37,24 @@ private[spark] class Pool(
3737

3838
val schedulableQueue = new ConcurrentLinkedQueue[Schedulable]
3939
val schedulableNameToSchedulable = new ConcurrentHashMap[String, Schedulable]
40-
var weight = initWeight
41-
var minShare = initMinShare
40+
val weight = initWeight
41+
val minShare = initMinShare
4242
var runningTasks = 0
43-
var priority = 0
43+
val priority = 0
4444

4545
// A pool's stage id is used to break the tie in scheduling.
4646
var stageId = -1
47-
var name = poolName
47+
val name = poolName
4848
var parent: Pool = null
4949

50-
var taskSetSchedulingAlgorithm: SchedulingAlgorithm = {
50+
private val taskSetSchedulingAlgorithm: SchedulingAlgorithm = {
5151
schedulingMode match {
5252
case SchedulingMode.FAIR =>
5353
new FairSchedulingAlgorithm()
5454
case SchedulingMode.FIFO =>
5555
new FIFOSchedulingAlgorithm()
5656
case _ =>
57-
val msg = "Unsupported scheduling mode: $schedulingMode. Use FAIR or FIFO instead."
57+
val msg = s"Unsupported scheduling mode: $schedulingMode. Use FAIR or FIFO instead."
5858
throw new IllegalArgumentException(msg)
5959
}
6060
}

core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ private[spark] class TaskSchedulerImpl private[scheduler](
5959
extends TaskScheduler with Logging
6060
{
6161

62+
import TaskSchedulerImpl._
63+
6264
def this(sc: SparkContext) = {
6365
this(
6466
sc,
@@ -130,17 +132,18 @@ private[spark] class TaskSchedulerImpl private[scheduler](
130132

131133
val mapOutputTracker = SparkEnv.get.mapOutputTracker
132134

133-
var schedulableBuilder: SchedulableBuilder = null
134-
var rootPool: Pool = null
135+
private var schedulableBuilder: SchedulableBuilder = null
135136
// default scheduler is FIFO
136-
private val schedulingModeConf = conf.get("spark.scheduler.mode", "FIFO")
137+
private val schedulingModeConf = conf.get(SCHEDULER_MODE_PROPERTY, SchedulingMode.FIFO.toString)
137138
val schedulingMode: SchedulingMode = try {
138139
SchedulingMode.withName(schedulingModeConf.toUpperCase)
139140
} catch {
140141
case e: java.util.NoSuchElementException =>
141-
throw new SparkException(s"Unrecognized spark.scheduler.mode: $schedulingModeConf")
142+
throw new SparkException(s"Unrecognized $SCHEDULER_MODE_PROPERTY: $schedulingModeConf")
142143
}
143144

145+
val rootPool: Pool = new Pool("", schedulingMode, 0, 0)
146+
144147
// This is a var so that we can reset it for testing purposes.
145148
private[spark] var taskResultGetter = new TaskResultGetter(sc.env, this)
146149

@@ -150,16 +153,15 @@ private[spark] class TaskSchedulerImpl private[scheduler](
150153

151154
def initialize(backend: SchedulerBackend) {
152155
this.backend = backend
153-
// temporarily set rootPool name to empty
154-
rootPool = new Pool("", schedulingMode, 0, 0)
155156
schedulableBuilder = {
156157
schedulingMode match {
157158
case SchedulingMode.FIFO =>
158159
new FIFOSchedulableBuilder(rootPool)
159160
case SchedulingMode.FAIR =>
160161
new FairSchedulableBuilder(rootPool, conf)
161162
case _ =>
162-
throw new IllegalArgumentException(s"Unsupported spark.scheduler.mode: $schedulingMode")
163+
throw new IllegalArgumentException(s"Unsupported $SCHEDULER_MODE_PROPERTY: " +
164+
s"$schedulingMode")
163165
}
164166
}
165167
schedulableBuilder.buildPools()
@@ -683,6 +685,9 @@ private[spark] class TaskSchedulerImpl private[scheduler](
683685

684686

685687
private[spark] object TaskSchedulerImpl {
688+
689+
val SCHEDULER_MODE_PROPERTY = "spark.scheduler.mode"
690+
686691
/**
687692
* Used to balance containers across hosts.
688693
*

core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -78,24 +78,24 @@ private[spark] class TaskSetManager(
7878
private val numFailures = new Array[Int](numTasks)
7979

8080
val taskAttempts = Array.fill[List[TaskInfo]](numTasks)(Nil)
81-
var tasksSuccessful = 0
81+
private[scheduler] var tasksSuccessful = 0
8282

83-
var weight = 1
84-
var minShare = 0
83+
val weight = 1
84+
val minShare = 0
8585
var priority = taskSet.priority
8686
var stageId = taskSet.stageId
8787
val name = "TaskSet_" + taskSet.id
8888
var parent: Pool = null
89-
var totalResultSize = 0L
90-
var calculatedTasks = 0
89+
private var totalResultSize = 0L
90+
private var calculatedTasks = 0
9191

9292
private[scheduler] val taskSetBlacklistHelperOpt: Option[TaskSetBlacklist] = {
9393
blacklistTracker.map { _ =>
9494
new TaskSetBlacklist(conf, stageId, clock)
9595
}
9696
}
9797

98-
val runningTasksSet = new HashSet[Long]
98+
private[scheduler] val runningTasksSet = new HashSet[Long]
9999

100100
override def runningTasks: Int = runningTasksSet.size
101101

@@ -105,7 +105,7 @@ private[spark] class TaskSetManager(
105105
// state until all tasks have finished running; we keep TaskSetManagers that are in the zombie
106106
// state in order to continue to track and account for the running tasks.
107107
// TODO: We should kill any running task attempts when the task set manager becomes a zombie.
108-
var isZombie = false
108+
private[scheduler] var isZombie = false
109109

110110
// Set of pending tasks for each executor. These collections are actually
111111
// treated as stacks, in which new tasks are added to the end of the
@@ -129,17 +129,17 @@ private[spark] class TaskSetManager(
129129
private val pendingTasksForRack = new HashMap[String, ArrayBuffer[Int]]
130130

131131
// Set containing pending tasks with no locality preferences.
132-
var pendingTasksWithNoPrefs = new ArrayBuffer[Int]
132+
private[scheduler] var pendingTasksWithNoPrefs = new ArrayBuffer[Int]
133133

134134
// Set containing all pending tasks (also used as a stack, as above).
135-
val allPendingTasks = new ArrayBuffer[Int]
135+
private val allPendingTasks = new ArrayBuffer[Int]
136136

137137
// Tasks that can be speculated. Since these will be a small fraction of total
138138
// tasks, we'll just hold them in a HashSet.
139-
val speculatableTasks = new HashSet[Int]
139+
private[scheduler] val speculatableTasks = new HashSet[Int]
140140

141141
// Task index, start and finish time for each task attempt (indexed by task ID)
142-
val taskInfos = new HashMap[Long, TaskInfo]
142+
private val taskInfos = new HashMap[Long, TaskInfo]
143143

144144
// How frequently to reprint duplicate exceptions in full, in milliseconds
145145
val EXCEPTION_PRINT_INTERVAL =
@@ -148,7 +148,7 @@ private[spark] class TaskSetManager(
148148
// Map of recent exceptions (identified by string representation and top stack frame) to
149149
// duplicate count (how many times the same exception has appeared) and time the full exception
150150
// was printed. This should ideally be an LRU map that can drop old exceptions automatically.
151-
val recentExceptions = HashMap[String, (Int, Long)]()
151+
private val recentExceptions = HashMap[String, (Int, Long)]()
152152

153153
// Figure out the current map output tracker epoch and set it on all tasks
154154
val epoch = sched.mapOutputTracker.getEpoch
@@ -169,20 +169,22 @@ private[spark] class TaskSetManager(
169169
* This allows a performance optimization, of skipping levels that aren't relevant (eg., skip
170170
* PROCESS_LOCAL if no tasks could be run PROCESS_LOCAL for the current set of executors).
171171
*/
172-
var myLocalityLevels = computeValidLocalityLevels()
173-
var localityWaits = myLocalityLevels.map(getLocalityWait) // Time to wait at each level
172+
private[scheduler] var myLocalityLevels = computeValidLocalityLevels()
173+
174+
// Time to wait at each level
175+
private[scheduler] var localityWaits = myLocalityLevels.map(getLocalityWait)
174176

175177
// Delay scheduling variables: we keep track of our current locality level and the time we
176178
// last launched a task at that level, and move up a level when localityWaits[curLevel] expires.
177179
// We then move down if we manage to launch a "more local" task.
178-
var currentLocalityIndex = 0 // Index of our current locality level in validLocalityLevels
179-
var lastLaunchTime = clock.getTimeMillis() // Time we last launched a task at this level
180+
private var currentLocalityIndex = 0 // Index of our current locality level in validLocalityLevels
181+
private var lastLaunchTime = clock.getTimeMillis() // Time we last launched a task at this level
180182

181183
override def schedulableQueue: ConcurrentLinkedQueue[Schedulable] = null
182184

183185
override def schedulingMode: SchedulingMode = SchedulingMode.NONE
184186

185-
var emittedTaskSizeWarning = false
187+
private[scheduler] var emittedTaskSizeWarning = false
186188

187189
/** Add a task to all the pending-task lists that it should be on. */
188190
private def addPendingTask(index: Int) {

core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -110,8 +110,8 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
110110
val cancelledStages = new HashSet[Int]()
111111

112112
val taskScheduler = new TaskScheduler() {
113-
override def rootPool: Pool = null
114-
override def schedulingMode: SchedulingMode = SchedulingMode.NONE
113+
override def schedulingMode: SchedulingMode = SchedulingMode.FIFO
114+
override def rootPool: Pool = new Pool("", schedulingMode, 0, 0)
115115
override def start() = {}
116116
override def stop() = {}
117117
override def executorHeartbeatReceived(
@@ -542,8 +542,8 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
542542
// make sure that the DAGScheduler doesn't crash when the TaskScheduler
543543
// doesn't implement killTask()
544544
val noKillTaskScheduler = new TaskScheduler() {
545-
override def rootPool: Pool = null
546-
override def schedulingMode: SchedulingMode = SchedulingMode.NONE
545+
override def schedulingMode: SchedulingMode = SchedulingMode.FIFO
546+
override def rootPool: Pool = new Pool("", schedulingMode, 0, 0)
547547
override def start(): Unit = {}
548548
override def stop(): Unit = {}
549549
override def submitTasks(taskSet: TaskSet): Unit = {

core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,8 @@ private class DummySchedulerBackend extends SchedulerBackend {
7373

7474
private class DummyTaskScheduler extends TaskScheduler {
7575
var initialized = false
76-
override def rootPool: Pool = null
77-
override def schedulingMode: SchedulingMode = SchedulingMode.NONE
76+
override def schedulingMode: SchedulingMode = SchedulingMode.FIFO
77+
override def rootPool: Pool = new Pool("", schedulingMode, 0, 0)
7878
override def start(): Unit = {}
7979
override def stop(): Unit = {}
8080
override def submitTasks(taskSet: TaskSet): Unit = {}

core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,12 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext {
286286
assert(testPool.getSchedulableByName(taskSetManager.name) === taskSetManager)
287287
}
288288

289+
test("Pool should throw IllegalArgumentException when schedulingMode is not supported") {
290+
intercept[IllegalArgumentException] {
291+
new Pool("TestPool", SchedulingMode.NONE, 0, 1)
292+
}
293+
}
294+
289295
private def verifyPool(rootPool: Pool, poolName: String, expectedInitMinShare: Int,
290296
expectedInitWeight: Int, expectedSchedulingMode: SchedulingMode): Unit = {
291297
val selectedPool = rootPool.getSchedulableByName(poolName)

core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -75,9 +75,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
7575

7676
def setupScheduler(confs: (String, String)*): TaskSchedulerImpl = {
7777
val conf = new SparkConf().setMaster("local").setAppName("TaskSchedulerImplSuite")
78-
confs.foreach { case (k, v) =>
79-
conf.set(k, v)
80-
}
78+
confs.foreach { case (k, v) => conf.set(k, v) }
8179
sc = new SparkContext(conf)
8280
taskScheduler = new TaskSchedulerImpl(sc)
8381
setupHelper()
@@ -904,4 +902,12 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
904902
assert(taskDescs.size === 1)
905903
assert(taskDescs.head.executorId === "exec2")
906904
}
905+
906+
test("TaskScheduler should throw IllegalArgumentException when schedulingMode is not supported") {
907+
intercept[IllegalArgumentException] {
908+
val taskScheduler = setupScheduler(
909+
TaskSchedulerImpl.SCHEDULER_MODE_PROPERTY -> SchedulingMode.NONE.toString)
910+
taskScheduler.initialize(new FakeSchedulerBackend)
911+
}
912+
}
907913
}

0 commit comments

Comments
 (0)