Skip to content

Commit 69c348c

Browse files
committed
Cancel retryTimer on restart of Worker or AppClient
1 parent 55100da commit 69c348c

File tree

2 files changed

+15
-8
lines changed

2 files changed

+15
-8
lines changed

core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ private[spark] class AppClient(
6060
var master: ActorSelection = null
6161
var alreadyDisconnected = false // To avoid calling listener.disconnected() multiple times
6262
var alreadyDead = false // To avoid calling listener.dead() multiple times
63+
var retryTimer: Option[Cancellable] = None
6364

6465
override def preStart() {
6566
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
@@ -83,22 +84,21 @@ private[spark] class AppClient(
8384

8485
def registerWithMaster() {
8586
tryRegisterAllMasters()
86-
8787
import context.dispatcher
8888
var retries = 0
89-
lazy val retryTimer: Cancellable =
89+
retryTimer = Some {
9090
context.system.scheduler.schedule(REGISTRATION_TIMEOUT, REGISTRATION_TIMEOUT) {
9191
retries += 1
9292
if (registered) {
93-
retryTimer.cancel()
93+
retryTimer.foreach(_.cancel())
9494
} else if (retries >= REGISTRATION_RETRIES) {
9595
logError("All masters are unresponsive! Giving up.")
9696
markDead()
9797
} else {
9898
tryRegisterAllMasters()
9999
}
100100
}
101-
retryTimer // start timer
101+
}
102102
}
103103

104104
def changeMaster(url: String) {
@@ -179,6 +179,11 @@ private[spark] class AppClient(
179179
alreadyDead = true
180180
}
181181
}
182+
183+
override def postStop() {
184+
retryTimer.foreach(_.cancel())
185+
}
186+
182187
}
183188

184189
def start() {

core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,8 @@ private[spark] class Worker(
102102
val metricsSystem = MetricsSystem.createMetricsSystem("worker", conf, securityMgr)
103103
val workerSource = new WorkerSource(this)
104104

105+
var retryTimer: Option[Cancellable] = None
106+
105107
def coresFree: Int = cores - coresUsed
106108
def memoryFree: Int = memory - memoryUsed
107109

@@ -163,21 +165,20 @@ private[spark] class Worker(
163165

164166
def registerWithMaster() {
165167
tryRegisterAllMasters()
166-
167168
var retries = 0
168-
lazy val retryTimer: Cancellable =
169+
retryTimer = Some {
169170
context.system.scheduler.schedule(REGISTRATION_TIMEOUT, REGISTRATION_TIMEOUT) {
170171
retries += 1
171172
if (registered) {
172-
retryTimer.cancel()
173+
retryTimer.foreach(_.cancel())
173174
} else if (retries >= REGISTRATION_RETRIES) {
174175
logError("All masters are unresponsive! Giving up.")
175176
System.exit(1)
176177
} else {
177178
tryRegisterAllMasters()
178179
}
179180
}
180-
retryTimer // start timer
181+
}
181182
}
182183

183184
override def receive = {
@@ -346,6 +347,7 @@ private[spark] class Worker(
346347
}
347348

348349
override def postStop() {
350+
retryTimer.foreach(_.cancel())
349351
executors.values.foreach(_.kill())
350352
drivers.values.foreach(_.kill())
351353
webUi.stop()

0 commit comments

Comments
 (0)