Skip to content

Commit 9e2c59e

Browse files
markhamstramateiz
authored andcommitted
[SPARK-1685] Cancel retryTimer on restart of Worker or AppClient
See https://issues.apache.org/jira/browse/SPARK-1685 for a more complete description, but in essence: If the Worker or AppClient actor restarts before successfully registering with Master, multiple retryTimers will be running, which will lead to less than the full number of registration retries being attempted before the new actor is forced to give up. Author: Mark Hamstra <[email protected]> Closes #602 from markhamstra/SPARK-1685 and squashes the following commits: 11cc088 [Mark Hamstra] retryTimer -> registrationRetryTimer 69c348c [Mark Hamstra] Cancel retryTimer on restart of Worker or AppClient
1 parent 45561cd commit 9e2c59e

File tree

2 files changed

+15
-8
lines changed

2 files changed

+15
-8
lines changed

core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ private[spark] class AppClient(
6060
var master: ActorSelection = null
6161
var alreadyDisconnected = false // To avoid calling listener.disconnected() multiple times
6262
var alreadyDead = false // To avoid calling listener.dead() multiple times
63+
var registrationRetryTimer: Option[Cancellable] = None
6364

6465
override def preStart() {
6566
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
@@ -83,22 +84,21 @@ private[spark] class AppClient(
8384

8485
def registerWithMaster() {
8586
tryRegisterAllMasters()
86-
8787
import context.dispatcher
8888
var retries = 0
89-
lazy val retryTimer: Cancellable =
89+
registrationRetryTimer = Some {
9090
context.system.scheduler.schedule(REGISTRATION_TIMEOUT, REGISTRATION_TIMEOUT) {
9191
retries += 1
9292
if (registered) {
93-
retryTimer.cancel()
93+
registrationRetryTimer.foreach(_.cancel())
9494
} else if (retries >= REGISTRATION_RETRIES) {
9595
logError("All masters are unresponsive! Giving up.")
9696
markDead()
9797
} else {
9898
tryRegisterAllMasters()
9999
}
100100
}
101-
retryTimer // start timer
101+
}
102102
}
103103

104104
def changeMaster(url: String) {
@@ -178,6 +178,11 @@ private[spark] class AppClient(
178178
alreadyDead = true
179179
}
180180
}
181+
182+
override def postStop() {
183+
registrationRetryTimer.foreach(_.cancel())
184+
}
185+
181186
}
182187

183188
def start() {

core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,8 @@ private[spark] class Worker(
9595
val metricsSystem = MetricsSystem.createMetricsSystem("worker", conf)
9696
val workerSource = new WorkerSource(this)
9797

98+
var registrationRetryTimer: Option[Cancellable] = None
99+
98100
def coresFree: Int = cores - coresUsed
99101
def memoryFree: Int = memory - memoryUsed
100102

@@ -158,21 +160,20 @@ private[spark] class Worker(
158160

159161
def registerWithMaster() {
160162
tryRegisterAllMasters()
161-
162163
var retries = 0
163-
lazy val retryTimer: Cancellable =
164+
registrationRetryTimer = Some {
164165
context.system.scheduler.schedule(REGISTRATION_TIMEOUT, REGISTRATION_TIMEOUT) {
165166
retries += 1
166167
if (registered) {
167-
retryTimer.cancel()
168+
registrationRetryTimer.foreach(_.cancel())
168169
} else if (retries >= REGISTRATION_RETRIES) {
169170
logError("All masters are unresponsive! Giving up.")
170171
System.exit(1)
171172
} else {
172173
tryRegisterAllMasters()
173174
}
174175
}
175-
retryTimer // start timer
176+
}
176177
}
177178

178179
override def receive = {
@@ -313,6 +314,7 @@ private[spark] class Worker(
313314
}
314315

315316
override def postStop() {
317+
registrationRetryTimer.foreach(_.cancel())
316318
executors.values.foreach(_.kill())
317319
drivers.values.foreach(_.kill())
318320
webUi.stop()

0 commit comments

Comments
 (0)