From 93116220bea8d444695666ea09b09ca014db7b90 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Tue, 8 Mar 2016 17:42:06 +0800 Subject: [PATCH 01/10] Simply the functionality of Yarn ClientArguments --- .../org/apache/spark/deploy/SparkSubmit.scala | 17 +- .../org/apache/spark/deploy/yarn/Client.scala | 85 +++++--- .../spark/deploy/yarn/ClientArguments.scala | 187 +----------------- .../deploy/yarn/YarnSparkHadoopUtil.scala | 2 +- .../org/apache/spark/deploy/yarn/config.scala | 20 +- .../cluster/YarnClientSchedulerBackend.scala | 42 +--- 6 files changed, 82 insertions(+), 271 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 4049fc0c41c5..fbbf0dede950 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -452,7 +452,7 @@ object SparkSubmit { OptionAssigner(args.driverExtraLibraryPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.driver.extraLibraryPath"), - // Yarn client only + // Yarn only OptionAssigner(args.queue, YARN, CLIENT, sysProp = "spark.yarn.queue"), OptionAssigner(args.numExecutors, YARN, ALL_DEPLOY_MODES, sysProp = "spark.executor.instances"), @@ -461,19 +461,6 @@ object SparkSubmit { OptionAssigner(args.principal, YARN, CLIENT, sysProp = "spark.yarn.principal"), OptionAssigner(args.keytab, YARN, CLIENT, sysProp = "spark.yarn.keytab"), - // Yarn cluster only - OptionAssigner(args.name, YARN, CLUSTER, clOption = "--name"), - OptionAssigner(args.driverMemory, YARN, CLUSTER, clOption = "--driver-memory"), - OptionAssigner(args.driverCores, YARN, CLUSTER, clOption = "--driver-cores"), - OptionAssigner(args.queue, YARN, CLUSTER, clOption = "--queue"), - OptionAssigner(args.executorMemory, YARN, CLUSTER, clOption = "--executor-memory"), - OptionAssigner(args.executorCores, YARN, CLUSTER, clOption = "--executor-cores"), - OptionAssigner(args.files, YARN, CLUSTER, clOption = "--files"), - OptionAssigner(args.archives, YARN, CLUSTER, clOption = "--archives"), - OptionAssigner(args.jars, YARN, CLUSTER, clOption = "--addJars"), - OptionAssigner(args.principal, YARN, CLUSTER, clOption = "--principal"), - OptionAssigner(args.keytab, YARN, CLUSTER, clOption = "--keytab"), - // Other options OptionAssigner(args.executorCores, STANDALONE | YARN, ALL_DEPLOY_MODES, sysProp = "spark.executor.cores"), @@ -577,7 +564,7 @@ object SparkSubmit { if (args.isPython) { childArgs += ("--primary-py-file", args.primaryResource) if (args.pyFiles != null) { - childArgs += ("--py-files", args.pyFiles) + sysProps("spark.submit.pyFiles") = args.pyFiles } childArgs += ("--class", "org.apache.spark.deploy.PythonRunner") } else if (args.isR) { diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 6bbc8c2dfa19..a05f6b4ec4b2 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -64,21 +64,46 @@ private[spark] class Client( extends Logging { import Client._ + import YarnSparkHadoopUtil._ def this(clientArgs: ClientArguments, spConf: SparkConf) = this(clientArgs, SparkHadoopUtil.get.newConfiguration(spConf), spConf) private val yarnClient = YarnClient.createYarnClient private val yarnConf = new YarnConfiguration(hadoopConf) - private var credentials: Credentials = null - private val amMemoryOverhead = args.amMemoryOverhead // MB - private val executorMemoryOverhead = args.executorMemoryOverhead // MB + + private val isClusterMode = sparkConf.get("spark.submit.deployMode", "client") == "cluster" + logInfo(s">>>>> is cluster mdoe: ${isClusterMode}") + + // AM related configurations + private val amMemory = if (isClusterMode) { + sparkConf.get(DRIVER_MEMORY).toInt + } else { + sparkConf.get(AM_MEMORY).toInt + } + private val amMemoryOverhead = { + val amMemoryOverheadEntry = if (isClusterMode) DRIVER_MEMORY_OVERHEAD else AM_MEMORY_OVERHEAD + sparkConf.get(amMemoryOverheadEntry).getOrElse( + math.max((MEMORY_OVERHEAD_FACTOR * amMemory).toLong, MEMORY_OVERHEAD_MIN)).toInt + } + private val amCores = if (isClusterMode) { + sparkConf.get(DRIVER_CORES) + } else { + sparkConf.get(AM_CORES) + } + + // Executor related configurations + private val executorMemory = sparkConf.get(EXECUTOR_MEMORY) + private val executorMemoryOverhead = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse( + math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toLong, MEMORY_OVERHEAD_MIN)).toInt + private val executorCores = sparkConf.get(EXECUTOR_CORES) + private val distCacheMgr = new ClientDistributedCacheManager() - private val isClusterMode = args.isClusterMode private var loginFromKeytab = false private var principal: String = null private var keytab: String = null + private var credentials: Credentials = null private val launcherBackend = new LauncherBackend() { override def onStopRequest(): Unit = { @@ -179,8 +204,8 @@ private[spark] class Client( newApp: YarnClientApplication, containerContext: ContainerLaunchContext): ApplicationSubmissionContext = { val appContext = newApp.getApplicationSubmissionContext - appContext.setApplicationName(args.appName) - appContext.setQueue(args.amQueue) + appContext.setApplicationName(sparkConf.get("spark.app.name", "Spark")) + appContext.setQueue(sparkConf.get(QUEUE_NAME)) appContext.setAMContainerSpec(containerContext) appContext.setApplicationType("SPARK") @@ -217,8 +242,8 @@ private[spark] class Client( } val capability = Records.newRecord(classOf[Resource]) - capability.setMemory(args.amMemory + amMemoryOverhead) - capability.setVirtualCores(args.amCores) + capability.setMemory(amMemory + amMemoryOverhead) + capability.setVirtualCores(amCores) sparkConf.get(AM_NODE_LABEL_EXPRESSION) match { case Some(expr) => @@ -272,16 +297,16 @@ private[spark] class Client( val maxMem = newAppResponse.getMaximumResourceCapability().getMemory() logInfo("Verifying our application has not requested more than the maximum " + s"memory capability of the cluster ($maxMem MB per container)") - val executorMem = args.executorMemory + executorMemoryOverhead + val executorMem = executorMemory + executorMemoryOverhead if (executorMem > maxMem) { - throw new IllegalArgumentException(s"Required executor memory (${args.executorMemory}" + + throw new IllegalArgumentException(s"Required executor memory ($executorMemory" + s"+$executorMemoryOverhead MB) is above the max threshold ($maxMem MB) of this cluster! " + "Please check the values of 'yarn.scheduler.maximum-allocation-mb' and/or " + "'yarn.nodemanager.resource.memory-mb'.") } - val amMem = args.amMemory + amMemoryOverhead + val amMem = amMemory + amMemoryOverhead if (amMem > maxMem) { - throw new IllegalArgumentException(s"Required AM memory (${args.amMemory}" + + throw new IllegalArgumentException(s"Required AM memory ($amMemory" + s"+$amMemoryOverhead MB) is above the max threshold ($maxMem MB) of this cluster! " + "Please increase the value of 'yarn.scheduler.maximum-allocation-mb'.") } @@ -510,10 +535,16 @@ private[spark] class Client( * (3) whether to add these resources to the classpath */ val cachedSecondaryJarLinks = ListBuffer.empty[String] + val files = sparkConf.get(FILES_TO_DISTRIBUTE).map(p => Utils.resolveURIs(p)) + .orElse(sys.env.get("SPARK_YARN_DIST_FILES")) + .orNull + val archives = sparkConf.get(ARCHIVES_TO_DISTRIBUTE).map(p => Utils.resolveURIs(p)) + .orElse(sys.env.get("SPARK_YARN_DIST_ARCHIVES")) + .orNull List( - (args.addJars, LocalResourceType.FILE, true), - (args.files, LocalResourceType.FILE, false), - (args.archives, LocalResourceType.ARCHIVE, false) + (sparkConf.getOption("spark.jars").orNull, LocalResourceType.FILE, true), + (files, LocalResourceType.FILE, false), + (archives, LocalResourceType.ARCHIVE, false) ).foreach { case (flist, resType, addToClasspath) => if (flist != null && !flist.isEmpty()) { flist.split(',').foreach { file => @@ -537,7 +568,7 @@ private[spark] class Client( // The python files list needs to be treated especially. All files that are not an // archive need to be placed in a subdirectory that will be added to PYTHONPATH. - args.pyFiles.foreach { f => + sparkConf.get(PY_FILES).foreach { f => val targetDir = if (f.endsWith(".py")) Some(LOCALIZED_PYTHON_DIR) else None distribute(f, targetDir = targetDir) } @@ -694,7 +725,7 @@ private[spark] class Client( // // NOTE: the code currently does not handle .py files defined with a "local:" scheme. val pythonPath = new ListBuffer[String]() - val (pyFiles, pyArchives) = args.pyFiles.partition(_.endsWith(".py")) + val (pyFiles, pyArchives) = sparkConf.get(PY_FILES).partition(_.endsWith(".py")) if (pyFiles.nonEmpty) { pythonPath += buildPath(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), LOCALIZED_PYTHON_DIR) @@ -791,7 +822,7 @@ private[spark] class Client( var prefixEnv: Option[String] = None // Add Xmx for AM memory - javaOpts += "-Xmx" + args.amMemory + "m" + javaOpts += "-Xmx" + amMemory + "m" val tmpDir = new Path( YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), @@ -895,8 +926,8 @@ private[spark] class Client( val amArgs = Seq(amClass) ++ userClass ++ userJar ++ primaryPyFile ++ primaryRFile ++ userArgs ++ Seq( - "--executor-memory", args.executorMemory.toString + "m", - "--executor-cores", args.executorCores.toString, + "--executor-memory", executorMemory.toString + "m", + "--executor-cores", executorCores.toString, "--properties-file", buildPath(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), LOCALIZED_CONF_DIR, SPARK_CONF_FILE)) @@ -935,10 +966,10 @@ private[spark] class Client( } def setupCredentials(): Unit = { - loginFromKeytab = args.principal != null || sparkConf.contains(PRINCIPAL.key) + loginFromKeytab = sparkConf.contains(PRINCIPAL.key) if (loginFromKeytab) { - principal = Option(args.principal).orElse(sparkConf.get(PRINCIPAL)).get - keytab = Option(args.keytab).orElse(sparkConf.get(KEYTAB)).orNull + principal = sparkConf.get(PRINCIPAL).get + keytab = sparkConf.get(KEYTAB).orNull require(keytab != null, "Keytab must be specified when principal is specified.") logInfo("Attempting to login to the Kerberos" + @@ -1113,11 +1144,7 @@ object Client extends Logging { System.setProperty("SPARK_YARN_MODE", "true") val sparkConf = new SparkConf - val args = new ClientArguments(argStrings, sparkConf) - // to maintain backwards-compatibility - if (!Utils.isDynamicAllocationEnabled(sparkConf)) { - sparkConf.setIfMissing(EXECUTOR_INSTANCES, args.numExecutors) - } + val args = new ClientArguments(argStrings) new Client(args, sparkConf).run() } @@ -1264,7 +1291,7 @@ object Client extends Logging { val secondaryJars = if (args != null) { - getSecondaryJarUris(Option(args.addJars).map(_.split(",").toSeq)) + getSecondaryJarUris(sparkConf.getOption("spark.jars").map(_.split(",").toSeq)) } else { getSecondaryJarUris(sparkConf.get(SECONDARY_JARS)) } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index 47b4cc300907..f32c7cc86a1d 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -19,113 +19,16 @@ package org.apache.spark.deploy.yarn import scala.collection.mutable.ArrayBuffer -import org.apache.spark.{SparkConf, SparkException} -import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._ -import org.apache.spark.deploy.yarn.config._ -import org.apache.spark.internal.config._ -import org.apache.spark.util.{IntParam, MemoryParam, Utils} - // TODO: Add code and support for ensuring that yarn resource 'tasks' are location aware ! -private[spark] class ClientArguments( - args: Array[String], - sparkConf: SparkConf) { +private[spark] class ClientArguments(args: Array[String]) { - var addJars: String = null - var files: String = null - var archives: String = null var userJar: String = null var userClass: String = null - var pyFiles: Seq[String] = Nil var primaryPyFile: String = null var primaryRFile: String = null var userArgs: ArrayBuffer[String] = new ArrayBuffer[String]() - var executorMemory = 1024 // MB - var executorCores = 1 - var numExecutors = DEFAULT_NUMBER_EXECUTORS - var amQueue = sparkConf.get(QUEUE_NAME) - var amMemory: Int = _ - var amCores: Int = _ - var appName: String = "Spark" - var priority = 0 - var principal: String = null - var keytab: String = null - def isClusterMode: Boolean = userClass != null - - private var driverMemory: Int = Utils.DEFAULT_DRIVER_MEM_MB // MB - private var driverCores: Int = 1 - private val isDynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(sparkConf) parseArgs(args.toList) - loadEnvironmentArgs() - validateArgs() - - // Additional memory to allocate to containers - val amMemoryOverheadEntry = if (isClusterMode) DRIVER_MEMORY_OVERHEAD else AM_MEMORY_OVERHEAD - val amMemoryOverhead = sparkConf.get(amMemoryOverheadEntry).getOrElse( - math.max((MEMORY_OVERHEAD_FACTOR * amMemory).toLong, MEMORY_OVERHEAD_MIN)).toInt - - val executorMemoryOverhead = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse( - math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toLong, MEMORY_OVERHEAD_MIN)).toInt - - /** Load any default arguments provided through environment variables and Spark properties. */ - private def loadEnvironmentArgs(): Unit = { - // For backward compatibility, SPARK_YARN_DIST_{ARCHIVES/FILES} should be resolved to hdfs://, - // while spark.yarn.dist.{archives/files} should be resolved to file:// (SPARK-2051). - files = Option(files) - .orElse(sparkConf.get(FILES_TO_DISTRIBUTE).map(p => Utils.resolveURIs(p))) - .orElse(sys.env.get("SPARK_YARN_DIST_FILES")) - .orNull - archives = Option(archives) - .orElse(sparkConf.get(ARCHIVES_TO_DISTRIBUTE).map(p => Utils.resolveURIs(p))) - .orElse(sys.env.get("SPARK_YARN_DIST_ARCHIVES")) - .orNull - // If dynamic allocation is enabled, start at the configured initial number of executors. - // Default to minExecutors if no initialExecutors is set. - numExecutors = YarnSparkHadoopUtil.getInitialTargetExecutorNumber(sparkConf, numExecutors) - principal = Option(principal) - .orElse(sparkConf.get(PRINCIPAL)) - .orNull - keytab = Option(keytab) - .orElse(sparkConf.get(KEYTAB)) - .orNull - } - - /** - * Fail fast if any arguments provided are invalid. - * This is intended to be called only after the provided arguments have been parsed. - */ - private def validateArgs(): Unit = { - if (numExecutors < 0 || (!isDynamicAllocationEnabled && numExecutors == 0)) { - throw new IllegalArgumentException( - s""" - |Number of executors was $numExecutors, but must be at least 1 - |(or 0 if dynamic executor allocation is enabled). - |${getUsageMessage()} - """.stripMargin) - } - if (executorCores < sparkConf.get(CPUS_PER_TASK)) { - throw new SparkException(s"Executor cores must not be less than ${CPUS_PER_TASK.key}.") - } - // scalastyle:off println - if (isClusterMode) { - for (key <- Seq(AM_MEMORY.key, AM_MEMORY_OVERHEAD.key, AM_CORES.key)) { - if (sparkConf.contains(key)) { - println(s"$key is set but does not apply in cluster mode.") - } - } - amMemory = driverMemory - amCores = driverCores - } else { - for (key <- Seq(DRIVER_MEMORY_OVERHEAD.key, DRIVER_CORES.key)) { - if (sparkConf.contains(key)) { - println(s"$key is set but does not apply in client mode.") - } - } - amMemory = sparkConf.get(AM_MEMORY).toInt - amCores = sparkConf.get(AM_CORES) - } - // scalastyle:on println - } private def parseArgs(inputArgs: List[String]): Unit = { var args = inputArgs @@ -149,81 +52,10 @@ private[spark] class ClientArguments( primaryRFile = value args = tail - case ("--args" | "--arg") :: value :: tail => - if (args(0) == "--args") { - println("--args is deprecated. Use --arg instead.") - } + case ("--arg") :: value :: tail => userArgs += value args = tail - case ("--master-class" | "--am-class") :: value :: tail => - println(s"${args(0)} is deprecated and is not used anymore.") - args = tail - - case ("--master-memory" | "--driver-memory") :: MemoryParam(value) :: tail => - if (args(0) == "--master-memory") { - println("--master-memory is deprecated. Use --driver-memory instead.") - } - driverMemory = value - args = tail - - case ("--driver-cores") :: IntParam(value) :: tail => - driverCores = value - args = tail - - case ("--num-workers" | "--num-executors") :: IntParam(value) :: tail => - if (args(0) == "--num-workers") { - println("--num-workers is deprecated. Use --num-executors instead.") - } - numExecutors = value - args = tail - - case ("--worker-memory" | "--executor-memory") :: MemoryParam(value) :: tail => - if (args(0) == "--worker-memory") { - println("--worker-memory is deprecated. Use --executor-memory instead.") - } - executorMemory = value - args = tail - - case ("--worker-cores" | "--executor-cores") :: IntParam(value) :: tail => - if (args(0) == "--worker-cores") { - println("--worker-cores is deprecated. Use --executor-cores instead.") - } - executorCores = value - args = tail - - case ("--queue") :: value :: tail => - amQueue = value - args = tail - - case ("--name") :: value :: tail => - appName = value - args = tail - - case ("--addJars") :: value :: tail => - addJars = value - args = tail - - case ("--py-files") :: value :: tail => - pyFiles = value.split(",") - args = tail - - case ("--files") :: value :: tail => - files = value - args = tail - - case ("--archives") :: value :: tail => - archives = value - args = tail - - case ("--principal") :: value :: tail => - principal = value - args = tail - - case ("--keytab") :: value :: tail => - keytab = value - args = tail - case Nil => case _ => @@ -240,7 +72,6 @@ private[spark] class ClientArguments( private def getUsageMessage(unknownParam: List[String] = null): String = { val message = if (unknownParam != null) s"Unknown/unsupported param $unknownParam\n" else "" - val mem_mb = Utils.DEFAULT_DRIVER_MEM_MB message + s""" |Usage: org.apache.spark.deploy.yarn.Client [options] @@ -252,20 +83,6 @@ private[spark] class ClientArguments( | --primary-r-file A main R file | --arg ARG Argument to be passed to your application's main class. | Multiple invocations are possible, each will be passed in order. - | --num-executors NUM Number of executors to start (Default: 2) - | --executor-cores NUM Number of cores per executor (Default: 1). - | --driver-memory MEM Memory for driver (e.g. 1000M, 2G) (Default: $mem_mb Mb) - | --driver-cores NUM Number of cores used by the driver (Default: 1). - | --executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G) - | --name NAME The name of your application (Default: Spark) - | --queue QUEUE The hadoop queue to use for allocation requests (Default: - | 'default') - | --addJars jars Comma separated list of local jars that want SparkContext.addJar - | to work with. - | --py-files PY_FILES Comma-separated list of .zip, .egg, or .py files to - | place on the PYTHONPATH for Python apps. - | --files files Comma separated list of files to be distributed with the job. - | --archives archives Comma separated list of archives to be distributed with the job. """.stripMargin } } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index 2915e664beff..5af2c2980879 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -512,7 +512,7 @@ object YarnSparkHadoopUtil { val initialNumExecutors = conf.get(DYN_ALLOCATION_INITIAL_EXECUTORS) val maxNumExecutors = conf.get(DYN_ALLOCATION_MAX_EXECUTORS) require(initialNumExecutors >= minNumExecutors && initialNumExecutors <= maxNumExecutors, - s"initial executor number $initialNumExecutors must between min executor number" + + s"initial executor number $initialNumExecutors must between min executor number " + s"$minNumExecutors and max executor number $maxNumExecutors") initialNumExecutors diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala index 10cd6d00b0ed..aad8259fe069 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala @@ -183,14 +183,26 @@ package object config { private[spark] val DRIVER_CORES = ConfigBuilder("spark.driver.cores") .intConf - .optional + .withDefault(1) private[spark] val DRIVER_MEMORY_OVERHEAD = ConfigBuilder("spark.yarn.driver.memoryOverhead") .bytesConf(ByteUnit.MiB) .optional + private[spark] val DRIVER_MEMORY = ConfigBuilder("spark.driver.memory") + .bytesConf(ByteUnit.MiB) + .withDefaultString("1g") + /* Executor configuration. */ + private[spark] val EXECUTOR_CORES = ConfigBuilder("spark.executor.cores") + .intConf + .withDefault(1) + + private[spark] val EXECUTOR_MEMORY = ConfigBuilder("spark.executor.memory") + .bytesConf(ByteUnit.MiB) + .withDefaultString("1g") + private[spark] val EXECUTOR_MEMORY_OVERHEAD = ConfigBuilder("spark.yarn.executor.memoryOverhead") .bytesConf(ByteUnit.MiB) .optional @@ -246,4 +258,10 @@ package object config { .toSequence .optional + private[spark] val PY_FILES = ConfigBuilder("spark.submit.pyFiles") + .internal + .stringConf + .toSequence + .withDefault(Nil) + } diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index 9fc727904b1e..56dc0004d04c 100644 --- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -48,11 +48,10 @@ private[spark] class YarnClientSchedulerBackend( val argsArrayBuf = new ArrayBuffer[String]() argsArrayBuf += ("--arg", hostport) - argsArrayBuf ++= getExtraClientArguments logDebug("ClientArguments called with: " + argsArrayBuf.mkString(" ")) - val args = new ClientArguments(argsArrayBuf.toArray, conf) - totalExpectedExecutors = args.numExecutors + val args = new ClientArguments(argsArrayBuf.toArray) + totalExpectedExecutors = YarnSparkHadoopUtil.getInitialTargetExecutorNumber(conf) client = new Client(args, conf) bindToYarn(client.submitApplication(), None) @@ -72,43 +71,6 @@ private[spark] class YarnClientSchedulerBackend( monitorThread.start() } - /** - * Return any extra command line arguments to be passed to Client provided in the form of - * environment variables or Spark properties. - */ - private def getExtraClientArguments: Seq[String] = { - val extraArgs = new ArrayBuffer[String] - // List of (target Client argument, environment variable, Spark property) - val optionTuples = - List( - ("--executor-memory", "SPARK_WORKER_MEMORY", "spark.executor.memory"), - ("--executor-memory", "SPARK_EXECUTOR_MEMORY", "spark.executor.memory"), - ("--executor-cores", "SPARK_WORKER_CORES", "spark.executor.cores"), - ("--executor-cores", "SPARK_EXECUTOR_CORES", "spark.executor.cores"), - ("--queue", "SPARK_YARN_QUEUE", "spark.yarn.queue"), - ("--py-files", null, "spark.submit.pyFiles") - ) - // Warn against the following deprecated environment variables: env var -> suggestion - val deprecatedEnvVars = Map( - "SPARK_WORKER_MEMORY" -> "SPARK_EXECUTOR_MEMORY or --executor-memory through spark-submit", - "SPARK_WORKER_CORES" -> "SPARK_EXECUTOR_CORES or --executor-cores through spark-submit") - optionTuples.foreach { case (optionName, envVar, sparkProp) => - if (sc.getConf.contains(sparkProp)) { - extraArgs += (optionName, sc.getConf.get(sparkProp)) - } else if (envVar != null && System.getenv(envVar) != null) { - extraArgs += (optionName, System.getenv(envVar)) - if (deprecatedEnvVars.contains(envVar)) { - logWarning(s"NOTE: $envVar is deprecated. Use ${deprecatedEnvVars(envVar)} instead.") - } - } - } - // The app name is a special case because "spark.app.name" is required of all applications. - // As a result, the corresponding "SPARK_YARN_APP_NAME" is already handled preemptively in - // SparkSubmitArguments if "spark.app.name" is not explicitly set by the user. (SPARK-5222) - sc.getConf.getOption("spark.app.name").foreach(v => extraArgs += ("--name", v)) - extraArgs - } - /** * Report the state of the application until it is running. * If the application has finished, failed or been killed in the process, throw an exception. From f99cb19f8e1e94fbcebb324ad2da4171c774a9ee Mon Sep 17 00:00:00 2001 From: jerryshao Date: Wed, 9 Mar 2016 15:01:33 +0800 Subject: [PATCH 02/10] Fix test failure --- .../org/apache/spark/deploy/SparkSubmit.scala | 9 +++++---- .../org/apache/spark/deploy/yarn/Client.scala | 1 - .../apache/spark/deploy/yarn/ClientSuite.scala | 17 ++++++++++------- 3 files changed, 15 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index fbbf0dede950..d0bf303b30b1 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -537,6 +537,10 @@ object SparkSubmit { if (args.isPython) { sysProps.put("spark.yarn.isPython", "true") } + + if (args.pyFiles != null) { + sysProps("spark.submit.pyFiles") = args.pyFiles + } } // assure a keytab is available from any place in a JVM @@ -563,9 +567,6 @@ object SparkSubmit { childMainClass = "org.apache.spark.deploy.yarn.Client" if (args.isPython) { childArgs += ("--primary-py-file", args.primaryResource) - if (args.pyFiles != null) { - sysProps("spark.submit.pyFiles") = args.pyFiles - } childArgs += ("--class", "org.apache.spark.deploy.PythonRunner") } else if (args.isR) { val mainFile = new Path(args.primaryResource).getName @@ -647,7 +648,7 @@ object SparkSubmit { childMainClass: String, verbose: Boolean): Unit = { // scalastyle:off println - if (verbose) { + if (true) { printStream.println(s"Main class:\n$childMainClass") printStream.println(s"Arguments:\n${childArgs.mkString("\n")}") printStream.println(s"System properties:\n${sysProps.mkString("\n")}") diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index a05f6b4ec4b2..84cb4c71bd34 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -73,7 +73,6 @@ private[spark] class Client( private val yarnConf = new YarnConfiguration(hadoopConf) private val isClusterMode = sparkConf.get("spark.submit.deployMode", "client") == "cluster" - logInfo(s">>>>> is cluster mdoe: ${isClusterMode}") // AM related configurations private val amMemory = if (isClusterMode) { diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala index 24472e006b87..c19ba63fad82 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala @@ -118,8 +118,9 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll val sparkConf = new SparkConf() .set(SPARK_JARS, Seq(SPARK)) .set(USER_CLASS_PATH_FIRST, true) + .set("spark.jars", ADDED) val env = new MutableHashMap[String, String]() - val args = new ClientArguments(Array("--jar", USER, "--addJars", ADDED), sparkConf) + val args = new ClientArguments(Array("--jar", USER)) populateClasspath(args, conf, sparkConf, env, true) @@ -138,9 +139,11 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll } test("Jar path propagation through SparkConf") { - val sparkConf = new SparkConf().set(SPARK_JARS, Seq(SPARK)) - val client = createClient(sparkConf, - args = Array("--jar", USER, "--addJars", ADDED)) + val conf = new Configuration() + val sparkConf = new SparkConf() + .set(SPARK_JARS, Seq(SPARK)) + .set("spark.jars", ADDED) + val client = createClient(sparkConf, args = Array("--jar", USER)) val tempDir = Utils.createTempDir() try { @@ -193,9 +196,9 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll val sparkConf = new SparkConf() .set(APPLICATION_TAGS.key, ",tag1, dup,tag2 , ,multi word , dup") .set(MAX_APP_ATTEMPTS, 42) - val args = new ClientArguments(Array( - "--name", "foo-test-app", - "--queue", "staging-queue"), sparkConf) + .set("spark.app.name", "foo-test-app") + .set(QUEUE_NAME, "staging-queue") + val args = new ClientArguments(Array()) val appContext = Records.newRecord(classOf[ApplicationSubmissionContext]) val getNewApplicationResponse = Records.newRecord(classOf[GetNewApplicationResponse]) From e27a5a03f0e17e7805084abab0e802000bb6513f Mon Sep 17 00:00:00 2001 From: jerryshao Date: Wed, 9 Mar 2016 16:28:16 +0800 Subject: [PATCH 03/10] continue dealing with properties --- .../org/apache/spark/deploy/SparkSubmit.scala | 18 +++++++++--------- .../apache/spark/deploy/SparkSubmitSuite.scala | 16 ++++++++-------- 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index d0bf303b30b1..de2d9a5185dc 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -453,13 +453,13 @@ object SparkSubmit { sysProp = "spark.driver.extraLibraryPath"), // Yarn only - OptionAssigner(args.queue, YARN, CLIENT, sysProp = "spark.yarn.queue"), + OptionAssigner(args.queue, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.queue"), OptionAssigner(args.numExecutors, YARN, ALL_DEPLOY_MODES, sysProp = "spark.executor.instances"), - OptionAssigner(args.files, YARN, CLIENT, sysProp = "spark.yarn.dist.files"), - OptionAssigner(args.archives, YARN, CLIENT, sysProp = "spark.yarn.dist.archives"), - OptionAssigner(args.principal, YARN, CLIENT, sysProp = "spark.yarn.principal"), - OptionAssigner(args.keytab, YARN, CLIENT, sysProp = "spark.yarn.keytab"), + OptionAssigner(args.files, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.dist.files"), + OptionAssigner(args.archives, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.dist.archives"), + OptionAssigner(args.principal, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.principal"), + OptionAssigner(args.keytab, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.keytab"), // Other options OptionAssigner(args.executorCores, STANDALONE | YARN, ALL_DEPLOY_MODES, @@ -470,10 +470,10 @@ object SparkSubmit { sysProp = "spark.cores.max"), OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, ALL_DEPLOY_MODES, sysProp = "spark.files"), - OptionAssigner(args.jars, STANDALONE | MESOS, CLUSTER, sysProp = "spark.jars"), - OptionAssigner(args.driverMemory, STANDALONE | MESOS, CLUSTER, + OptionAssigner(args.jars, STANDALONE | MESOS | YARN, CLUSTER, sysProp = "spark.jars"), + OptionAssigner(args.driverMemory, STANDALONE | MESOS | YARN, CLUSTER, sysProp = "spark.driver.memory"), - OptionAssigner(args.driverCores, STANDALONE | MESOS, CLUSTER, + OptionAssigner(args.driverCores, STANDALONE | MESOS | YARN, CLUSTER, sysProp = "spark.driver.cores"), OptionAssigner(args.supervise.toString, STANDALONE | MESOS, CLUSTER, sysProp = "spark.driver.supervise"), @@ -648,7 +648,7 @@ object SparkSubmit { childMainClass: String, verbose: Boolean): Unit = { // scalastyle:off println - if (true) { + if (verbose) { printStream.println(s"Main class:\n$childMainClass") printStream.println(s"Arguments:\n${childArgs.mkString("\n")}") printStream.println(s"System properties:\n${sysProps.mkString("\n")}") diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index 96cb4fd0ebee..58bcdb5a8e51 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -199,21 +199,21 @@ class SparkSubmitSuite val (childArgs, classpath, sysProps, mainClass) = prepareSubmitEnvironment(appArgs) val childArgsStr = childArgs.mkString(" ") childArgsStr should include ("--class org.SomeClass") - childArgsStr should include ("--executor-memory 5g") - childArgsStr should include ("--driver-memory 4g") - childArgsStr should include ("--executor-cores 5") childArgsStr should include ("--arg arg1 --arg arg2") - childArgsStr should include ("--queue thequeue") childArgsStr should include regex ("--jar .*thejar.jar") - childArgsStr should include regex ("--addJars .*one.jar,.*two.jar,.*three.jar") - childArgsStr should include regex ("--files .*file1.txt,.*file2.txt") - childArgsStr should include regex ("--archives .*archive1.txt,.*archive2.txt") mainClass should be ("org.apache.spark.deploy.yarn.Client") classpath should have length (0) + + sysProps("spark.executor.memory") should be ("5g") + sysProps("spark.driver.memory") should be ("4g") + sysProps("spark.executor.cores") should be ("5") + sysProps("spark.yarn.queue") should be ("thequeue") + sysProps("spark.jars") should include regex (".*one.jar,.*two.jar,.*three.jar") + sysProps("spark.yarn.dist.files") should include regex (".*file1.txt,.*file2.txt") + sysProps("spark.yarn.dist.archives") should include regex (".*archive1.txt,.*archive2.txt") sysProps("spark.app.name") should be ("beauty") sysProps("spark.ui.enabled") should be ("false") sysProps("SPARK_SUBMIT") should be ("true") - sysProps.keys should not contain ("spark.jars") } test("handles YARN client mode") { From 75c5b367be500fdc6660fefd25c4fff22b325c3a Mon Sep 17 00:00:00 2001 From: jerryshao Date: Thu, 10 Mar 2016 13:17:09 +0800 Subject: [PATCH 04/10] Address the comments --- .../org/apache/spark/deploy/SparkSubmit.scala | 5 +++-- .../apache/spark/internal/config/package.scala | 14 ++++++++++++++ .../apache/spark/deploy/SparkSubmitSuite.scala | 5 +++-- .../spark/deploy/yarn/ApplicationMaster.scala | 2 +- .../deploy/yarn/ApplicationMasterArguments.scala | 16 +--------------- .../org/apache/spark/deploy/yarn/Client.scala | 9 +++------ .../apache/spark/deploy/yarn/YarnAllocator.scala | 6 +++--- .../apache/spark/deploy/yarn/YarnRMClient.scala | 5 ++--- .../org/apache/spark/deploy/yarn/config.scala | 14 ++------------ .../apache/spark/deploy/yarn/ClientSuite.scala | 4 ++-- .../spark/deploy/yarn/YarnAllocatorSuite.scala | 8 ++++---- 11 files changed, 38 insertions(+), 50 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index de2d9a5185dc..49b3a78e9419 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -441,7 +441,6 @@ object SparkSubmit { OptionAssigner(args.deployMode, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.submit.deployMode"), OptionAssigner(args.name, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.app.name"), - OptionAssigner(args.jars, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.jars"), OptionAssigner(args.ivyRepoPath, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.jars.ivy"), OptionAssigner(args.driverMemory, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.driver.memory"), @@ -456,6 +455,7 @@ object SparkSubmit { OptionAssigner(args.queue, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.queue"), OptionAssigner(args.numExecutors, YARN, ALL_DEPLOY_MODES, sysProp = "spark.executor.instances"), + OptionAssigner(args.jars, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.dist.jars"), OptionAssigner(args.files, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.dist.files"), OptionAssigner(args.archives, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.dist.archives"), OptionAssigner(args.principal, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.principal"), @@ -470,7 +470,8 @@ object SparkSubmit { sysProp = "spark.cores.max"), OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, ALL_DEPLOY_MODES, sysProp = "spark.files"), - OptionAssigner(args.jars, STANDALONE | MESOS | YARN, CLUSTER, sysProp = "spark.jars"), + OptionAssigner(args.jars, LOCAL, CLIENT, sysProp = "spark.jars"), + OptionAssigner(args.jars, STANDALONE | MESOS, ALL_DEPLOY_MODES, sysProp = "spark.jars"), OptionAssigner(args.driverMemory, STANDALONE | MESOS | YARN, CLUSTER, sysProp = "spark.driver.memory"), OptionAssigner(args.driverCores, STANDALONE | MESOS | YARN, CLUSTER, diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index f2f20b320757..34a64c4318a4 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -18,6 +18,7 @@ package org.apache.spark.internal import org.apache.spark.launcher.SparkLauncher +import org.apache.spark.network.util.ByteUnit package object config { @@ -33,6 +34,10 @@ package object config { private[spark] val DRIVER_USER_CLASS_PATH_FIRST = ConfigBuilder("spark.driver.userClassPathFirst").booleanConf.withDefault(false) + private[spark] val DRIVER_MEMORY = ConfigBuilder("spark.driver.memory") + .bytesConf(ByteUnit.MiB) + .withDefaultString("1g") + private[spark] val EXECUTOR_CLASS_PATH = ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_CLASSPATH).stringConf.optional @@ -45,6 +50,10 @@ package object config { private[spark] val EXECUTOR_USER_CLASS_PATH_FIRST = ConfigBuilder("spark.executor.userClassPathFirst").booleanConf.withDefault(false) + private[spark] val EXECUTOR_MEMORY = ConfigBuilder("spark.executor.memory") + .bytesConf(ByteUnit.MiB) + .withDefaultString("1g") + private[spark] val IS_PYTHON_APP = ConfigBuilder("spark.yarn.isPython").internal .booleanConf.withDefault(false) @@ -73,4 +82,9 @@ package object config { private[spark] val EXECUTOR_INSTANCES = ConfigBuilder("spark.executor.instances").intConf.optional + private[spark] val PY_FILES = ConfigBuilder("spark.submit.pyFiles") + .internal + .stringConf + .toSequence + .withDefault(Nil) } diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index 58bcdb5a8e51..271897699201 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -208,7 +208,7 @@ class SparkSubmitSuite sysProps("spark.driver.memory") should be ("4g") sysProps("spark.executor.cores") should be ("5") sysProps("spark.yarn.queue") should be ("thequeue") - sysProps("spark.jars") should include regex (".*one.jar,.*two.jar,.*three.jar") + sysProps("spark.yarn.dist.jars") should include regex (".*one.jar,.*two.jar,.*three.jar") sysProps("spark.yarn.dist.files") should include regex (".*file1.txt,.*file2.txt") sysProps("spark.yarn.dist.archives") should include regex (".*archive1.txt,.*archive2.txt") sysProps("spark.app.name") should be ("beauty") @@ -249,7 +249,8 @@ class SparkSubmitSuite sysProps("spark.executor.instances") should be ("6") sysProps("spark.yarn.dist.files") should include regex (".*file1.txt,.*file2.txt") sysProps("spark.yarn.dist.archives") should include regex (".*archive1.txt,.*archive2.txt") - sysProps("spark.jars") should include regex (".*one.jar,.*two.jar,.*three.jar,.*thejar.jar") + sysProps("spark.yarn.dist.jars") should include + regex (".*one.jar,.*two.jar,.*three.jar,.*thejar.jar") sysProps("SPARK_SUBMIT") should be ("true") sysProps("spark.ui.enabled") should be ("false") } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index e941089d1b09..9e8453429c9b 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -662,7 +662,7 @@ object ApplicationMaster extends Logging { SignalLogger.register(log) val amArgs = new ApplicationMasterArguments(args) SparkHadoopUtil.get.runAsSparkUser { () => - master = new ApplicationMaster(amArgs, new YarnRMClient(amArgs)) + master = new ApplicationMaster(amArgs, new YarnRMClient) System.exit(master.run()) } } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala index 6987e5a55fc3..b3bd5605b6f3 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala @@ -27,8 +27,6 @@ class ApplicationMasterArguments(val args: Array[String]) { var primaryPyFile: String = null var primaryRFile: String = null var userArgs: Seq[String] = Nil - var executorMemory = 1024 - var executorCores = 1 var propertiesFile: String = null parseArgs(args.toList) @@ -58,18 +56,10 @@ class ApplicationMasterArguments(val args: Array[String]) { primaryRFile = value args = tail - case ("--args" | "--arg") :: value :: tail => + case ("--arg") :: value :: tail => userArgsBuffer += value args = tail - case ("--worker-memory" | "--executor-memory") :: MemoryParam(value) :: tail => - executorMemory = value - args = tail - - case ("--worker-cores" | "--executor-cores") :: IntParam(value) :: tail => - executorCores = value - args = tail - case ("--properties-file") :: value :: tail => propertiesFile = value args = tail @@ -101,12 +91,8 @@ class ApplicationMasterArguments(val args: Array[String]) { | --class CLASS_NAME Name of your application's main class | --primary-py-file A main Python file | --primary-r-file A main R file - | --py-files PY_FILES Comma-separated list of .zip, .egg, or .py files to - | place on the PYTHONPATH for Python apps. | --args ARGS Arguments to be passed to your application's main class. | Multiple invocations are possible, each will be passed in order. - | --executor-cores NUM Number of cores for the executors (Default: 1) - | --executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G) | --properties-file FILE Path to a custom Spark properties file. """.stripMargin) // scalastyle:on println diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 84cb4c71bd34..1aa2be1912b6 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -95,7 +95,6 @@ private[spark] class Client( private val executorMemory = sparkConf.get(EXECUTOR_MEMORY) private val executorMemoryOverhead = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse( math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toLong, MEMORY_OVERHEAD_MIN)).toInt - private val executorCores = sparkConf.get(EXECUTOR_CORES) private val distCacheMgr = new ClientDistributedCacheManager() @@ -541,7 +540,7 @@ private[spark] class Client( .orElse(sys.env.get("SPARK_YARN_DIST_ARCHIVES")) .orNull List( - (sparkConf.getOption("spark.jars").orNull, LocalResourceType.FILE, true), + (sparkConf.get(JARS_TO_DISTRIBUTE).orNull, LocalResourceType.FILE, true), (files, LocalResourceType.FILE, false), (archives, LocalResourceType.ARCHIVE, false) ).foreach { case (flist, resType, addToClasspath) => @@ -925,8 +924,6 @@ private[spark] class Client( val amArgs = Seq(amClass) ++ userClass ++ userJar ++ primaryPyFile ++ primaryRFile ++ userArgs ++ Seq( - "--executor-memory", executorMemory.toString + "m", - "--executor-cores", executorCores.toString, "--properties-file", buildPath(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), LOCALIZED_CONF_DIR, SPARK_CONF_FILE)) @@ -1130,7 +1127,7 @@ private[spark] class Client( } -object Client extends Logging { +private object Client extends Logging { def main(argStrings: Array[String]) { if (!sys.props.contains("SPARK_SUBMIT")) { @@ -1290,7 +1287,7 @@ object Client extends Logging { val secondaryJars = if (args != null) { - getSecondaryJarUris(sparkConf.getOption("spark.jars").map(_.split(",").toSeq)) + getSecondaryJarUris(sparkConf.get(JARS_TO_DISTRIBUTE).map(_.split(",").toSeq)) } else { getSecondaryJarUris(sparkConf.get(SECONDARY_JARS)) } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index d09430236285..7d71a642f62f 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -36,6 +36,7 @@ import org.apache.spark.{SecurityManager, SparkConf, SparkException} import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._ import org.apache.spark.deploy.yarn.config._ import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef} import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason} import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor @@ -61,7 +62,6 @@ private[yarn] class YarnAllocator( sparkConf: SparkConf, amClient: AMRMClient[ContainerRequest], appAttemptId: ApplicationAttemptId, - args: ApplicationMasterArguments, securityMgr: SecurityManager) extends Logging { @@ -107,12 +107,12 @@ private[yarn] class YarnAllocator( private val containerIdToExecutorId = new HashMap[ContainerId, String] // Executor memory in MB. - protected val executorMemory = args.executorMemory + protected val executorMemory = sparkConf.get(EXECUTOR_MEMORY).toInt // Additional memory overhead. protected val memoryOverhead: Int = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse( math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN)).toInt // Number of cores per executor. - protected val executorCores = args.executorCores + protected val executorCores = sparkConf.get(EXECUTOR_CORES) // Resource capability requested for each executors private[yarn] val resource = Resource.newInstance(executorMemory + memoryOverhead, executorCores) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala index 83d30b7352a0..e7f75446641c 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala @@ -39,7 +39,7 @@ import org.apache.spark.util.Utils /** * Handles registering and unregistering the application with the YARN ResourceManager. */ -private[spark] class YarnRMClient(args: ApplicationMasterArguments) extends Logging { +private[spark] class YarnRMClient extends Logging { private var amClient: AMRMClient[ContainerRequest] = _ private var uiHistoryAddress: String = _ @@ -72,8 +72,7 @@ private[spark] class YarnRMClient(args: ApplicationMasterArguments) extends Logg amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress) registered = true } - new YarnAllocator(driverUrl, driverRef, conf, sparkConf, amClient, getAttemptId(), args, - securityMgr) + new YarnAllocator(driverUrl, driverRef, conf, sparkConf, amClient, getAttemptId(), securityMgr) } /** diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala index aad8259fe069..902d9ae1a58d 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala @@ -189,20 +189,12 @@ package object config { .bytesConf(ByteUnit.MiB) .optional - private[spark] val DRIVER_MEMORY = ConfigBuilder("spark.driver.memory") - .bytesConf(ByteUnit.MiB) - .withDefaultString("1g") - /* Executor configuration. */ private[spark] val EXECUTOR_CORES = ConfigBuilder("spark.executor.cores") .intConf .withDefault(1) - private[spark] val EXECUTOR_MEMORY = ConfigBuilder("spark.executor.memory") - .bytesConf(ByteUnit.MiB) - .withDefaultString("1g") - private[spark] val EXECUTOR_MEMORY_OVERHEAD = ConfigBuilder("spark.yarn.executor.memoryOverhead") .bytesConf(ByteUnit.MiB) .optional @@ -258,10 +250,8 @@ package object config { .toSequence .optional - private[spark] val PY_FILES = ConfigBuilder("spark.submit.pyFiles") + private[spark] val JARS_TO_DISTRIBUTE = ConfigBuilder("spark.yarn.dist.jars") .internal .stringConf - .toSequence - .withDefault(Nil) - + .optional } diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala index c19ba63fad82..1cb42d10ce04 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala @@ -118,7 +118,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll val sparkConf = new SparkConf() .set(SPARK_JARS, Seq(SPARK)) .set(USER_CLASS_PATH_FIRST, true) - .set("spark.jars", ADDED) + .set("spark.yarn.dist.jars", ADDED) val env = new MutableHashMap[String, String]() val args = new ClientArguments(Array("--jar", USER)) @@ -142,7 +142,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll val conf = new Configuration() val sparkConf = new SparkConf() .set(SPARK_JARS, Seq(SPARK)) - .set("spark.jars", ADDED) + .set("spark.yarn.dist.jars", ADDED) val client = createClient(sparkConf, args = Array("--jar", USER)) val tempDir = Utils.createTempDir() diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala index 0587444a334b..a641a6e73e85 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala @@ -90,12 +90,13 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter def createAllocator(maxExecutors: Int = 5): YarnAllocator = { val args = Array( - "--executor-cores", "5", - "--executor-memory", "2048", "--jar", "somejar.jar", "--class", "SomeClass") val sparkConfClone = sparkConf.clone() - sparkConfClone.set("spark.executor.instances", maxExecutors.toString) + sparkConfClone + .set("spark.executor.instances", maxExecutors.toString) + .set("spark.executor.cores", "5") + .set("spark.executor.memory", "2048") new YarnAllocator( "not used", mock(classOf[RpcEndpointRef]), @@ -103,7 +104,6 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter sparkConfClone, rmClient, appAttemptId, - new ApplicationMasterArguments(args), new SecurityManager(sparkConf)) } From 7f414039c06c24cacebcf31a9b8ef86a29fea28e Mon Sep 17 00:00:00 2001 From: jerryshao Date: Thu, 17 Mar 2016 17:14:59 +0800 Subject: [PATCH 05/10] Fix rebase issue and add unit tests --- .../spark/deploy/yarn/ClientSuite.scala | 2 +- .../spark/deploy/yarn/YarnClusterSuite.scala | 46 ++++++++++++++++++- 2 files changed, 45 insertions(+), 3 deletions(-) diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala index 1cb42d10ce04..9a238dbd5345 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala @@ -349,7 +349,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll sparkConf: SparkConf, conf: Configuration = new Configuration(), args: Array[String] = Array()): Client = { - val clientArgs = new ClientArguments(args, sparkConf) + val clientArgs = new ClientArguments(args) val client = spy(new Client(clientArgs, conf, sparkConf)) doReturn(new Path("/")).when(client).copyFileToRemote(any(classOf[Path]), any(classOf[Path]), anyShort()) diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala index 26520529ecab..b2b4d84f53d8 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala @@ -85,6 +85,35 @@ class YarnClusterSuite extends BaseYarnClusterSuite { testBasicYarnApp(false) } + test("run Spark in yarn-client mode with different configurations") { + testBasicYarnApp(true, + Map( + "spark.driver.memory" -> "512m", + "spark.executor.cores" -> "1", + "spark.executor.memory" -> "512m", + "spark.executor.instances" -> "2" + )) + } + + test("run Spark in yarn-cluster mode with different configurations") { + testBasicYarnApp(true, + Map( + "spark.driver.memory" -> "512m", + "spark.driver.cores" -> "1", + "spark.executor.cores" -> "1", + "spark.executor.memory" -> "512m", + "spark.executor.instances" -> "2" + )) + } + + test("run Spark in yarn-client mode with additional jar") { + testWithAddJar(true) + } + + test("run Spark in yarn-cluster mode with additional jar") { + testWithAddJar(false) + } + test("run Spark in yarn-cluster mode unsuccessfully") { // Don't provide arguments so the driver will fail. val finalState = runSpark(false, mainClassName(YarnClusterDriver.getClass)) @@ -139,13 +168,26 @@ class YarnClusterSuite extends BaseYarnClusterSuite { } } - private def testBasicYarnApp(clientMode: Boolean): Unit = { + private def testBasicYarnApp(clientMode: Boolean, conf: Map[String, String] = Map()): Unit = { val result = File.createTempFile("result", null, tempDir) val finalState = runSpark(clientMode, mainClassName(YarnClusterDriver.getClass), - appArgs = Seq(result.getAbsolutePath())) + appArgs = Seq(result.getAbsolutePath()), + extraConf = conf) checkResult(finalState, result) } + private def testWithAddJar(clientMode: Boolean): Unit = { + val originalJar = TestUtils.createJarWithFiles(Map("test.resource" -> "ORIGINAL"), tempDir) + val driverResult = File.createTempFile("driver", null, tempDir) + val executorResult = File.createTempFile("executor", null, tempDir) + val finalState = runSpark(clientMode, mainClassName(YarnClasspathTest.getClass), + appArgs = Seq(driverResult.getAbsolutePath(), executorResult.getAbsolutePath()), + extraClassPath = Seq(originalJar.getPath()), + extraJars = Seq("local:" + originalJar.getPath())) + checkResult(finalState, driverResult, "ORIGINAL") + checkResult(finalState, executorResult, "ORIGINAL") + } + private def testPySpark(clientMode: Boolean): Unit = { val primaryPyFile = new File(tempDir, "test.py") Files.write(TEST_PYFILE, primaryPyFile, StandardCharsets.UTF_8) From 6d3c62d59c292b1aeee6e896c1dc5609091f4c11 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Tue, 22 Mar 2016 14:53:13 +0800 Subject: [PATCH 06/10] Minor fix --- .../main/scala/org/apache/spark/deploy/yarn/Client.scala | 8 ++------ .../org/apache/spark/deploy/yarn/ClientArguments.scala | 2 -- 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 1aa2be1912b6..101e60f5a860 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -533,12 +533,8 @@ private[spark] class Client( * (3) whether to add these resources to the classpath */ val cachedSecondaryJarLinks = ListBuffer.empty[String] - val files = sparkConf.get(FILES_TO_DISTRIBUTE).map(p => Utils.resolveURIs(p)) - .orElse(sys.env.get("SPARK_YARN_DIST_FILES")) - .orNull - val archives = sparkConf.get(ARCHIVES_TO_DISTRIBUTE).map(p => Utils.resolveURIs(p)) - .orElse(sys.env.get("SPARK_YARN_DIST_ARCHIVES")) - .orNull + val files = sparkConf.get(FILES_TO_DISTRIBUTE).map(p => Utils.resolveURIs(p)).orNull + val archives = sparkConf.get(ARCHIVES_TO_DISTRIBUTE).map(p => Utils.resolveURIs(p)).orNull List( (sparkConf.get(JARS_TO_DISTRIBUTE).orNull, LocalResourceType.FILE, true), (files, LocalResourceType.FILE, false), diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index f32c7cc86a1d..61c027ec4483 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -33,7 +33,6 @@ private[spark] class ClientArguments(args: Array[String]) { private def parseArgs(inputArgs: List[String]): Unit = { var args = inputArgs - // scalastyle:off println while (!args.isEmpty) { args match { case ("--jar") :: value :: tail => @@ -62,7 +61,6 @@ private[spark] class ClientArguments(args: Array[String]) { throw new IllegalArgumentException(getUsageMessage(args)) } } - // scalastyle:on println if (primaryPyFile != null && primaryRFile != null) { throw new IllegalArgumentException("Cannot have primary-py-file and primary-r-file" + From f9b62a1450e75437f44751c0d7e2a70aee792a58 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Thu, 24 Mar 2016 16:25:01 +0800 Subject: [PATCH 07/10] remove old log4j environment --- .../org/apache/spark/deploy/yarn/Client.scala | 33 +++++-------------- 1 file changed, 8 insertions(+), 25 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 101e60f5a860..2050d889ef90 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -374,14 +374,6 @@ private[spark] class Client( val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]() - val oldLog4jConf = Option(System.getenv("SPARK_LOG4J_CONF")) - if (oldLog4jConf.isDefined) { - logWarning( - "SPARK_LOG4J_CONF detected in the system environment. This variable has been " + - "deprecated. Please refer to the \"Launching Spark on YARN\" documentation " + - "for alternatives.") - } - def addDistributedUri(uri: URI): Boolean = { val uriStr = uri.toString() if (distributedUris.contains(uriStr)) { @@ -503,25 +495,16 @@ private[spark] class Client( } /** - * Copy a few resources to the distributed cache if their scheme is not "local". + * Copy user jar to the distributed cache if their scheme is not "local". * Otherwise, set the corresponding key in our SparkConf to handle it downstream. - * Each resource is represented by a 3-tuple of: - * (1) destination resource name, - * (2) local path to the resource, - * (3) Spark property key to set if the scheme is not local */ - List( - (APP_JAR_NAME, args.userJar, APP_JAR), - ("log4j.properties", oldLog4jConf.orNull, null) - ).foreach { case (destName, path, confKey) => - if (path != null && !path.trim().isEmpty()) { - val (isLocal, localizedPath) = distribute(path, destName = Some(destName)) - if (isLocal && confKey != null) { - require(localizedPath != null, s"Path $path already distributed.") - // If the resource is intended for local use only, handle this downstream - // by setting the appropriate property - sparkConf.set(confKey, localizedPath) - } + for (jar <- Option(args.userJar); if !jar.trim.isEmpty) { + val (isLocal, localizedPath) = distribute(jar, destName = Some(APP_JAR_NAME)) + if (isLocal) { + require(localizedPath != null, s"Path $jar already distributed") + // If the resource is intended for local use only, handle this downstream + // by setting the appropriate property + sparkConf.set(APP_JAR, localizedPath) } } From d152f9f2023688a11011183b6c859873c8b2f8fb Mon Sep 17 00:00:00 2001 From: jerryshao Date: Tue, 29 Mar 2016 16:00:24 +0800 Subject: [PATCH 08/10] Address the comments --- .../main/scala/org/apache/spark/deploy/SparkSubmit.scala | 3 ++- .../scala/org/apache/spark/internal/config/package.scala | 2 +- .../main/scala/org/apache/spark/deploy/yarn/Client.scala | 8 +++----- 3 files changed, 6 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 49b3a78e9419..926e1ff7a874 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -616,7 +616,8 @@ object SparkSubmit { "spark.jars", "spark.files", "spark.yarn.dist.files", - "spark.yarn.dist.archives") + "spark.yarn.dist.archives", + "spark.yarn.dist.jars") pathConfigs.foreach { config => // Replace old URIs with resolved URIs, if they exist sysProps.get(config).foreach { oldValue => diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 34a64c4318a4..968c5192ac67 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -34,7 +34,7 @@ package object config { private[spark] val DRIVER_USER_CLASS_PATH_FIRST = ConfigBuilder("spark.driver.userClassPathFirst").booleanConf.withDefault(false) - private[spark] val DRIVER_MEMORY = ConfigBuilder("spark.driver.memory") + private[spark] val DRIVER_MEMORY = ConfigBuilder("spark.driver.memory") .bytesConf(ByteUnit.MiB) .withDefaultString("1g") diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 2050d889ef90..91a74b9ba214 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -498,7 +498,7 @@ private[spark] class Client( * Copy user jar to the distributed cache if their scheme is not "local". * Otherwise, set the corresponding key in our SparkConf to handle it downstream. */ - for (jar <- Option(args.userJar); if !jar.trim.isEmpty) { + Option(args.userJar).filter(_.trim.nonEmpty).foreach { jar => val (isLocal, localizedPath) = distribute(jar, destName = Some(APP_JAR_NAME)) if (isLocal) { require(localizedPath != null, s"Path $jar already distributed") @@ -516,12 +516,10 @@ private[spark] class Client( * (3) whether to add these resources to the classpath */ val cachedSecondaryJarLinks = ListBuffer.empty[String] - val files = sparkConf.get(FILES_TO_DISTRIBUTE).map(p => Utils.resolveURIs(p)).orNull - val archives = sparkConf.get(ARCHIVES_TO_DISTRIBUTE).map(p => Utils.resolveURIs(p)).orNull List( (sparkConf.get(JARS_TO_DISTRIBUTE).orNull, LocalResourceType.FILE, true), - (files, LocalResourceType.FILE, false), - (archives, LocalResourceType.ARCHIVE, false) + (sparkConf.get(FILES_TO_DISTRIBUTE).orNull, LocalResourceType.FILE, false), + (sparkConf.get(ARCHIVES_TO_DISTRIBUTE).orNull, LocalResourceType.ARCHIVE, false) ).foreach { case (flist, resType, addToClasspath) => if (flist != null && !flist.isEmpty()) { flist.split(',').foreach { file => From 7feae6e376d368f8aecaeda6578fde6703969bd1 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Wed, 30 Mar 2016 13:55:21 +0800 Subject: [PATCH 09/10] Add spark.yarn.dist.jars to doc and fix minor issue --- docs/running-on-yarn.md | 7 +++++++ .../spark/deploy/yarn/ApplicationMasterArguments.scala | 2 +- .../main/scala/org/apache/spark/deploy/yarn/config.scala | 9 ++++----- 3 files changed, 12 insertions(+), 6 deletions(-) diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index c775fe710ffd..bb83272ec80e 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -215,6 +215,13 @@ If you need a reference to the proper location to put log files in the YARN so t Comma-separated list of files to be placed in the working directory of each executor. + + spark.yarn.dist.jars + (none) + + Comma-separated list of jars to be placed in the working directory of each executor. + + spark.executor.cores 1 in YARN mode, all the available cores on the worker in standalone mode. diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala index b3bd5605b6f3..5cdec87667a5 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala @@ -91,7 +91,7 @@ class ApplicationMasterArguments(val args: Array[String]) { | --class CLASS_NAME Name of your application's main class | --primary-py-file A main Python file | --primary-r-file A main R file - | --args ARGS Arguments to be passed to your application's main class. + | --arg ARG Argument to be passed to your application's main class. | Multiple invocations are possible, each will be passed in order. | --properties-file FILE Path to a custom Spark properties file. """.stripMargin) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala index 902d9ae1a58d..02793a41c37c 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala @@ -91,6 +91,10 @@ package object config { .stringConf .optional + private[spark] val JARS_TO_DISTRIBUTE = ConfigBuilder("spark.yarn.dist.jars") + .stringConf + .optional + private[spark] val PRESERVE_STAGING_FILES = ConfigBuilder("spark.yarn.preserve.staging.files") .doc("Whether to preserve temporary files created by the job in HDFS.") .booleanConf @@ -249,9 +253,4 @@ package object config { .stringConf .toSequence .optional - - private[spark] val JARS_TO_DISTRIBUTE = ConfigBuilder("spark.yarn.dist.jars") - .internal - .stringConf - .optional } From 3bb44b4b1b84f9a972ad8ea4876b70369ba07d0c Mon Sep 17 00:00:00 2001 From: jerryshao Date: Thu, 31 Mar 2016 13:17:12 +0800 Subject: [PATCH 10/10] minor fix --- .../org/apache/spark/deploy/yarn/Client.scala | 20 +++++++++---------- .../org/apache/spark/deploy/yarn/config.scala | 9 ++++++--- 2 files changed, 15 insertions(+), 14 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 91a74b9ba214..3049f1123ef6 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -517,17 +517,15 @@ private[spark] class Client( */ val cachedSecondaryJarLinks = ListBuffer.empty[String] List( - (sparkConf.get(JARS_TO_DISTRIBUTE).orNull, LocalResourceType.FILE, true), - (sparkConf.get(FILES_TO_DISTRIBUTE).orNull, LocalResourceType.FILE, false), - (sparkConf.get(ARCHIVES_TO_DISTRIBUTE).orNull, LocalResourceType.ARCHIVE, false) + (sparkConf.get(JARS_TO_DISTRIBUTE), LocalResourceType.FILE, true), + (sparkConf.get(FILES_TO_DISTRIBUTE), LocalResourceType.FILE, false), + (sparkConf.get(ARCHIVES_TO_DISTRIBUTE), LocalResourceType.ARCHIVE, false) ).foreach { case (flist, resType, addToClasspath) => - if (flist != null && !flist.isEmpty()) { - flist.split(',').foreach { file => - val (_, localizedPath) = distribute(file, resType = resType) - require(localizedPath != null) - if (addToClasspath) { - cachedSecondaryJarLinks += localizedPath - } + flist.foreach { file => + val (_, localizedPath) = distribute(file, resType = resType) + require(localizedPath != null) + if (addToClasspath) { + cachedSecondaryJarLinks += localizedPath } } } @@ -1264,7 +1262,7 @@ private object Client extends Logging { val secondaryJars = if (args != null) { - getSecondaryJarUris(sparkConf.get(JARS_TO_DISTRIBUTE).map(_.split(",").toSeq)) + getSecondaryJarUris(Option(sparkConf.get(JARS_TO_DISTRIBUTE))) } else { getSecondaryJarUris(sparkConf.get(SECONDARY_JARS)) } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala index 02793a41c37c..61ed4b706e83 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala @@ -85,15 +85,18 @@ package object config { private[spark] val ARCHIVES_TO_DISTRIBUTE = ConfigBuilder("spark.yarn.dist.archives") .stringConf - .optional + .toSequence + .withDefault(Nil) private[spark] val FILES_TO_DISTRIBUTE = ConfigBuilder("spark.yarn.dist.files") .stringConf - .optional + .toSequence + .withDefault(Nil) private[spark] val JARS_TO_DISTRIBUTE = ConfigBuilder("spark.yarn.dist.jars") .stringConf - .optional + .toSequence + .withDefault(Nil) private[spark] val PRESERVE_STAGING_FILES = ConfigBuilder("spark.yarn.preserve.staging.files") .doc("Whether to preserve temporary files created by the job in HDFS.")