Skip to content

Commit 9229433

Browse files
author
Andrew Or
committed
Reduce duplicate naming in REST field
This commit also fixes a the standalone REST protocol test, which would fail with ClassCastException if the server returns error for the same reason explained in the previous commit.
1 parent ade28fd commit 9229433

File tree

11 files changed

+91
-65
lines changed

11 files changed

+91
-65
lines changed

core/src/main/scala/org/apache/spark/deploy/rest/DriverStatusRequest.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,11 @@ package org.apache.spark.deploy.rest
2121
* A request to query the status of a driver in the REST application submission protocol.
2222
*/
2323
class DriverStatusRequest extends SubmitRestProtocolRequest {
24-
private val driverId = new SubmitRestProtocolField[String]("driverId")
24+
private val driverId = new SubmitRestProtocolField[String]
2525
def getDriverId: String = driverId.toString
2626
def setDriverId(s: String): this.type = setField(driverId, s)
2727
protected override def doValidate(): Unit = {
2828
super.doValidate()
29-
assertFieldIsSet(driverId)
29+
assertFieldIsSet(driverId, "driverId")
3030
}
3131
}

core/src/main/scala/org/apache/spark/deploy/rest/DriverStatusResponse.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,11 @@ package org.apache.spark.deploy.rest
2121
* A response to the [[DriverStatusRequest]] in the REST application submission protocol.
2222
*/
2323
class DriverStatusResponse extends SubmitRestProtocolResponse {
24-
private val driverId = new SubmitRestProtocolField[String]("driverId")
24+
private val driverId = new SubmitRestProtocolField[String]
2525
// standalone cluster mode only
26-
private val driverState = new SubmitRestProtocolField[String]("driverState")
27-
private val workerId = new SubmitRestProtocolField[String]("workerId")
28-
private val workerHostPort = new SubmitRestProtocolField[String]("workerHostPort")
26+
private val driverState = new SubmitRestProtocolField[String]
27+
private val workerId = new SubmitRestProtocolField[String]
28+
private val workerHostPort = new SubmitRestProtocolField[String]
2929

3030
def getDriverId: String = driverId.toString
3131
def getDriverState: String = driverState.toString
@@ -39,6 +39,6 @@ class DriverStatusResponse extends SubmitRestProtocolResponse {
3939

4040
protected override def doValidate(): Unit = {
4141
super.doValidate()
42-
assertFieldIsSet(driverId)
42+
assertFieldIsSet(driverId, "driverId")
4343
}
4444
}

core/src/main/scala/org/apache/spark/deploy/rest/ErrorResponse.scala

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,20 @@
1717

1818
package org.apache.spark.deploy.rest
1919

20+
import com.fasterxml.jackson.annotation.JsonIgnore
21+
2022
/**
2123
* An error response message used in the REST application submission protocol.
2224
*/
2325
class ErrorResponse extends SubmitRestProtocolResponse {
24-
// request was unsuccessful
2526
setSuccess("false")
2627

28+
// Don't bother logging success = false in the JSON
29+
@JsonIgnore
30+
override def getSuccess: String = super.getSuccess
31+
2732
protected override def doValidate(): Unit = {
2833
super.doValidate()
29-
assertFieldIsSet(message)
30-
assert(!getSuccess.toBoolean, "The 'success' field cannot be true in an error response.")
34+
assertFieldIsSet(message, "message")
3135
}
3236
}

core/src/main/scala/org/apache/spark/deploy/rest/KillDriverRequest.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,11 @@ package org.apache.spark.deploy.rest
2121
* A request to kill a driver in the REST application submission protocol.
2222
*/
2323
class KillDriverRequest extends SubmitRestProtocolRequest {
24-
private val driverId = new SubmitRestProtocolField[String]("driverId")
24+
private val driverId = new SubmitRestProtocolField[String]
2525
def getDriverId: String = driverId.toString
2626
def setDriverId(s: String): this.type = setField(driverId, s)
2727
protected override def doValidate(): Unit = {
2828
super.doValidate()
29-
assertFieldIsSet(driverId)
29+
assertFieldIsSet(driverId, "driverId")
3030
}
3131
}

core/src/main/scala/org/apache/spark/deploy/rest/KillDriverResponse.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,11 @@ package org.apache.spark.deploy.rest
2121
* A response to the [[KillDriverRequest]] in the REST application submission protocol.
2222
*/
2323
class KillDriverResponse extends SubmitRestProtocolResponse {
24-
private val driverId = new SubmitRestProtocolField[String]("driverId")
24+
private val driverId = new SubmitRestProtocolField[String]
2525
def getDriverId: String = driverId.toString
2626
def setDriverId(s: String): this.type = setField(driverId, s)
2727
protected override def doValidate(): Unit = {
2828
super.doValidate()
29-
assertFieldIsSet(driverId)
29+
assertFieldIsSet(driverId, "driverId")
3030
}
3131
}

core/src/main/scala/org/apache/spark/deploy/rest/SubmitDriverRequest.scala

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -29,20 +29,20 @@ import org.apache.spark.util.JsonProtocol
2929
* A request to submit a driver in the REST application submission protocol.
3030
*/
3131
class SubmitDriverRequest extends SubmitRestProtocolRequest {
32-
private val appName = new SubmitRestProtocolField[String]("appName")
33-
private val appResource = new SubmitRestProtocolField[String]("appResource")
34-
private val mainClass = new SubmitRestProtocolField[String]("mainClass")
35-
private val jars = new SubmitRestProtocolField[String]("jars")
36-
private val files = new SubmitRestProtocolField[String]("files")
37-
private val pyFiles = new SubmitRestProtocolField[String]("pyFiles")
38-
private val driverMemory = new SubmitRestProtocolField[String]("driverMemory")
39-
private val driverCores = new SubmitRestProtocolField[Int]("driverCores")
40-
private val driverExtraJavaOptions = new SubmitRestProtocolField[String]("driverExtraJavaOptions")
41-
private val driverExtraClassPath = new SubmitRestProtocolField[String]("driverExtraClassPath")
42-
private val driverExtraLibraryPath = new SubmitRestProtocolField[String]("driverExtraLibraryPath")
43-
private val superviseDriver = new SubmitRestProtocolField[Boolean]("superviseDriver")
44-
private val executorMemory = new SubmitRestProtocolField[String]("executorMemory")
45-
private val totalExecutorCores = new SubmitRestProtocolField[Int]("totalExecutorCores")
32+
private val appName = new SubmitRestProtocolField[String]
33+
private val appResource = new SubmitRestProtocolField[String]
34+
private val mainClass = new SubmitRestProtocolField[String]
35+
private val jars = new SubmitRestProtocolField[String]
36+
private val files = new SubmitRestProtocolField[String]
37+
private val pyFiles = new SubmitRestProtocolField[String]
38+
private val driverMemory = new SubmitRestProtocolField[String]
39+
private val driverCores = new SubmitRestProtocolField[Int]
40+
private val driverExtraJavaOptions = new SubmitRestProtocolField[String]
41+
private val driverExtraClassPath = new SubmitRestProtocolField[String]
42+
private val driverExtraLibraryPath = new SubmitRestProtocolField[String]
43+
private val superviseDriver = new SubmitRestProtocolField[Boolean]
44+
private val executorMemory = new SubmitRestProtocolField[String]
45+
private val totalExecutorCores = new SubmitRestProtocolField[Int]
4646

4747
// Special fields
4848
private val appArgs = new ArrayBuffer[String]
@@ -140,7 +140,7 @@ class SubmitDriverRequest extends SubmitRestProtocolRequest {
140140

141141
protected override def doValidate(): Unit = {
142142
super.doValidate()
143-
assertFieldIsSet(appName)
144-
assertFieldIsSet(appResource)
143+
assertFieldIsSet(appName, "appName")
144+
assertFieldIsSet(appResource, "appResource")
145145
}
146146
}

core/src/main/scala/org/apache/spark/deploy/rest/SubmitDriverResponse.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ package org.apache.spark.deploy.rest
2121
* A response to the [[SubmitDriverRequest]] in the REST application submission protocol.
2222
*/
2323
class SubmitDriverResponse extends SubmitRestProtocolResponse {
24-
private val driverId = new SubmitRestProtocolField[String]("driverId")
24+
private val driverId = new SubmitRestProtocolField[String]
2525
def getDriverId: String = driverId.toString
2626
def setDriverId(s: String): this.type = setField(driverId, s)
2727
}

core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolField.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.spark.deploy.rest
2020
/**
2121
* A field used in [[SubmitRestProtocolMessage]]s.
2222
*/
23-
class SubmitRestProtocolField[T](val name: String) {
23+
class SubmitRestProtocolField[T] {
2424
protected var value: Option[T] = None
2525
def isSet: Boolean = value.isDefined
2626
def getValue: Option[T] = value

core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolMessage.scala

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.spark.deploy.rest
2020
import com.fasterxml.jackson.annotation._
2121
import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility
2222
import com.fasterxml.jackson.annotation.JsonInclude.Include
23-
import com.fasterxml.jackson.databind.ObjectMapper
23+
import com.fasterxml.jackson.databind.{ObjectMapper, SerializationFeature}
2424
import org.json4s.JsonAST._
2525
import org.json4s.jackson.JsonMethods._
2626

@@ -42,7 +42,7 @@ abstract class SubmitRestProtocolMessage {
4242
val messageType = Utils.getFormattedClassName(this)
4343
protected val action: String = messageType
4444
protected val sparkVersion: SubmitRestProtocolField[String]
45-
protected val message = new SubmitRestProtocolField[String]("message")
45+
protected val message = new SubmitRestProtocolField[String]
4646

4747
// Required for JSON de/serialization and not explicitly used
4848
private def getAction: String = action
@@ -64,7 +64,8 @@ abstract class SubmitRestProtocolMessage {
6464
def toJson: String = {
6565
validate()
6666
val mapper = new ObjectMapper
67-
pretty(parse(mapper.writeValueAsString(this)))
67+
mapper.enable(SerializationFeature.INDENT_OUTPUT)
68+
mapper.writeValueAsString(this)
6869
}
6970

7071
/**
@@ -85,14 +86,13 @@ abstract class SubmitRestProtocolMessage {
8586
if (action == null) {
8687
throw new SubmitRestMissingFieldException(s"The action field is missing in $messageType")
8788
}
88-
assertFieldIsSet(sparkVersion)
8989
}
9090

9191
/** Assert that the specified field is set in this message. */
92-
protected def assertFieldIsSet(field: SubmitRestProtocolField[_]): Unit = {
92+
protected def assertFieldIsSet(field: SubmitRestProtocolField[_], name: String): Unit = {
9393
if (!field.isSet) {
9494
throw new SubmitRestMissingFieldException(
95-
s"Field '${field.name}' is missing in message $messageType.")
95+
s"Field '$name' is missing in message $messageType.")
9696
}
9797
}
9898

@@ -143,19 +143,23 @@ abstract class SubmitRestProtocolMessage {
143143
* An abstract request sent from the client in the REST application submission protocol.
144144
*/
145145
abstract class SubmitRestProtocolRequest extends SubmitRestProtocolMessage {
146-
protected override val sparkVersion = new SubmitRestProtocolField[String]("client_spark_version")
146+
protected override val sparkVersion = new SubmitRestProtocolField[String]
147147
def getClientSparkVersion: String = sparkVersion.toString
148148
def setClientSparkVersion(s: String): this.type = setField(sparkVersion, s)
149149
override def getSparkVersion: String = getClientSparkVersion
150150
override def setSparkVersion(s: String) = setClientSparkVersion(s)
151+
protected override def doValidate(): Unit = {
152+
super.doValidate()
153+
assertFieldIsSet(sparkVersion, "clientSparkVersion")
154+
}
151155
}
152156

153157
/**
154158
* An abstract response sent from the server in the REST application submission protocol.
155159
*/
156160
abstract class SubmitRestProtocolResponse extends SubmitRestProtocolMessage {
157-
protected override val sparkVersion = new SubmitRestProtocolField[String]("server_spark_version")
158-
private val success = new SubmitRestProtocolField[Boolean]("success")
161+
protected override val sparkVersion = new SubmitRestProtocolField[String]
162+
private val success = new SubmitRestProtocolField[Boolean]
159163

160164
override def getSparkVersion: String = getServerSparkVersion
161165
def getServerSparkVersion: String = sparkVersion.toString
@@ -167,7 +171,8 @@ abstract class SubmitRestProtocolResponse extends SubmitRestProtocolMessage {
167171

168172
protected override def doValidate(): Unit = {
169173
super.doValidate()
170-
assertFieldIsSet(success)
174+
assertFieldIsSet(sparkVersion, "serverSparkVersion")
175+
assertFieldIsSet(success, "success")
171176
}
172177
}
173178

core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestProtocolSuite.scala

Lines changed: 29 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ class StandaloneRestProtocolSuite extends FunSuite with BeforeAndAfterAll with B
6060

6161
test("kill empty driver") {
6262
val response = client.killDriver(masterRestUrl, "driver-that-does-not-exist")
63-
val killResponse = getResponse[KillDriverResponse](response, client)
63+
val killResponse = getKillResponse(response)
6464
val killSuccess = killResponse.getSuccess
6565
assert(killSuccess === "false")
6666
}
@@ -71,11 +71,11 @@ class StandaloneRestProtocolSuite extends FunSuite with BeforeAndAfterAll with B
7171
val size = 500
7272
val driverId = submitApplication(resultsFile, numbers, size)
7373
val response = client.killDriver(masterRestUrl, driverId)
74-
val killResponse = getResponse[KillDriverResponse](response, client)
74+
val killResponse = getKillResponse(response)
7575
val killSuccess = killResponse.getSuccess
7676
waitUntilFinished(driverId)
7777
val response2 = client.requestDriverStatus(masterRestUrl, driverId)
78-
val statusResponse = getResponse[DriverStatusResponse](response2, client)
78+
val statusResponse = getStatusResponse(response2)
7979
val statusSuccess = statusResponse.getSuccess
8080
val driverState = statusResponse.getDriverState
8181
assert(killSuccess === "true")
@@ -87,7 +87,7 @@ class StandaloneRestProtocolSuite extends FunSuite with BeforeAndAfterAll with B
8787

8888
test("request status for empty driver") {
8989
val response = client.requestDriverStatus(masterRestUrl, "driver-that-does-not-exist")
90-
val statusResponse = getResponse[DriverStatusResponse](response, client)
90+
val statusResponse = getStatusResponse(response)
9191
val statusSuccess = statusResponse.getSuccess
9292
assert(statusSuccess === "false")
9393
}
@@ -130,7 +130,7 @@ class StandaloneRestProtocolSuite extends FunSuite with BeforeAndAfterAll with B
130130
val args = new SparkSubmitArguments(commandLineArgs)
131131
SparkSubmit.prepareSubmitEnvironment(args)
132132
val response = client.submitDriver(args)
133-
val submitResponse = getResponse[SubmitDriverResponse](response, client)
133+
val submitResponse = getSubmitResponse(response)
134134
submitResponse.getDriverId
135135
}
136136

@@ -140,7 +140,7 @@ class StandaloneRestProtocolSuite extends FunSuite with BeforeAndAfterAll with B
140140
val expireTime = System.currentTimeMillis + maxSeconds * 1000
141141
while (!finished) {
142142
val response = client.requestDriverStatus(masterRestUrl, driverId)
143-
val statusResponse = getResponse[DriverStatusResponse](response, client)
143+
val statusResponse = getStatusResponse(response)
144144
val driverState = statusResponse.getDriverState
145145
finished =
146146
driverState != DriverState.SUBMITTED.toString &&
@@ -151,17 +151,30 @@ class StandaloneRestProtocolSuite extends FunSuite with BeforeAndAfterAll with B
151151
}
152152
}
153153

154-
/** Return the response as the expected type, or fail with an informative error message. */
155-
private def getResponse[T <: SubmitRestProtocolResponse](
156-
response: SubmitRestProtocolResponse,
157-
client: StandaloneRestClient): T = {
154+
/** Return the response as a submit driver response, or fail with error otherwise. */
155+
private def getSubmitResponse(response: SubmitRestProtocolResponse): SubmitDriverResponse = {
158156
response match {
159-
case error: ErrorResponse =>
160-
fail(s"Error from the server:\n${error.getMessage}")
161-
case _ =>
162-
client.getResponse[T](response).getOrElse {
163-
fail(s"Response type was unexpected: ${response.toJson}")
164-
}
157+
case s: SubmitDriverResponse => s
158+
case e: ErrorResponse => fail(s"Server returned error: ${e.toJson}")
159+
case r => fail(s"Expected submit response. Actual: ${r.toJson}")
160+
}
161+
}
162+
163+
/** Return the response as a kill driver response, or fail with error otherwise. */
164+
private def getKillResponse(response: SubmitRestProtocolResponse): KillDriverResponse = {
165+
response match {
166+
case k: KillDriverResponse => k
167+
case e: ErrorResponse => fail(s"Server returned error: ${e.toJson}")
168+
case r => fail(s"Expected kill response. Actual: ${r.toJson}")
169+
}
170+
}
171+
172+
/** Return the response as a driver status response, or fail with error otherwise. */
173+
private def getStatusResponse(response: SubmitRestProtocolResponse): DriverStatusResponse = {
174+
response match {
175+
case s: DriverStatusResponse => s
176+
case e: ErrorResponse => fail(s"Server returned error: ${e.toJson}")
177+
case r => fail(s"Expected status response. Actual: ${r.toJson}")
165178
}
166179
}
167180

0 commit comments

Comments
 (0)