Skip to content

Commit 8ba2b7f

Browse files
jerryshaoMarcelo Vanzin
authored andcommitted
[SPARK-12343][YARN] Simplify Yarn client and client argument
## What changes were proposed in this pull request? Currently in Spark on YARN, configurations can be passed through SparkConf, env and command arguments, some parts are duplicated, like client argument and SparkConf. So here propose to simplify the command arguments. ## How was this patch tested? This patch is tested manually with unit test. CC vanzin tgravescs , please help to suggest this proposal. The original purpose of this JIRA is to remove `ClientArguments`, through refactoring some arguments like `--class`, `--arg` are not so easy to replace, so here I remove the most part of command line arguments, only keep the minimal set. Author: jerryshao <[email protected]> Closes #11603 from jerryshao/SPARK-12343.
1 parent 58e6bc8 commit 8ba2b7f

File tree

16 files changed

+186
-342
lines changed

16 files changed

+186
-342
lines changed

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 17 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -441,7 +441,6 @@ object SparkSubmit {
441441
OptionAssigner(args.deployMode, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
442442
sysProp = "spark.submit.deployMode"),
443443
OptionAssigner(args.name, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.app.name"),
444-
OptionAssigner(args.jars, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.jars"),
445444
OptionAssigner(args.ivyRepoPath, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.jars.ivy"),
446445
OptionAssigner(args.driverMemory, ALL_CLUSTER_MGRS, CLIENT,
447446
sysProp = "spark.driver.memory"),
@@ -452,27 +451,15 @@ object SparkSubmit {
452451
OptionAssigner(args.driverExtraLibraryPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
453452
sysProp = "spark.driver.extraLibraryPath"),
454453

455-
// Yarn client only
456-
OptionAssigner(args.queue, YARN, CLIENT, sysProp = "spark.yarn.queue"),
454+
// Yarn only
455+
OptionAssigner(args.queue, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.queue"),
457456
OptionAssigner(args.numExecutors, YARN, ALL_DEPLOY_MODES,
458457
sysProp = "spark.executor.instances"),
459-
OptionAssigner(args.files, YARN, CLIENT, sysProp = "spark.yarn.dist.files"),
460-
OptionAssigner(args.archives, YARN, CLIENT, sysProp = "spark.yarn.dist.archives"),
461-
OptionAssigner(args.principal, YARN, CLIENT, sysProp = "spark.yarn.principal"),
462-
OptionAssigner(args.keytab, YARN, CLIENT, sysProp = "spark.yarn.keytab"),
463-
464-
// Yarn cluster only
465-
OptionAssigner(args.name, YARN, CLUSTER, clOption = "--name"),
466-
OptionAssigner(args.driverMemory, YARN, CLUSTER, clOption = "--driver-memory"),
467-
OptionAssigner(args.driverCores, YARN, CLUSTER, clOption = "--driver-cores"),
468-
OptionAssigner(args.queue, YARN, CLUSTER, clOption = "--queue"),
469-
OptionAssigner(args.executorMemory, YARN, CLUSTER, clOption = "--executor-memory"),
470-
OptionAssigner(args.executorCores, YARN, CLUSTER, clOption = "--executor-cores"),
471-
OptionAssigner(args.files, YARN, CLUSTER, clOption = "--files"),
472-
OptionAssigner(args.archives, YARN, CLUSTER, clOption = "--archives"),
473-
OptionAssigner(args.jars, YARN, CLUSTER, clOption = "--addJars"),
474-
OptionAssigner(args.principal, YARN, CLUSTER, clOption = "--principal"),
475-
OptionAssigner(args.keytab, YARN, CLUSTER, clOption = "--keytab"),
458+
OptionAssigner(args.jars, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.dist.jars"),
459+
OptionAssigner(args.files, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.dist.files"),
460+
OptionAssigner(args.archives, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.dist.archives"),
461+
OptionAssigner(args.principal, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.principal"),
462+
OptionAssigner(args.keytab, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.keytab"),
476463

477464
// Other options
478465
OptionAssigner(args.executorCores, STANDALONE | YARN, ALL_DEPLOY_MODES,
@@ -483,10 +470,11 @@ object SparkSubmit {
483470
sysProp = "spark.cores.max"),
484471
OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, ALL_DEPLOY_MODES,
485472
sysProp = "spark.files"),
486-
OptionAssigner(args.jars, STANDALONE | MESOS, CLUSTER, sysProp = "spark.jars"),
487-
OptionAssigner(args.driverMemory, STANDALONE | MESOS, CLUSTER,
473+
OptionAssigner(args.jars, LOCAL, CLIENT, sysProp = "spark.jars"),
474+
OptionAssigner(args.jars, STANDALONE | MESOS, ALL_DEPLOY_MODES, sysProp = "spark.jars"),
475+
OptionAssigner(args.driverMemory, STANDALONE | MESOS | YARN, CLUSTER,
488476
sysProp = "spark.driver.memory"),
489-
OptionAssigner(args.driverCores, STANDALONE | MESOS, CLUSTER,
477+
OptionAssigner(args.driverCores, STANDALONE | MESOS | YARN, CLUSTER,
490478
sysProp = "spark.driver.cores"),
491479
OptionAssigner(args.supervise.toString, STANDALONE | MESOS, CLUSTER,
492480
sysProp = "spark.driver.supervise"),
@@ -550,6 +538,10 @@ object SparkSubmit {
550538
if (args.isPython) {
551539
sysProps.put("spark.yarn.isPython", "true")
552540
}
541+
542+
if (args.pyFiles != null) {
543+
sysProps("spark.submit.pyFiles") = args.pyFiles
544+
}
553545
}
554546

555547
// assure a keytab is available from any place in a JVM
@@ -576,9 +568,6 @@ object SparkSubmit {
576568
childMainClass = "org.apache.spark.deploy.yarn.Client"
577569
if (args.isPython) {
578570
childArgs += ("--primary-py-file", args.primaryResource)
579-
if (args.pyFiles != null) {
580-
childArgs += ("--py-files", args.pyFiles)
581-
}
582571
childArgs += ("--class", "org.apache.spark.deploy.PythonRunner")
583572
} else if (args.isR) {
584573
val mainFile = new Path(args.primaryResource).getName
@@ -627,7 +616,8 @@ object SparkSubmit {
627616
"spark.jars",
628617
"spark.files",
629618
"spark.yarn.dist.files",
630-
"spark.yarn.dist.archives")
619+
"spark.yarn.dist.archives",
620+
"spark.yarn.dist.jars")
631621
pathConfigs.foreach { config =>
632622
// Replace old URIs with resolved URIs, if they exist
633623
sysProps.get(config).foreach { oldValue =>

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.internal
1919

2020
import org.apache.spark.launcher.SparkLauncher
21+
import org.apache.spark.network.util.ByteUnit
2122

2223
package object config {
2324

@@ -33,6 +34,10 @@ package object config {
3334
private[spark] val DRIVER_USER_CLASS_PATH_FIRST =
3435
ConfigBuilder("spark.driver.userClassPathFirst").booleanConf.withDefault(false)
3536

37+
private[spark] val DRIVER_MEMORY = ConfigBuilder("spark.driver.memory")
38+
.bytesConf(ByteUnit.MiB)
39+
.withDefaultString("1g")
40+
3641
private[spark] val EXECUTOR_CLASS_PATH =
3742
ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_CLASSPATH).stringConf.optional
3843

@@ -45,6 +50,10 @@ package object config {
4550
private[spark] val EXECUTOR_USER_CLASS_PATH_FIRST =
4651
ConfigBuilder("spark.executor.userClassPathFirst").booleanConf.withDefault(false)
4752

53+
private[spark] val EXECUTOR_MEMORY = ConfigBuilder("spark.executor.memory")
54+
.bytesConf(ByteUnit.MiB)
55+
.withDefaultString("1g")
56+
4857
private[spark] val IS_PYTHON_APP = ConfigBuilder("spark.yarn.isPython").internal
4958
.booleanConf.withDefault(false)
5059

@@ -73,4 +82,9 @@ package object config {
7382

7483
private[spark] val EXECUTOR_INSTANCES = ConfigBuilder("spark.executor.instances").intConf.optional
7584

85+
private[spark] val PY_FILES = ConfigBuilder("spark.submit.pyFiles")
86+
.internal
87+
.stringConf
88+
.toSequence
89+
.withDefault(Nil)
7690
}

core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -199,21 +199,21 @@ class SparkSubmitSuite
199199
val (childArgs, classpath, sysProps, mainClass) = prepareSubmitEnvironment(appArgs)
200200
val childArgsStr = childArgs.mkString(" ")
201201
childArgsStr should include ("--class org.SomeClass")
202-
childArgsStr should include ("--executor-memory 5g")
203-
childArgsStr should include ("--driver-memory 4g")
204-
childArgsStr should include ("--executor-cores 5")
205202
childArgsStr should include ("--arg arg1 --arg arg2")
206-
childArgsStr should include ("--queue thequeue")
207203
childArgsStr should include regex ("--jar .*thejar.jar")
208-
childArgsStr should include regex ("--addJars .*one.jar,.*two.jar,.*three.jar")
209-
childArgsStr should include regex ("--files .*file1.txt,.*file2.txt")
210-
childArgsStr should include regex ("--archives .*archive1.txt,.*archive2.txt")
211204
mainClass should be ("org.apache.spark.deploy.yarn.Client")
212205
classpath should have length (0)
206+
207+
sysProps("spark.executor.memory") should be ("5g")
208+
sysProps("spark.driver.memory") should be ("4g")
209+
sysProps("spark.executor.cores") should be ("5")
210+
sysProps("spark.yarn.queue") should be ("thequeue")
211+
sysProps("spark.yarn.dist.jars") should include regex (".*one.jar,.*two.jar,.*three.jar")
212+
sysProps("spark.yarn.dist.files") should include regex (".*file1.txt,.*file2.txt")
213+
sysProps("spark.yarn.dist.archives") should include regex (".*archive1.txt,.*archive2.txt")
213214
sysProps("spark.app.name") should be ("beauty")
214215
sysProps("spark.ui.enabled") should be ("false")
215216
sysProps("SPARK_SUBMIT") should be ("true")
216-
sysProps.keys should not contain ("spark.jars")
217217
}
218218

219219
test("handles YARN client mode") {
@@ -249,7 +249,8 @@ class SparkSubmitSuite
249249
sysProps("spark.executor.instances") should be ("6")
250250
sysProps("spark.yarn.dist.files") should include regex (".*file1.txt,.*file2.txt")
251251
sysProps("spark.yarn.dist.archives") should include regex (".*archive1.txt,.*archive2.txt")
252-
sysProps("spark.jars") should include regex (".*one.jar,.*two.jar,.*three.jar,.*thejar.jar")
252+
sysProps("spark.yarn.dist.jars") should include
253+
regex (".*one.jar,.*two.jar,.*three.jar,.*thejar.jar")
253254
sysProps("SPARK_SUBMIT") should be ("true")
254255
sysProps("spark.ui.enabled") should be ("false")
255256
}

docs/running-on-yarn.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,13 @@ If you need a reference to the proper location to put log files in the YARN so t
215215
Comma-separated list of files to be placed in the working directory of each executor.
216216
</td>
217217
</tr>
218+
<tr>
219+
<td><code>spark.yarn.dist.jars</code></td>
220+
<td>(none)</td>
221+
<td>
222+
Comma-separated list of jars to be placed in the working directory of each executor.
223+
</td>
224+
</tr>
218225
<tr>
219226
<td><code>spark.executor.cores</code></td>
220227
<td>1 in YARN mode, all the available cores on the worker in standalone mode.</td>

yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -662,7 +662,7 @@ object ApplicationMaster extends Logging {
662662
SignalLogger.register(log)
663663
val amArgs = new ApplicationMasterArguments(args)
664664
SparkHadoopUtil.get.runAsSparkUser { () =>
665-
master = new ApplicationMaster(amArgs, new YarnRMClient(amArgs))
665+
master = new ApplicationMaster(amArgs, new YarnRMClient)
666666
System.exit(master.run())
667667
}
668668
}

yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala

Lines changed: 2 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,6 @@ class ApplicationMasterArguments(val args: Array[String]) {
2727
var primaryPyFile: String = null
2828
var primaryRFile: String = null
2929
var userArgs: Seq[String] = Nil
30-
var executorMemory = 1024
31-
var executorCores = 1
3230
var propertiesFile: String = null
3331

3432
parseArgs(args.toList)
@@ -58,18 +56,10 @@ class ApplicationMasterArguments(val args: Array[String]) {
5856
primaryRFile = value
5957
args = tail
6058

61-
case ("--args" | "--arg") :: value :: tail =>
59+
case ("--arg") :: value :: tail =>
6260
userArgsBuffer += value
6361
args = tail
6462

65-
case ("--worker-memory" | "--executor-memory") :: MemoryParam(value) :: tail =>
66-
executorMemory = value
67-
args = tail
68-
69-
case ("--worker-cores" | "--executor-cores") :: IntParam(value) :: tail =>
70-
executorCores = value
71-
args = tail
72-
7363
case ("--properties-file") :: value :: tail =>
7464
propertiesFile = value
7565
args = tail
@@ -101,12 +91,8 @@ class ApplicationMasterArguments(val args: Array[String]) {
10191
| --class CLASS_NAME Name of your application's main class
10292
| --primary-py-file A main Python file
10393
| --primary-r-file A main R file
104-
| --py-files PY_FILES Comma-separated list of .zip, .egg, or .py files to
105-
| place on the PYTHONPATH for Python apps.
106-
| --args ARGS Arguments to be passed to your application's main class.
94+
| --arg ARG Argument to be passed to your application's main class.
10795
| Multiple invocations are possible, each will be passed in order.
108-
| --executor-cores NUM Number of cores for the executors (Default: 1)
109-
| --executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G)
11096
| --properties-file FILE Path to a custom Spark properties file.
11197
""".stripMargin)
11298
// scalastyle:on println

0 commit comments

Comments
 (0)