Skip to content

Commit b575c55

Browse files
author
jinxing
committed
[SPARK-19450] Replace askWithRetry with askSync.
1 parent 2f523fa commit b575c55

File tree

24 files changed

+58
-119
lines changed

24 files changed

+58
-119
lines changed

core/src/main/scala/org/apache/spark/MapOutputTracker.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
9999
*/
100100
protected def askTracker[T: ClassTag](message: Any): T = {
101101
try {
102-
trackerEndpoint.askWithRetry[T](message)
102+
trackerEndpoint.askSync[T](message)
103103
} catch {
104104
case e: Exception =>
105105
logError("Error communicating with MapOutputTracker", e)

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -608,7 +608,7 @@ class SparkContext(config: SparkConf) extends Logging {
608608
Some(Utils.getThreadDump())
609609
} else {
610610
val endpointRef = env.blockManager.master.getExecutorEndpointRef(executorId).get
611-
Some(endpointRef.askWithRetry[Array[ThreadStackTrace]](TriggerThreadDump))
611+
Some(endpointRef.askSync[Array[ThreadStackTrace]](TriggerThreadDump))
612612
}
613613
} catch {
614614
case e: Exception =>

core/src/main/scala/org/apache/spark/deploy/Client.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ private class ClientEndpoint(
123123
Thread.sleep(5000)
124124
logInfo("... polling master for driver state")
125125
val statusResponse =
126-
activeMasterEndpoint.askWithRetry[DriverStatusResponse](RequestDriverStatus(driverId))
126+
activeMasterEndpoint.askSync[DriverStatusResponse](RequestDriverStatus(driverId))
127127
if (statusResponse.found) {
128128
logInfo(s"State of $driverId is ${statusResponse.state.get}")
129129
// Worker node, if present

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1045,7 +1045,7 @@ private[deploy] object Master extends Logging {
10451045
val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)
10461046
val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,
10471047
new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))
1048-
val portsResponse = masterEndpoint.askWithRetry[BoundPortsResponse](BoundPortsRequest)
1048+
val portsResponse = masterEndpoint.askSync[BoundPortsResponse](BoundPortsRequest)
10491049
(rpcEnv, portsResponse.webUIPort, portsResponse.restPort)
10501050
}
10511051
}

core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ private[ui] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app")
3434
/** Executor details for a particular application */
3535
def render(request: HttpServletRequest): Seq[Node] = {
3636
val appId = request.getParameter("appId")
37-
val state = master.askWithRetry[MasterStateResponse](RequestMasterState)
37+
val state = master.askSync[MasterStateResponse](RequestMasterState)
3838
val app = state.activeApps.find(_.id == appId)
3939
.getOrElse(state.completedApps.find(_.id == appId).orNull)
4040
if (app == null) {

core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
3333
private val master = parent.masterEndpointRef
3434

3535
def getMasterState: MasterStateResponse = {
36-
master.askWithRetry[MasterStateResponse](RequestMasterState)
36+
master.askSync[MasterStateResponse](RequestMasterState)
3737
}
3838

3939
override def renderJson(request: HttpServletRequest): JValue = {

core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ private[rest] class StandaloneKillRequestServlet(masterEndpoint: RpcEndpointRef,
7171
extends KillRequestServlet {
7272

7373
protected def handleKill(submissionId: String): KillSubmissionResponse = {
74-
val response = masterEndpoint.askWithRetry[DeployMessages.KillDriverResponse](
74+
val response = masterEndpoint.askSync[DeployMessages.KillDriverResponse](
7575
DeployMessages.RequestKillDriver(submissionId))
7676
val k = new KillSubmissionResponse
7777
k.serverSparkVersion = sparkVersion
@@ -89,7 +89,7 @@ private[rest] class StandaloneStatusRequestServlet(masterEndpoint: RpcEndpointRe
8989
extends StatusRequestServlet {
9090

9191
protected def handleStatus(submissionId: String): SubmissionStatusResponse = {
92-
val response = masterEndpoint.askWithRetry[DeployMessages.DriverStatusResponse](
92+
val response = masterEndpoint.askSync[DeployMessages.DriverStatusResponse](
9393
DeployMessages.RequestDriverStatus(submissionId))
9494
val message = response.exception.map { s"Exception from the cluster:\n" + formatException(_) }
9595
val d = new SubmissionStatusResponse
@@ -174,7 +174,7 @@ private[rest] class StandaloneSubmitRequestServlet(
174174
requestMessage match {
175175
case submitRequest: CreateSubmissionRequest =>
176176
val driverDescription = buildDriverDescription(submitRequest)
177-
val response = masterEndpoint.askWithRetry[DeployMessages.SubmitDriverResponse](
177+
val response = masterEndpoint.askSync[DeployMessages.SubmitDriverResponse](
178178
DeployMessages.RequestSubmitDriver(driverDescription))
179179
val submitResponse = new CreateSubmissionResponse
180180
submitResponse.serverSparkVersion = sparkVersion

core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerPage.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,12 @@ private[ui] class WorkerPage(parent: WorkerWebUI) extends WebUIPage("") {
3434
private val workerEndpoint = parent.worker.self
3535

3636
override def renderJson(request: HttpServletRequest): JValue = {
37-
val workerState = workerEndpoint.askWithRetry[WorkerStateResponse](RequestWorkerState)
37+
val workerState = workerEndpoint.askSync[WorkerStateResponse](RequestWorkerState)
3838
JsonProtocol.writeWorkerState(workerState)
3939
}
4040

4141
def render(request: HttpServletRequest): Seq[Node] = {
42-
val workerState = workerEndpoint.askWithRetry[WorkerStateResponse](RequestWorkerState)
42+
val workerState = workerEndpoint.askSync[WorkerStateResponse](RequestWorkerState)
4343

4444
val executorHeaders = Seq("ExecutorID", "Cores", "State", "Memory", "Job Details", "Logs")
4545
val runningExecutors = workerState.executors

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
199199
new SecurityManager(executorConf),
200200
clientMode = true)
201201
val driver = fetcher.setupEndpointRefByURI(driverUrl)
202-
val cfg = driver.askWithRetry[SparkAppConfig](RetrieveSparkAppConfig)
202+
val cfg = driver.askSync[SparkAppConfig](RetrieveSparkAppConfig)
203203
val props = cfg.sparkProperties ++ Seq[(String, String)](("spark.app.id", appId))
204204
fetcher.shutdown()
205205

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -677,7 +677,7 @@ private[spark] class Executor(
677677

678678
val message = Heartbeat(executorId, accumUpdates.toArray, env.blockManager.blockManagerId)
679679
try {
680-
val response = heartbeatReceiverRef.askWithRetry[HeartbeatResponse](
680+
val response = heartbeatReceiverRef.askSync[HeartbeatResponse](
681681
message, RpcTimeout(conf, "spark.executor.heartbeatInterval", "10s"))
682682
if (response.reregisterBlockManager) {
683683
logInfo("Told to re-register on heartbeat")

0 commit comments

Comments
 (0)