Skip to content

Commit e42c131

Browse files
author
Andrew Or
committed
Add end-to-end tests for standalone REST protocol
This is actually non-trivial because we must run standalone mode instead of relying on the existing local-cluster mode. This means we must manually start our own Master and Workers, and provide a real jar when submitting the test application, which involves manually packaging our own jar. Further, since the driver output is difficult to obtain programmatically in cluster mode, we need to write the results to a special file and verify them later.
1 parent 837475b commit e42c131

File tree

8 files changed

+321
-29
lines changed

8 files changed

+321
-29
lines changed

core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ class LocalSparkCluster(numWorkers: Int, coresPerWorker: Int, memoryPerWorker: I
4545

4646
/* Start the Master */
4747
val conf = new SparkConf(false)
48-
val (masterSystem, masterPort, _) = Master.startSystemAndActor(localHostname, 0, 0, conf)
48+
val (masterSystem, masterPort, _, _) = Master.startSystemAndActor(localHostname, 0, 0, conf)
4949
masterActorSystems += masterSystem
5050
val masterUrl = "spark://" + localHostname + ":" + masterPort
5151
val masters = Array(masterUrl)

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 31 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -117,19 +117,10 @@ object SparkSubmit {
117117
* parameters directly to a REST client, which will submit the application using the
118118
* REST protocol instead.
119119
*/
120-
private def submit(args: SparkSubmitArguments): Unit = {
120+
private[spark] def submit(args: SparkSubmitArguments): Unit = {
121121
val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args)
122-
val restKey = "spark.submit.rest.enabled"
123-
val restEnabled = args.sparkProperties.get(restKey).getOrElse("false").toBoolean
124-
if (args.isStandaloneCluster && restEnabled) {
122+
if (args.isStandaloneCluster && args.isRestEnabled) {
125123
printStream.println("Running standalone cluster mode using the stable REST protocol.")
126-
// NOTE: since we mutate the values of some configs in `prepareSubmitEnvironment`, we
127-
// must update the corresponding fields in the original SparkSubmitArguments to reflect
128-
// these changes.
129-
args.sparkProperties.clear()
130-
args.sparkProperties ++= sysProps
131-
sysProps.get("spark.jars").foreach { args.jars = _ }
132-
sysProps.get("spark.files").foreach { args.files = _ }
133124
new StandaloneRestClient().submitDriver(args)
134125
} else {
135126
runMain(childArgs, childClasspath, sysProps, childMainClass)
@@ -159,7 +150,7 @@ object SparkSubmit {
159150
case m if m.startsWith("spark") => STANDALONE
160151
case m if m.startsWith("mesos") => MESOS
161152
case m if m.startsWith("local") => LOCAL
162-
case _ => printErrorAndExit("Master must start with yarn, spark, mesos or local"); -1
153+
case _ => printErrorAndExit("Master must start with yarn, spark, mesos, or local"); -1
163154
}
164155

165156
// Set the deploy mode; default is client mode
@@ -249,6 +240,8 @@ object SparkSubmit {
249240

250241
// Standalone cluster only
251242
OptionAssigner(args.jars, STANDALONE, CLUSTER, sysProp = "spark.jars"),
243+
OptionAssigner(args.driverMemory, STANDALONE, CLUSTER, clOption = "--memory"),
244+
OptionAssigner(args.driverCores, STANDALONE, CLUSTER, clOption = "--cores"),
252245

253246
// Yarn client only
254247
OptionAssigner(args.queue, YARN, CLIENT, sysProp = "spark.yarn.queue"),
@@ -299,6 +292,20 @@ object SparkSubmit {
299292
}
300293
}
301294

295+
// In standalone-cluster mode, use Client as a wrapper around the user class
296+
// Note that we won't actually launch this class if we're using the stable REST protocol
297+
if (args.isStandaloneCluster && !args.isRestEnabled) {
298+
childMainClass = "org.apache.spark.deploy.Client"
299+
if (args.supervise) {
300+
childArgs += "--supervise"
301+
}
302+
childArgs += "launch"
303+
childArgs += (args.master, args.primaryResource, args.mainClass)
304+
if (args.childArgs != null) {
305+
childArgs ++= args.childArgs
306+
}
307+
}
308+
302309
// Add the application jar automatically so the user doesn't have to call sc.addJar
303310
// For YARN cluster mode, the jar is already distributed on each node as "app.jar"
304311
// For python files, the primary resource is already distributed as a regular file
@@ -356,14 +363,24 @@ object SparkSubmit {
356363
sysProps("spark.submit.pyFiles") = formattedPyFiles
357364
}
358365

366+
// NOTE: If we are using the REST gateway, we will use the original arguments directly.
367+
// Since we mutate the values of some configs in this method, we must update the
368+
// corresponding fields in the original SparkSubmitArguments to reflect these changes.
369+
if (args.isStandaloneCluster && args.isRestEnabled) {
370+
args.sparkProperties.clear()
371+
args.sparkProperties ++= sysProps
372+
sysProps.get("spark.jars").foreach { args.jars = _ }
373+
sysProps.get("spark.files").foreach { args.files = _ }
374+
}
375+
359376
(childArgs, childClasspath, sysProps, childMainClass)
360377
}
361378

362379
/**
363380
* Run the main method of the child class using the provided launch environment.
364381
*
365-
* Depending on the deploy mode, cluster manager, and the type of the application,
366-
* this main class may not necessarily be the one provided by the user.
382+
* Note that this main class will not be the one provided by the user if we're
383+
* running cluster deploy mode or python applications.
367384
*/
368385
private def runMain(
369386
childArgs: Seq[String],

core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,11 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
256256
master.startsWith("spark://") && deployMode == "cluster"
257257
}
258258

259+
/** Return whether the stable application submission REST gateway is enabled. */
260+
def isRestEnabled: Boolean = {
261+
sparkProperties.get("spark.submit.rest.enabled").getOrElse("false").toBoolean
262+
}
263+
259264
override def toString = {
260265
s"""Parsed arguments:
261266
| master $master

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -53,12 +53,12 @@ private[spark] class Master(
5353
host: String,
5454
port: Int,
5555
webUiPort: Int,
56-
val securityMgr: SecurityManager)
56+
val securityMgr: SecurityManager,
57+
val conf: SparkConf)
5758
extends Actor with ActorLogReceive with Logging with LeaderElectable {
5859

5960
import context.dispatcher // to use Akka's scheduler.schedule()
6061

61-
val conf = new SparkConf
6262
val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
6363

6464
def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss") // For application IDs
@@ -444,8 +444,8 @@ private[spark] class Master(
444444
timeOutDeadWorkers()
445445
}
446446

447-
case RequestWebUIPort => {
448-
sender ! WebUIPortResponse(webUi.boundPort)
447+
case BoundPortsRequest => {
448+
sender ! BoundPortsResponse(port, webUi.boundPort, restServerBoundPort)
449449
}
450450
}
451451

@@ -866,7 +866,7 @@ private[spark] object Master extends Logging {
866866
SignalLogger.register(log)
867867
val conf = new SparkConf
868868
val args = new MasterArguments(argStrings, conf)
869-
val (actorSystem, _, _) = startSystemAndActor(args.host, args.port, args.webUiPort, conf)
869+
val (actorSystem, _, _, _) = startSystemAndActor(args.host, args.port, args.webUiPort, conf)
870870
actorSystem.awaitTermination()
871871
}
872872

@@ -890,19 +890,26 @@ private[spark] object Master extends Logging {
890890
Address("akka.tcp", systemName, host, port)
891891
}
892892

893+
/**
894+
* Start the Master and return a four tuple of:
895+
* (1) The Master actor system
896+
* (2) The bound port
897+
* (3) The web UI bound port
898+
* (4) The REST server bound port, if any
899+
*/
893900
def startSystemAndActor(
894901
host: String,
895902
port: Int,
896903
webUiPort: Int,
897-
conf: SparkConf): (ActorSystem, Int, Int) = {
904+
conf: SparkConf): (ActorSystem, Int, Int, Option[Int]) = {
898905
val securityMgr = new SecurityManager(conf)
899906
val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port, conf = conf,
900907
securityManager = securityMgr)
901-
val actor = actorSystem.actorOf(Props(classOf[Master], host, boundPort, webUiPort,
902-
securityMgr), actorName)
908+
val actor = actorSystem.actorOf(
909+
Props(classOf[Master], host, boundPort, webUiPort, securityMgr, conf), actorName)
903910
val timeout = AkkaUtils.askTimeout(conf)
904-
val respFuture = actor.ask(RequestWebUIPort)(timeout)
905-
val resp = Await.result(respFuture, timeout).asInstanceOf[WebUIPortResponse]
906-
(actorSystem, boundPort, resp.webUIBoundPort)
911+
val portsRequest = actor.ask(BoundPortsRequest)(timeout)
912+
val portsResponse = Await.result(portsRequest, timeout).asInstanceOf[BoundPortsResponse]
913+
(actorSystem, boundPort, portsResponse.webUIPort, portsResponse.stablePort)
907914
}
908915
}

core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ private[master] object MasterMessages {
3636

3737
case object CompleteRecovery
3838

39-
case object RequestWebUIPort
39+
case object BoundPortsRequest
4040

41-
case class WebUIPortResponse(webUIBoundPort: Int)
41+
case class BoundPortsResponse(actorPort: Int, webUIPort: Int, stablePort: Option[Int])
4242
}

core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,8 @@ class JsonProtocolSuite extends FunSuite {
6868
val completedApps = Array[ApplicationInfo]()
6969
val activeDrivers = Array(createDriverInfo())
7070
val completedDrivers = Array(createDriverInfo())
71-
val stateResponse = new MasterStateResponse("host", 8080, workers, activeApps, completedApps,
71+
val stateResponse = new MasterStateResponse(
72+
"host", 8080, None, workers, activeApps, completedApps,
7273
activeDrivers, completedDrivers, RecoveryState.ALIVE)
7374
val output = JsonProtocol.writeMasterState(stateResponse)
7475
assertValidJson(output)

core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,33 @@ class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties
197197
sysProps("spark.shuffle.spill") should be ("false")
198198
}
199199

200+
test("handles standalone cluster mode") {
201+
val clArgs = Seq(
202+
"--deploy-mode", "cluster",
203+
"--master", "spark://h:p",
204+
"--class", "org.SomeClass",
205+
"--supervise",
206+
"--driver-memory", "4g",
207+
"--driver-cores", "5",
208+
"--conf", "spark.shuffle.spill=false",
209+
"thejar.jar",
210+
"arg1", "arg2")
211+
val appArgs = new SparkSubmitArguments(clArgs)
212+
val (childArgs, classpath, sysProps, mainClass) = prepareSubmitEnvironment(appArgs)
213+
val childArgsStr = childArgs.mkString(" ")
214+
childArgsStr should startWith ("--memory 4g --cores 5 --supervise")
215+
childArgsStr should include regex ("launch spark://h:p .*thejar.jar org.SomeClass arg1 arg2")
216+
mainClass should be ("org.apache.spark.deploy.Client")
217+
classpath should have size (0)
218+
sysProps should have size (5)
219+
sysProps.keys should contain ("SPARK_SUBMIT")
220+
sysProps.keys should contain ("spark.master")
221+
sysProps.keys should contain ("spark.app.name")
222+
sysProps.keys should contain ("spark.jars")
223+
sysProps.keys should contain ("spark.shuffle.spill")
224+
sysProps("spark.shuffle.spill") should be ("false")
225+
}
226+
200227
test("handles standalone client mode") {
201228
val clArgs = Seq(
202229
"--deploy-mode", "client",
@@ -279,6 +306,7 @@ class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties
279306
"--master", "local-cluster[2,1,512]",
280307
"--jars", jarsString,
281308
"--conf", "spark.ui.enabled=false",
309+
"--conf", "spark.master.rest.enabled=false",
282310
unusedJar.toString)
283311
runSparkSubmit(args)
284312
}

0 commit comments

Comments
 (0)