@@ -143,6 +143,13 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
143143 // Ignoring the task kill since the executor is not registered.
144144 logWarning(s " Attempted to kill task $taskId for unknown executor $executorId. " )
145145 }
146+
147+ case RemoveExecutor (executorId, reason) =>
148+ // We will remove the executor's state and cannot restore it. However, the connection
149+ // between the driver and the executor may be still alive so that the executor won't exit
150+ // automatically, so try to tell the executor to stop itself. See SPARK-13519.
151+ executorDataMap.get(executorId).foreach(_.executorEndpoint.send(StopExecutor ))
152+ removeExecutor(executorId, reason)
146153 }
147154
148155 override def receiveAndReply (context : RpcCallContext ): PartialFunction [Any , Unit ] = {
@@ -196,14 +203,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
196203 }
197204 context.reply(true )
198205
199- case RemoveExecutor (executorId, reason) =>
200- // We will remove the executor's state and cannot restore it. However, the connection
201- // between the driver and the executor may be still alive so that the executor won't exit
202- // automatically, so try to tell the executor to stop itself. See SPARK-13519.
203- executorDataMap.get(executorId).foreach(_.executorEndpoint.send(StopExecutor ))
204- removeExecutor(executorId, reason)
205- context.reply(true )
206-
207206 case RetrieveSparkProps =>
208207 context.reply(sparkProperties)
209208 }
@@ -407,20 +406,13 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
407406 conf.getInt(" spark.default.parallelism" , math.max(totalCoreCount.get(), 2 ))
408407 }
409408
410- // Called by subclasses when notified of a lost worker
409+ /**
410+ * Called by subclasses when notified of a lost worker. It just fires the message and returns
411+ * at once.
412+ */
411413 protected def removeExecutor (executorId : String , reason : ExecutorLossReason ): Unit = {
412- try {
413- driverEndpoint.askWithRetry[Boolean ](RemoveExecutor (executorId, reason))
414- } catch {
415- case e : Exception =>
416- throw new SparkException (" Error notifying standalone scheduler's driver endpoint" , e)
417- }
418- }
419-
420- protected def removeExecutorAsync (
421- executorId : String ,
422- reason : ExecutorLossReason ): Future [Boolean ] = {
423- driverEndpoint.ask[Boolean ](RemoveExecutor (executorId, reason))
414+ // Only log the failure since we don't care about the result.
415+ driverEndpoint.send(RemoveExecutor (executorId, reason))
424416 }
425417
426418 def sufficientResourcesRegistered (): Boolean = true
0 commit comments