Skip to content

Commit b2fef8b

Browse files
author
Andrew Or
committed
Abstract the success field to the general response
This was common basically across all response messages.
1 parent 6c57b4b commit b2fef8b

File tree

10 files changed

+43
-53
lines changed

10 files changed

+43
-53
lines changed

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,15 +93,19 @@ object SparkSubmit {
9393
* Standalone cluster mode only.
9494
*/
9595
private def kill(args: SparkSubmitArguments): Unit = {
96-
new StandaloneRestClient().killDriver(args.master, args.driverToKill)
96+
val client = new StandaloneRestClient
97+
val response = client.killDriver(args.master, args.driverToKill)
98+
printStream.println(response.toJson)
9799
}
98100

99101
/**
100102
* Request the status of an existing driver using the REST application submission protocol.
101103
* Standalone cluster mode only.
102104
*/
103105
private def requestStatus(args: SparkSubmitArguments): Unit = {
104-
new StandaloneRestClient().requestDriverStatus(args.master, args.driverToRequestStatusFor)
106+
val client = new StandaloneRestClient
107+
val response = client.requestDriverStatus(args.master, args.driverToRequestStatusFor)
108+
printStream.println(response.toJson)
105109
}
106110

107111
/**

core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import java.util.jar.JarFile
2222

2323
import scala.collection.mutable.{ArrayBuffer, HashMap}
2424

25-
import org.apache.spark.deploy.SparkSubmitAction.SparkSubmitAction
25+
import org.apache.spark.deploy.SparkSubmitAction._
2626
import org.apache.spark.util.Utils
2727

2828
/**
@@ -52,6 +52,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
5252
var verbose: Boolean = false
5353
var isPython: Boolean = false
5454
var pyFiles: String = null
55+
var action: SparkSubmitAction = null
5556
val sparkProperties: HashMap[String, String] = new HashMap[String, String]()
5657

5758
// Standalone cluster mode only
@@ -62,17 +63,6 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
6263

6364
private val restEnabledKey = "spark.submit.rest.enabled"
6465

65-
def action: SparkSubmitAction = {
66-
(driverToKill, driverToRequestStatusFor) match {
67-
case (null, null) => SparkSubmitAction.SUBMIT
68-
case (_, null) => SparkSubmitAction.KILL
69-
case (null, _) => SparkSubmitAction.REQUEST_STATUS
70-
case _ => SparkSubmit.printErrorAndExit(
71-
"Requested to both kill and request status for a driver. Choose only one.")
72-
null // never reached
73-
}
74-
}
75-
7666
/** Default properties present in the currently defined defaults file. */
7767
lazy val defaultSparkProperties: HashMap[String, String] = {
7868
val defaultProperties = new HashMap[String, String]()
@@ -189,14 +179,17 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
189179
if (name == null && primaryResource != null) {
190180
name = Utils.stripDirectory(primaryResource)
191181
}
182+
183+
// Action should be SUBMIT unless otherwise specified
184+
action = Option(action).getOrElse(SUBMIT)
192185
}
193186

194187
/** Ensure that required fields exists. Call this only once all defaults are loaded. */
195188
private def validateArguments(): Unit = {
196189
action match {
197-
case SparkSubmitAction.SUBMIT => validateSubmitArguments()
198-
case SparkSubmitAction.KILL => validateKillArguments()
199-
case SparkSubmitAction.REQUEST_STATUS => validateStatusRequestArguments()
190+
case SUBMIT => validateSubmitArguments()
191+
case KILL => validateKillArguments()
192+
case REQUEST_STATUS => validateStatusRequestArguments()
200193
}
201194
}
202195

@@ -379,10 +372,18 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
379372

380373
case ("--kill") :: value :: tail =>
381374
driverToKill = value
375+
if (action != null) {
376+
SparkSubmit.printErrorAndExit(s"Action cannot be both $action and $KILL.")
377+
}
378+
action = KILL
382379
parse(tail)
383380

384381
case ("--status") :: value :: tail =>
385382
driverToRequestStatusFor = value
383+
if (action != null) {
384+
SparkSubmit.printErrorAndExit(s"Action cannot be both $action and $REQUEST_STATUS.")
385+
}
386+
action = REQUEST_STATUS
386387
parse(tail)
387388

388389
case ("--supervise") :: tail =>

core/src/main/scala/org/apache/spark/deploy/rest/DriverStatusRequest.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,8 @@ package org.apache.spark.deploy.rest
2222
*/
2323
class DriverStatusRequest extends SubmitRestProtocolRequest {
2424
private val driverId = new SubmitRestProtocolField[String]("driverId")
25-
2625
def getDriverId: String = driverId.toString
2726
def setDriverId(s: String): this.type = setField(driverId, s)
28-
2927
protected override def doValidate(): Unit = {
3028
super.doValidate()
3129
assertFieldIsSet(driverId)

core/src/main/scala/org/apache/spark/deploy/rest/DriverStatusResponse.scala

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,27 +22,23 @@ package org.apache.spark.deploy.rest
2222
*/
2323
class DriverStatusResponse extends SubmitRestProtocolResponse {
2424
private val driverId = new SubmitRestProtocolField[String]("driverId")
25-
private val success = new SubmitRestProtocolField[Boolean]("success")
2625
// standalone cluster mode only
2726
private val driverState = new SubmitRestProtocolField[String]("driverState")
2827
private val workerId = new SubmitRestProtocolField[String]("workerId")
2928
private val workerHostPort = new SubmitRestProtocolField[String]("workerHostPort")
3029

3130
def getDriverId: String = driverId.toString
32-
def getSuccess: String = success.toString
3331
def getDriverState: String = driverState.toString
3432
def getWorkerId: String = workerId.toString
3533
def getWorkerHostPort: String = workerHostPort.toString
3634

3735
def setDriverId(s: String): this.type = setField(driverId, s)
38-
def setSuccess(s: String): this.type = setBooleanField(success, s)
3936
def setDriverState(s: String): this.type = setField(driverState, s)
4037
def setWorkerId(s: String): this.type = setField(workerId, s)
4138
def setWorkerHostPort(s: String): this.type = setField(workerHostPort, s)
4239

4340
protected override def doValidate(): Unit = {
4441
super.doValidate()
4542
assertFieldIsSet(driverId)
46-
assertFieldIsSet(success)
4743
}
4844
}

core/src/main/scala/org/apache/spark/deploy/rest/ErrorResponse.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,12 @@ package org.apache.spark.deploy.rest
2121
* An error response message used in the REST application submission protocol.
2222
*/
2323
class ErrorResponse extends SubmitRestProtocolResponse {
24+
// request was unsuccessful
25+
setSuccess("false")
26+
2427
protected override def doValidate(): Unit = {
2528
super.doValidate()
2629
assertFieldIsSet(message)
30+
assert(!getSuccess.toBoolean, "The 'success' field cannot be true in an error response.")
2731
}
2832
}

core/src/main/scala/org/apache/spark/deploy/rest/KillDriverRequest.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,8 @@ package org.apache.spark.deploy.rest
2222
*/
2323
class KillDriverRequest extends SubmitRestProtocolRequest {
2424
private val driverId = new SubmitRestProtocolField[String]("driverId")
25-
2625
def getDriverId: String = driverId.toString
2726
def setDriverId(s: String): this.type = setField(driverId, s)
28-
2927
protected override def doValidate(): Unit = {
3028
super.doValidate()
3129
assertFieldIsSet(driverId)

core/src/main/scala/org/apache/spark/deploy/rest/KillDriverResponse.scala

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,17 +22,10 @@ package org.apache.spark.deploy.rest
2222
*/
2323
class KillDriverResponse extends SubmitRestProtocolResponse {
2424
private val driverId = new SubmitRestProtocolField[String]("driverId")
25-
private val success = new SubmitRestProtocolField[Boolean]("success")
26-
2725
def getDriverId: String = driverId.toString
28-
def getSuccess: String = success.toString
29-
3026
def setDriverId(s: String): this.type = setField(driverId, s)
31-
def setSuccess(s: String): this.type = setBooleanField(success, s)
32-
3327
protected override def doValidate(): Unit = {
3428
super.doValidate()
3529
assertFieldIsSet(driverId)
36-
assertFieldIsSet(success)
3730
}
3831
}

core/src/main/scala/org/apache/spark/deploy/rest/SubmitDriverResponse.scala

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,17 +21,7 @@ package org.apache.spark.deploy.rest
2121
* A response to the [[SubmitDriverRequest]] in the REST application submission protocol.
2222
*/
2323
class SubmitDriverResponse extends SubmitRestProtocolResponse {
24-
private val success = new SubmitRestProtocolField[Boolean]("success")
2524
private val driverId = new SubmitRestProtocolField[String]("driverId")
26-
27-
def getSuccess: String = success.toString
2825
def getDriverId: String = driverId.toString
29-
30-
def setSuccess(s: String): this.type = setBooleanField(success, s)
3126
def setDriverId(s: String): this.type = setField(driverId, s)
32-
33-
protected override def doValidate(): Unit = {
34-
super.doValidate()
35-
assertFieldIsSet(success)
36-
}
3727
}

core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestClient.scala

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ private[spark] abstract class SubmitRestClient extends Logging {
3939
val url = getHttpUrl(args.master)
4040
val request = constructSubmitRequest(args)
4141
val response = sendHttp(url, request)
42-
handleResponse(response)
42+
validateResponse(response)
4343
}
4444

4545
/** Request that the REST server kill the specified driver. */
@@ -48,7 +48,7 @@ private[spark] abstract class SubmitRestClient extends Logging {
4848
val url = getHttpUrl(master)
4949
val request = constructKillRequest(master, driverId)
5050
val response = sendHttp(url, request)
51-
handleResponse(response)
51+
validateResponse(response)
5252
}
5353

5454
/** Request the status of the specified driver from the REST server. */
@@ -57,7 +57,7 @@ private[spark] abstract class SubmitRestClient extends Logging {
5757
val url = getHttpUrl(master)
5858
val request = constructStatusRequest(master, driverId)
5959
val response = sendHttp(url, request)
60-
handleResponse(response)
60+
validateResponse(response)
6161
}
6262

6363
/** Return the HTTP URL of the REST server that corresponds to the given master URL. */
@@ -95,14 +95,10 @@ private[spark] abstract class SubmitRestClient extends Logging {
9595
}
9696
}
9797

98-
/** Validate the response and log any error messages produced by the server. */
99-
private def handleResponse(response: SubmitRestProtocolResponse): SubmitRestProtocolResponse = {
98+
/** Validate the response... */
99+
private def validateResponse(response: SubmitRestProtocolResponse): SubmitRestProtocolResponse = {
100100
try {
101101
response.validate()
102-
response match {
103-
case error: ErrorResponse => logError(s"Server returned error:\n${error.getMessage}")
104-
case _ =>
105-
}
106102
} catch {
107103
case e: SubmitRestProtocolException =>
108104
throw new SubmitRestProtocolException("Malformed response received from server", e)

core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolMessage.scala

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -155,10 +155,20 @@ abstract class SubmitRestProtocolRequest extends SubmitRestProtocolMessage {
155155
*/
156156
abstract class SubmitRestProtocolResponse extends SubmitRestProtocolMessage {
157157
protected override val sparkVersion = new SubmitRestProtocolField[String]("server_spark_version")
158-
def getServerSparkVersion: String = sparkVersion.toString
159-
def setServerSparkVersion(s: String): this.type = setField(sparkVersion, s)
158+
private val success = new SubmitRestProtocolField[Boolean]("success")
159+
160160
override def getSparkVersion: String = getServerSparkVersion
161+
def getServerSparkVersion: String = sparkVersion.toString
162+
def getSuccess: String = success.toString
163+
161164
override def setSparkVersion(s: String) = setServerSparkVersion(s)
165+
def setServerSparkVersion(s: String): this.type = setField(sparkVersion, s)
166+
def setSuccess(s: String): this.type = setBooleanField(success, s)
167+
168+
protected override def doValidate(): Unit = {
169+
super.doValidate()
170+
assertFieldIsSet(success)
171+
}
162172
}
163173

164174
object SubmitRestProtocolMessage {

0 commit comments

Comments
 (0)