Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,8 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
| --queue QUEUE_NAME The YARN queue to submit to (Default: "default").
| --num-executors NUM Number of executors to launch (Default: 2).
| --archives ARCHIVES Comma separated list of archives to be extracted into the
| working directory of each executor.""".stripMargin
| working directory of each executor.
""".stripMargin
)
SparkSubmit.exitFn()
}
Expand Down
19 changes: 17 additions & 2 deletions docs/running-on-yarn.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@ Most of the configs are the same for Spark on YARN as for other deployment modes

<table class="table">
<tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
<tr>
<td><code>spark.yarn.am.memory</code></td>
<td>512m</td>
<td>
Amount of memory to use for the YARN Application Master in client mode, in the same format as JVM memory strings (e.g. <code>512m</code>, <code>2g</code>).
In cluster mode, use `spark.driver.memory` instead.
</td>
</tr>
<tr>
<td><code>spark.yarn.am.waitTime</code></td>
<td>100000</td>
Expand Down Expand Up @@ -90,7 +98,14 @@ Most of the configs are the same for Spark on YARN as for other deployment modes
<td><code>spark.yarn.driver.memoryOverhead</code></td>
<td>driverMemory * 0.07, with minimum of 384 </td>
<td>
The amount of off heap memory (in megabytes) to be allocated per driver. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the container size (typically 6-10%).
The amount of off heap memory (in megabytes) to be allocated per driver in cluster mode. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the container size (typically 6-10%).
</td>
</tr>
<tr>
<td><code>spark.yarn.am.memoryOverhead</code></td>
<td>AM memory * 0.07, with minimum of 384 </td>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be nice to add comment to spark.yarn.driver.memoryOverhead saying it applies in cluster mode.

This config is a bit different from the others as the memory overhead is purely a yarn thing and doesn't apply in other modes. ie There is no existing spark.driver.memoryOverhead. We could potentially just use one config for this. I'm not sure if that will be more confusing or not though... @sryza @vanzin @andrewor14 thoughts on that?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find it more confusing to collapse these two configs. It's immediately intuitive that spark.yarn.am.memoryOverhead is the overhead applied on top of spark.yarn.am.memory since they share the same prefix, but it's not at all clear to me that spark.driver.memoryOverhead is related at all (assuming the user does not have expertise in how different deploy modes are architected).

But yes we should definitely add a comment in spark.yarn.driver.memoryOverhead to say it's only for cluster mode.

<td>
Same as `spark.yarn.driver.memoryOverhead`, but for the Application Master in client mode.
</td>
</tr>
<tr>
Expand Down Expand Up @@ -145,7 +160,7 @@ Most of the configs are the same for Spark on YARN as for other deployment modes
<td><code>spark.yarn.am.extraJavaOptions</code></td>
<td>(none)</td>
<td>
A string of extra JVM options to pass to the Yarn ApplicationMaster in client mode.
A string of extra JVM options to pass to the YARN Application Master in client mode.
In cluster mode, use spark.driver.extraJavaOptions instead.
</td>
</tr>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ private[spark] class Client(
private val amMemoryOverhead = args.amMemoryOverhead // MB
private val executorMemoryOverhead = args.executorMemoryOverhead // MB
private val distCacheMgr = new ClientDistributedCacheManager()
private val isClusterMode = args.userClass != null
private val isClusterMode = args.isClusterMode


def stop(): Unit = yarnClient.stop()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,23 +38,27 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
var amMemory: Int = 512 // MB
var appName: String = "Spark"
var priority = 0
def isClusterMode: Boolean = userClass != null

private var driverMemory: Int = 512 // MB
private val driverMemOverheadKey = "spark.yarn.driver.memoryOverhead"
private val amMemKey = "spark.yarn.am.memory"
private val amMemOverheadKey = "spark.yarn.am.memoryOverhead"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see this config being used to set anything? ie set amMemoryOverhead

private val isDynamicAllocationEnabled =
sparkConf.getBoolean("spark.dynamicAllocation.enabled", false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer you move this up there with other private vals and keep the calls to parseArgs, loadEnvironmentArgs and validateArgs together as before


parseArgs(args.toList)
loadEnvironmentArgs()
validateArgs()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor point, but when I refactored this I made sure to keep these three things (parseArgs ... validateArgs) right after each other so it's easy to follow what the order is. I would move val isClusterMode after validateArgs()


// Additional memory to allocate to containers
// For now, use driver's memory overhead as our AM container's memory overhead
val amMemoryOverhead = sparkConf.getInt("spark.yarn.driver.memoryOverhead",
val amMemoryOverheadConf = if (isClusterMode) driverMemOverheadKey else amMemOverheadKey
val amMemoryOverhead = sparkConf.getInt(amMemoryOverheadConf,
math.max((MEMORY_OVERHEAD_FACTOR * amMemory).toInt, MEMORY_OVERHEAD_MIN))

val executorMemoryOverhead = sparkConf.getInt("spark.yarn.executor.memoryOverhead",
math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN))

private val isDynamicAllocationEnabled =
sparkConf.getBoolean("spark.dynamicAllocation.enabled", false)

loadEnvironmentArgs()
validateArgs()

/** Load any default arguments provided through environment variables and Spark properties. */
private def loadEnvironmentArgs(): Unit = {
// For backward compatibility, SPARK_YARN_DIST_{ARCHIVES/FILES} should be resolved to hdfs://,
Expand Down Expand Up @@ -87,6 +91,21 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
throw new IllegalArgumentException(
"You must specify at least 1 executor!\n" + getUsageMessage())
}
if (isClusterMode) {
for (key <- Seq(amMemKey, amMemOverheadKey)) {
if (sparkConf.contains(key)) {
println(s"$key is set but does not apply in cluster mode.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you want to use println here. Also, I'm always a little conflicted about these logs, since they'll show up when you set these values in the default conf file (which would apply to a lot of different jobs unless they explicitly override the conf file). But not a big deal.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As ClientArguments.scala didn't extends Logging class, only println can be used here.
Yep, if user set the config values that never be used in that mode, we should give a prompt.

BTW, spark.driver.memory is used in both modes, so I deleted the meesage about it.

}
}
amMemory = driverMemory
} else {
if (sparkConf.contains(driverMemOverheadKey)) {
println(s"$driverMemOverheadKey is set but does not apply in client mode.")
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This warning doesn't make sense. It's a perfectly reasonable thing for YARN users to set the driver memory in client mode.

sparkConf.getOption(amMemKey)
.map(Utils.memoryStringToMb)
.foreach(mem => amMemory = mem)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unindent, I will fix when I merge

}
}

private def parseArgs(inputArgs: List[String]): Unit = {
Expand Down Expand Up @@ -118,7 +137,7 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
if (args(0) == "--master-memory") {
println("--master-memory is deprecated. Use --driver-memory instead.")
}
amMemory = value
driverMemory = value
args = tail

case ("--num-workers" | "--num-executors") :: IntParam(value) :: tail =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,6 @@ private[spark] class YarnClientSchedulerBackend(
// List of (target Client argument, environment variable, Spark property)
val optionTuples =
List(
("--driver-memory", "SPARK_MASTER_MEMORY", "spark.master.memory"),
("--driver-memory", "SPARK_DRIVER_MEMORY", "spark.driver.memory"),
("--num-executors", "SPARK_WORKER_INSTANCES", "spark.executor.instances"),
("--num-executors", "SPARK_EXECUTOR_INSTANCES", "spark.executor.instances"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wait, why remove these?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since in Client.scala the --driver-memory passed by spark-submit is not used anymore.
I thought we've disscussed it before.

@andrewor14 Ok, I got what you mean. I think I have a misunderstanding before.
To solve this problem, should we just delete
("--driver-memory", "SPARK_MASTER_MEMORY", "spark.master.memory"),
("--driver-memory", "SPARK_DRIVER_MEMORY", "spark.driver.memory"), .
in YarnClientSchedulerBackend.scala?

@WangTaoTheTonic that would fix it, but I think in addition to that we should also add a check in ClientArguments itself in case the user calls into the Client main class and specifying --driver-memory manually.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah ok. Also it doesn't really make sense to pass driver memory on in client mode anyway, because the driver by definition has already started when YarnClientSchedulerBackend is created.

("--executor-memory", "SPARK_WORKER_MEMORY", "spark.executor.memory"),
Expand Down