From 0b01ff86e56a9471acd52abc820e4bd98946eb36 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Wed, 9 Jul 2014 13:12:05 -0700 Subject: [PATCH 1/4] Clean up SparkSubmit for readability --- .../org/apache/spark/deploy/SparkSubmit.scala | 293 +++++++++--------- 1 file changed, 143 insertions(+), 150 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index b050dccb6d57..9759d2406c89 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -32,20 +32,33 @@ import org.apache.spark.util.Utils * modes that Spark supports. */ object SparkSubmit { + + // Cluster managers private val YARN = 1 private val STANDALONE = 2 private val MESOS = 4 private val LOCAL = 8 private val ALL_CLUSTER_MGRS = YARN | STANDALONE | MESOS | LOCAL - private var clusterManager: Int = LOCAL + // Deploy modes + private val CLIENT = 1 + private val CLUSTER = 2 + private val ALL_DEPLOY_MODES = CLIENT | CLUSTER - /** - * Special primary resource names that represent shells rather than application jars. - */ + // Special primary resource names that represent shells rather than application jars. private val SPARK_SHELL = "spark-shell" private val PYSPARK_SHELL = "pyspark-shell" + // Exposed for testing + private[spark] var exitFn: () => Unit = () => System.exit(-1) + private[spark] var printStream: PrintStream = System.err + private[spark] def printWarning(str: String) = printStream.println("Warning: " + str) + private[spark] def printErrorAndExit(str: String) = { + printStream.println("Error: " + str) + printStream.println("Run with --help for usage help or --verbose for debug output") + exitFn() + } + def main(args: Array[String]) { val appArgs = new SparkSubmitArguments(args) if (appArgs.verbose) { @@ -55,119 +68,74 @@ object SparkSubmit { launch(childArgs, classpath, sysProps, mainClass, appArgs.verbose) } - // Exposed for testing - private[spark] var printStream: PrintStream = System.err - private[spark] var exitFn: () => Unit = () => System.exit(-1) - - private[spark] def printErrorAndExit(str: String) = { - printStream.println("Error: " + str) - printStream.println("Run with --help for usage help or --verbose for debug output") - exitFn() - } - private[spark] def printWarning(str: String) = printStream.println("Warning: " + str) - /** - * @return a tuple containing the arguments for the child, a list of classpath - * entries for the child, a list of system properties, a list of env vars - * and the main class for the child + * @return a tuple containing + * (1) the arguments for the child process, + * (2) a list of classpath entries for the child, + * (3) a list of system properties and env vars, and + * (4) the main class for the child */ private[spark] def createLaunchEnv(args: SparkSubmitArguments) : (ArrayBuffer[String], ArrayBuffer[String], Map[String, String], String) = { - if (args.master.startsWith("local")) { - clusterManager = LOCAL - } else if (args.master.startsWith("yarn")) { - clusterManager = YARN - } else if (args.master.startsWith("spark")) { - clusterManager = STANDALONE - } else if (args.master.startsWith("mesos")) { - clusterManager = MESOS - } else { - printErrorAndExit("Master must start with yarn, mesos, spark, or local") - } - // Because "yarn-cluster" and "yarn-client" encapsulate both the master - // and deploy mode, we have some logic to infer the master and deploy mode - // from each other if only one is specified, or exit early if they are at odds. - if (args.deployMode == null && - (args.master == "yarn-standalone" || args.master == "yarn-cluster")) { - args.deployMode = "cluster" - } - if (args.deployMode == "cluster" && args.master == "yarn-client") { - printErrorAndExit("Deploy mode \"cluster\" and master \"yarn-client\" are not compatible") - } - if (args.deployMode == "client" && - (args.master == "yarn-standalone" || args.master == "yarn-cluster")) { - printErrorAndExit("Deploy mode \"client\" and master \"" + args.master - + "\" are not compatible") - } - if (args.deployMode == "cluster" && args.master.startsWith("yarn")) { - args.master = "yarn-cluster" - } - if (args.deployMode != "cluster" && args.master.startsWith("yarn")) { - args.master = "yarn-client" - } - - val deployOnCluster = Option(args.deployMode).getOrElse("client") == "cluster" - - val childClasspath = new ArrayBuffer[String]() + // Values to return val childArgs = new ArrayBuffer[String]() + val childClasspath = new ArrayBuffer[String]() val sysProps = new HashMap[String, String]() var childMainClass = "" - val isPython = args.isPython - val isYarnCluster = clusterManager == YARN && deployOnCluster - - // For mesos, only client mode is supported - if (clusterManager == MESOS && deployOnCluster) { - printErrorAndExit("Cluster deploy mode is currently not supported for Mesos clusters.") + // Set the cluster manager + val clusterManager: Int = args.master match { + case m if m.startsWith("local") => LOCAL + case m if m.startsWith("yarn") => YARN + case m if m.startsWith("spark") => STANDALONE + case m if m.startsWith("mesos") => MESOS + case _ => printErrorAndExit("Master must start with yarn, mesos, spark, or local"); -1 } - // For standalone, only client mode is supported - if (clusterManager == STANDALONE && deployOnCluster) { - printErrorAndExit("Cluster deploy mode is currently not supported for standalone clusters.") + // Set the deploy mode; default is client mode + var deployMode: Int = args.deployMode match { + case "client" | null => CLIENT + case "cluster" => CLUSTER + case _ => printErrorAndExit("Deploy mode must be either client or cluster"); -1 } - // For shells, only client mode is applicable - if (isShell(args.primaryResource) && deployOnCluster) { - printErrorAndExit("Cluster deploy mode is not applicable to Spark shells.") + // The following modes are not supported or applicable + (clusterManager, deployMode) match { + case (MESOS, CLUSTER) => + printErrorAndExit("Cluster deploy mode is currently not supported for Mesos clusters.") + case (STANDALONE, CLUSTER) => + printErrorAndExit("Cluster deploy mode is currently not supported for Standalone clusters.") + case (_, CLUSTER) if args.isPython => + printErrorAndExit("Cluster deploy mode is currently not supported for python applications.") + case (_, CLUSTER) if isShell(args.primaryResource) => + printErrorAndExit("Cluster deploy mode is not applicable to Spark shells.") + case _ => } - // If we're running a python app, set the main class to our specific python runner - if (isPython) { - if (deployOnCluster) { - printErrorAndExit("Cluster deploy mode is currently not supported for python.") - } - if (args.primaryResource == PYSPARK_SHELL) { - args.mainClass = "py4j.GatewayServer" - args.childArgs = ArrayBuffer("--die-on-broken-pipe", "0") - } else { - // If a python file is provided, add it to the child arguments and list of files to deploy. - // Usage: PythonAppRunner
[app arguments] - args.mainClass = "org.apache.spark.deploy.PythonRunner" - args.childArgs = ArrayBuffer(args.primaryResource, args.pyFiles) ++ args.childArgs - args.files = mergeFileLists(args.files, args.primaryResource) + // Because "yarn-cluster" and "yarn-client" encapsulate both the master + // and deploy mode, we have some logic to infer the master and deploy mode + // from each other if only one is specified, or exit early if they are at odds. + if (clusterManager == YARN) { + if (args.master == "yarn-standalone") { + printWarning("\"yarn-standalone\" is deprecated. Use \"yarn-cluster\" instead.") + args.master = "yarn-cluster" } - args.files = mergeFileLists(args.files, args.pyFiles) - // Format python file paths properly before adding them to the PYTHONPATH - sysProps("spark.submit.pyFiles") = PythonRunner.formatPaths(args.pyFiles).mkString(",") - } - - // If we're deploying into YARN, use yarn.Client as a wrapper around the user class - if (!deployOnCluster) { - childMainClass = args.mainClass - if (isUserJar(args.primaryResource)) { - childClasspath += args.primaryResource + (args.master, args.deployMode) match { + case ("yarn-cluster", null) => + deployMode = CLUSTER + case ("yarn-cluster", "client") => + printErrorAndExit("Client deploy mode is not compatible with master \"yarn-cluster\"") + case ("yarn-client", "cluster") => + printErrorAndExit("Cluster deploy mode is not compatible with master \"yarn-client\"") + case (_, mode) => + args.master = "yarn-" + Option(mode).getOrElse("client") } - } else if (clusterManager == YARN) { - childMainClass = "org.apache.spark.deploy.yarn.Client" - childArgs += ("--jar", args.primaryResource) - childArgs += ("--class", args.mainClass) - } - // Make sure YARN is included in our build if we're trying to use it - if (clusterManager == YARN) { + // Make sure YARN is included in our build if we're trying to use it if (!Utils.classIsLoadable("org.apache.spark.deploy.yarn.Client") && !Utils.isTesting) { - printErrorAndExit("Could not load YARN classes. " + + printErrorAndExit( + "Could not load YARN classes. " + "This copy of Spark may not have been compiled with YARN support.") } } @@ -178,94 +146,119 @@ object SparkSubmit { // A list of rules to map each argument to system properties or command-line options in // each deploy mode; we iterate through these below val options = List[OptionAssigner]( - OptionAssigner(args.master, ALL_CLUSTER_MGRS, false, sysProp = "spark.master"), - OptionAssigner(args.name, ALL_CLUSTER_MGRS, false, sysProp = "spark.app.name"), - OptionAssigner(args.name, YARN, true, clOption = "--name", sysProp = "spark.app.name"), - OptionAssigner(args.driverExtraClassPath, STANDALONE | YARN, true, + OptionAssigner(args.master, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.master"), + OptionAssigner(args.name, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.app.name"), + OptionAssigner(args.name, YARN, CLUSTER, clOption = "--name", sysProp = "spark.app.name"), + OptionAssigner(args.driverExtraClassPath, STANDALONE | YARN, CLUSTER, sysProp = "spark.driver.extraClassPath"), - OptionAssigner(args.driverExtraJavaOptions, STANDALONE | YARN, true, + OptionAssigner(args.driverExtraJavaOptions, STANDALONE | YARN, CLUSTER, sysProp = "spark.driver.extraJavaOptions"), - OptionAssigner(args.driverExtraLibraryPath, STANDALONE | YARN, true, + OptionAssigner(args.driverExtraLibraryPath, STANDALONE | YARN, CLUSTER, sysProp = "spark.driver.extraLibraryPath"), - OptionAssigner(args.driverMemory, YARN, true, clOption = "--driver-memory"), - OptionAssigner(args.driverMemory, STANDALONE, true, clOption = "--memory"), - OptionAssigner(args.driverCores, STANDALONE, true, clOption = "--cores"), - OptionAssigner(args.queue, YARN, true, clOption = "--queue"), - OptionAssigner(args.queue, YARN, false, sysProp = "spark.yarn.queue"), - OptionAssigner(args.numExecutors, YARN, true, clOption = "--num-executors"), - OptionAssigner(args.numExecutors, YARN, false, sysProp = "spark.executor.instances"), - OptionAssigner(args.executorMemory, YARN, true, clOption = "--executor-memory"), - OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, false, + OptionAssigner(args.driverMemory, YARN, CLUSTER, clOption = "--driver-memory"), + OptionAssigner(args.driverMemory, STANDALONE, CLUSTER, clOption = "--memory"), + OptionAssigner(args.driverCores, STANDALONE, CLUSTER, clOption = "--cores"), + OptionAssigner(args.queue, YARN, CLUSTER, clOption = "--queue"), + OptionAssigner(args.queue, YARN, CLIENT, sysProp = "spark.yarn.queue"), + OptionAssigner(args.numExecutors, YARN, CLUSTER, clOption = "--num-executors"), + OptionAssigner(args.numExecutors, YARN, CLIENT, sysProp = "spark.executor.instances"), + OptionAssigner(args.executorMemory, YARN, CLUSTER, clOption = "--executor-memory"), + OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, CLIENT, sysProp = "spark.executor.memory"), - OptionAssigner(args.executorCores, YARN, true, clOption = "--executor-cores"), - OptionAssigner(args.executorCores, YARN, false, sysProp = "spark.executor.cores"), - OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, false, + OptionAssigner(args.executorCores, YARN, CLUSTER, clOption = "--executor-cores"), + OptionAssigner(args.executorCores, YARN, CLIENT, sysProp = "spark.executor.cores"), + OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, CLIENT, sysProp = "spark.cores.max"), - OptionAssigner(args.files, YARN, false, sysProp = "spark.yarn.dist.files"), - OptionAssigner(args.files, YARN, true, clOption = "--files"), - OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, false, sysProp = "spark.files"), - OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, true, sysProp = "spark.files"), - OptionAssigner(args.archives, YARN, false, sysProp = "spark.yarn.dist.archives"), - OptionAssigner(args.archives, YARN, true, clOption = "--archives"), - OptionAssigner(args.jars, YARN, true, clOption = "--addJars"), - OptionAssigner(args.jars, ALL_CLUSTER_MGRS, false, sysProp = "spark.jars") + OptionAssigner(args.files, YARN, CLIENT, sysProp = "spark.yarn.dist.files"), + OptionAssigner(args.files, YARN, CLUSTER, clOption = "--files"), + OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, CLIENT, sysProp = "spark.files"), + OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, CLUSTER, sysProp = "spark.files"), + OptionAssigner(args.archives, YARN, CLIENT, sysProp = "spark.yarn.dist.archives"), + OptionAssigner(args.archives, YARN, CLUSTER, clOption = "--archives"), + OptionAssigner(args.jars, YARN, CLUSTER, clOption = "--addJars"), + OptionAssigner(args.jars, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.jars") ) - // For client mode make any added jars immediately visible on the classpath - if (args.jars != null && !deployOnCluster) { - for (jar <- args.jars.split(",")) { - childClasspath += jar + // In client mode, launch the application main class directly + // In addition, add the main application jar and any added jars (if any) to the classpath + if (deployMode == CLIENT) { + childMainClass = args.mainClass + if (isUserJar(args.primaryResource)) { + childClasspath += args.primaryResource + } + if (args.jars != null) { + childClasspath ++= args.jars.split(",") + } + if (args.childArgs != null) { + childArgs ++= args.childArgs + } + } + + // If we're running a python app, set the main class to our specific python runner + if (args.isPython) { + if (args.primaryResource == PYSPARK_SHELL) { + args.mainClass = "py4j.GatewayServer" + args.childArgs = ArrayBuffer("--die-on-broken-pipe", "0") + } else { + // If a python file is provided, add it to the child arguments and list of files to deploy. + // Usage: PythonAppRunner
[app arguments] + args.mainClass = "org.apache.spark.deploy.PythonRunner" + args.childArgs = ArrayBuffer(args.primaryResource, args.pyFiles) ++ args.childArgs + args.files = mergeFileLists(args.files, args.primaryResource) } + args.files = mergeFileLists(args.files, args.pyFiles) + // Format python file paths properly before adding them to the PYTHONPATH + sysProps("spark.submit.pyFiles") = PythonRunner.formatPaths(args.pyFiles).mkString(",") } // Map all arguments to command-line options or system properties for our chosen mode for (opt <- options) { - if (opt.value != null && deployOnCluster == opt.deployOnCluster && + if (opt.value != null && + (deployMode & opt.deployMode) != 0 && (clusterManager & opt.clusterManager) != 0) { - if (opt.clOption != null) { - childArgs += (opt.clOption, opt.value) - } - if (opt.sysProp != null) { - sysProps.put(opt.sysProp, opt.value) - } + if (opt.clOption != null) { childArgs += (opt.clOption, opt.value) } + if (opt.sysProp != null) { sysProps.put(opt.sysProp, opt.value) } } } // Add the application jar automatically so the user doesn't have to call sc.addJar // For YARN cluster mode, the jar is already distributed on each node as "app.jar" // For python files, the primary resource is already distributed as a regular file - if (!isYarnCluster && !isPython) { - var jars = sysProps.get("spark.jars").map(x => x.split(",").toSeq).getOrElse(Seq()) + val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER + if (!isYarnCluster && !args.isPython) { + var jars = sysProps.get("spark.jars").map(x => x.split(",").toSeq).getOrElse(Seq.empty) if (isUserJar(args.primaryResource)) { jars = jars ++ Seq(args.primaryResource) } sysProps.put("spark.jars", jars.mkString(",")) } - // Standalone cluster specific configurations - if (deployOnCluster && clusterManager == STANDALONE) { + // In standalone-cluster mode, use Client as a wrapper around the user class + if (clusterManager == STANDALONE && deployMode == CLUSTER) { + childMainClass = "org.apache.spark.deploy.Client" if (args.supervise) { childArgs += "--supervise" } - childMainClass = "org.apache.spark.deploy.Client" childArgs += "launch" childArgs += (args.master, args.primaryResource, args.mainClass) + if (args.childArgs != null) { + childArgs ++= args.childArgs + } } - // Arguments to be passed to user program - if (args.childArgs != null) { - if (!deployOnCluster || clusterManager == STANDALONE) { - childArgs ++= args.childArgs - } else if (clusterManager == YARN) { - for (arg <- args.childArgs) { - childArgs += ("--arg", arg) - } + // In yarn-cluster mode, use yarn.Client as a wrapper around the user class + if (clusterManager == YARN && deployMode == CLUSTER) { + childMainClass = "org.apache.spark.deploy.yarn.Client" + childArgs += ("--jar", args.primaryResource) + childArgs += ("--class", args.mainClass) + if (args.childArgs != null) { + args.childArgs.foreach { arg => childArgs += ("--arg", arg) } } } // Read from default spark properties, if any for ((k, v) <- args.getDefaultSparkProperties) { - if (!sysProps.contains(k)) sysProps(k) = v + sysProps.getOrElseUpdate(k, v) } (childArgs, childClasspath, sysProps, childMainClass) @@ -364,6 +357,6 @@ object SparkSubmit { private[spark] case class OptionAssigner( value: String, clusterManager: Int, - deployOnCluster: Boolean, + deployMode: Int, clOption: String = null, sysProp: String = null) From 7167824f6a3953dd037fb35d74f0c30384cf7cd1 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Wed, 9 Jul 2014 14:44:34 -0700 Subject: [PATCH 2/4] Re-order config options and update comments This also moves the code block for python applications back to the right place. --- .../org/apache/spark/deploy/SparkSubmit.scala | 98 ++++++++++--------- 1 file changed, 53 insertions(+), 45 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 9759d2406c89..d5da75de6781 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -27,9 +27,10 @@ import org.apache.spark.executor.ExecutorURLClassLoader import org.apache.spark.util.Utils /** - * Scala code behind the spark-submit script. The script handles setting up the classpath with - * relevant Spark dependencies and provides a layer over the different cluster managers and deploy - * modes that Spark supports. + * Main gateway of launching a Spark application. + * + * This script handles setting up the classpath with relevant Spark dependencies and provides + * a layer over the different cluster managers and deploy modes that Spark supports. */ object SparkSubmit { @@ -86,11 +87,11 @@ object SparkSubmit { // Set the cluster manager val clusterManager: Int = args.master match { - case m if m.startsWith("local") => LOCAL case m if m.startsWith("yarn") => YARN case m if m.startsWith("spark") => STANDALONE case m if m.startsWith("mesos") => MESOS - case _ => printErrorAndExit("Master must start with yarn, mesos, spark, or local"); -1 + case m if m.startsWith("local") => LOCAL + case _ => printErrorAndExit("Master must start with yarn, spark, mesos, or local"); -1 } // Set the deploy mode; default is client mode @@ -140,43 +141,70 @@ object SparkSubmit { } } + // If we're running a python app, set the main class to our specific python runner + if (args.isPython) { + if (args.primaryResource == PYSPARK_SHELL) { + args.mainClass = "py4j.GatewayServer" + args.childArgs = ArrayBuffer("--die-on-broken-pipe", "0") + } else { + // If a python file is provided, add it to the child arguments and list of files to deploy. + // Usage: PythonAppRunner
[app arguments] + args.mainClass = "org.apache.spark.deploy.PythonRunner" + args.childArgs = ArrayBuffer(args.primaryResource, args.pyFiles) ++ args.childArgs + args.files = mergeFileLists(args.files, args.primaryResource) + } + args.files = mergeFileLists(args.files, args.pyFiles) + // Format python file paths properly before adding them to the PYTHONPATH + sysProps("spark.submit.pyFiles") = PythonRunner.formatPaths(args.pyFiles).mkString(",") + } + // Special flag to avoid deprecation warnings at the client sysProps("SPARK_SUBMIT") = "true" // A list of rules to map each argument to system properties or command-line options in // each deploy mode; we iterate through these below val options = List[OptionAssigner]( + + // All cluster managers OptionAssigner(args.master, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.master"), OptionAssigner(args.name, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.app.name"), + OptionAssigner(args.jars, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.jars"), + + // Standalone cluster only + OptionAssigner(args.driverMemory, STANDALONE, CLUSTER, clOption = "--memory"), + OptionAssigner(args.driverCores, STANDALONE, CLUSTER, clOption = "--cores"), + + // Yarn client only + OptionAssigner(args.queue, YARN, CLIENT, sysProp = "spark.yarn.queue"), + OptionAssigner(args.numExecutors, YARN, CLIENT, sysProp = "spark.executor.instances"), + OptionAssigner(args.executorCores, YARN, CLIENT, sysProp = "spark.executor.cores"), + OptionAssigner(args.files, YARN, CLIENT, sysProp = "spark.yarn.dist.files"), + OptionAssigner(args.archives, YARN, CLIENT, sysProp = "spark.yarn.dist.archives"), + + // Yarn cluster only OptionAssigner(args.name, YARN, CLUSTER, clOption = "--name", sysProp = "spark.app.name"), + OptionAssigner(args.driverMemory, YARN, CLUSTER, clOption = "--driver-memory"), + OptionAssigner(args.queue, YARN, CLUSTER, clOption = "--queue"), + OptionAssigner(args.numExecutors, YARN, CLUSTER, clOption = "--num-executors"), + OptionAssigner(args.executorMemory, YARN, CLUSTER, clOption = "--executor-memory"), + OptionAssigner(args.executorCores, YARN, CLUSTER, clOption = "--executor-cores"), + OptionAssigner(args.files, YARN, CLUSTER, clOption = "--files"), + OptionAssigner(args.archives, YARN, CLUSTER, clOption = "--archives"), + OptionAssigner(args.jars, YARN, CLUSTER, clOption = "--addJars"), + + // Other options OptionAssigner(args.driverExtraClassPath, STANDALONE | YARN, CLUSTER, sysProp = "spark.driver.extraClassPath"), OptionAssigner(args.driverExtraJavaOptions, STANDALONE | YARN, CLUSTER, sysProp = "spark.driver.extraJavaOptions"), OptionAssigner(args.driverExtraLibraryPath, STANDALONE | YARN, CLUSTER, sysProp = "spark.driver.extraLibraryPath"), - OptionAssigner(args.driverMemory, YARN, CLUSTER, clOption = "--driver-memory"), - OptionAssigner(args.driverMemory, STANDALONE, CLUSTER, clOption = "--memory"), - OptionAssigner(args.driverCores, STANDALONE, CLUSTER, clOption = "--cores"), - OptionAssigner(args.queue, YARN, CLUSTER, clOption = "--queue"), - OptionAssigner(args.queue, YARN, CLIENT, sysProp = "spark.yarn.queue"), - OptionAssigner(args.numExecutors, YARN, CLUSTER, clOption = "--num-executors"), - OptionAssigner(args.numExecutors, YARN, CLIENT, sysProp = "spark.executor.instances"), - OptionAssigner(args.executorMemory, YARN, CLUSTER, clOption = "--executor-memory"), OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, CLIENT, sysProp = "spark.executor.memory"), - OptionAssigner(args.executorCores, YARN, CLUSTER, clOption = "--executor-cores"), - OptionAssigner(args.executorCores, YARN, CLIENT, sysProp = "spark.executor.cores"), OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, CLIENT, sysProp = "spark.cores.max"), - OptionAssigner(args.files, YARN, CLIENT, sysProp = "spark.yarn.dist.files"), - OptionAssigner(args.files, YARN, CLUSTER, clOption = "--files"), - OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, CLIENT, sysProp = "spark.files"), - OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, CLUSTER, sysProp = "spark.files"), - OptionAssigner(args.archives, YARN, CLIENT, sysProp = "spark.yarn.dist.archives"), - OptionAssigner(args.archives, YARN, CLUSTER, clOption = "--archives"), - OptionAssigner(args.jars, YARN, CLUSTER, clOption = "--addJars"), - OptionAssigner(args.jars, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.jars") + OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, ALL_DEPLOY_MODES, + sysProp = "spark.files") ) // In client mode, launch the application main class directly @@ -186,30 +214,10 @@ object SparkSubmit { if (isUserJar(args.primaryResource)) { childClasspath += args.primaryResource } - if (args.jars != null) { - childClasspath ++= args.jars.split(",") - } - if (args.childArgs != null) { - childArgs ++= args.childArgs - } + if (args.jars != null) { childClasspath ++= args.jars.split(",") } + if (args.childArgs != null) { childArgs ++= args.childArgs } } - // If we're running a python app, set the main class to our specific python runner - if (args.isPython) { - if (args.primaryResource == PYSPARK_SHELL) { - args.mainClass = "py4j.GatewayServer" - args.childArgs = ArrayBuffer("--die-on-broken-pipe", "0") - } else { - // If a python file is provided, add it to the child arguments and list of files to deploy. - // Usage: PythonAppRunner
[app arguments] - args.mainClass = "org.apache.spark.deploy.PythonRunner" - args.childArgs = ArrayBuffer(args.primaryResource, args.pyFiles) ++ args.childArgs - args.files = mergeFileLists(args.files, args.primaryResource) - } - args.files = mergeFileLists(args.files, args.pyFiles) - // Format python file paths properly before adding them to the PYTHONPATH - sysProps("spark.submit.pyFiles") = PythonRunner.formatPaths(args.pyFiles).mkString(",") - } // Map all arguments to command-line options or system properties for our chosen mode for (opt <- options) { From fe484a18c4ece79b3f6af25cf3d0287f1f7b6833 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Thu, 10 Jul 2014 18:08:14 -0700 Subject: [PATCH 3/4] Move deploy mode checks after yarn code In the yarn code block, we set the deploy mode to "cluster" the master is "yarn-cluster" and deply mode is not specified. However, by then we have already passed the error checks that prevent users from launching python and shell applications in cluster mode. This is fixed by re-ordering the two code blocks. --- .../org/apache/spark/deploy/SparkSubmit.scala | 26 +++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index d5da75de6781..28dff1042400 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -101,19 +101,6 @@ object SparkSubmit { case _ => printErrorAndExit("Deploy mode must be either client or cluster"); -1 } - // The following modes are not supported or applicable - (clusterManager, deployMode) match { - case (MESOS, CLUSTER) => - printErrorAndExit("Cluster deploy mode is currently not supported for Mesos clusters.") - case (STANDALONE, CLUSTER) => - printErrorAndExit("Cluster deploy mode is currently not supported for Standalone clusters.") - case (_, CLUSTER) if args.isPython => - printErrorAndExit("Cluster deploy mode is currently not supported for python applications.") - case (_, CLUSTER) if isShell(args.primaryResource) => - printErrorAndExit("Cluster deploy mode is not applicable to Spark shells.") - case _ => - } - // Because "yarn-cluster" and "yarn-client" encapsulate both the master // and deploy mode, we have some logic to infer the master and deploy mode // from each other if only one is specified, or exit early if they are at odds. @@ -141,6 +128,19 @@ object SparkSubmit { } } + // The following modes are not supported or applicable + (clusterManager, deployMode) match { + case (MESOS, CLUSTER) => + printErrorAndExit("Cluster deploy mode is currently not supported for Mesos clusters.") + case (STANDALONE, CLUSTER) => + printErrorAndExit("Cluster deploy mode is currently not supported for Standalone clusters.") + case (_, CLUSTER) if args.isPython => + printErrorAndExit("Cluster deploy mode is currently not supported for python applications.") + case (_, CLUSTER) if isShell(args.primaryResource) => + printErrorAndExit("Cluster deploy mode is not applicable to Spark shells.") + case _ => + } + // If we're running a python app, set the main class to our specific python runner if (args.isPython) { if (args.primaryResource == PYSPARK_SHELL) { From 8f99200c043d7035b71d4e68cce6651356189ce3 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Thu, 10 Jul 2014 18:20:26 -0700 Subject: [PATCH 4/4] script -> program (minor) --- core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 28dff1042400..3d8373d8175e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -29,7 +29,7 @@ import org.apache.spark.util.Utils /** * Main gateway of launching a Spark application. * - * This script handles setting up the classpath with relevant Spark dependencies and provides + * This program handles setting up the classpath with relevant Spark dependencies and provides * a layer over the different cluster managers and deploy modes that Spark supports. */ object SparkSubmit {