|
18 | 18 | package org.apache.spark.deploy |
19 | 19 |
|
20 | 20 | import java.net.URI |
| 21 | +import java.util.{List => JList} |
21 | 22 | import java.util.jar.JarFile |
22 | 23 |
|
| 24 | +import scala.collection.JavaConversions._ |
23 | 25 | import scala.collection.mutable.{ArrayBuffer, HashMap} |
24 | 26 |
|
| 27 | +import org.apache.spark.launcher.SparkSubmitOptionParser |
25 | 28 | import org.apache.spark.util.Utils |
26 | 29 |
|
27 | 30 | /** |
28 | 31 | * Parses and encapsulates arguments from the spark-submit script. |
29 | 32 | * The env argument is used for testing. |
30 | 33 | */ |
31 | | -private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, String] = sys.env) { |
| 34 | +private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, String] = sys.env) |
| 35 | + extends SparkSubmitOptionParser { |
32 | 36 | var master: String = null |
33 | 37 | var deployMode: String = null |
34 | 38 | var executorMemory: String = null |
@@ -73,7 +77,12 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St |
73 | 77 | } |
74 | 78 |
|
75 | 79 | // Set parameters from command line arguments |
76 | | - parseOpts(args.toList) |
| 80 | + try { |
| 81 | + parse(args.toList) |
| 82 | + } catch { |
| 83 | + case e: IllegalArgumentException => |
| 84 | + SparkSubmit.printErrorAndExit(e.getMessage()) |
| 85 | + } |
77 | 86 | // Populate `sparkProperties` map from properties file |
78 | 87 | mergeDefaultSparkProperties() |
79 | 88 | // Use `sparkProperties` map along with env vars to fill in any missing parameters |
@@ -224,138 +233,116 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St |
224 | 233 | """.stripMargin |
225 | 234 | } |
226 | 235 |
|
227 | | - /** |
228 | | - * Fill in values by parsing user options. |
229 | | - * NOTE: Any changes here must be reflected in YarnClientSchedulerBackend. |
230 | | - */ |
231 | | - private def parseOpts(opts: Seq[String]): Unit = { |
232 | | - val EQ_SEPARATED_OPT="""(--[^=]+)=(.+)""".r |
233 | | - |
234 | | - // Delineates parsing of Spark options from parsing of user options. |
235 | | - parse(opts) |
236 | | - |
237 | | - /** |
238 | | - * NOTE: If you add or remove spark-submit options, |
239 | | - * modify NOT ONLY this file but also utils.sh |
240 | | - */ |
241 | | - def parse(opts: Seq[String]): Unit = opts match { |
242 | | - case ("--name") :: value :: tail => |
| 236 | + import SparkSubmitOptionParser._ |
| 237 | + |
| 238 | + /** Fill in values by parsing user options. */ |
| 239 | + override protected def handle(opt: String, value: String): Boolean = { |
| 240 | + opt match { |
| 241 | + case NAME => |
243 | 242 | name = value |
244 | | - parse(tail) |
245 | 243 |
|
246 | | - case ("--master") :: value :: tail => |
| 244 | + case MASTER => |
247 | 245 | master = value |
248 | | - parse(tail) |
249 | 246 |
|
250 | | - case ("--class") :: value :: tail => |
| 247 | + case CLASS => |
251 | 248 | mainClass = value |
252 | | - parse(tail) |
253 | 249 |
|
254 | | - case ("--deploy-mode") :: value :: tail => |
| 250 | + case DEPLOY_MODE => |
255 | 251 | if (value != "client" && value != "cluster") { |
256 | 252 | SparkSubmit.printErrorAndExit("--deploy-mode must be either \"client\" or \"cluster\"") |
257 | 253 | } |
258 | 254 | deployMode = value |
259 | | - parse(tail) |
260 | 255 |
|
261 | | - case ("--num-executors") :: value :: tail => |
| 256 | + case NUM_EXECUTORS => |
262 | 257 | numExecutors = value |
263 | | - parse(tail) |
264 | 258 |
|
265 | | - case ("--total-executor-cores") :: value :: tail => |
| 259 | + case TOTAL_EXECUTOR_CORES => |
266 | 260 | totalExecutorCores = value |
267 | | - parse(tail) |
268 | 261 |
|
269 | | - case ("--executor-cores") :: value :: tail => |
| 262 | + case EXECUTOR_CORES => |
270 | 263 | executorCores = value |
271 | | - parse(tail) |
272 | 264 |
|
273 | | - case ("--executor-memory") :: value :: tail => |
| 265 | + case EXECUTOR_MEMORY => |
274 | 266 | executorMemory = value |
275 | | - parse(tail) |
276 | 267 |
|
277 | | - case ("--driver-memory") :: value :: tail => |
| 268 | + case DRIVER_MEMORY => |
278 | 269 | driverMemory = value |
279 | | - parse(tail) |
280 | 270 |
|
281 | | - case ("--driver-cores") :: value :: tail => |
| 271 | + case DRIVER_CORES => |
282 | 272 | driverCores = value |
283 | | - parse(tail) |
284 | 273 |
|
285 | | - case ("--driver-class-path") :: value :: tail => |
| 274 | + case DRIVER_CLASS_PATH => |
286 | 275 | driverExtraClassPath = value |
287 | | - parse(tail) |
288 | 276 |
|
289 | | - case ("--driver-java-options") :: value :: tail => |
| 277 | + case DRIVER_JAVA_OPTIONS => |
290 | 278 | driverExtraJavaOptions = value |
291 | | - parse(tail) |
292 | 279 |
|
293 | | - case ("--driver-library-path") :: value :: tail => |
| 280 | + case DRIVER_LIBRARY_PATH => |
294 | 281 | driverExtraLibraryPath = value |
295 | | - parse(tail) |
296 | 282 |
|
297 | | - case ("--properties-file") :: value :: tail => |
| 283 | + case PROPERTIES_FILE => |
298 | 284 | propertiesFile = value |
299 | | - parse(tail) |
300 | 285 |
|
301 | | - case ("--supervise") :: tail => |
| 286 | + case SUPERVISE => |
302 | 287 | supervise = true |
303 | | - parse(tail) |
304 | 288 |
|
305 | | - case ("--queue") :: value :: tail => |
| 289 | + case QUEUE => |
306 | 290 | queue = value |
307 | | - parse(tail) |
308 | 291 |
|
309 | | - case ("--files") :: value :: tail => |
| 292 | + case FILES => |
310 | 293 | files = Utils.resolveURIs(value) |
311 | | - parse(tail) |
312 | 294 |
|
313 | | - case ("--py-files") :: value :: tail => |
| 295 | + case PY_FILES => |
314 | 296 | pyFiles = Utils.resolveURIs(value) |
315 | | - parse(tail) |
316 | 297 |
|
317 | | - case ("--archives") :: value :: tail => |
| 298 | + case ARCHIVES => |
318 | 299 | archives = Utils.resolveURIs(value) |
319 | | - parse(tail) |
320 | 300 |
|
321 | | - case ("--jars") :: value :: tail => |
| 301 | + case JARS => |
322 | 302 | jars = Utils.resolveURIs(value) |
323 | | - parse(tail) |
324 | 303 |
|
325 | | - case ("--conf" | "-c") :: value :: tail => |
| 304 | + case CONF => |
326 | 305 | value.split("=", 2).toSeq match { |
327 | 306 | case Seq(k, v) => sparkProperties(k) = v |
328 | 307 | case _ => SparkSubmit.printErrorAndExit(s"Spark config without '=': $value") |
329 | 308 | } |
330 | | - parse(tail) |
331 | 309 |
|
332 | | - case ("--help" | "-h") :: tail => |
| 310 | + case HELP => |
333 | 311 | printUsageAndExit(0) |
334 | 312 |
|
335 | | - case ("--verbose" | "-v") :: tail => |
| 313 | + case VERBOSE => |
336 | 314 | verbose = true |
337 | | - parse(tail) |
338 | 315 |
|
339 | | - case EQ_SEPARATED_OPT(opt, value) :: tail => |
340 | | - parse(opt :: value :: tail) |
| 316 | + case _ => |
| 317 | + throw new IllegalArgumentException(s"Unexpected argument '$opt'.") |
| 318 | + } |
| 319 | + true |
| 320 | + } |
341 | 321 |
|
342 | | - case value :: tail if value.startsWith("-") => |
343 | | - SparkSubmit.printErrorAndExit(s"Unrecognized option '$value'.") |
| 322 | + /** |
| 323 | + * The first unrecognized option is treated as the "primary resource". Everything else is |
| 324 | + * treated as application arguments. |
| 325 | + */ |
| 326 | + override protected def handleUnknown(opt: String): Boolean = { |
| 327 | + if (opt.startsWith("-")) { |
| 328 | + SparkSubmit.printErrorAndExit(s"Unrecognized option '$opt'.") |
| 329 | + } |
344 | 330 |
|
345 | | - case value :: tail => |
346 | | - primaryResource = |
347 | | - if (!SparkSubmit.isShell(value) && !SparkSubmit.isInternal(value)) { |
348 | | - Utils.resolveURI(value).toString |
349 | | - } else { |
350 | | - value |
351 | | - } |
352 | | - isPython = SparkSubmit.isPython(value) |
353 | | - childArgs ++= tail |
| 331 | + primaryResource = |
| 332 | + if (!SparkSubmit.isShell(opt) && !SparkSubmit.isInternal(opt)) { |
| 333 | + Utils.resolveURI(opt).toString |
| 334 | + } else { |
| 335 | + opt |
| 336 | + } |
| 337 | + isPython = SparkSubmit.isPython(opt) |
| 338 | + false |
| 339 | + } |
354 | 340 |
|
355 | | - case Nil => |
356 | | - } |
| 341 | + override protected def handleExtraArgs(extra: JList[String]): Unit = { |
| 342 | + childArgs ++= extra |
357 | 343 | } |
358 | 344 |
|
| 345 | + |
359 | 346 | private def printUsageAndExit(exitCode: Int, unknownParam: Any = null): Unit = { |
360 | 347 | val outStream = SparkSubmit.printStream |
361 | 348 | if (unknownParam != null) { |
|
0 commit comments