Skip to content

Commit 9cdff16

Browse files
committed
review commit
1 parent 8bc2f4b commit 9cdff16

File tree

2 files changed

+14
-16
lines changed

2 files changed

+14
-16
lines changed

core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,8 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
104104
totalExecutorCores = Option(totalExecutorCores)
105105
.getOrElse(defaultProperties.get("spark.cores.max").orNull)
106106
name = Option(name).getOrElse(defaultProperties.get("spark.app.name").orNull)
107-
jars = Option(jars).getOrElse(defaultProperties.get("spark.jars").orNull)
107+
jars = Option(jars).getOrElse(defaultProperties.get("spark.jars").
108+
map(p => Utils.resolveURIs(p)).orNull)
108109

109110
// This supports env vars in older versions of Spark
110111
master = Option(master).getOrElse(System.getenv("MASTER"))
@@ -131,6 +132,13 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
131132
if (name == null && primaryResource != null) {
132133
name = Utils.stripDirectory(primaryResource)
133134
}
135+
136+
if (master.startsWith("yarn")) {
137+
archives = Option(archives).getOrElse(defaultProperties
138+
.get("spark.yarn.dist.archives").map(p => Utils.resolveURIs(p)).orNull)
139+
files = Option(files).getOrElse(defaultProperties
140+
.get("spark.yarn.dist.files").map(p => Utils.resolveURIs(p)).orNull)
141+
}
134142
}
135143

136144
/** Ensure that required fields exists. Call this only once all defaults are loaded. */

yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -219,21 +219,10 @@ trait ClientBase extends Logging {
219219
}
220220
}
221221

222-
def getArg(arg: String, envVar: String, sysProp: String): String = {
223-
if (arg != null && !arg.isEmpty) {
224-
arg
225-
} else if (System.getenv(envVar) != null && !System.getenv(envVar).isEmpty) {
226-
System.getenv(envVar)
227-
} else {
228-
sparkConf.getOption(sysProp).orNull
229-
}
230-
}
231-
var cachedSecondaryJarLinks = ListBuffer.empty[String]
232-
val fileLists = List((args.addJars, LocalResourceType.FILE, true),
233-
(getArg(args.files, "SPARK_YARN_DIST_FILES", "spark.yarn.dist.files"),
234-
LocalResourceType.FILE, false),
235-
(getArg(args.archives, "SPARK_YARN_DIST_ARCHIVES", "spark.yarn.dist.archives"),
236-
LocalResourceType.ARCHIVE, false))
222+
val cachedSecondaryJarLinks = ListBuffer.empty[String]
223+
val fileLists = List( (args.addJars, LocalResourceType.FILE, true),
224+
(args.files, LocalResourceType.FILE, false),
225+
(args.archives, LocalResourceType.ARCHIVE, false) )
237226
fileLists.foreach { case (flist, resType, addToClasspath) =>
238227
if (flist != null && !flist.isEmpty()) {
239228
flist.split(',').foreach { case file: String =>
@@ -251,6 +240,7 @@ trait ClientBase extends Logging {
251240
}
252241
}
253242
}
243+
logInfo("Prepared Local resources " + localResources)
254244
sparkConf.set(ClientBase.CONF_SPARK_YARN_SECONDARY_JARS, cachedSecondaryJarLinks.mkString(","))
255245

256246
UserGroupInformation.getCurrentUser().addCredentials(credentials)

0 commit comments

Comments
 (0)