From 0566bb89318a848ee6d2f551430d9fd135a22c7d Mon Sep 17 00:00:00 2001 From: WangTaoTheTonic Date: Thu, 4 Dec 2014 21:42:47 +0800 Subject: [PATCH 01/16] yarn client mode Application Master memory size is same as driver memory size --- .../org/apache/spark/deploy/SparkSubmit.scala | 1 + .../spark/deploy/SparkSubmitArguments.scala | 15 ++++++++++++++- .../org/apache/spark/deploy/yarn/ClientBase.scala | 7 ++++++- 3 files changed, 21 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 0c7d247519447..0226634f0eff0 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -196,6 +196,7 @@ object SparkSubmit { OptionAssigner(args.executorCores, YARN, CLIENT, sysProp = "spark.executor.cores"), OptionAssigner(args.files, YARN, CLIENT, sysProp = "spark.yarn.dist.files"), OptionAssigner(args.archives, YARN, CLIENT, sysProp = "spark.yarn.dist.archives"), + OptionAssigner(args.amMemory, YARN, CLIENT, sysProp = "spark.yarn.appMaster.memory"), // Yarn cluster only OptionAssigner(args.name, YARN, CLUSTER, clOption = "--name"), diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index f0e9ee67f6a67..e2f500eeb8483 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -39,6 +39,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St var driverExtraLibraryPath: String = null var driverExtraJavaOptions: String = null var driverCores: String = null + var amMemory: String = null var supervise: Boolean = false var queue: String = null var numExecutors: String = null @@ -107,6 +108,10 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St .orElse(sparkProperties.get("spark.driver.memory")) .orElse(env.get("SPARK_DRIVER_MEMORY")) .orNull + amMemory = Option(amMemory) + .orElse(sparkProperties.get("spark.yarn.appMaster.memory")) + .orElse(env.get("SPARK_YARN_AM_MEMORY")) + .orNull executorMemory = Option(executorMemory) .orElse(sparkProperties.get("spark.executor.memory")) .orElse(env.get("SPARK_EXECUTOR_MEMORY")) @@ -193,6 +198,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St | driverExtraClassPath $driverExtraClassPath | driverExtraLibraryPath $driverExtraLibraryPath | driverExtraJavaOptions $driverExtraJavaOptions + | amMemory $amMemory | supervise $supervise | queue $queue | numExecutors $numExecutors @@ -279,6 +285,10 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St driverExtraLibraryPath = value parse(tail) + case ("--am-memory") :: value :: tail => + amMemory = value + parse(tail) + case ("--properties-file") :: value :: tail => propertiesFile = value parse(tail) @@ -390,7 +400,10 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St | --queue QUEUE_NAME The YARN queue to submit to (Default: "default"). | --num-executors NUM Number of executors to launch (Default: 2). | --archives ARCHIVES Comma separated list of archives to be extracted into the - | working directory of each executor.""".stripMargin + | working directory of each executor. + | --am-memory MEM Memory for ApplicationMaster(e.g. 1000M, 2G) (Default: 512M). + | Only available on yarn-client mode. + """.stripMargin ) SparkSubmit.exitFn() } diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala index f95d72379171c..c6c63e54b4b35 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala @@ -318,7 +318,12 @@ private[spark] trait ClientBase extends Logging { var prefixEnv: Option[String] = None // Add Xmx for AM memory - javaOpts += "-Xmx" + args.amMemory + "m" + val amMemory = if (isLaunchingDriver) { + args.amMemory + } else { + Utils.memoryStringToMb(sparkConf.get("spark.yarn.appMaster.memory", "512m")) + } + javaOpts += "-Xmx" + amMemory + "m" val tmpDir = new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) javaOpts += "-Djava.io.tmpdir=" + tmpDir From 6fd13e1bda3513204375736b3b5551a6bfb0d5a0 Mon Sep 17 00:00:00 2001 From: WangTaoTheTonic Date: Fri, 5 Dec 2014 10:24:59 +0800 Subject: [PATCH 02/16] add overhead mem and remove some configs --- .../org/apache/spark/deploy/SparkSubmit.scala | 1 - .../spark/deploy/SparkSubmitArguments.scala | 11 -------- .../spark/deploy/yarn/ClientArguments.scala | 25 +++++++++++++------ .../apache/spark/deploy/yarn/ClientBase.scala | 7 +----- 4 files changed, 18 insertions(+), 26 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 0226634f0eff0..0c7d247519447 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -196,7 +196,6 @@ object SparkSubmit { OptionAssigner(args.executorCores, YARN, CLIENT, sysProp = "spark.executor.cores"), OptionAssigner(args.files, YARN, CLIENT, sysProp = "spark.yarn.dist.files"), OptionAssigner(args.archives, YARN, CLIENT, sysProp = "spark.yarn.dist.archives"), - OptionAssigner(args.amMemory, YARN, CLIENT, sysProp = "spark.yarn.appMaster.memory"), // Yarn cluster only OptionAssigner(args.name, YARN, CLUSTER, clOption = "--name"), diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index e2f500eeb8483..0176a6bd796c2 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -39,7 +39,6 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St var driverExtraLibraryPath: String = null var driverExtraJavaOptions: String = null var driverCores: String = null - var amMemory: String = null var supervise: Boolean = false var queue: String = null var numExecutors: String = null @@ -108,10 +107,6 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St .orElse(sparkProperties.get("spark.driver.memory")) .orElse(env.get("SPARK_DRIVER_MEMORY")) .orNull - amMemory = Option(amMemory) - .orElse(sparkProperties.get("spark.yarn.appMaster.memory")) - .orElse(env.get("SPARK_YARN_AM_MEMORY")) - .orNull executorMemory = Option(executorMemory) .orElse(sparkProperties.get("spark.executor.memory")) .orElse(env.get("SPARK_EXECUTOR_MEMORY")) @@ -285,10 +280,6 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St driverExtraLibraryPath = value parse(tail) - case ("--am-memory") :: value :: tail => - amMemory = value - parse(tail) - case ("--properties-file") :: value :: tail => propertiesFile = value parse(tail) @@ -401,8 +392,6 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St | --num-executors NUM Number of executors to launch (Default: 2). | --archives ARCHIVES Comma separated list of archives to be extracted into the | working directory of each executor. - | --am-memory MEM Memory for ApplicationMaster(e.g. 1000M, 2G) (Default: 512M). - | Only available on yarn-client mode. """.stripMargin ) SparkSubmit.exitFn() diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index 4d859450efc63..180e2c2347036 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -39,14 +39,6 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) var appName: String = "Spark" var priority = 0 - // Additional memory to allocate to containers - // For now, use driver's memory overhead as our AM container's memory overhead - val amMemoryOverhead = sparkConf.getInt("spark.yarn.driver.memoryOverhead", - math.max((MEMORY_OVERHEAD_FACTOR * amMemory).toInt, MEMORY_OVERHEAD_MIN)) - - val executorMemoryOverhead = sparkConf.getInt("spark.yarn.executor.memoryOverhead", - math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN)) - private val isDynamicAllocationEnabled = sparkConf.getBoolean("spark.dynamicAllocation.enabled", false) @@ -54,8 +46,25 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) loadEnvironmentArgs() validateArgs() + // Additional memory to allocate to containers + // For now, use driver's memory overhead as our AM container's memory overhead + val memOverheadStr = if (userClass == null) { + "spark.yarn.driver.memoryOverhead" + } else { + "spark.yarn.am.memoryOverhead" + } + val amMemoryOverhead = sparkConf.getInt(memOverheadStr, + math.max((MEMORY_OVERHEAD_FACTOR * amMemory).toInt, MEMORY_OVERHEAD_MIN)) + + val executorMemoryOverhead = sparkConf.getInt("spark.yarn.executor.memoryOverhead", + math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN)) + /** Load any default arguments provided through environment variables and Spark properties. */ private def loadEnvironmentArgs(): Unit = { + // We use spark.yarn.am.memory to initialize Application Master in yarn-client mode. + if (userClass == null) { + amMemory = Utils.memoryStringToMb(sparkConf.get("spark.yarn.am.memory", "512m")) + } // For backward compatibility, SPARK_YARN_DIST_{ARCHIVES/FILES} should be resolved to hdfs://, // while spark.yarn.dist.{archives/files} should be resolved to file:// (SPARK-2051). files = Option(files) diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala index c6c63e54b4b35..f95d72379171c 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala @@ -318,12 +318,7 @@ private[spark] trait ClientBase extends Logging { var prefixEnv: Option[String] = None // Add Xmx for AM memory - val amMemory = if (isLaunchingDriver) { - args.amMemory - } else { - Utils.memoryStringToMb(sparkConf.get("spark.yarn.appMaster.memory", "512m")) - } - javaOpts += "-Xmx" + amMemory + "m" + javaOpts += "-Xmx" + args.amMemory + "m" val tmpDir = new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) javaOpts += "-Djava.io.tmpdir=" + tmpDir From 44e48c226bacd970abb338190e44d587479a995c Mon Sep 17 00:00:00 2001 From: WangTaoTheTonic Date: Fri, 5 Dec 2014 10:27:30 +0800 Subject: [PATCH 03/16] minor fix --- .../scala/org/apache/spark/deploy/SparkSubmitArguments.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index 0176a6bd796c2..6875f08a1c4ca 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -193,7 +193,6 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St | driverExtraClassPath $driverExtraClassPath | driverExtraLibraryPath $driverExtraLibraryPath | driverExtraJavaOptions $driverExtraJavaOptions - | amMemory $amMemory | supervise $supervise | queue $queue | numExecutors $numExecutors From ab16bb5c996ff499ca3ef76a6d48617fe1efab8c Mon Sep 17 00:00:00 2001 From: WangTaoTheTonic Date: Tue, 9 Dec 2014 11:17:53 +0800 Subject: [PATCH 04/16] fix bug and add comments --- docs/running-on-yarn.md | 17 ++++++++++++++++- .../spark/deploy/yarn/ClientArguments.scala | 14 ++++++++------ 2 files changed, 24 insertions(+), 7 deletions(-) diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index dfe2db4b3fce8..b20ba09e7073e 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -21,6 +21,14 @@ Most of the configs are the same for Spark on YARN as for other deployment modes + + + + + @@ -88,7 +96,14 @@ Most of the configs are the same for Spark on YARN as for other deployment modes + + + + + diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index 180e2c2347036..538a3f84a6f08 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -43,17 +43,19 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) sparkConf.getBoolean("spark.dynamicAllocation.enabled", false) parseArgs(args.toList) + + val isClusterMode = userClass != null + loadEnvironmentArgs() validateArgs() - // Additional memory to allocate to containers - // For now, use driver's memory overhead as our AM container's memory overhead - val memOverheadStr = if (userClass == null) { + // Additional memory to allocate to containers. In different modes, we use different configs. + val amMemOverheadConf = if (isClusterMode) { "spark.yarn.driver.memoryOverhead" } else { "spark.yarn.am.memoryOverhead" } - val amMemoryOverhead = sparkConf.getInt(memOverheadStr, + val amMemoryOverhead = sparkConf.getInt(amMemOverheadConf, math.max((MEMORY_OVERHEAD_FACTOR * amMemory).toInt, MEMORY_OVERHEAD_MIN)) val executorMemoryOverhead = sparkConf.getInt("spark.yarn.executor.memoryOverhead", @@ -61,8 +63,8 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) /** Load any default arguments provided through environment variables and Spark properties. */ private def loadEnvironmentArgs(): Unit = { - // We use spark.yarn.am.memory to initialize Application Master in yarn-client mode. - if (userClass == null) { + // In cluster mode, the driver and the AM live in the same JVM, so this does not apply + if (!isClusterMode) { amMemory = Utils.memoryStringToMb(sparkConf.get("spark.yarn.am.memory", "512m")) } // For backward compatibility, SPARK_YARN_DIST_{ARCHIVES/FILES} should be resolved to hdfs://, From b09c309cc62f3d1c105f2c29a1ec8fdec92172bf Mon Sep 17 00:00:00 2001 From: WangTaoTheTonic Date: Tue, 9 Dec 2014 11:41:05 +0800 Subject: [PATCH 05/16] use code format --- docs/running-on-yarn.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index b20ba09e7073e..2b16904bb50a5 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -25,7 +25,7 @@ Most of the configs are the same for Spark on YARN as for other deployment modes From f6bee0e75d677314e7ba01a34c239a33565b4fcf Mon Sep 17 00:00:00 2001 From: WangTaoTheTonic Date: Fri, 12 Dec 2014 10:31:41 +0800 Subject: [PATCH 06/16] docs issue --- docs/running-on-yarn.md | 8 ++++---- .../org/apache/spark/deploy/yarn/ClientArguments.scala | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index d43cc55f5f523..7096f7a793bcb 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -25,7 +25,7 @@ Most of the configs are the same for Spark on YARN as for other deployment modes @@ -96,14 +96,14 @@ Most of the configs are the same for Spark on YARN as for other deployment modes - + diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index caa5c492193f2..fc7b2ff31e034 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -50,12 +50,12 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) validateArgs() // Additional memory to allocate to containers. In different modes, we use different configs. - val amMemOverheadConf = if (isClusterMode) { + val amMemoryOverheadConf = if (isClusterMode) { "spark.yarn.driver.memoryOverhead" } else { "spark.yarn.am.memoryOverhead" } - val amMemoryOverhead = sparkConf.getInt(amMemOverheadConf, + val amMemoryOverhead = sparkConf.getInt(amMemoryOverheadConf, math.max((MEMORY_OVERHEAD_FACTOR * amMemory).toInt, MEMORY_OVERHEAD_MIN)) val executorMemoryOverhead = sparkConf.getInt("spark.yarn.executor.memoryOverhead", From 7fa9e2e26f3de07ce7934cf623d57583fa917611 Mon Sep 17 00:00:00 2001 From: WangTaoTheTonic Date: Sat, 13 Dec 2014 09:45:50 +0800 Subject: [PATCH 07/16] log a warning --- .../scala/org/apache/spark/deploy/yarn/ClientArguments.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index fc7b2ff31e034..0d88f14b9371a 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -66,6 +66,9 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) // In cluster mode, the driver and the AM live in the same JVM, so this does not apply if (!isClusterMode) { amMemory = Utils.memoryStringToMb(sparkConf.get("spark.yarn.am.memory", "512m")) + } else { + println("spark.yarn.am.memory is set but does not apply in client mode, " + + "use spark.driver.memory instead.") } // For backward compatibility, SPARK_YARN_DIST_{ARCHIVES/FILES} should be resolved to hdfs://, // while spark.yarn.dist.{archives/files} should be resolved to file:// (SPARK-2051). From 1960d16bdd7779427165116d360ccb27941e9529 Mon Sep 17 00:00:00 2001 From: WangTaoTheTonic Date: Wed, 17 Dec 2014 20:44:41 +0800 Subject: [PATCH 08/16] fix wrong comment --- .../scala/org/apache/spark/deploy/yarn/ClientArguments.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index 0d88f14b9371a..8dbbb2258475c 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -67,7 +67,7 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) if (!isClusterMode) { amMemory = Utils.memoryStringToMb(sparkConf.get("spark.yarn.am.memory", "512m")) } else { - println("spark.yarn.am.memory is set but does not apply in client mode, " + + println("spark.yarn.am.memory is set but does not apply in cluster mode, " + "use spark.driver.memory instead.") } // For backward compatibility, SPARK_YARN_DIST_{ARCHIVES/FILES} should be resolved to hdfs://, From 42075b0ed8eca5e0e92672a1cfb7680ebdf22218 Mon Sep 17 00:00:00 2001 From: WangTaoTheTonic Date: Fri, 19 Dec 2014 10:56:50 +0800 Subject: [PATCH 09/16] arrange the args and warn logging --- docs/running-on-yarn.md | 6 +-- .../spark/deploy/yarn/ClientArguments.scala | 37 +++++++++++-------- .../apache/spark/deploy/yarn/ClientBase.scala | 2 +- 3 files changed, 26 insertions(+), 19 deletions(-) diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index b597a792751d3..5db00802f4612 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -25,8 +25,8 @@ Most of the configs are the same for Spark on YARN as for other deployment modes @@ -98,7 +98,7 @@ Most of the configs are the same for Spark on YARN as for other deployment modes diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index 8dbbb2258475c..fc71aef27e928 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -38,23 +38,22 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) var amMemory: Int = 512 // MB var appName: String = "Spark" var priority = 0 + var isClusterMode = false + + private val driverMemKey = "spark.driver.memory" + private val driverMemOverheadKey = "spark.yarn.driver.memoryOverhead" + private val amMemKey = "spark.yarn.am.memory" + private val amMemOverheadKey = "spark.yarn.am.memoryOverhead" private val isDynamicAllocationEnabled = sparkConf.getBoolean("spark.dynamicAllocation.enabled", false) parseArgs(args.toList) - - val isClusterMode = userClass != null - loadEnvironmentArgs() validateArgs() // Additional memory to allocate to containers. In different modes, we use different configs. - val amMemoryOverheadConf = if (isClusterMode) { - "spark.yarn.driver.memoryOverhead" - } else { - "spark.yarn.am.memoryOverhead" - } + val amMemoryOverheadConf = if (isClusterMode) driverMemOverheadKey else amMemOverheadKey val amMemoryOverhead = sparkConf.getInt(amMemoryOverheadConf, math.max((MEMORY_OVERHEAD_FACTOR * amMemory).toInt, MEMORY_OVERHEAD_MIN)) @@ -63,13 +62,6 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) /** Load any default arguments provided through environment variables and Spark properties. */ private def loadEnvironmentArgs(): Unit = { - // In cluster mode, the driver and the AM live in the same JVM, so this does not apply - if (!isClusterMode) { - amMemory = Utils.memoryStringToMb(sparkConf.get("spark.yarn.am.memory", "512m")) - } else { - println("spark.yarn.am.memory is set but does not apply in cluster mode, " + - "use spark.driver.memory instead.") - } // For backward compatibility, SPARK_YARN_DIST_{ARCHIVES/FILES} should be resolved to hdfs://, // while spark.yarn.dist.{archives/files} should be resolved to file:// (SPARK-2051). files = Option(files) @@ -100,6 +92,21 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) throw new IllegalArgumentException( "You must specify at least 1 executor!\n" + getUsageMessage()) } + isClusterMode = userClass != null + if (isClusterMode) { + for (key <- Seq(amMemKey, amMemOverheadKey)) { + if (sparkConf.getOption(key).isDefined) { + println(s"$key is set but does not apply in cluster mode.") + } + } + } else { + // In cluster mode, the driver and the AM live in the same JVM, so this does not apply + amMemory = Utils.memoryStringToMb(sparkConf.get(amMemKey, "512m")) + if (sparkConf.getOption(driverMemKey) || sparkConf.getOption(driverMemOverheadKey)) { + println(s"$driverMemKey, $driverMemOverheadKey or --driver-memory does not apply" + + s"in client mode, please use $amMemKey and $amMemOverheadKey instead.") + } + } } private def parseArgs(inputArgs: List[String]): Unit = { diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala index 5f0c67f05c9dd..dac5d89deb744 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala @@ -56,7 +56,7 @@ private[spark] trait ClientBase extends Logging { protected val amMemoryOverhead = args.amMemoryOverhead // MB protected val executorMemoryOverhead = args.executorMemoryOverhead // MB private val distCacheMgr = new ClientDistributedCacheManager() - private val isLaunchingDriver = args.userClass != null + private val isLaunchingDriver = args.isClusterMode /** * Fail fast if we have requested more resources per container than is available in the cluster. From 2557c5e5d7a4a431094b2fbb6a2d48432a75f93f Mon Sep 17 00:00:00 2001 From: WangTaoTheTonic Date: Fri, 19 Dec 2014 11:00:11 +0800 Subject: [PATCH 10/16] missing a single blank --- .../scala/org/apache/spark/deploy/yarn/ClientArguments.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index fc71aef27e928..477b536e33758 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -103,7 +103,7 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) // In cluster mode, the driver and the AM live in the same JVM, so this does not apply amMemory = Utils.memoryStringToMb(sparkConf.get(amMemKey, "512m")) if (sparkConf.getOption(driverMemKey) || sparkConf.getOption(driverMemOverheadKey)) { - println(s"$driverMemKey, $driverMemOverheadKey or --driver-memory does not apply" + + println(s"$driverMemKey, $driverMemOverheadKey or --driver-memory does not apply " + s"in client mode, please use $amMemKey and $amMemOverheadKey instead.") } } From b7acbb266383f6bd6afe6bde936f9de9121b5ab5 Mon Sep 17 00:00:00 2001 From: WangTaoTheTonic Date: Fri, 19 Dec 2014 11:59:28 +0800 Subject: [PATCH 11/16] incorrect method invoked --- .../scala/org/apache/spark/deploy/yarn/ClientArguments.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index 477b536e33758..f64a1e79f4aee 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -102,7 +102,7 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) } else { // In cluster mode, the driver and the AM live in the same JVM, so this does not apply amMemory = Utils.memoryStringToMb(sparkConf.get(amMemKey, "512m")) - if (sparkConf.getOption(driverMemKey) || sparkConf.getOption(driverMemOverheadKey)) { + if (sparkConf.contains(driverMemKey) || sparkConf.contains(driverMemOverheadKey)) { println(s"$driverMemKey, $driverMemOverheadKey or --driver-memory does not apply " + s"in client mode, please use $amMemKey and $amMemOverheadKey instead.") } From 2b2792883b0b12e73fb5a1b957de75a0b53492c7 Mon Sep 17 00:00:00 2001 From: WangTaoTheTonic Date: Fri, 19 Dec 2014 13:50:08 +0800 Subject: [PATCH 12/16] inaccurate description --- docs/running-on-yarn.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index 5db00802f4612..07d378e99e8b1 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -25,7 +25,7 @@ Most of the configs are the same for Spark on YARN as for other deployment modes @@ -105,7 +105,7 @@ Most of the configs are the same for Spark on YARN as for other deployment modes @@ -160,7 +160,7 @@ Most of the configs are the same for Spark on YARN as for other deployment modes From 987b99d20af846500ea2abab9bc55be10fab5e2d Mon Sep 17 00:00:00 2001 From: WangTaoTheTonic Date: Tue, 23 Dec 2014 11:19:06 +0800 Subject: [PATCH 13/16] disable --driver-memory in client mode --- .../apache/spark/deploy/yarn/ClientArguments.scala | 12 ++++++++---- .../cluster/YarnClientSchedulerBackend.scala | 2 -- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index f64a1e79f4aee..f47ce824ad204 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -100,12 +100,16 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) } } } else { + for (key <- Seq(driverMemKey, driverMemOverheadKey)) { + if (sparkConf.getOption(key).isDefined) { + println(s"$key is set but does not apply in client mode.") + } + } + if (amMemory != 512) { + println("--driver-memory is set but does not apply in client mode.") + } // In cluster mode, the driver and the AM live in the same JVM, so this does not apply amMemory = Utils.memoryStringToMb(sparkConf.get(amMemKey, "512m")) - if (sparkConf.contains(driverMemKey) || sparkConf.contains(driverMemOverheadKey)) { - println(s"$driverMemKey, $driverMemOverheadKey or --driver-memory does not apply " + - s"in client mode, please use $amMemKey and $amMemOverheadKey instead.") - } } } diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index 2923e6729cd6b..efe105b5a445c 100644 --- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -67,8 +67,6 @@ private[spark] class YarnClientSchedulerBackend( val extraArgs = new ArrayBuffer[String] val optionTuples = // List of (target Client argument, environment variable, Spark property) List( - ("--driver-memory", "SPARK_MASTER_MEMORY", "spark.master.memory"), - ("--driver-memory", "SPARK_DRIVER_MEMORY", "spark.driver.memory"), ("--num-executors", "SPARK_WORKER_INSTANCES", "spark.executor.instances"), ("--num-executors", "SPARK_EXECUTOR_INSTANCES", "spark.executor.instances"), ("--executor-memory", "SPARK_WORKER_MEMORY", "spark.executor.memory"), From ddcd592485353f9151f0e40fed320abb67bae1b7 Mon Sep 17 00:00:00 2001 From: WangTaoTheTonic Date: Thu, 8 Jan 2015 15:03:38 +0800 Subject: [PATCH 14/16] fix the bug produced in rebase and some improvements --- .../spark/deploy/yarn/ClientArguments.scala | 22 +++++++------------ 1 file changed, 8 insertions(+), 14 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index 88f51bc6b9828..53354ea8231f4 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -38,30 +38,29 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) var amMemory: Int = 512 // MB var appName: String = "Spark" var priority = 0 - var isClusterMode = false + def isClusterMode: Boolean = userClass != null + private var driverMemory: Int = 512 // MB private val driverMemKey = "spark.driver.memory" private val driverMemOverheadKey = "spark.yarn.driver.memoryOverhead" private val amMemKey = "spark.yarn.am.memory" private val amMemOverheadKey = "spark.yarn.am.memoryOverhead" - private var isDriverMemSet = false - - parseArgs(args.toList) - private val isDynamicAllocationEnabled = sparkConf.getBoolean("spark.dynamicAllocation.enabled", false) + parseArgs(args.toList) loadEnvironmentArgs() validateArgs() // Additional memory to allocate to containers - val amMemoryOverhead = sparkConf.getInt("spark.yarn.driver.memoryOverhead", + val amMemoryOverheadConf = if (isClusterMode) driverMemOverheadKey else amMemOverheadKey + val amMemoryOverhead = sparkConf.getInt(amMemoryOverheadConf, math.max((MEMORY_OVERHEAD_FACTOR * amMemory).toInt, MEMORY_OVERHEAD_MIN)) val executorMemoryOverhead = sparkConf.getInt("spark.yarn.executor.memoryOverhead", math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN)) - /** Load any default arguments provided through environment variables and Spark properties. */ + /** Load any default arguments provided through environment variables and Spark properties. */ private def loadEnvironmentArgs(): Unit = { // For backward compatibility, SPARK_YARN_DIST_{ARCHIVES/FILES} should be resolved to hdfs://, // while spark.yarn.dist.{archives/files} should be resolved to file:// (SPARK-2051). @@ -99,16 +98,13 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) println(s"$key is set but does not apply in cluster mode.") } } + amMemory = driverMemory } else { for (key <- Seq(driverMemKey, driverMemOverheadKey)) { if (sparkConf.getOption(key).isDefined) { println(s"$key is set but does not apply in client mode.") } } - if (isDriverMemSet) { - println("--driver-memory is set but does not apply in client mode.") - } - // In cluster mode, the driver and the AM live in the same JVM, so this does not apply amMemory = Utils.memoryStringToMb(sparkConf.get(amMemKey, "512m")) } } @@ -125,7 +121,6 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) case ("--class") :: value :: tail => userClass = value - isClusterMode = true args = tail case ("--args" | "--arg") :: value :: tail => @@ -143,8 +138,7 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) if (args(0) == "--master-memory") { println("--master-memory is deprecated. Use --driver-memory instead.") } - amMemory = value - isDriverMemSet = true + driverMemory = value args = tail case ("--num-workers" | "--num-executors") :: IntParam(value) :: tail => From b8410c004b6e71c54c5d1c63d3f4fb391a7f8ce7 Mon Sep 17 00:00:00 2001 From: WangTaoTheTonic Date: Fri, 9 Jan 2015 09:30:52 +0800 Subject: [PATCH 15/16] minor optiminzation --- .../org/apache/spark/deploy/yarn/ClientArguments.scala | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index 53354ea8231f4..0ed5f002401bb 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -94,18 +94,20 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) } if (isClusterMode) { for (key <- Seq(amMemKey, amMemOverheadKey)) { - if (sparkConf.getOption(key).isDefined) { + if (sparkConf.contains(key)) { println(s"$key is set but does not apply in cluster mode.") } } amMemory = driverMemory } else { for (key <- Seq(driverMemKey, driverMemOverheadKey)) { - if (sparkConf.getOption(key).isDefined) { + if (sparkConf.contains(key)) { println(s"$key is set but does not apply in client mode.") } } - amMemory = Utils.memoryStringToMb(sparkConf.get(amMemKey, "512m")) + sparkConf.getOption(amMemKey) + .map(Utils.memoryStringToMb) + .foreach(mem => amMemory = mem) } } From d5ceb1b2f181628fe0096202ffb31d95f0afcef8 Mon Sep 17 00:00:00 2001 From: WangTaoTheTonic Date: Fri, 9 Jan 2015 09:46:59 +0800 Subject: [PATCH 16/16] spark.driver.memeory is used in both modes --- .../org/apache/spark/deploy/yarn/ClientArguments.scala | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index 0ed5f002401bb..ce221a9fa72a6 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -41,7 +41,6 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) def isClusterMode: Boolean = userClass != null private var driverMemory: Int = 512 // MB - private val driverMemKey = "spark.driver.memory" private val driverMemOverheadKey = "spark.yarn.driver.memoryOverhead" private val amMemKey = "spark.yarn.am.memory" private val amMemOverheadKey = "spark.yarn.am.memoryOverhead" @@ -100,10 +99,8 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) } amMemory = driverMemory } else { - for (key <- Seq(driverMemKey, driverMemOverheadKey)) { - if (sparkConf.contains(key)) { - println(s"$key is set but does not apply in client mode.") - } + if (sparkConf.contains(driverMemOverheadKey)) { + println(s"$driverMemOverheadKey is set but does not apply in client mode.") } sparkConf.getOption(amMemKey) .map(Utils.memoryStringToMb)
Property NameDefaultMeaning
spark.yarn.am.memory512m + Amount of memory to use for the ApplicationMaster process on yarn-client mode. Use spark.driver.memory instead on yarn-cluster mode. + (e.g. 512m, 2g). +
spark.yarn.applicationMaster.waitTries 10spark.yarn.driver.memoryOverhead driverMemory * 0.07, with minimum of 384 - The amount of off heap memory (in megabytes) to be allocated per driver. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the container size (typically 6-10%). + The amount of off heap memory (in megabytes) to be allocated for driver with ApplicationMaster on cluster mode. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the container size (typically 6-10%). +
spark.yarn.am.memoryOverheaddriverMemory * 0.07, with minimum of 384 + The amount of off heap memory (in megabytes) to be allocated for ApplicationMaster on client mode. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the container size (typically 6-10%).
spark.yarn.am.memory 512m - Amount of memory to use for the ApplicationMaster process on yarn-client mode. Use spark.driver.memory instead on yarn-cluster mode. + Amount of memory to use for the ApplicationMaster process on yarn-client mode. Use spark.driver.memory instead on yarn-cluster mode. (e.g. 512m, 2g).
spark.yarn.am.memory 512m - Amount of memory to use for the ApplicationMaster process on yarn-client mode. Use spark.driver.memory instead on yarn-cluster mode. + Amount of memory to use for the Yarn ApplicationMaster in client mode. In cluster mode, use `spark.driver.memory` instead. (e.g. 512m, 2g).
spark.yarn.driver.memoryOverhead driverMemory * 0.07, with minimum of 384 - The amount of off heap memory (in megabytes) to be allocated for driver with ApplicationMaster on cluster mode. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the container size (typically 6-10%). + The amount of off heap memory (in megabytes) to be allocated per driver. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the container size (typically 6-10%).
spark.yarn.am.memoryOverheaddriverMemory * 0.07, with minimum of 384 AM memory * 0.07, with minimum of 384 - The amount of off heap memory (in megabytes) to be allocated for ApplicationMaster on client mode. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the container size (typically 6-10%). + Same as `spark.yarn.driver.memoryOverhead`, but for the ApplicationMaster in client mode.
spark.yarn.am.memory 512m - Amount of memory to use for the Yarn ApplicationMaster in client mode. In cluster mode, use `spark.driver.memory` instead. - (e.g. 512m, 2g). + Amount of memory to use for the Yarn ApplicationMaster in client mode, in the same format as JVM memory strings (e.g. 512m, 2g). + In cluster mode, use `spark.driver.memory` instead.
spark.yarn.driver.memoryOverhead driverMemory * 0.07, with minimum of 384 - The amount of off heap memory (in megabytes) to be allocated per driver. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the container size (typically 6-10%). + The amount of off heap memory (in megabytes) to be allocated per driver in cluster mode. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the container size (typically 6-10%).
spark.yarn.am.memory 512m - Amount of memory to use for the Yarn ApplicationMaster in client mode, in the same format as JVM memory strings (e.g. 512m, 2g). + Amount of memory to use for the YARN Application Master in client mode, in the same format as JVM memory strings (e.g. 512m, 2g). In cluster mode, use `spark.driver.memory` instead.
spark.yarn.am.memoryOverhead AM memory * 0.07, with minimum of 384 - Same as `spark.yarn.driver.memoryOverhead`, but for the ApplicationMaster in client mode. + Same as `spark.yarn.driver.memoryOverhead`, but for the Application Master in client mode.
spark.yarn.am.extraJavaOptions (none) - A string of extra JVM options to pass to the Yarn ApplicationMaster in client mode. + A string of extra JVM options to pass to the YARN Application Master in client mode. In cluster mode, use spark.driver.extraJavaOptions instead.