Skip to content

Commit 6fc7670

Browse files
author
Andrew Or
committed
Report REST server response back to the user
1 parent 40e6095 commit 6fc7670

File tree

2 files changed

+37
-33
lines changed

2 files changed

+37
-33
lines changed

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 4 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -106,25 +106,15 @@ object SparkSubmit {
106106
* Standalone cluster mode only.
107107
*/
108108
private def kill(args: SparkSubmitArguments): Unit = {
109-
val client = new StandaloneRestClient
110-
val response = client.killSubmission(args.master, args.driverToKill)
111-
response match {
112-
case k: KillSubmissionResponse => handleRestResponse(k)
113-
case r => handleUnexpectedRestResponse(r)
114-
}
109+
new StandaloneRestClient().killSubmission(args.master, args.driverToKill)
115110
}
116111

117112
/**
118113
* Request the status of an existing driver using the REST application submission protocol.
119114
* Standalone cluster mode only.
120115
*/
121116
private def requestStatus(args: SparkSubmitArguments): Unit = {
122-
val client = new StandaloneRestClient
123-
val response = client.requestSubmissionStatus(args.master, args.driverToRequestStatusFor)
124-
response match {
125-
case s: SubmissionStatusResponse => handleRestResponse(s)
126-
case r => handleUnexpectedRestResponse(r)
127-
}
117+
new StandaloneRestClient().requestSubmissionStatus(args.master, args.driverToRequestStatusFor)
128118
}
129119

130120
/**
@@ -150,8 +140,8 @@ object SparkSubmit {
150140
} catch {
151141
// Fail over to use the legacy submission gateway
152142
case e: SubmitRestConnectionException =>
153-
printStream.println(s"Master endpoint ${args.master} was not a " +
154-
s"REST server. Falling back to legacy submission gateway instead.")
143+
printWarning(s"Master endpoint ${args.master} was not a REST server. " +
144+
"Falling back to legacy submission gateway instead.")
155145
args.useRest = false
156146
submit(args)
157147
}
@@ -540,17 +530,6 @@ object SparkSubmit {
540530
}
541531
}
542532

543-
/** Log the response sent by the server in the REST application submission protocol. */
544-
private def handleRestResponse(response: SubmitRestProtocolResponse): Unit = {
545-
printStream.println(s"Server responded with ${response.messageType}:\n${response.toJson}")
546-
}
547-
548-
/** Log an appropriate error if the response sent by the server is not of the expected type. */
549-
private def handleUnexpectedRestResponse(unexpected: SubmitRestProtocolResponse): Unit = {
550-
printStream.println(
551-
s"Error: Server responded with message of unexpected type ${unexpected.messageType}.")
552-
}
553-
554533
/**
555534
* Return whether the given primary resource represents a user jar.
556535
*/

core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestClient.scala

Lines changed: 33 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,11 @@ private[spark] class StandaloneRestClient extends Logging {
6565
val request = constructSubmitRequest(appArgs, sparkProperties, environmentVariables)
6666
val response = postJson(url, request.toJson)
6767
response match {
68-
case s: CreateSubmissionResponse => reportSubmissionStatus(master, s)
69-
case _ => // unexpected type, let upstream caller handle it
68+
case s: CreateSubmissionResponse =>
69+
reportSubmissionStatus(master, s)
70+
handleRestResponse(s)
71+
case unexpected =>
72+
handleUnexpectedRestResponse(unexpected)
7073
}
7174
response
7275
}
@@ -75,14 +78,27 @@ private[spark] class StandaloneRestClient extends Logging {
7578
def killSubmission(master: String, submissionId: String): SubmitRestProtocolResponse = {
7679
logInfo(s"Submitting a request to kill submission $submissionId in $master.")
7780
validateMaster(master)
78-
post(getKillUrl(master, submissionId))
81+
val response = post(getKillUrl(master, submissionId))
82+
response match {
83+
case k: KillSubmissionResponse => handleRestResponse(k)
84+
case unexpected => handleUnexpectedRestResponse(unexpected)
85+
}
86+
response
7987
}
8088

8189
/** Request the status of a submission from the server. */
82-
def requestSubmissionStatus(master: String, submissionId: String): SubmitRestProtocolResponse = {
90+
def requestSubmissionStatus(
91+
master: String,
92+
submissionId: String,
93+
quiet: Boolean = false): SubmitRestProtocolResponse = {
8394
logInfo(s"Submitting a request for the status of submission $submissionId in $master.")
8495
validateMaster(master)
85-
get(getStatusUrl(master, submissionId))
96+
val response = get(getStatusUrl(master, submissionId))
97+
response match {
98+
case s: SubmissionStatusResponse => if (!quiet) { handleRestResponse(s) }
99+
case unexpected => handleUnexpectedRestResponse(unexpected)
100+
}
101+
response
86102
}
87103

88104
/** Send a GET request to the specified URL. */
@@ -224,7 +240,7 @@ private[spark] class StandaloneRestClient extends Logging {
224240
*/
225241
private def pollSubmissionStatus(master: String, submissionId: String): Unit = {
226242
(1 to REPORT_DRIVER_STATUS_MAX_TRIES).foreach { _ =>
227-
val response = requestSubmissionStatus(master, submissionId)
243+
val response = requestSubmissionStatus(master, submissionId, quiet = true)
228244
val statusResponse = response match {
229245
case s: SubmissionStatusResponse => s
230246
case _ => return // unexpected type, let upstream caller handle it
@@ -253,6 +269,16 @@ private[spark] class StandaloneRestClient extends Logging {
253269
}
254270
logError(s"Error: Master did not recognize driver $submissionId.")
255271
}
272+
273+
/** Log the response sent by the server in the REST application submission protocol. */
274+
private def handleRestResponse(response: SubmitRestProtocolResponse): Unit = {
275+
logInfo(s"Server responded with ${response.messageType}:\n${response.toJson}")
276+
}
277+
278+
/** Log an appropriate error if the response sent by the server is not of the expected type. */
279+
private def handleUnexpectedRestResponse(unexpected: SubmitRestProtocolResponse): Unit = {
280+
logError(s"Error: Server responded with message of unexpected type ${unexpected.messageType}.")
281+
}
256282
}
257283

258284
private[spark] object StandaloneRestClient {
@@ -266,12 +292,11 @@ private[spark] object StandaloneRestClient {
266292
*/
267293
def main(args: Array[String]): Unit = {
268294
val client = new StandaloneRestClient
269-
val appArgs = args.slice(1, args.size)
270295
val master = sys.props.get("spark.master").getOrElse {
271296
throw new IllegalArgumentException("'spark.master' must be set.")
272297
}
273298
val sparkProperties = new SparkConf().getAll.toMap
274299
val environmentVariables = sys.env.filter { case (k, _) => k.startsWith("SPARK_") }
275-
client.createSubmission(master, appArgs, sparkProperties, environmentVariables)
300+
client.createSubmission(master, args, sparkProperties, environmentVariables)
276301
}
277302
}

0 commit comments

Comments
 (0)