Skip to content

Commit 07f1c54

Browse files
Marcelo VanzinTom Graves
authored andcommitted
[SPARK-13577][YARN] Allow Spark jar to be multiple jars, archive.
In preparation for the demise of assemblies, this change allows the YARN backend to use multiple jars and globs as the "Spark jar". The config option has been renamed to "spark.yarn.jars" to reflect that. A second option "spark.yarn.archive" was also added; if set, this takes precedence and uploads an archive expected to contain the jar files with the Spark code and its dependencies. Existing deployments should keep working, mostly. This change drops support for the "SPARK_JAR" environment variable, and also does not fall back to using "jarOfClass" if no configuration is set, falling back to finding files under SPARK_HOME instead. This should be fine since "jarOfClass" probably wouldn't work unless you were using spark-submit anyway. Tested with the unit tests, and trying the different config options on a YARN cluster. Author: Marcelo Vanzin <[email protected]> Closes #11500 from vanzin/SPARK-13577.
1 parent 8fff0f9 commit 07f1c54

File tree

8 files changed

+227
-80
lines changed

8 files changed

+227
-80
lines changed

core/src/main/scala/org/apache/spark/SparkConf.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -656,7 +656,9 @@ private[spark] object SparkConf extends Logging {
656656
"spark.memory.offHeap.enabled" -> Seq(
657657
AlternateConfig("spark.unsafe.offHeap", "1.6")),
658658
"spark.rpc.message.maxSize" -> Seq(
659-
AlternateConfig("spark.akka.frameSize", "1.6"))
659+
AlternateConfig("spark.akka.frameSize", "1.6")),
660+
"spark.yarn.jars" -> Seq(
661+
AlternateConfig("spark.yarn.jar", "2.0"))
660662
)
661663

662664
/**

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -626,7 +626,6 @@ object SparkSubmit {
626626
val pathConfigs = Seq(
627627
"spark.jars",
628628
"spark.files",
629-
"spark.yarn.jar",
630629
"spark.yarn.dist.files",
631630
"spark.yarn.dist.archives")
632631
pathConfigs.foreach { config =>

docs/running-on-yarn.md

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -272,14 +272,25 @@ If you need a reference to the proper location to put log files in the YARN so t
272272
</td>
273273
</tr>
274274
<tr>
275-
<td><code>spark.yarn.jar</code></td>
275+
<td><code>spark.yarn.jars</code></td>
276276
<td>(none)</td>
277277
<td>
278-
The location of the Spark jar file, in case overriding the default location is desired.
279-
By default, Spark on YARN will use a Spark jar installed locally, but the Spark jar can also be
278+
List of libraries containing Spark code to distribute to YARN containers.
279+
By default, Spark on YARN will use Spark jars installed locally, but the Spark jars can also be
280280
in a world-readable location on HDFS. This allows YARN to cache it on nodes so that it doesn't
281-
need to be distributed each time an application runs. To point to a jar on HDFS, for example,
282-
set this configuration to <code>hdfs:///some/path</code>.
281+
need to be distributed each time an application runs. To point to jars on HDFS, for example,
282+
set this configuration to <code>hdfs:///some/path</code>. Globs are allowed.
283+
</td>
284+
</tr>
285+
<tr>
286+
<td><code>spark.yarn.archive</code></td>
287+
<td>(none)</td>
288+
<td>
289+
An archive containing needed Spark jars for distribution to the YARN cache. If set, this
290+
configuration replaces <code>spark.yarn.jars</code> and the archive is used in all the
291+
application's containers. The archive should contain jar files in its root directory.
292+
Like with the previous option, the archive can also be hosted on HDFS to speed up file
293+
distribution.
283294
</td>
284295
</tr>
285296
<tr>
@@ -288,8 +299,8 @@ If you need a reference to the proper location to put log files in the YARN so t
288299
<td>
289300
A comma-separated list of secure HDFS namenodes your Spark application is going to access. For
290301
example, <code>spark.yarn.access.namenodes=hdfs://nn1.com:8032,hdfs://nn2.com:8032,
291-
webhdfs://nn3.com:50070</code>. The Spark application must have access to the namenodes listed
292-
and Kerberos must be properly configured to be able to access them (either in the same realm
302+
webhdfs://nn3.com:50070</code>. The Spark application must have access to the namenodes listed
303+
and Kerberos must be properly configured to be able to access them (either in the same realm
293304
or in a trusted realm). Spark acquires security tokens for each of the namenodes so that
294305
the Spark application can access those remote HDFS clusters.
295306
</td>

yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala

Lines changed: 78 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -423,16 +423,71 @@ private[spark] class Client(
423423
}
424424

425425
/**
426-
* Copy the given main resource to the distributed cache if the scheme is not "local".
426+
* Add Spark to the cache. There are two settings that control what files to add to the cache:
427+
* - if a Spark archive is defined, use the archive. The archive is expected to contain
428+
* jar files at its root directory.
429+
* - if a list of jars is provided, filter the non-local ones, resolve globs, and
430+
* add the found files to the cache.
431+
*
432+
* Note that the archive cannot be a "local" URI. If none of the above settings are found,
433+
* then upload all files found in $SPARK_HOME/jars.
434+
*
435+
* TODO: currently the code looks in $SPARK_HOME/lib while the work to replace assemblies
436+
* with a directory full of jars is ongoing.
437+
*/
438+
val sparkArchive = sparkConf.get(SPARK_ARCHIVE)
439+
if (sparkArchive.isDefined) {
440+
val archive = sparkArchive.get
441+
require(!isLocalUri(archive), s"${SPARK_ARCHIVE.key} cannot be a local URI.")
442+
distribute(Utils.resolveURI(archive).toString,
443+
resType = LocalResourceType.ARCHIVE,
444+
destName = Some(LOCALIZED_LIB_DIR))
445+
} else {
446+
sparkConf.get(SPARK_JARS) match {
447+
case Some(jars) =>
448+
// Break the list of jars to upload, and resolve globs.
449+
val localJars = new ArrayBuffer[String]()
450+
jars.foreach { jar =>
451+
if (!isLocalUri(jar)) {
452+
val path = getQualifiedLocalPath(Utils.resolveURI(jar), hadoopConf)
453+
val pathFs = FileSystem.get(path.toUri(), hadoopConf)
454+
pathFs.globStatus(path).filter(_.isFile()).foreach { entry =>
455+
distribute(entry.getPath().toUri().toString(),
456+
targetDir = Some(LOCALIZED_LIB_DIR))
457+
}
458+
} else {
459+
localJars += jar
460+
}
461+
}
462+
463+
// Propagate the local URIs to the containers using the configuration.
464+
sparkConf.set(SPARK_JARS, localJars)
465+
466+
case None =>
467+
// No configuration, so fall back to uploading local jar files.
468+
logWarning(s"Neither ${SPARK_JARS.key} nor ${SPARK_ARCHIVE.key} is set, falling back " +
469+
"to uploading libraries under SPARK_HOME.")
470+
val jarsDir = new File(sparkConf.getenv("SPARK_HOME"), "lib")
471+
if (jarsDir.isDirectory()) {
472+
jarsDir.listFiles().foreach { f =>
473+
if (f.isFile() && f.getName().toLowerCase().endsWith(".jar")) {
474+
distribute(f.getAbsolutePath(), targetDir = Some(LOCALIZED_LIB_DIR))
475+
}
476+
}
477+
}
478+
}
479+
}
480+
481+
/**
482+
* Copy a few resources to the distributed cache if their scheme is not "local".
427483
* Otherwise, set the corresponding key in our SparkConf to handle it downstream.
428484
* Each resource is represented by a 3-tuple of:
429485
* (1) destination resource name,
430486
* (2) local path to the resource,
431487
* (3) Spark property key to set if the scheme is not local
432488
*/
433489
List(
434-
(SPARK_JAR_NAME, sparkJar(sparkConf), SPARK_JAR.key),
435-
(APP_JAR_NAME, args.userJar, APP_JAR.key),
490+
(APP_JAR_NAME, args.userJar, APP_JAR),
436491
("log4j.properties", oldLog4jConf.orNull, null)
437492
).foreach { case (destName, path, confKey) =>
438493
if (path != null && !path.trim().isEmpty()) {
@@ -1062,8 +1117,7 @@ object Client extends Logging {
10621117
new Client(args, sparkConf).run()
10631118
}
10641119

1065-
// Alias for the Spark assembly jar and the user jar
1066-
val SPARK_JAR_NAME: String = "__spark__.jar"
1120+
// Alias for the user jar
10671121
val APP_JAR_NAME: String = "__app__.jar"
10681122

10691123
// URI scheme that identifies local resources
@@ -1072,8 +1126,6 @@ object Client extends Logging {
10721126
// Staging directory for any temporary jars or files
10731127
val SPARK_STAGING: String = ".sparkStaging"
10741128

1075-
// Location of any user-defined Spark jars
1076-
val ENV_SPARK_JAR = "SPARK_JAR"
10771129

10781130
// Staging directory is private! -> rwx--------
10791131
val STAGING_DIR_PERMISSION: FsPermission =
@@ -1095,28 +1147,8 @@ object Client extends Logging {
10951147
// Subdirectory where the user's python files (not archives) will be placed.
10961148
val LOCALIZED_PYTHON_DIR = "__pyfiles__"
10971149

1098-
/**
1099-
* Find the user-defined Spark jar if configured, or return the jar containing this
1100-
* class if not.
1101-
*
1102-
* This method first looks in the SparkConf object for the spark.yarn.jar key, and in the
1103-
* user environment if that is not found (for backwards compatibility).
1104-
*/
1105-
private def sparkJar(conf: SparkConf): String = {
1106-
conf.get(SPARK_JAR).getOrElse(
1107-
if (System.getenv(ENV_SPARK_JAR) != null) {
1108-
logWarning(
1109-
s"$ENV_SPARK_JAR detected in the system environment. This variable has been deprecated " +
1110-
s"in favor of the ${SPARK_JAR.key} configuration variable.")
1111-
System.getenv(ENV_SPARK_JAR)
1112-
} else {
1113-
SparkContext.jarOfClass(this.getClass).getOrElse(throw new SparkException("Could not "
1114-
+ "find jar containing Spark classes. The jar can be defined using the "
1115-
+ s"${SPARK_JAR.key} configuration option. If testing Spark, either set that option "
1116-
+ "or make sure SPARK_PREPEND_CLASSES is not set."))
1117-
}
1118-
)
1119-
}
1150+
// Subdirectory where Spark libraries will be placed.
1151+
val LOCALIZED_LIB_DIR = "__spark_libs__"
11201152

11211153
/**
11221154
* Return the path to the given application's staging directory.
@@ -1236,7 +1268,18 @@ object Client extends Logging {
12361268
addFileToClasspath(sparkConf, conf, x, null, env)
12371269
}
12381270
}
1239-
addFileToClasspath(sparkConf, conf, new URI(sparkJar(sparkConf)), SPARK_JAR_NAME, env)
1271+
1272+
// Add the Spark jars to the classpath, depending on how they were distributed.
1273+
addClasspathEntry(buildPath(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD),
1274+
LOCALIZED_LIB_DIR, "*"), env)
1275+
if (!sparkConf.get(SPARK_ARCHIVE).isDefined) {
1276+
sparkConf.get(SPARK_JARS).foreach { jars =>
1277+
jars.filter(isLocalUri).foreach { jar =>
1278+
addClasspathEntry(getClusterPath(sparkConf, jar), env)
1279+
}
1280+
}
1281+
}
1282+
12401283
populateHadoopClasspath(conf, env)
12411284
sys.env.get(ENV_DIST_CLASSPATH).foreach { cp =>
12421285
addClasspathEntry(getClusterPath(sparkConf, cp), env)
@@ -1392,4 +1435,9 @@ object Client extends Logging {
13921435
components.mkString(Path.SEPARATOR)
13931436
}
13941437

1438+
/** Returns whether the URI is a "local:" URI. */
1439+
def isLocalUri(uri: String): Boolean = {
1440+
uri.startsWith(s"$LOCAL_SCHEME:")
1441+
}
1442+
13951443
}

yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,11 +72,17 @@ package object config {
7272

7373
/* File distribution. */
7474

75-
private[spark] val SPARK_JAR = ConfigBuilder("spark.yarn.jar")
76-
.doc("Location of the Spark jar to use.")
75+
private[spark] val SPARK_ARCHIVE = ConfigBuilder("spark.yarn.archive")
76+
.doc("Location of archive containing jars files with Spark classes.")
7777
.stringConf
7878
.optional
7979

80+
private[spark] val SPARK_JARS = ConfigBuilder("spark.yarn.jars")
81+
.doc("Location of jars containing Spark classes.")
82+
.stringConf
83+
.toSequence
84+
.optional
85+
8086
private[spark] val ARCHIVES_TO_DISTRIBUTE = ConfigBuilder("spark.yarn.dist.archives")
8187
.stringConf
8288
.optional

yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import org.scalatest.{BeforeAndAfterAll, Matchers}
3434
import org.scalatest.concurrent.Eventually._
3535

3636
import org.apache.spark._
37+
import org.apache.spark.deploy.yarn.config._
3738
import org.apache.spark.launcher._
3839
import org.apache.spark.util.Utils
3940

@@ -202,7 +203,7 @@ abstract class BaseYarnClusterSuite
202203
extraClassPath: Seq[String] = Nil,
203204
extraConf: Map[String, String] = Map()): String = {
204205
val props = new Properties()
205-
props.put("spark.yarn.jar", "local:" + fakeSparkJar.getAbsolutePath())
206+
props.put(SPARK_JARS.key, "local:" + fakeSparkJar.getAbsolutePath())
206207

207208
val testClasspath = new TestClasspathBuilder()
208209
.buildClassPath(

0 commit comments

Comments
 (0)