Skip to content

Commit f68105d

Browse files
sryzapwendell
authored andcommitted
SPARK-2664. Deal with --conf options in spark-submit that relate to fl...
...ags Author: Sandy Ryza <[email protected]> Closes #1665 from sryza/sandy-spark-2664 and squashes the following commits: 0518c63 [Sandy Ryza] SPARK-2664. Deal with `--conf` options in spark-submit that relate to flags
1 parent f193312 commit f68105d

File tree

3 files changed

+38
-15
lines changed

3 files changed

+38
-15
lines changed

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ object SparkSubmit {
184184
OptionAssigner(args.archives, YARN, CLIENT, sysProp = "spark.yarn.dist.archives"),
185185

186186
// Yarn cluster only
187-
OptionAssigner(args.name, YARN, CLUSTER, clOption = "--name", sysProp = "spark.app.name"),
187+
OptionAssigner(args.name, YARN, CLUSTER, clOption = "--name"),
188188
OptionAssigner(args.driverMemory, YARN, CLUSTER, clOption = "--driver-memory"),
189189
OptionAssigner(args.queue, YARN, CLUSTER, clOption = "--queue"),
190190
OptionAssigner(args.numExecutors, YARN, CLUSTER, clOption = "--num-executors"),
@@ -268,14 +268,17 @@ object SparkSubmit {
268268
}
269269
}
270270

271+
// Properties given with --conf are superceded by other options, but take precedence over
272+
// properties in the defaults file.
273+
for ((k, v) <- args.sparkProperties) {
274+
sysProps.getOrElseUpdate(k, v)
275+
}
276+
271277
// Read from default spark properties, if any
272278
for ((k, v) <- args.getDefaultSparkProperties) {
273279
sysProps.getOrElseUpdate(k, v)
274280
}
275281

276-
// Spark properties included on command line take precedence
277-
sysProps ++= args.sparkProperties
278-
279282
(childArgs, childClasspath, sysProps, childMainClass)
280283
}
281284

core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
5858
val sparkProperties: HashMap[String, String] = new HashMap[String, String]()
5959

6060
parseOpts(args.toList)
61-
loadDefaults()
61+
mergeSparkProperties()
6262
checkRequiredArguments()
6363

6464
/** Return default present in the currently defined defaults file. */
@@ -79,9 +79,11 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
7979
defaultProperties
8080
}
8181

82-
/** Fill in any undefined values based on the current properties file or built-in defaults. */
83-
private def loadDefaults(): Unit = {
84-
82+
/**
83+
* Fill in any undefined values based on the default properties file or options passed in through
84+
* the '--conf' flag.
85+
*/
86+
private def mergeSparkProperties(): Unit = {
8587
// Use common defaults file, if not specified by user
8688
if (propertiesFile == null) {
8789
sys.env.get("SPARK_HOME").foreach { sparkHome =>
@@ -94,18 +96,20 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
9496
}
9597
}
9698

97-
val defaultProperties = getDefaultSparkProperties
99+
val properties = getDefaultSparkProperties
100+
properties.putAll(sparkProperties)
101+
98102
// Use properties file as fallback for values which have a direct analog to
99103
// arguments in this script.
100-
master = Option(master).getOrElse(defaultProperties.get("spark.master").orNull)
104+
master = Option(master).getOrElse(properties.get("spark.master").orNull)
101105
executorMemory = Option(executorMemory)
102-
.getOrElse(defaultProperties.get("spark.executor.memory").orNull)
106+
.getOrElse(properties.get("spark.executor.memory").orNull)
103107
executorCores = Option(executorCores)
104-
.getOrElse(defaultProperties.get("spark.executor.cores").orNull)
108+
.getOrElse(properties.get("spark.executor.cores").orNull)
105109
totalExecutorCores = Option(totalExecutorCores)
106-
.getOrElse(defaultProperties.get("spark.cores.max").orNull)
107-
name = Option(name).getOrElse(defaultProperties.get("spark.app.name").orNull)
108-
jars = Option(jars).getOrElse(defaultProperties.get("spark.jars").orNull)
110+
.getOrElse(properties.get("spark.cores.max").orNull)
111+
name = Option(name).getOrElse(properties.get("spark.app.name").orNull)
112+
jars = Option(jars).getOrElse(properties.get("spark.jars").orNull)
109113

110114
// This supports env vars in older versions of Spark
111115
master = Option(master).getOrElse(System.getenv("MASTER"))

core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,22 @@ class SparkSubmitSuite extends FunSuite with Matchers {
253253
sysProps("spark.shuffle.spill") should be ("false")
254254
}
255255

256+
test("handles confs with flag equivalents") {
257+
val clArgs = Seq(
258+
"--deploy-mode", "cluster",
259+
"--executor-memory", "5g",
260+
"--class", "org.SomeClass",
261+
"--conf", "spark.executor.memory=4g",
262+
"--conf", "spark.master=yarn",
263+
"thejar.jar",
264+
"arg1", "arg2")
265+
val appArgs = new SparkSubmitArguments(clArgs)
266+
val (_, _, sysProps, mainClass) = createLaunchEnv(appArgs)
267+
sysProps("spark.executor.memory") should be ("5g")
268+
sysProps("spark.master") should be ("yarn-cluster")
269+
mainClass should be ("org.apache.spark.deploy.yarn.Client")
270+
}
271+
256272
test("launch simple application with spark-submit") {
257273
val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
258274
val args = Seq(

0 commit comments

Comments
 (0)