Skip to content

Commit 6c94d79

Browse files
committed
Various cleanups in ClientBase and ClientArguments
1 parent ef7069a commit 6c94d79

File tree

5 files changed

+122
-121
lines changed

5 files changed

+122
-121
lines changed

yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,6 @@ private[spark] class Client(
5050

5151
/** Submit an application running our ApplicationMaster to the ResourceManager. */
5252
override def submitApplication(): ApplicationId = {
53-
validateArgs()
54-
5553
// Initialize and start the client service.
5654
init(yarnConf)
5755
start()

yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala

Lines changed: 36 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,14 @@
1717

1818
package org.apache.spark.deploy.yarn
1919

20-
import scala.collection.mutable.{ArrayBuffer, HashMap}
20+
import scala.collection.mutable.ArrayBuffer
2121

2222
import org.apache.spark.SparkConf
23-
import org.apache.spark.scheduler.InputFormatInfo
2423
import org.apache.spark.util.{Utils, IntParam, MemoryParam}
2524

2625

2726
// TODO: Add code and support for ensuring that yarn resource 'tasks' are location aware !
28-
class ClientArguments(val args: Array[String], val sparkConf: SparkConf) {
27+
private[spark] class ClientArguments(val args: Array[String], val sparkConf: SparkConf) {
2928
var addJars: String = null
3029
var files: String = null
3130
var archives: String = null
@@ -35,28 +34,48 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) {
3534
var executorMemory = 1024 // MB
3635
var executorCores = 1
3736
var numExecutors = 2
38-
var amQueue = sparkConf.get("QUEUE", "default")
37+
var amQueue = sparkConf.get("spark.yarn.queue", "default")
3938
var amMemory: Int = 512 // MB
4039
var appName: String = "Spark"
4140
var priority = 0
4241

43-
parseArgs(args.toList)
42+
// Additional memory to allocate to containers
43+
val memoryOverhead = sparkConf.getInt(
44+
"spark.yarn.driver.memoryOverhead", YarnSparkHadoopUtil.DEFAULT_MEMORY_OVERHEAD)
4445

45-
// env variable SPARK_YARN_DIST_ARCHIVES/SPARK_YARN_DIST_FILES set in yarn-client then
46-
// it should default to hdfs://
47-
files = Option(files).getOrElse(sys.env.get("SPARK_YARN_DIST_FILES").orNull)
48-
archives = Option(archives).getOrElse(sys.env.get("SPARK_YARN_DIST_ARCHIVES").orNull)
46+
parseArgs(args.toList)
47+
loadDefaultArgs()
48+
validateArgs()
49+
50+
private def loadDefaultArgs(): Unit = {
51+
// For backward compatibility, SPARK_YARN_DIST_{ARCHIVES/FILES} should be resolved to hdfs://,
52+
// while spark.yarn.dist.{archives/files} should be resolved to file:// (SPARK-2051).
53+
files = Option(files).orElse(sys.env.get("SPARK_YARN_DIST_FILES")).orNull
54+
files = Option(files)
55+
.orElse(sparkConf.getOption("spark.yarn.dist.files"))
56+
.map(p => Utils.resolveURIs(p))
57+
.orNull
58+
archives = Option(archives).orElse(sys.env.get("SPARK_YARN_DIST_ARCHIVES")).orNull
59+
archives = Option(archives)
60+
.orElse(sparkConf.getOption("spark.yarn.dist.archives"))
61+
.map(p => Utils.resolveURIs(p))
62+
.orNull
63+
}
4964

50-
// spark.yarn.dist.archives/spark.yarn.dist.files defaults to use file:// if not specified,
51-
// for both yarn-client and yarn-cluster
52-
files = Option(files).getOrElse(sparkConf.getOption("spark.yarn.dist.files").
53-
map(p => Utils.resolveURIs(p)).orNull)
54-
archives = Option(archives).getOrElse(sparkConf.getOption("spark.yarn.dist.archives").
55-
map(p => Utils.resolveURIs(p)).orNull)
65+
private def validateArgs(): Unit = {
66+
Map[Boolean, String](
67+
(numExecutors <= 0) -> "You must specify at least 1 executor!",
68+
(amMemory <= memoryOverhead) -> s"AM memory must be > $memoryOverhead MB",
69+
(executorMemory <= memoryOverhead) -> s"Executor memory must be > $memoryOverhead MB"
70+
).foreach { case (errorCondition, errorMessage) =>
71+
if (errorCondition) {
72+
throw new IllegalArgumentException(errorMessage + "\n" + getUsageMessage())
73+
}
74+
}
75+
}
5676

5777
private def parseArgs(inputArgs: List[String]): Unit = {
5878
val userArgsBuffer: ArrayBuffer[String] = new ArrayBuffer[String]()
59-
6079
var args = inputArgs
6180

6281
while (!args.isEmpty) {
@@ -138,10 +157,8 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) {
138157
userArgs = userArgsBuffer.readOnly
139158
}
140159

141-
142-
def getUsageMessage(unknownParam: Any = null): String = {
160+
private def getUsageMessage(unknownParam: List[String] = null): String = {
143161
val message = if (unknownParam != null) s"Unknown/unsupported param $unknownParam\n" else ""
144-
145162
message +
146163
"Usage: org.apache.spark.deploy.yarn.Client [options] \n" +
147164
"Options:\n" +

0 commit comments

Comments
 (0)