Skip to content

Commit 544de1d

Browse files
author
Andrew Or
committed
Major clean ups in code and comments
This involves refactoring SparkSubmit a little to put the code that launches the REST client in the right place. This commit also adds port retry logic in the REST server, which was previously missing.
1 parent e958cae commit 544de1d

14 files changed

+151
-143
lines changed

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 38 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -73,30 +73,24 @@ object SparkSubmit {
7373
if (appArgs.verbose) {
7474
printStream.println(appArgs)
7575
}
76-
77-
// In standalone cluster mode, use the brand new REST client to submit the application
78-
val isStandaloneCluster =
79-
appArgs.master.startsWith("spark://") && appArgs.deployMode == "cluster"
80-
if (isStandaloneCluster) {
81-
new StandaloneRestClient().submitDriver(appArgs)
82-
return
83-
}
84-
85-
val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
86-
launch(childArgs, classpath, sysProps, mainClass, appArgs.verbose)
76+
launch(appArgs)
8777
}
8878

8979
/**
90-
* @return a tuple containing
91-
* (1) the arguments for the child process,
92-
* (2) a list of classpath entries for the child,
93-
* (3) a list of system properties and env vars, and
94-
* (4) the main class for the child
80+
* Launch the application using the provided parameters.
81+
*
82+
* This runs in two steps. First, we prepare the launch environment by setting up
83+
* the appropriate classpath, system properties, and application arguments for
84+
* running the child main class based on the cluster manager and the deploy mode.
85+
* Second, we use this launch environment to invoke the main method of the child
86+
* main class.
87+
*
88+
* Note that standalone cluster mode is an exception in that we do not invoke the
89+
* main method of a child class. Instead, we pass the submit parameters directly to
90+
* a REST client, which will submit the application using the stable REST protocol.
9591
*/
96-
private[spark] def createLaunchEnv(args: SparkSubmitArguments)
97-
: (ArrayBuffer[String], ArrayBuffer[String], Map[String, String], String) = {
98-
99-
// Values to return
92+
private[spark] def launch(args: SparkSubmitArguments): Unit = {
93+
// Environment needed to launch the child main class
10094
val childArgs = new ArrayBuffer[String]()
10195
val childClasspath = new ArrayBuffer[String]()
10296
val sysProps = new HashMap[String, String]()
@@ -198,8 +192,6 @@ object SparkSubmit {
198192

199193
// Standalone cluster only
200194
OptionAssigner(args.jars, STANDALONE, CLUSTER, sysProp = "spark.jars"),
201-
OptionAssigner(args.driverMemory, STANDALONE, CLUSTER, clOption = "--memory"),
202-
OptionAssigner(args.driverCores, STANDALONE, CLUSTER, clOption = "--cores"),
203195

204196
// Yarn client only
205197
OptionAssigner(args.queue, YARN, CLIENT, sysProp = "spark.yarn.queue"),
@@ -228,6 +220,9 @@ object SparkSubmit {
228220
sysProp = "spark.files")
229221
)
230222

223+
val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER
224+
val isStandaloneCluster = clusterManager == STANDALONE && deployMode == CLUSTER
225+
231226
// In client mode, launch the application main class directly
232227
// In addition, add the main application jar and any added jars (if any) to the classpath
233228
if (deployMode == CLIENT) {
@@ -239,7 +234,6 @@ object SparkSubmit {
239234
if (args.childArgs != null) { childArgs ++= args.childArgs }
240235
}
241236

242-
243237
// Map all arguments to command-line options or system properties for our chosen mode
244238
for (opt <- options) {
245239
if (opt.value != null &&
@@ -253,7 +247,6 @@ object SparkSubmit {
253247
// Add the application jar automatically so the user doesn't have to call sc.addJar
254248
// For YARN cluster mode, the jar is already distributed on each node as "app.jar"
255249
// For python files, the primary resource is already distributed as a regular file
256-
val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER
257250
if (!isYarnCluster && !args.isPython) {
258251
var jars = sysProps.get("spark.jars").map(x => x.split(",").toSeq).getOrElse(Seq.empty)
259252
if (isUserJar(args.primaryResource)) {
@@ -262,19 +255,6 @@ object SparkSubmit {
262255
sysProps.put("spark.jars", jars.mkString(","))
263256
}
264257

265-
// In standalone-cluster mode, use Client as a wrapper around the user class
266-
if (clusterManager == STANDALONE && deployMode == CLUSTER) {
267-
childMainClass = "org.apache.spark.deploy.Client"
268-
if (args.supervise) {
269-
childArgs += "--supervise"
270-
}
271-
childArgs += "launch"
272-
childArgs += (args.master, args.primaryResource, args.mainClass)
273-
if (args.childArgs != null) {
274-
childArgs ++= args.childArgs
275-
}
276-
}
277-
278258
// In yarn-cluster mode, use yarn.Client as a wrapper around the user class
279259
if (isYarnCluster) {
280260
childMainClass = "org.apache.spark.deploy.yarn.Client"
@@ -294,7 +274,7 @@ object SparkSubmit {
294274

295275
// Ignore invalid spark.driver.host in cluster modes.
296276
if (deployMode == CLUSTER) {
297-
sysProps -= ("spark.driver.host")
277+
sysProps -= "spark.driver.host"
298278
}
299279

300280
// Resolve paths in certain spark properties
@@ -320,10 +300,28 @@ object SparkSubmit {
320300
sysProps("spark.submit.pyFiles") = formattedPyFiles
321301
}
322302

323-
(childArgs, childClasspath, sysProps, childMainClass)
303+
// In standalone cluster mode, use the stable application submission REST protocol.
304+
// Otherwise, just call the main method of the child class.
305+
if (isStandaloneCluster) {
306+
// NOTE: since we mutate the values of some configs in this method, we must update the
307+
// corresponding fields in the original SparkSubmitArguments to reflect these changes.
308+
args.sparkProperties.clear()
309+
args.sparkProperties ++= sysProps
310+
sysProps.get("spark.jars").foreach { args.jars = _ }
311+
sysProps.get("spark.files").foreach { args.files = _ }
312+
new StandaloneRestClient().submitDriver(args)
313+
} else {
314+
runMain(childArgs, childClasspath, sysProps, childMainClass)
315+
}
324316
}
325317

326-
private def launch(
318+
/**
319+
* Run the main method of the child class using the provided launch environment.
320+
*
321+
* Depending on the deploy mode, cluster manager, and the type of the application,
322+
* this main class may not necessarily be the one provided by the user.
323+
*/
324+
private def runMain(
327325
childArgs: ArrayBuffer[String],
328326
childClasspath: ArrayBuffer[String],
329327
sysProps: Map[String, String],

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,7 @@ private[spark] class Master(
180180
recoveryCompletionTask.cancel()
181181
}
182182
webUi.stop()
183+
restServer.stop()
183184
masterMetricsSystem.stop()
184185
applicationMetricsSystem.stop()
185186
persistenceEngine.close()

core/src/main/scala/org/apache/spark/deploy/rest/DriverStatusRequestMessage.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,15 +33,16 @@ private[spark] object DriverStatusRequestField
3333
}
3434

3535
/**
36-
* A request sent to the cluster manager to query the status of a driver.
36+
* A request sent to the cluster manager to query the status of a driver
37+
* in the stable application submission REST protocol.
3738
*/
3839
private[spark] class DriverStatusRequestMessage extends SubmitRestProtocolMessage(
39-
SubmitRestProtocolAction.DRIVER_STATUS_REQUEST,
40-
DriverStatusRequestField.ACTION,
41-
DriverStatusRequestField.requiredFields)
40+
SubmitRestProtocolAction.DRIVER_STATUS_REQUEST,
41+
DriverStatusRequestField.ACTION,
42+
DriverStatusRequestField.requiredFields)
4243

4344
private[spark] object DriverStatusRequestMessage
4445
extends SubmitRestProtocolMessageCompanion[DriverStatusRequestMessage] {
4546
protected override def newMessage() = new DriverStatusRequestMessage
46-
protected override def fieldWithName(field: String) = DriverStatusRequestField.withName(field)
47+
protected override def fieldFromString(field: String) = DriverStatusRequestField.fromString(field)
4748
}

core/src/main/scala/org/apache/spark/deploy/rest/DriverStatusResponseMessage.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,15 +37,16 @@ private[spark] object DriverStatusResponseField
3737
}
3838

3939
/**
40-
* A message sent from the cluster manager in response to a DriverStatusResponseMessage.
40+
* A message sent from the cluster manager in response to a DriverStatusRequestMessage
41+
* in the stable application submission REST protocol.
4142
*/
4243
private[spark] class DriverStatusResponseMessage extends SubmitRestProtocolMessage(
43-
SubmitRestProtocolAction.DRIVER_STATUS_RESPONSE,
44-
DriverStatusResponseField.ACTION,
45-
DriverStatusResponseField.requiredFields)
44+
SubmitRestProtocolAction.DRIVER_STATUS_RESPONSE,
45+
DriverStatusResponseField.ACTION,
46+
DriverStatusResponseField.requiredFields)
4647

4748
private[spark] object DriverStatusResponseMessage
4849
extends SubmitRestProtocolMessageCompanion[DriverStatusResponseMessage] {
4950
protected override def newMessage() = new DriverStatusResponseMessage
50-
protected override def fieldWithName(field: String) = DriverStatusResponseField.withName(field)
51+
protected override def fieldFromString(field: String) = DriverStatusResponseField.fromString(field)
5152
}

core/src/main/scala/org/apache/spark/deploy/rest/ErrorMessage.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.spark.deploy.rest
1919

2020
/**
21-
* A field used in a ErrorMessage.
21+
* A field used in an ErrorMessage.
2222
*/
2323
private[spark] abstract class ErrorField extends SubmitRestProtocolField
2424
private[spark] object ErrorField extends SubmitRestProtocolFieldCompanion[ErrorField] {
@@ -30,14 +30,14 @@ private[spark] object ErrorField extends SubmitRestProtocolFieldCompanion[ErrorF
3030
}
3131

3232
/**
33-
* An error message exchanged in the stable application submission protocol.
33+
* An error message exchanged in the stable application submission REST protocol.
3434
*/
3535
private[spark] class ErrorMessage extends SubmitRestProtocolMessage(
36-
SubmitRestProtocolAction.ERROR,
37-
ErrorField.ACTION,
38-
ErrorField.requiredFields)
36+
SubmitRestProtocolAction.ERROR,
37+
ErrorField.ACTION,
38+
ErrorField.requiredFields)
3939

4040
private[spark] object ErrorMessage extends SubmitRestProtocolMessageCompanion[ErrorMessage] {
4141
protected override def newMessage() = new ErrorMessage
42-
protected override def fieldWithName(field: String) = ErrorField.withName(field)
42+
protected override def fieldFromString(field: String) = ErrorField.fromString(field)
4343
}

core/src/main/scala/org/apache/spark/deploy/rest/KillDriverRequestMessage.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,15 +33,16 @@ private[spark] object KillDriverRequestField
3333
}
3434

3535
/**
36-
* A request sent to the cluster manager to kill a driver.
36+
* A request sent to the cluster manager to kill a driver
37+
* in the stable application submission REST protocol.
3738
*/
3839
private[spark] class KillDriverRequestMessage extends SubmitRestProtocolMessage(
39-
SubmitRestProtocolAction.KILL_DRIVER_REQUEST,
40-
KillDriverRequestField.ACTION,
41-
KillDriverRequestField.requiredFields)
40+
SubmitRestProtocolAction.KILL_DRIVER_REQUEST,
41+
KillDriverRequestField.ACTION,
42+
KillDriverRequestField.requiredFields)
4243

4344
private[spark] object KillDriverRequestMessage
4445
extends SubmitRestProtocolMessageCompanion[KillDriverRequestMessage] {
4546
protected override def newMessage() = new KillDriverRequestMessage
46-
protected override def fieldWithName(field: String) = KillDriverRequestField.withName(field)
47+
protected override def fieldFromString(field: String) = KillDriverRequestField.fromString(field)
4748
}

core/src/main/scala/org/apache/spark/deploy/rest/KillDriverResponseMessage.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,15 +34,16 @@ private[spark] object KillDriverResponseField
3434
}
3535

3636
/**
37-
* A message sent from the cluster manager in response to a KillDriverResponseMessage.
37+
* A message sent from the cluster manager in response to a KillDriverRequestMessage
38+
* in the stable application submission REST protocol.
3839
*/
3940
private[spark] class KillDriverResponseMessage extends SubmitRestProtocolMessage(
40-
SubmitRestProtocolAction.KILL_DRIVER_RESPONSE,
41-
KillDriverResponseField.ACTION,
42-
KillDriverResponseField.requiredFields)
41+
SubmitRestProtocolAction.KILL_DRIVER_RESPONSE,
42+
KillDriverResponseField.ACTION,
43+
KillDriverResponseField.requiredFields)
4344

4445
private[spark] object KillDriverResponseMessage
4546
extends SubmitRestProtocolMessageCompanion[KillDriverResponseMessage] {
4647
protected override def newMessage() = new KillDriverResponseMessage
47-
protected override def fieldWithName(field: String) = KillDriverResponseField.withName(field)
48+
protected override def fieldFromString(field: String) = KillDriverResponseField.fromString(field)
4849
}

core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestClient.scala

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import org.apache.spark.deploy.SparkSubmitArguments
2424
import org.apache.spark.util.Utils
2525

2626
/**
27-
* A client that submits Spark applications to the standalone Master using a stable REST protocol.
27+
* A client that submits applications to the standalone Master using the stable REST protocol.
2828
* This client is intended to communicate with the StandaloneRestServer. Cluster mode only.
2929
*/
3030
private[spark] class StandaloneRestClient extends SubmitRestClient {
@@ -33,12 +33,8 @@ private[spark] class StandaloneRestClient extends SubmitRestClient {
3333
override protected def constructSubmitRequest(
3434
args: SparkSubmitArguments): SubmitDriverRequestMessage = {
3535
import SubmitDriverRequestField._
36-
val driverMemory = Option(args.driverMemory)
37-
.map { m => Utils.memoryStringToMb(m).toString }
38-
.orNull
39-
val executorMemory = Option(args.executorMemory)
40-
.map { m => Utils.memoryStringToMb(m).toString }
41-
.orNull
36+
val dm = Option(args.driverMemory).map { m => Utils.memoryStringToMb(m).toString }.orNull
37+
val em = Option(args.executorMemory).map { m => Utils.memoryStringToMb(m).toString }.orNull
4238
val message = new SubmitDriverRequestMessage()
4339
.setField(SPARK_VERSION, sparkVersion)
4440
.setField(MASTER, args.master)
@@ -47,17 +43,17 @@ private[spark] class StandaloneRestClient extends SubmitRestClient {
4743
.setFieldIfNotNull(MAIN_CLASS, args.mainClass)
4844
.setFieldIfNotNull(JARS, args.jars)
4945
.setFieldIfNotNull(FILES, args.files)
50-
.setFieldIfNotNull(DRIVER_MEMORY, driverMemory)
46+
.setFieldIfNotNull(DRIVER_MEMORY, dm)
5147
.setFieldIfNotNull(DRIVER_CORES, args.driverCores)
5248
.setFieldIfNotNull(DRIVER_EXTRA_JAVA_OPTIONS, args.driverExtraJavaOptions)
5349
.setFieldIfNotNull(DRIVER_EXTRA_CLASS_PATH, args.driverExtraClassPath)
5450
.setFieldIfNotNull(DRIVER_EXTRA_LIBRARY_PATH, args.driverExtraLibraryPath)
5551
.setFieldIfNotNull(SUPERVISE_DRIVER, args.supervise.toString)
56-
.setFieldIfNotNull(EXECUTOR_MEMORY, executorMemory)
52+
.setFieldIfNotNull(EXECUTOR_MEMORY, em)
5753
.setFieldIfNotNull(TOTAL_EXECUTOR_CORES, args.totalExecutorCores)
5854
args.childArgs.foreach(message.appendAppArg)
5955
args.sparkProperties.foreach { case (k, v) => message.setSparkProperty(k, v) }
60-
// TODO: set environment variables?
56+
// TODO: send special environment variables?
6157
message.validate()
6258
}
6359

core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,7 @@ package org.apache.spark.deploy.rest
1919

2020
import java.io.File
2121

22-
import scala.collection.mutable
23-
import scala.collection.mutable.ArrayBuffer
22+
import akka.actor.ActorRef
2423

2524
import org.apache.spark.{SPARK_VERSION => sparkVersion}
2625
import org.apache.spark.SparkConf
@@ -29,22 +28,19 @@ import org.apache.spark.deploy.{Command, DriverDescription}
2928
import org.apache.spark.deploy.ClientArguments._
3029
import org.apache.spark.deploy.DeployMessages._
3130
import org.apache.spark.deploy.master.Master
32-
import akka.actor.ActorRef
3331

3432
/**
3533
* A server that responds to requests submitted by the StandaloneRestClient.
3634
* This is intended to be embedded in the standalone Master. Cluster mode only.
3735
*/
38-
private[spark] class StandaloneRestServer(
39-
master: Master,
40-
host: String,
41-
requestedPort: Int)
42-
extends SubmitRestServer(host, requestedPort) {
36+
private[spark] class StandaloneRestServer(master: Master, host: String, requestedPort: Int)
37+
extends SubmitRestServer(host, requestedPort, master.conf) {
4338
override protected val handler = new StandaloneRestServerHandler(master)
4439
}
4540

4641
/**
47-
* A handler for requests submitted to the standalone Master through the REST protocol.
42+
* A handler for requests submitted to the standalone Master
43+
* via the stable application submission REST protocol.
4844
*/
4945
private[spark] class StandaloneRestServerHandler(
5046
conf: SparkConf,
@@ -141,9 +137,7 @@ private[spark] class StandaloneRestServerHandler(
141137
// Otherwise, once the driver is launched it will contact with the wrong server
142138
.set("spark.master", masterUrl)
143139
.set("spark.app.name", appName)
144-
// Include main app resource on the executor classpath
145-
// The corresponding behavior in client mode is handled in SparkSubmit
146-
.set("spark.jars", jars.map(_ + ",").getOrElse("") + appResource)
140+
jars.foreach { j => conf.set("spark.jars", j) }
147141
files.foreach { f => conf.set("spark.files", f) }
148142
driverExtraJavaOptions.foreach { j => conf.set("spark.driver.extraJavaOptions", j) }
149143
driverExtraClassPath.foreach { cp => conf.set("spark.driver.extraClassPath", cp) }

0 commit comments

Comments
 (0)