1717
1818package org .apache .spark .deploy
1919
20- import scala .collection .mutable .ArrayBuffer
21- import java .io .File
20+ import java .io .{File , FileInputStream , IOException }
21+ import java .util .Properties
22+
23+ import scala .collection .JavaConversions ._
24+ import scala .collection .mutable .{HashMap , ArrayBuffer }
25+
26+ import org .apache .spark .SparkException
2227
2328/**
2429 * Parses and encapsulates arguments from the spark-submit script.
2530 */
2631private [spark] class SparkSubmitArguments (args : Array [String ]) {
27- var master : String = " local "
32+ var master : String = null
2833 var deployMode : String = null
2934 var executorMemory : String = null
3035 var executorCores : String = null
@@ -47,22 +52,70 @@ private[spark] class SparkSubmitArguments(args: Array[String]) {
4752 var jars : String = null
4853 var verbose : Boolean = false
4954
50- loadEnvVars()
5155 parseOpts(args.toList)
56+ loadDefaults()
57+ checkRequiredArguments()
58+
59+ /** Return default present in the currently defined defaults file. */
60+ def getDefaultSparkProperties = {
61+ val defaultProperties = new HashMap [String , String ]()
62+ if (verbose) SparkSubmit .printStream.println(s " Using properties file: $propertiesFile" )
63+ Option (propertiesFile).foreach { filename =>
64+ val file = new File (filename)
65+ SparkSubmitArguments .getPropertiesFromFile(file).foreach { case (k, v) =>
66+ if (k.startsWith(" spark" )) {
67+ defaultProperties(k) = v
68+ if (verbose) SparkSubmit .printStream.println(s " Adding default property: $k= $v" )
69+ }
70+ else {
71+ SparkSubmit .printWarning(s " Ignoring non-spark config property: $k= $v" )
72+ }
73+ }
74+ }
75+ defaultProperties
76+ }
5277
53- // Sanity checks
54- if (args.length == 0 ) printUsageAndExit(- 1 )
55- if (primaryResource == null ) SparkSubmit .printErrorAndExit(" Must specify a primary resource" )
56- if (mainClass == null ) SparkSubmit .printErrorAndExit(" Must specify a main class with --class" )
57- if (propertiesFile == null ) {
58- sys.env.get(" SPARK_HOME" ).foreach { sparkHome =>
59- val sep = File .separator
60- val defaultPath = s " ${sparkHome}${sep}conf ${sep}spark-defaults.properties "
61- val file = new File (defaultPath)
62- if (file.exists()) {
63- propertiesFile = file.getAbsolutePath
64- }
78+ /** Fill in any undefined values based on the current properties file or built-in defaults. */
79+ private def loadDefaults () = {
80+
81+ // Use common defaults file, if not specified by user
82+ if (propertiesFile == null ) {
83+ sys.env.get(" SPARK_HOME" ).foreach { sparkHome =>
84+ val sep = File .separator
85+ val defaultPath = s " ${sparkHome}${sep}conf ${sep}spark-defaults.properties "
86+ val file = new File (defaultPath)
87+ if (file.exists()) {
88+ propertiesFile = file.getAbsolutePath
89+ }
90+ }
6591 }
92+
93+ val defaultProperties = getDefaultSparkProperties
94+ // Use properties file as fallback for values which have a direct analog to
95+ // arguments in this script.
96+ master = Option (master).getOrElse(defaultProperties.get(" spark.master" ).orNull)
97+ executorMemory = Option (executorMemory)
98+ .getOrElse(defaultProperties.get(" spark.executor.memory" ).orNull)
99+ executorCores = Option (executorCores)
100+ .getOrElse(defaultProperties.get(" spark.executor.cores" ).orNull)
101+ totalExecutorCores = Option (totalExecutorCores)
102+ .getOrElse(defaultProperties.get(" spark.cores.max" ).orNull)
103+ name = Option (name).getOrElse(defaultProperties.get(" spark.app.name" ).orNull)
104+ jars = Option (jars).getOrElse(defaultProperties.get(" spark.jars" ).orNull)
105+
106+ // This supports env vars in older versions of Spark
107+ master = Option (master).getOrElse(System .getenv(" MASTER" ))
108+ deployMode = Option (deployMode).getOrElse(System .getenv(" DEPLOY_MODE" ))
109+
110+ // Global defaults. These should be keep to minimum to avoid confusing behavior.
111+ master = Option (master).getOrElse(" local" )
112+ }
113+
114+ /** Ensure that required fields exists. Call this only once all defaults are loaded. */
115+ private def checkRequiredArguments () = {
116+ if (args.length == 0 ) printUsageAndExit(- 1 )
117+ if (primaryResource == null ) SparkSubmit .printErrorAndExit(" Must specify a primary resource" )
118+ if (mainClass == null ) SparkSubmit .printErrorAndExit(" Must specify a main class with --class" )
66119 }
67120
68121 override def toString = {
@@ -89,14 +142,12 @@ private[spark] class SparkSubmitArguments(args: Array[String]) {
89142 | childArgs [ ${childArgs.mkString(" " )}]
90143 | jars $jars
91144 | verbose $verbose
145+ |
146+ |Default properties from $propertiesFile:
147+ | ${getDefaultSparkProperties.mkString(" " , " \n " , " \n " )}
92148 """ .stripMargin
93149 }
94150
95- private def loadEnvVars () {
96- Option (System .getenv(" MASTER" )).map(master = _)
97- Option (System .getenv(" DEPLOY_MODE" )).map(deployMode = _)
98- }
99-
100151 private def parseOpts (opts : List [String ]): Unit = opts match {
101152 case (" --name" ) :: value :: tail =>
102153 name = value
@@ -189,6 +240,18 @@ private[spark] class SparkSubmitArguments(args: Array[String]) {
189240 parseOpts(tail)
190241
191242 case value :: tail =>
243+ if (value.startsWith(" -" )) {
244+ val errMessage = s " Unrecognized option ' $value'. "
245+ val suggestion : Option [String ] = value match {
246+ case v if v.startsWith(" --" ) && v.contains(" =" ) =>
247+ val parts = v.split(" =" )
248+ Some (s " Perhaps you meant ' ${parts(0 )} ${parts(1 )}'? " )
249+ case _ =>
250+ None
251+ }
252+ SparkSubmit .printErrorAndExit(errMessage + suggestion.map(" " + _).getOrElse(" " ))
253+ }
254+
192255 if (primaryResource != null ) {
193256 val error = s " Found two conflicting resources, $value and $primaryResource. " +
194257 " Expecting only one resource."
@@ -217,6 +280,8 @@ private[spark] class SparkSubmitArguments(args: Array[String]) {
217280 | --jars JARS A comma-separated list of local jars to include on the
218281 | driver classpath and that SparkContext.addJar will work
219282 | with. Doesn't work on standalone with 'cluster' deploy mode.
283+ | --files FILES Comma separated list of files to be placed in the working dir
284+ | of each executor.
220285 | --properties-file FILE Path to a file from which to load extra properties. If not
221286 | specified, this will look for conf/spark-defaults.properties.
222287 |
@@ -225,6 +290,7 @@ private[spark] class SparkSubmitArguments(args: Array[String]) {
225290 | --driver-library-path Extra library path entries to pass to the driver
226291 | --driver-class-path Extra class path entries to pass to the driver
227292 |
293+ | --executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G).
228294 |
229295 | Spark standalone with cluster deploy mode only:
230296 | --driver-cores NUM Cores for driver (Default: 1).
@@ -235,14 +301,28 @@ private[spark] class SparkSubmitArguments(args: Array[String]) {
235301 |
236302 | YARN-only:
237303 | --executor-cores NUM Number of cores per executor (Default: 1).
238- | --executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G).
239304 | --queue QUEUE_NAME The YARN queue to submit to (Default: 'default').
240305 | --num-executors NUM Number of executors to (Default: 2).
241- | --files FILES Comma separated list of files to be placed in the working dir
242- | of each executor.
243306 | --archives ARCHIVES Comma separated list of archives to be extracted into the
244307 | working dir of each executor.""" .stripMargin
245308 )
246309 SparkSubmit .exitFn()
247310 }
248311}
312+
313+ object SparkSubmitArguments {
314+ /** Load properties present in the given file. */
315+ def getPropertiesFromFile (file : File ): Seq [(String , String )] = {
316+ require(file.exists(), s " Properties file ${file.getName} does not exist " )
317+ val inputStream = new FileInputStream (file)
318+ val properties = new Properties ()
319+ try {
320+ properties.load(inputStream)
321+ } catch {
322+ case e : IOException =>
323+ val message = s " Failed when loading Spark properties file ${file.getName}"
324+ throw new SparkException (message, e)
325+ }
326+ properties.stringPropertyNames().toSeq.map(k => (k, properties(k)))
327+ }
328+ }
0 commit comments