Skip to content

Commit dba3140

Browse files
mengxrtdas
authored andcommitted
[SPARK-1870] Make spark-submit --jars work in yarn-cluster mode.
Sent secondary jars to distributed cache of all containers and add the cached jars to classpath before executors start. Tested on a YARN cluster (CDH-5.0). `spark-submit --jars` also works in standalone server and `yarn-client`. Thanks for @andrewor14 for testing! I removed "Doesn't work for drivers in standalone mode with "cluster" deploy mode." from `spark-submit`'s help message, though we haven't tested mesos yet. CC: @dbtsai @sryza Author: Xiangrui Meng <[email protected]> Closes #848 from mengxr/yarn-classpath and squashes the following commits: 23e7df4 [Xiangrui Meng] rename spark.jar to __spark__.jar and app.jar to __app__.jar to avoid confliction apped $CWD/ and $CWD/* to the classpath remove unused methods a40f6ed [Xiangrui Meng] standalone -> cluster 65e04ad [Xiangrui Meng] update spark-submit help message and add a comment for yarn-client 11e5354 [Xiangrui Meng] minor changes 3e7e1c4 [Xiangrui Meng] use sparkConf instead of hadoop conf dc3c825 [Xiangrui Meng] add secondary jars to classpath in yarn
1 parent 2a948e7 commit dba3140

File tree

3 files changed

+19
-55
lines changed

3 files changed

+19
-55
lines changed

core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -326,8 +326,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
326326
| --class CLASS_NAME Your application's main class (for Java / Scala apps).
327327
| --name NAME A name of your application.
328328
| --jars JARS Comma-separated list of local jars to include on the driver
329-
| and executor classpaths. Doesn't work for drivers in
330-
| standalone mode with "cluster" deploy mode.
329+
| and executor classpaths.
331330
| --py-files PY_FILES Comma-separated list of .zip or .egg files to place on the
332331
| PYTHONPATH for Python apps.
333332
| --files FILES Comma-separated list of files to be placed in the working

yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala

Lines changed: 17 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ import org.apache.spark.{Logging, SparkConf, SparkContext}
4444
* Client submits an application to the YARN ResourceManager.
4545
*
4646
* Depending on the deployment mode this will launch one of two application master classes:
47-
* 1. In standalone mode, it will launch an [[org.apache.spark.deploy.yarn.ApplicationMaster]]
47+
* 1. In cluster mode, it will launch an [[org.apache.spark.deploy.yarn.ApplicationMaster]]
4848
* which launches a driver program inside of the cluster.
4949
* 2. In client mode, it will launch an [[org.apache.spark.deploy.yarn.ExecutorLauncher]] to
5050
* request executors on behalf of a driver running outside of the cluster.
@@ -220,10 +220,11 @@ trait ClientBase extends Logging {
220220
}
221221
}
222222

223+
var cachedSecondaryJarLinks = ListBuffer.empty[String]
223224
val fileLists = List( (args.addJars, LocalResourceType.FILE, true),
224225
(args.files, LocalResourceType.FILE, false),
225226
(args.archives, LocalResourceType.ARCHIVE, false) )
226-
fileLists.foreach { case (flist, resType, appMasterOnly) =>
227+
fileLists.foreach { case (flist, resType, addToClasspath) =>
227228
if (flist != null && !flist.isEmpty()) {
228229
flist.split(',').foreach { case file: String =>
229230
val localURI = new URI(file.trim())
@@ -232,11 +233,15 @@ trait ClientBase extends Logging {
232233
val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
233234
val destPath = copyRemoteFile(dst, localPath, replication)
234235
distCacheMgr.addResource(fs, conf, destPath, localResources, resType,
235-
linkname, statCache, appMasterOnly)
236+
linkname, statCache)
237+
if (addToClasspath) {
238+
cachedSecondaryJarLinks += linkname
239+
}
236240
}
237241
}
238242
}
239243
}
244+
sparkConf.set(ClientBase.CONF_SPARK_YARN_SECONDARY_JARS, cachedSecondaryJarLinks.mkString(","))
240245

241246
UserGroupInformation.getCurrentUser().addCredentials(credentials)
242247
localResources
@@ -374,11 +379,12 @@ trait ClientBase extends Logging {
374379
}
375380

376381
object ClientBase {
377-
val SPARK_JAR: String = "spark.jar"
378-
val APP_JAR: String = "app.jar"
382+
val SPARK_JAR: String = "__spark__.jar"
383+
val APP_JAR: String = "__app__.jar"
379384
val LOG4J_PROP: String = "log4j.properties"
380385
val LOG4J_CONF_ENV_KEY: String = "SPARK_LOG4J_CONF"
381386
val LOCAL_SCHEME = "local"
387+
val CONF_SPARK_YARN_SECONDARY_JARS = "spark.yarn.secondary.jars"
382388

383389
def getSparkJar = sys.env.get("SPARK_JAR").getOrElse(SparkContext.jarOfClass(this.getClass).head)
384390

@@ -479,66 +485,25 @@ object ClientBase {
479485

480486
extraClassPath.foreach(addClasspathEntry)
481487

482-
addClasspathEntry(Environment.PWD.$())
488+
val cachedSecondaryJarLinks =
489+
sparkConf.getOption(CONF_SPARK_YARN_SECONDARY_JARS).getOrElse("").split(",")
483490
// Normally the users app.jar is last in case conflicts with spark jars
484491
if (sparkConf.get("spark.yarn.user.classpath.first", "false").toBoolean) {
485492
addPwdClasspathEntry(APP_JAR)
493+
cachedSecondaryJarLinks.foreach(addPwdClasspathEntry)
486494
addPwdClasspathEntry(SPARK_JAR)
487495
ClientBase.populateHadoopClasspath(conf, env)
488496
} else {
489497
addPwdClasspathEntry(SPARK_JAR)
490498
ClientBase.populateHadoopClasspath(conf, env)
491499
addPwdClasspathEntry(APP_JAR)
500+
cachedSecondaryJarLinks.foreach(addPwdClasspathEntry)
492501
}
502+
// Append all class files and jar files under the working directory to the classpath.
503+
addClasspathEntry(Environment.PWD.$())
493504
addPwdClasspathEntry("*")
494505
}
495506

496-
/**
497-
* Adds the user jars which have local: URIs (or alternate names, such as APP_JAR) explicitly
498-
* to the classpath.
499-
*/
500-
private def addUserClasspath(args: ClientArguments, env: HashMap[String, String]) = {
501-
if (args != null) {
502-
addClasspathEntry(args.userJar, APP_JAR, env)
503-
}
504-
505-
if (args != null && args.addJars != null) {
506-
args.addJars.split(",").foreach { case file: String =>
507-
addClasspathEntry(file, null, env)
508-
}
509-
}
510-
}
511-
512-
/**
513-
* Adds the given path to the classpath, handling "local:" URIs correctly.
514-
*
515-
* If an alternate name for the file is given, and it's not a "local:" file, the alternate
516-
* name will be added to the classpath (relative to the job's work directory).
517-
*
518-
* If not a "local:" file and no alternate name, the environment is not modified.
519-
*
520-
* @param path Path to add to classpath (optional).
521-
* @param fileName Alternate name for the file (optional).
522-
* @param env Map holding the environment variables.
523-
*/
524-
private def addClasspathEntry(path: String, fileName: String,
525-
env: HashMap[String, String]) : Unit = {
526-
if (path != null) {
527-
scala.util.control.Exception.ignoring(classOf[URISyntaxException]) {
528-
val localPath = getLocalPath(path)
529-
if (localPath != null) {
530-
YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, localPath,
531-
File.pathSeparator)
532-
return
533-
}
534-
}
535-
}
536-
if (fileName != null) {
537-
YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name,
538-
Environment.PWD.$() + Path.SEPARATOR + fileName, File.pathSeparator);
539-
}
540-
}
541-
542507
/**
543508
* Returns the local path if the URI is a "local:" URI, or null otherwise.
544509
*/

yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ private[spark] class YarnClientSchedulerBackend(
5252
val argsArrayBuf = new ArrayBuffer[String]()
5353
argsArrayBuf += (
5454
"--class", "notused",
55-
"--jar", null,
55+
"--jar", null, // The primary jar will be added dynamically in SparkContext.
5656
"--args", hostport,
5757
"--am-class", classOf[ExecutorLauncher].getName
5858
)

0 commit comments

Comments
 (0)