Skip to content

Commit 1f1c03f

Browse files
author
Andrew Or
committed
Use Jackson's DefaultScalaModule to simplify messages
Instead of explicitly defining getters and setters in the messages, we let Jackson's scala module do the work. This simplifies the code for each message significantly, though at the expense of reducing the level of type safety for users who implement their own clients and servers.
1 parent 9229433 commit 1f1c03f

17 files changed

+360
-495
lines changed

core/pom.xml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,14 @@
213213
<groupId>com.codahale.metrics</groupId>
214214
<artifactId>metrics-graphite</artifactId>
215215
</dependency>
216+
<dependency>
217+
<groupId>com.fasterxml.jackson.core</groupId>
218+
<artifactId>jackson-databind</artifactId>
219+
</dependency>
220+
<dependency>
221+
<groupId>com.fasterxml.jackson.module</groupId>
222+
<artifactId>jackson-module-scala_2.10</artifactId>
223+
</dependency>
216224
<dependency>
217225
<groupId>org.apache.derby</groupId>
218226
<artifactId>derby</artifactId>

core/src/main/scala/org/apache/spark/deploy/rest/DriverStatusRequest.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,7 @@ package org.apache.spark.deploy.rest
2121
* A request to query the status of a driver in the REST application submission protocol.
2222
*/
2323
class DriverStatusRequest extends SubmitRestProtocolRequest {
24-
private val driverId = new SubmitRestProtocolField[String]
25-
def getDriverId: String = driverId.toString
26-
def setDriverId(s: String): this.type = setField(driverId, s)
24+
var driverId: String = null
2725
protected override def doValidate(): Unit = {
2826
super.doValidate()
2927
assertFieldIsSet(driverId, "driverId")

core/src/main/scala/org/apache/spark/deploy/rest/DriverStatusResponse.scala

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -21,21 +21,12 @@ package org.apache.spark.deploy.rest
2121
* A response to the [[DriverStatusRequest]] in the REST application submission protocol.
2222
*/
2323
class DriverStatusResponse extends SubmitRestProtocolResponse {
24-
private val driverId = new SubmitRestProtocolField[String]
25-
// standalone cluster mode only
26-
private val driverState = new SubmitRestProtocolField[String]
27-
private val workerId = new SubmitRestProtocolField[String]
28-
private val workerHostPort = new SubmitRestProtocolField[String]
29-
30-
def getDriverId: String = driverId.toString
31-
def getDriverState: String = driverState.toString
32-
def getWorkerId: String = workerId.toString
33-
def getWorkerHostPort: String = workerHostPort.toString
24+
var driverId: String = null
3425

35-
def setDriverId(s: String): this.type = setField(driverId, s)
36-
def setDriverState(s: String): this.type = setField(driverState, s)
37-
def setWorkerId(s: String): this.type = setField(workerId, s)
38-
def setWorkerHostPort(s: String): this.type = setField(workerHostPort, s)
26+
// standalone cluster mode only
27+
var driverState: String = null
28+
var workerId: String = null
29+
var workerHostPort: String = null
3930

4031
protected override def doValidate(): Unit = {
4132
super.doValidate()

core/src/main/scala/org/apache/spark/deploy/rest/ErrorResponse.scala

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,17 @@
1717

1818
package org.apache.spark.deploy.rest
1919

20-
import com.fasterxml.jackson.annotation.JsonIgnore
21-
2220
/**
2321
* An error response message used in the REST application submission protocol.
2422
*/
2523
class ErrorResponse extends SubmitRestProtocolResponse {
26-
setSuccess("false")
2724

28-
// Don't bother logging success = false in the JSON
29-
@JsonIgnore
30-
override def getSuccess: String = super.getSuccess
25+
// request was unsuccessful
26+
success = "false"
3127

3228
protected override def doValidate(): Unit = {
3329
super.doValidate()
3430
assertFieldIsSet(message, "message")
31+
assert(!success.toBoolean, s"The 'success' field must be false in $messageType.")
3532
}
3633
}

core/src/main/scala/org/apache/spark/deploy/rest/KillDriverRequest.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,7 @@ package org.apache.spark.deploy.rest
2121
* A request to kill a driver in the REST application submission protocol.
2222
*/
2323
class KillDriverRequest extends SubmitRestProtocolRequest {
24-
private val driverId = new SubmitRestProtocolField[String]
25-
def getDriverId: String = driverId.toString
26-
def setDriverId(s: String): this.type = setField(driverId, s)
24+
var driverId: String = null
2725
protected override def doValidate(): Unit = {
2826
super.doValidate()
2927
assertFieldIsSet(driverId, "driverId")

core/src/main/scala/org/apache/spark/deploy/rest/KillDriverResponse.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,7 @@ package org.apache.spark.deploy.rest
2121
* A response to the [[KillDriverRequest]] in the REST application submission protocol.
2222
*/
2323
class KillDriverResponse extends SubmitRestProtocolResponse {
24-
private val driverId = new SubmitRestProtocolField[String]
25-
def getDriverId: String = driverId.toString
26-
def setDriverId(s: String): this.type = setField(driverId, s)
24+
var driverId: String = null
2725
protected override def doValidate(): Unit = {
2826
super.doValidate()
2927
assertFieldIsSet(driverId, "driverId")

core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestClient.scala

Lines changed: 39 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -43,14 +43,19 @@ private[spark] class StandaloneRestClient extends SubmitRestClient {
4343
case s: SubmitDriverResponse => s
4444
case _ => return response
4545
}
46-
val submitSuccess = submitResponse.getSuccess.toBoolean
46+
// Report status of submitted driver to user
47+
val submitSuccess = submitResponse.success.toBoolean
4748
if (submitSuccess) {
48-
val driverId = submitResponse.getDriverId
49-
logInfo(s"Driver successfully submitted as $driverId. Polling driver state...")
50-
pollSubmittedDriverStatus(args.master, driverId)
49+
val driverId = submitResponse.driverId
50+
if (driverId != null) {
51+
logInfo(s"Driver successfully submitted as $driverId. Polling driver state...")
52+
pollSubmittedDriverStatus(args.master, driverId)
53+
} else {
54+
logError("Application successfully submitted, but driver ID was not provided!")
55+
}
5156
} else {
52-
val submitMessage = submitResponse.getMessage
53-
logError(s"Application submission failed: $submitMessage")
57+
val failMessage = Option(submitResponse.message).map { ": " + _ }.getOrElse("")
58+
logError("Application submission failed" + failMessage)
5459
}
5560
submitResponse
5661
}
@@ -78,12 +83,12 @@ private[spark] class StandaloneRestClient extends SubmitRestClient {
7883
case s: DriverStatusResponse => s
7984
case _ => return
8085
}
81-
val statusSuccess = statusResponse.getSuccess.toBoolean
86+
val statusSuccess = statusResponse.success.toBoolean
8287
if (statusSuccess) {
83-
val driverState = Option(statusResponse.getDriverState)
84-
val workerId = Option(statusResponse.getWorkerId)
85-
val workerHostPort = Option(statusResponse.getWorkerHostPort)
86-
val exception = Option(statusResponse.getMessage)
88+
val driverState = Option(statusResponse.driverState)
89+
val workerId = Option(statusResponse.workerId)
90+
val workerHostPort = Option(statusResponse.workerHostPort)
91+
val exception = Option(statusResponse.message)
8792
// Log driver state, if present
8893
driverState match {
8994
case Some(state) => logInfo(s"State of driver $driverId is now $state.")
@@ -105,21 +110,21 @@ private[spark] class StandaloneRestClient extends SubmitRestClient {
105110

106111
/** Construct a submit driver request message. */
107112
protected override def constructSubmitRequest(args: SparkSubmitArguments): SubmitDriverRequest = {
108-
val message = new SubmitDriverRequest()
109-
.setSparkVersion(sparkVersion)
110-
.setAppName(args.name)
111-
.setAppResource(args.primaryResource)
112-
.setMainClass(args.mainClass)
113-
.setJars(args.jars)
114-
.setFiles(args.files)
115-
.setDriverMemory(args.driverMemory)
116-
.setDriverCores(args.driverCores)
117-
.setDriverExtraJavaOptions(args.driverExtraJavaOptions)
118-
.setDriverExtraClassPath(args.driverExtraClassPath)
119-
.setDriverExtraLibraryPath(args.driverExtraLibraryPath)
120-
.setSuperviseDriver(args.supervise.toString)
121-
.setExecutorMemory(args.executorMemory)
122-
.setTotalExecutorCores(args.totalExecutorCores)
113+
val message = new SubmitDriverRequest
114+
message.clientSparkVersion = sparkVersion
115+
message.appName = args.name
116+
message.appResource = args.primaryResource
117+
message.mainClass = args.mainClass
118+
message.jars = args.jars
119+
message.files = args.files
120+
message.driverMemory = args.driverMemory
121+
message.driverCores = args.driverCores
122+
message.driverExtraJavaOptions = args.driverExtraJavaOptions
123+
message.driverExtraClassPath = args.driverExtraClassPath
124+
message.driverExtraLibraryPath = args.driverExtraLibraryPath
125+
message.superviseDriver = args.supervise.toString
126+
message.executorMemory = args.executorMemory
127+
message.totalExecutorCores = args.totalExecutorCores
123128
args.childArgs.foreach(message.addAppArg)
124129
args.sparkProperties.foreach { case (k, v) => message.setSparkProperty(k, v) }
125130
sys.env.foreach { case (k, v) =>
@@ -132,18 +137,20 @@ private[spark] class StandaloneRestClient extends SubmitRestClient {
132137
protected override def constructKillRequest(
133138
master: String,
134139
driverId: String): KillDriverRequest = {
135-
new KillDriverRequest()
136-
.setSparkVersion(sparkVersion)
137-
.setDriverId(driverId)
140+
val k = new KillDriverRequest
141+
k.clientSparkVersion = sparkVersion
142+
k.driverId = driverId
143+
k
138144
}
139145

140146
/** Construct a driver status request message. */
141147
protected override def constructStatusRequest(
142148
master: String,
143149
driverId: String): DriverStatusRequest = {
144-
new DriverStatusRequest()
145-
.setSparkVersion(sparkVersion)
146-
.setDriverId(driverId)
150+
val d = new DriverStatusRequest
151+
d.clientSparkVersion = sparkVersion
152+
d.driverId = driverId
153+
d
147154
}
148155

149156
/** Extract the URL portion of the master address. */

core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala

Lines changed: 39 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -58,39 +58,42 @@ private[spark] class StandaloneRestServerHandler(
5858
val driverDescription = buildDriverDescription(request)
5959
val response = AkkaUtils.askWithReply[DeployMessages.SubmitDriverResponse](
6060
DeployMessages.RequestSubmitDriver(driverDescription), masterActor, askTimeout)
61-
new SubmitDriverResponse()
62-
.setSparkVersion(sparkVersion)
63-
.setMessage(response.message)
64-
.setSuccess(response.success.toString)
65-
.setDriverId(response.driverId.orNull)
61+
val s = new SubmitDriverResponse
62+
s.serverSparkVersion = sparkVersion
63+
s.message = response.message
64+
s.success = response.success.toString
65+
s.driverId = response.driverId.orNull
66+
s
6667
}
6768

6869
/** Handle a request to kill a driver. */
6970
protected override def handleKill(request: KillDriverRequest): KillDriverResponse = {
70-
val driverId = request.getDriverId
71+
val driverId = request.driverId
7172
val response = AkkaUtils.askWithReply[DeployMessages.KillDriverResponse](
7273
DeployMessages.RequestKillDriver(driverId), masterActor, askTimeout)
73-
new KillDriverResponse()
74-
.setSparkVersion(sparkVersion)
75-
.setMessage(response.message)
76-
.setDriverId(driverId)
77-
.setSuccess(response.success.toString)
74+
val k = new KillDriverResponse
75+
k.serverSparkVersion = sparkVersion
76+
k.message = response.message
77+
k.driverId = driverId
78+
k.success = response.success.toString
79+
k
7880
}
7981

8082
/** Handle a request for a driver's status. */
8183
protected override def handleStatus(request: DriverStatusRequest): DriverStatusResponse = {
82-
val driverId = request.getDriverId
84+
val driverId = request.driverId
8385
val response = AkkaUtils.askWithReply[DeployMessages.DriverStatusResponse](
8486
DeployMessages.RequestDriverStatus(driverId), masterActor, askTimeout)
8587
val message = response.exception.map { s"Exception from the cluster:\n" + formatException(_) }
86-
new DriverStatusResponse()
87-
.setSparkVersion(sparkVersion)
88-
.setDriverId(driverId)
89-
.setSuccess(response.found.toString)
90-
.setDriverState(response.state.map(_.toString).orNull)
91-
.setWorkerId(response.workerId.orNull)
92-
.setWorkerHostPort(response.workerHostPort.orNull)
93-
.setMessage(message.orNull)
88+
val d = new DriverStatusResponse
89+
d.serverSparkVersion = sparkVersion
90+
d.driverId = driverId
91+
d.success = response.found.toString
92+
d.driverState = response.state.map(_.toString).orNull
93+
d.workerId = response.workerId.orNull
94+
d.workerHostPort = response.workerHostPort.orNull
95+
d.message = message.orNull
96+
d
9497
}
9598

9699
/**
@@ -101,27 +104,27 @@ private[spark] class StandaloneRestServerHandler(
101104
*/
102105
private def buildDriverDescription(request: SubmitDriverRequest): DriverDescription = {
103106
// Required fields, including the main class because python is not yet supported
104-
val appName = request.getAppName
105-
val appResource = request.getAppResource
106-
val mainClass = request.getMainClass
107+
val appName = request.appName
108+
val appResource = request.appResource
109+
val mainClass = request.mainClass
107110
if (mainClass == null) {
108111
throw new SubmitRestMissingFieldException("Main class must be set in submit request.")
109112
}
110113

111114
// Optional fields
112-
val jars = Option(request.getJars)
113-
val files = Option(request.getFiles)
114-
val driverMemory = Option(request.getDriverMemory)
115-
val driverCores = Option(request.getDriverCores)
116-
val driverExtraJavaOptions = Option(request.getDriverExtraJavaOptions)
117-
val driverExtraClassPath = Option(request.getDriverExtraClassPath)
118-
val driverExtraLibraryPath = Option(request.getDriverExtraLibraryPath)
119-
val superviseDriver = Option(request.getSuperviseDriver)
120-
val executorMemory = Option(request.getExecutorMemory)
121-
val totalExecutorCores = Option(request.getTotalExecutorCores)
122-
val appArgs = request.getAppArgs
123-
val sparkProperties = request.getSparkProperties
124-
val environmentVariables = request.getEnvironmentVariables
115+
val jars = Option(request.jars)
116+
val files = Option(request.files)
117+
val driverMemory = Option(request.driverMemory)
118+
val driverCores = Option(request.driverCores)
119+
val driverExtraJavaOptions = Option(request.driverExtraJavaOptions)
120+
val driverExtraClassPath = Option(request.driverExtraClassPath)
121+
val driverExtraLibraryPath = Option(request.driverExtraLibraryPath)
122+
val superviseDriver = Option(request.superviseDriver)
123+
val executorMemory = Option(request.executorMemory)
124+
val totalExecutorCores = Option(request.totalExecutorCores)
125+
val appArgs = request.appArgs
126+
val sparkProperties = request.sparkProperties
127+
val environmentVariables = request.environmentVariables
125128

126129
// Translate all fields to the relevant Spark properties
127130
val conf = new SparkConf(false)

0 commit comments

Comments
 (0)