@@ -38,30 +38,29 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
3838 var amMemory : Int = 512 // MB
3939 var appName : String = " Spark"
4040 var priority = 0
41- var isClusterMode = false
41+ def isClusterMode : Boolean = userClass != null
4242
43+ private var driverMemory : Int = 512 // MB
4344 private val driverMemKey = " spark.driver.memory"
4445 private val driverMemOverheadKey = " spark.yarn.driver.memoryOverhead"
4546 private val amMemKey = " spark.yarn.am.memory"
4647 private val amMemOverheadKey = " spark.yarn.am.memoryOverhead"
47- private var isDriverMemSet = false
48-
49- parseArgs(args.toList)
50-
5148 private val isDynamicAllocationEnabled =
5249 sparkConf.getBoolean(" spark.dynamicAllocation.enabled" , false )
5350
51+ parseArgs(args.toList)
5452 loadEnvironmentArgs()
5553 validateArgs()
5654
5755 // Additional memory to allocate to containers
58- val amMemoryOverhead = sparkConf.getInt(" spark.yarn.driver.memoryOverhead" ,
56+ val amMemoryOverheadConf = if (isClusterMode) driverMemOverheadKey else amMemOverheadKey
57+ val amMemoryOverhead = sparkConf.getInt(amMemoryOverheadConf,
5958 math.max((MEMORY_OVERHEAD_FACTOR * amMemory).toInt, MEMORY_OVERHEAD_MIN ))
6059
6160 val executorMemoryOverhead = sparkConf.getInt(" spark.yarn.executor.memoryOverhead" ,
6261 math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN ))
6362
64- /** Load any default arguments provided through environment variables and Spark properties. */
63+ /** Load any default arguments provided through environment variables and Spark properties. */
6564 private def loadEnvironmentArgs (): Unit = {
6665 // For backward compatibility, SPARK_YARN_DIST_{ARCHIVES/FILES} should be resolved to hdfs://,
6766 // while spark.yarn.dist.{archives/files} should be resolved to file:// (SPARK-2051).
@@ -99,16 +98,13 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
9998 println(s " $key is set but does not apply in cluster mode. " )
10099 }
101100 }
101+ amMemory = driverMemory
102102 } else {
103103 for (key <- Seq (driverMemKey, driverMemOverheadKey)) {
104104 if (sparkConf.getOption(key).isDefined) {
105105 println(s " $key is set but does not apply in client mode. " )
106106 }
107107 }
108- if (isDriverMemSet) {
109- println(" --driver-memory is set but does not apply in client mode." )
110- }
111- // In cluster mode, the driver and the AM live in the same JVM, so this does not apply
112108 amMemory = Utils .memoryStringToMb(sparkConf.get(amMemKey, " 512m" ))
113109 }
114110 }
@@ -125,7 +121,6 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
125121
126122 case (" --class" ) :: value :: tail =>
127123 userClass = value
128- isClusterMode = true
129124 args = tail
130125
131126 case (" --args" | " --arg" ) :: value :: tail =>
@@ -143,8 +138,7 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
143138 if (args(0 ) == " --master-memory" ) {
144139 println(" --master-memory is deprecated. Use --driver-memory instead." )
145140 }
146- amMemory = value
147- isDriverMemSet = true
141+ driverMemory = value
148142 args = tail
149143
150144 case (" --num-workers" | " --num-executors" ) :: IntParam (value) :: tail =>
0 commit comments