Skip to content

Commit fabe4c4

Browse files
committed
Reuse more code in YarnClientSchedulerBackend
We implement a while loop to monitor an application's state in four separate places (stable/Client, alpha/Client, and twice in YarnClientSchedulerBackend). This commit reduces this to one.
1 parent 3f941dc commit fabe4c4

File tree

3 files changed

+72
-61
lines changed

3 files changed

+72
-61
lines changed

yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala

Lines changed: 32 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -435,36 +435,51 @@ private[spark] trait ClientBase extends Logging {
435435

436436
/**
437437
* Report the state of an application until it has exited, either successfully or
438-
* due to some failure.
438+
* due to some failure, then return the application state.
439+
*
440+
* @param returnOnRunning Whether to also return the application state when it is RUNNING.
441+
* @param logApplicationReport Whether to log details of the application report every iteration.
442+
* @return state of the application, one of FINISHED, FAILED, KILLED, and RUNNING.
439443
*/
440-
def monitorApplication(appId: ApplicationId): Unit = {
444+
def monitorApplication(
445+
appId: ApplicationId,
446+
returnOnRunning: Boolean = false,
447+
logApplicationReport: Boolean = true): YarnApplicationState = {
441448
val interval = sparkConf.getLong("spark.yarn.report.interval", 1000)
442449
while (true) {
443450
Thread.sleep(interval)
444451
val report = getApplicationReport(appId)
445452
val state = report.getYarnApplicationState
446453

447-
logInfo(s"Application report from ResourceManager for ${appId.getId} (state: $state)")
448-
449-
logDebug(
450-
s"\t full application identifier: $appId\n" +
451-
s"\t clientToken: ${getClientToken(report)}\n" +
452-
s"\t appDiagnostics: ${report.getDiagnostics}\n" +
453-
s"\t appMasterHost: ${report.getHost}\n" +
454-
s"\t appQueue: ${report.getQueue}\n" +
455-
s"\t appMasterRpcPort: ${report.getRpcPort}\n" +
456-
s"\t appStartTime: ${report.getStartTime}\n" +
457-
s"\t yarnAppState: $state\n" +
458-
s"\t distributedFinalState: ${report.getFinalApplicationStatus}\n" +
459-
s"\t appTrackingUrl: ${report.getTrackingUrl}\n" +
460-
s"\t appUser: ${report.getUser}")
454+
if (logApplicationReport) {
455+
logInfo(s"Application report from ResourceManager for application ${appId.getId} " +
456+
s"(state: $state)")
457+
logDebug(
458+
s"\t full application identifier: $appId\n" +
459+
s"\t clientToken: ${getClientToken(report)}\n" +
460+
s"\t appDiagnostics: ${report.getDiagnostics}\n" +
461+
s"\t appMasterHost: ${report.getHost}\n" +
462+
s"\t appQueue: ${report.getQueue}\n" +
463+
s"\t appMasterRpcPort: ${report.getRpcPort}\n" +
464+
s"\t appStartTime: ${report.getStartTime}\n" +
465+
s"\t yarnAppState: $state\n" +
466+
s"\t distributedFinalState: ${report.getFinalApplicationStatus}\n" +
467+
s"\t appTrackingUrl: ${report.getTrackingUrl}\n" +
468+
s"\t appUser: ${report.getUser}")
469+
}
461470

462471
if (state == YarnApplicationState.FINISHED ||
463472
state == YarnApplicationState.FAILED ||
464473
state == YarnApplicationState.KILLED) {
465-
return
474+
return state
475+
}
476+
477+
if (returnOnRunning && state == YarnApplicationState.RUNNING) {
478+
return state
466479
}
467480
}
481+
// Never reached, but keeps compiler happy
482+
throw new SparkException("While loop is depleted! This should never happen...")
468483
}
469484

470485
/**

yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala

Lines changed: 39 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.scheduler.cluster
1919

2020
import org.apache.hadoop.yarn.api.records.{ApplicationId, YarnApplicationState}
2121
import org.apache.spark.{SparkException, Logging, SparkContext}
22-
import org.apache.spark.deploy.yarn.{Client, ClientArguments, YarnSparkHadoopUtil}
22+
import org.apache.spark.deploy.yarn.{Client, ClientArguments}
2323
import org.apache.spark.scheduler.TaskSchedulerImpl
2424

2525
import scala.collection.mutable.ArrayBuffer
@@ -36,7 +36,6 @@ private[spark] class YarnClientSchedulerBackend(
3636

3737
var client: Client = null
3838
var appId: ApplicationId = null
39-
var checkerThread: Thread = null
4039
var stopping: Boolean = false
4140
var totalExpectedExecutors = 0
4241

@@ -83,66 +82,62 @@ private[spark] class YarnClientSchedulerBackend(
8382
totalExpectedExecutors = args.numExecutors
8483
client = new Client(args, conf)
8584
appId = client.submitApplication()
86-
waitForApp()
87-
checkerThread = yarnApplicationStateCheckerThread()
85+
waitForApplication()
86+
asyncMonitorApplication()
8887
}
8988

90-
def waitForApp() {
91-
92-
// TODO : need a better way to find out whether the executors are ready or not
93-
// maybe by resource usage report?
94-
while(true) {
95-
val report = client.getApplicationReport(appId)
96-
97-
logInfo("Application report from ASM: \n" +
98-
"\t appMasterRpcPort: " + report.getRpcPort() + "\n" +
99-
"\t appStartTime: " + report.getStartTime() + "\n" +
100-
"\t yarnAppState: " + report.getYarnApplicationState() + "\n"
101-
)
102-
103-
// Ready to go, or already gone.
104-
val state = report.getYarnApplicationState()
105-
if (state == YarnApplicationState.RUNNING) {
106-
return
107-
} else if (state == YarnApplicationState.FINISHED ||
108-
state == YarnApplicationState.FAILED ||
109-
state == YarnApplicationState.KILLED) {
110-
throw new SparkException("Yarn application already ended," +
111-
"might be killed or not able to launch application master.")
112-
}
113-
114-
Thread.sleep(1000)
89+
/**
90+
* Report the state of the application until it is running.
91+
* If the application has finished, failed or been killed in the process, throw an exception.
92+
* This assumes both `client` and `appId` have already been set.
93+
*/
94+
private def waitForApplication(): Unit = {
95+
assert(client != null && appId != null, "Application has not been submitted yet!")
96+
val state = client.monitorApplication(appId, returnOnRunning = true) // blocking
97+
if (state == YarnApplicationState.FINISHED ||
98+
state == YarnApplicationState.FAILED ||
99+
state == YarnApplicationState.KILLED) {
100+
throw new SparkException("Yarn application has already ended! " +
101+
"It might have been killed or unable to launch application master.")
102+
}
103+
if (state == YarnApplicationState.RUNNING) {
104+
logInfo(s"Application ${appId.getId} has started running.")
115105
}
116106
}
117107

118-
private def yarnApplicationStateCheckerThread(): Thread = {
108+
/**
109+
* Monitor the application state in a separate thread.
110+
* If the application has exited for any reason, stop the SparkContext.
111+
* This assumes both `client` and `appId` have already been set.
112+
*/
113+
private def asyncMonitorApplication(): Thread = {
114+
assert(client != null && appId != null, "Application has not been submitted yet!")
119115
val t = new Thread {
120116
override def run() {
121-
while (!stopping) {
122-
val report = client.getApplicationReport(appId)
123-
val state = report.getYarnApplicationState()
124-
if (state == YarnApplicationState.FINISHED || state == YarnApplicationState.KILLED
125-
|| state == YarnApplicationState.FAILED) {
126-
logError(s"Yarn application already ended: $state")
127-
sc.stop()
128-
stopping = true
129-
}
130-
Thread.sleep(1000L)
117+
val state = client.monitorApplication(appId, logApplicationReport = false) // blocking
118+
if (state == YarnApplicationState.FINISHED ||
119+
state == YarnApplicationState.KILLED ||
120+
state == YarnApplicationState.FAILED) {
121+
logWarning(s"Yarn application has exited: $state")
122+
sc.stop()
123+
stopping = true
131124
}
132-
checkerThread = null
133-
Thread.currentThread().interrupt()
134125
}
135126
}
136-
t.setName("Yarn Application State Checker")
127+
t.setName("Yarn Application State Monitor")
137128
t.setDaemon(true)
138129
t.start()
139130
t
140131
}
141132

133+
/**
134+
* Stop the scheduler. This assumes `start()` has already been called.
135+
*/
142136
override def stop() {
137+
assert(client != null, "Attempted to stop this scheduler before starting it!")
143138
stopping = true
144139
super.stop()
145-
client.stop
140+
client.stop()
146141
logInfo("Stopped")
147142
}
148143

yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ private[spark] class Client(
109109
yarnClient.getApplicationReport(appId)
110110

111111
/** */
112+
// FIXME: This could throw NPE
112113
override def getClientToken(report: ApplicationReport): String =
113114
report.getClientToAMToken.toString
114115
}

0 commit comments

Comments
 (0)