Skip to content

Commit d2b1ef8

Browse files
author
Andrew Or
committed
Comment changes + minor code refactoring across the board
This is a comprehensive pass across the entire patch. The most semantically significant change included in this commit is the correction of server behavior when an error occurs. Previously we will attempt to find the unknown fields of a faulty request, which would cause an exception on the server. This is, however, a client error not a server error. This behavior is now fixed.
1 parent 9c82a36 commit d2b1ef8

File tree

10 files changed

+172
-157
lines changed

10 files changed

+172
-157
lines changed

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -101,20 +101,19 @@ object SparkSubmit {
101101
}
102102
}
103103

104-
/**
105-
* Kill an existing driver using the REST application submission protocol.
106-
* Standalone cluster mode only.
107-
*/
104+
/** Kill an existing submission using the REST protocol. Standalone cluster mode only. */
108105
private def kill(args: SparkSubmitArguments): Unit = {
109-
new StandaloneRestClient().killSubmission(args.master, args.driverToKill)
106+
new StandaloneRestClient()
107+
.killSubmission(args.master, args.submissionToKill)
110108
}
111109

112110
/**
113-
* Request the status of an existing driver using the REST application submission protocol.
111+
* Request the status of an existing submission using the REST protocol.
114112
* Standalone cluster mode only.
115113
*/
116114
private def requestStatus(args: SparkSubmitArguments): Unit = {
117-
new StandaloneRestClient().requestSubmissionStatus(args.master, args.driverToRequestStatusFor)
115+
new StandaloneRestClient()
116+
.requestSubmissionStatus(args.master, args.submissionToRequestStatusFor)
118117
}
119118

120119
/**
@@ -300,6 +299,7 @@ object SparkSubmit {
300299
sysProp = "spark.driver.extraLibraryPath"),
301300

302301
// Standalone cluster only
302+
// Do not set CL arguments here because there are multiple possibilities for the main class
303303
OptionAssigner(args.jars, STANDALONE, CLUSTER, sysProp = "spark.jars"),
304304
OptionAssigner(args.ivyRepoPath, STANDALONE, CLUSTER, sysProp = "spark.jars.ivy"),
305305
OptionAssigner(args.driverMemory, STANDALONE, CLUSTER, sysProp = "spark.driver.memory"),
@@ -368,8 +368,7 @@ object SparkSubmit {
368368
}
369369

370370
// In standalone cluster mode, use the REST client to submit the application (Spark 1.3+).
371-
// All parameters except application arguments are expected to be passed to the main class
372-
// through system properties.
371+
// All Spark parameters are expected to be passed to the client through system properties.
373372
if (args.isStandaloneCluster) {
374373
if (args.useRest) {
375374
childMainClass = "org.apache.spark.deploy.rest.StandaloneRestClient"
@@ -383,7 +382,6 @@ object SparkSubmit {
383382
childArgs += "launch"
384383
childArgs += (args.master, args.primaryResource, args.mainClass)
385384
}
386-
// Whether or not we use REST, pass application arguments through the command line
387385
if (args.childArgs != null) {
388386
childArgs ++= args.childArgs
389387
}

core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,8 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
6262
var useRest: Boolean = true
6363
var supervise: Boolean = false
6464
var driverCores: String = null
65-
var driverToKill: String = null
66-
var driverToRequestStatusFor: String = null
65+
var submissionToKill: String = null
66+
var submissionToRequestStatusFor: String = null
6767

6868
/** Default properties present in the currently defined defaults file. */
6969
lazy val defaultSparkProperties: HashMap[String, String] = {
@@ -220,21 +220,21 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
220220
}
221221

222222
private def validateKillArguments(): Unit = {
223-
if (!isStandaloneCluster) {
224-
SparkSubmit.printErrorAndExit("Killing drivers is only supported in standalone cluster mode")
223+
if (!master.startsWith("spark://")) {
224+
SparkSubmit.printErrorAndExit("Killing submissions is only supported in standalone mode!")
225225
}
226-
if (driverToKill == null) {
227-
SparkSubmit.printErrorAndExit("Please specify a driver to kill")
226+
if (submissionToKill == null) {
227+
SparkSubmit.printErrorAndExit("Please specify a submission to kill.")
228228
}
229229
}
230230

231231
private def validateStatusRequestArguments(): Unit = {
232-
if (!isStandaloneCluster) {
232+
if (!master.startsWith("spark://")) {
233233
SparkSubmit.printErrorAndExit(
234-
"Requesting driver statuses is only supported in standalone cluster mode")
234+
"Requesting submission statuses is only supported in standalone mode!")
235235
}
236-
if (driverToRequestStatusFor == null) {
237-
SparkSubmit.printErrorAndExit("Please specify a driver to request status for")
236+
if (submissionToRequestStatusFor == null) {
237+
SparkSubmit.printErrorAndExit("Please specify a submission to request status for.")
238238
}
239239
}
240240

@@ -351,15 +351,15 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
351351
parse(tail)
352352

353353
case ("--kill") :: value :: tail =>
354-
driverToKill = value
354+
submissionToKill = value
355355
if (action != null) {
356356
SparkSubmit.printErrorAndExit(s"Action cannot be both $action and $KILL.")
357357
}
358358
action = KILL
359359
parse(tail)
360360

361361
case ("--status") :: value :: tail =>
362-
driverToRequestStatusFor = value
362+
submissionToRequestStatusFor = value
363363
if (action != null) {
364364
SparkSubmit.printErrorAndExit(s"Action cannot be both $action and $REQUEST_STATUS.")
365365
}
@@ -439,6 +439,9 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
439439
}
440440
outStream.println(
441441
"""Usage: spark-submit [options] <app jar | python file> [app arguments]
442+
|Usage: spark-submit --kill [submission ID] --master [spark://...]
443+
|Usage: spark-submit --status [submission ID] --master [spark://...]
444+
|
442445
|Options:
443446
| --master MASTER_URL spark://host:port, mesos://host:port, yarn, or local.
444447
| --deploy-mode DEPLOY_MODE Whether to launch the driver program locally ("client") or

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ private[spark] class Master(
127127
private val restServer =
128128
if (restServerEnabled) {
129129
val port = conf.getInt("spark.master.rest.port", 6066)
130-
Some(new StandaloneRestServer(this, host, port))
130+
Some(new StandaloneRestServer(host, port, self, masterUrl, conf))
131131
} else {
132132
None
133133
}

core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestClient.scala

Lines changed: 36 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import java.net.{HttpURLConnection, SocketException, URL}
2222

2323
import scala.io.Source
2424

25+
import com.fasterxml.jackson.databind.JsonMappingException
2526
import com.google.common.base.Charsets
2627

2728
import org.apache.spark.{Logging, SparkConf, SPARK_VERSION => sparkVersion}
@@ -54,15 +55,15 @@ private[spark] class StandaloneRestClient extends Logging {
5455
import StandaloneRestClient._
5556

5657
/**
57-
* Submit an application specified by the provided arguments.
58+
* Submit an application specified by the parameters in the provided request.
5859
*
5960
* If the submission was successful, poll the status of the submission and report
6061
* it to the user. Otherwise, report the error message provided by the server.
6162
*/
6263
def createSubmission(
6364
master: String,
6465
request: CreateSubmissionRequest): SubmitRestProtocolResponse = {
65-
logInfo(s"Submitting a request to launch a driver in $master.")
66+
logInfo(s"Submitting a request to launch an application in $master.")
6667
validateMaster(master)
6768
val url = getSubmitUrl(master)
6869
val response = postJson(url, request.toJson)
@@ -103,6 +104,24 @@ private[spark] class StandaloneRestClient extends Logging {
103104
response
104105
}
105106

107+
/** Construct a message that captures the specified parameters for submitting an application. */
108+
def constructSubmitRequest(
109+
appResource: String,
110+
mainClass: String,
111+
appArgs: Array[String],
112+
sparkProperties: Map[String, String],
113+
environmentVariables: Map[String, String]): CreateSubmissionRequest = {
114+
val message = new CreateSubmissionRequest
115+
message.clientSparkVersion = sparkVersion
116+
message.appResource = appResource
117+
message.mainClass = mainClass
118+
message.appArgs = appArgs
119+
message.sparkProperties = sparkProperties
120+
message.environmentVariables = environmentVariables
121+
message.validate()
122+
message
123+
}
124+
106125
/** Send a GET request to the specified URL. */
107126
private def get(url: URL): SubmitRestProtocolResponse = {
108127
logDebug(s"Sending GET request to server at $url.")
@@ -134,40 +153,33 @@ private[spark] class StandaloneRestClient extends Logging {
134153
}
135154

136155
/**
137-
* Read the response from the given connection.
138-
*
139-
* The response is expected to represent a [[SubmitRestProtocolResponse]] in the form of JSON.
140-
* Additionally, this validates the response to ensure that it is properly constructed.
141-
* If the response represents an error, report the message from the server.
156+
* Read the response from the server and return it as a validated [[SubmitRestProtocolResponse]].
157+
* If the response represents an error, report the embedded message to the user.
142158
*/
143159
private def readResponse(connection: HttpURLConnection): SubmitRestProtocolResponse = {
144160
try {
145161
val responseJson = Source.fromInputStream(connection.getInputStream).mkString
146162
logDebug(s"Response from the server:\n$responseJson")
147163
val response = SubmitRestProtocolMessage.fromJson(responseJson)
148-
// The response should have already been validated on the server.
149-
// In case this is not true, validate it ourselves to avoid potential NPEs.
150-
try {
151-
response.validate()
152-
} catch {
153-
case e: SubmitRestProtocolException =>
154-
throw new SubmitRestProtocolException("Malformed response received from server", e)
155-
}
156-
// If the response is an error, log the message
157-
// Otherwise, simply return the response
164+
response.validate()
158165
response match {
166+
// If the response is an error, log the message
159167
case error: ErrorResponse =>
160168
logError(s"Server responded with error:\n${error.message}")
161169
error
170+
// Otherwise, simply return the response
162171
case response: SubmitRestProtocolResponse => response
163172
case unexpected =>
164173
throw new SubmitRestProtocolException(
165174
s"Message received from server was not a response:\n${unexpected.toJson}")
166175
}
167176
} catch {
168-
case e @ (_: FileNotFoundException | _: SocketException) =>
177+
case unreachable @ (_: FileNotFoundException | _: SocketException) =>
169178
throw new SubmitRestConnectionException(
170-
s"Unable to connect to server ${connection.getURL}", e)
179+
s"Unable to connect to server ${connection.getURL}", unreachable)
180+
case malformed @ (_: SubmitRestProtocolException | _: JsonMappingException) =>
181+
throw new SubmitRestProtocolException(
182+
"Malformed response received from server", malformed)
171183
}
172184
}
173185

@@ -202,24 +214,6 @@ private[spark] class StandaloneRestClient extends Logging {
202214
}
203215
}
204216

205-
/** Construct a message that captures the specified parameters for submitting an application. */
206-
def constructSubmitRequest(
207-
appResource: String,
208-
mainClass: String,
209-
appArgs: Array[String],
210-
sparkProperties: Map[String, String],
211-
environmentVariables: Map[String, String]): CreateSubmissionRequest = {
212-
val message = new CreateSubmissionRequest
213-
message.clientSparkVersion = sparkVersion
214-
message.appResource = appResource
215-
message.mainClass = mainClass
216-
message.appArgs = appArgs
217-
message.sparkProperties = sparkProperties
218-
message.environmentVariables = environmentVariables
219-
message.validate()
220-
message
221-
}
222-
223217
/** Report the status of a newly created submission. */
224218
private def reportSubmissionStatus(
225219
master: String,
@@ -292,7 +286,7 @@ private[spark] object StandaloneRestClient {
292286
val REPORT_DRIVER_STATUS_MAX_TRIES = 10
293287
val PROTOCOL_VERSION = "v1"
294288

295-
/** Submit an application, assuming parameters are specified through system properties. */
289+
/** Submit an application, assuming Spark parameters are specified through system properties. */
296290
def main(args: Array[String]): Unit = {
297291
if (args.size < 2) {
298292
sys.error("Usage: StandaloneRestClient [app resource] [main class] [app args*]")
@@ -301,12 +295,13 @@ private[spark] object StandaloneRestClient {
301295
val appResource = args(0)
302296
val mainClass = args(1)
303297
val appArgs = args.slice(2, args.size)
304-
val client = new StandaloneRestClient
305-
val master = sys.props.get("spark.master").getOrElse {
298+
val conf = new SparkConf
299+
val master = conf.getOption("spark.master").getOrElse {
306300
throw new IllegalArgumentException("'spark.master' must be set.")
307301
}
308-
val sparkProperties = new SparkConf().getAll.toMap
302+
val sparkProperties = conf.getAll.toMap
309303
val environmentVariables = sys.env.filter { case (k, _) => k.startsWith("SPARK_") }
304+
val client = new StandaloneRestClient
310305
val submitRequest = client.constructSubmitRequest(
311306
appResource, mainClass, appArgs, sparkProperties, environmentVariables)
312307
client.createSubmission(master, submitRequest)

0 commit comments

Comments
 (0)