Skip to content

Commit ff2d3e2

Browse files
author
Marcelo Vanzin
committed
Fix other send call site.
1 parent cb3f972 commit ff2d3e2

File tree

1 file changed

+17
-14
lines changed
  • core/src/main/scala/org/apache/spark/deploy/worker

1 file changed

+17
-14
lines changed

core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -213,18 +213,7 @@ private[deploy] class Worker(
213213
logInfo("Connecting to master " + masterAddress + "...")
214214
val masterEndpoint =
215215
rpcEnv.setupEndpointRef(Master.SYSTEM_NAME, masterAddress, Master.ENDPOINT_NAME)
216-
masterEndpoint.ask[RegisterWorkerResponse](RegisterWorker(
217-
workerId, host, port, self, cores, memory, webUi.boundPort, publicAddress))
218-
.onComplete {
219-
// This is a very fast action so we can use "ThreadUtils.sameThread"
220-
case Success(msg) =>
221-
Utils.tryLogNonFatalError {
222-
handleRegisterResponse(msg)
223-
}
224-
case Failure(e) =>
225-
logError(s"Cannot register with master: $masterAddress", e)
226-
System.exit(1)
227-
}(ThreadUtils.sameThread)
216+
registerWithMaster(masterEndpoint)
228217
} catch {
229218
case ie: InterruptedException => // Cancelled
230219
case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)
@@ -281,8 +270,7 @@ private[deploy] class Worker(
281270
logInfo("Connecting to master " + masterAddress + "...")
282271
val masterEndpoint =
283272
rpcEnv.setupEndpointRef(Master.SYSTEM_NAME, masterAddress, Master.ENDPOINT_NAME)
284-
masterEndpoint.send(RegisterWorker(
285-
workerId, host, port, self, cores, memory, webUi.boundPort, publicAddress))
273+
registerWithMaster(masterEndpoint)
286274
} catch {
287275
case ie: InterruptedException => // Cancelled
288276
case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)
@@ -351,6 +339,21 @@ private[deploy] class Worker(
351339
}
352340
}
353341

342+
private def registerWithMaster(masterEndpoint: RpcEndpointRef): Unit = {
343+
masterEndpoint.ask[RegisterWorkerResponse](RegisterWorker(
344+
workerId, host, port, self, cores, memory, webUi.boundPort, publicAddress))
345+
.onComplete {
346+
// This is a very fast action so we can use "ThreadUtils.sameThread"
347+
case Success(msg) =>
348+
Utils.tryLogNonFatalError {
349+
handleRegisterResponse(msg)
350+
}
351+
case Failure(e) =>
352+
logError(s"Cannot register with master: ${masterEndpoint.address}", e)
353+
System.exit(1)
354+
}(ThreadUtils.sameThread)
355+
}
356+
354357
private def handleRegisterResponse(msg: RegisterWorkerResponse): Unit = synchronized {
355358
msg match {
356359
case RegisteredWorker(masterRef, masterWebUiUrl) =>

0 commit comments

Comments
 (0)