From 08f93822de809946e565793adab7c30c8c8c3430 Mon Sep 17 00:00:00 2001 From: Tom Graves Date: Thu, 1 Oct 2015 13:29:47 -0500 Subject: [PATCH 1/5] [SPARK-10901] spark.yarn.user.classpath.first doesn't work --- yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 8c53c24a79c4..4d975ce77c02 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -1162,6 +1162,8 @@ object Client extends Logging { userClassPath.foreach { x => addFileToClasspath(sparkConf, x, null, env) } + // add the users application jar if its distributed through dist cache + addFileToClasspath(sparkConf, null, APP_JAR, env) } addFileToClasspath(sparkConf, new URI(sparkJar(sparkConf)), SPARK_JAR, env) populateHadoopClasspath(conf, env) From b860226c563fc86a84d2ed80353c6502852829f9 Mon Sep 17 00:00:00 2001 From: Thomas Graves Date: Mon, 5 Oct 2015 17:30:12 +0000 Subject: [PATCH 2/5] Rework --- .../org/apache/spark/deploy/yarn/Client.scala | 36 ++++++++++++++----- 1 file changed, 28 insertions(+), 8 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 4d975ce77c02..8ffe329f8a73 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -1153,17 +1153,26 @@ object Client extends Logging { } if (sparkConf.getBoolean("spark.yarn.user.classpath.first", false)) { - val userClassPath = + // in order to properly add the app jar when user classpath is first + // we have to do the mainJar separate in order to send the right thing + // into addFileToClasspath + val mainJar = if (args != null) { - getUserClasspath(Option(args.userJar), Option(args.addJars)) + getMainJarUri(Option(args.userJar)) } else { - getUserClasspath(sparkConf) + getMainJarUri(sparkConf.getOption(CONF_SPARK_USER_JAR)) } - userClassPath.foreach { x => + mainJar.foreach(addFileToClasspath(sparkConf, _, APP_JAR, env)) + + val secondaryJars = + if (args != null) { + getSecondaryJarUris(Option(args.addJars)) + } else { + getSecondaryJarUris(sparkConf.getOption(CONF_SPARK_YARN_SECONDARY_JARS)) + } + secondaryJars.foreach { x => addFileToClasspath(sparkConf, x, null, env) } - // add the users application jar if its distributed through dist cache - addFileToClasspath(sparkConf, null, APP_JAR, env) } addFileToClasspath(sparkConf, new URI(sparkJar(sparkConf)), SPARK_JAR, env) populateHadoopClasspath(conf, env) @@ -1185,11 +1194,22 @@ object Client extends Logging { private def getUserClasspath( mainJar: Option[String], secondaryJars: Option[String]): Array[URI] = { - val mainUri = mainJar.orElse(Some(APP_JAR)).map(new URI(_)) - val secondaryUris = secondaryJars.map(_.split(",")).toSeq.flatten.map(new URI(_)) + val mainUri = getMainJarUri(mainJar) + val secondaryUris = getSecondaryJarUris(secondaryJars) (mainUri ++ secondaryUris).toArray } + private def getMainJarUri(mainJar: Option[String]): Option[URI] = { + mainJar.flatMap { path => + val uri = new URI(path) + if (uri.getScheme == LOCAL_SCHEME) Some(uri) else None + }.orElse(Some(new URI(APP_JAR))) + } + + private def getSecondaryJarUris(secondaryJars: Option[String]): Seq[URI] = { + secondaryJars.map(_.split(",")).toSeq.flatten.map(new URI(_)) + } + /** * Adds the given path to the classpath, handling "local:" URIs correctly. * From cd1bae06853c2a5ddfe8b6e98c7e6db4ccc45fb8 Mon Sep 17 00:00:00 2001 From: Thomas Graves Date: Mon, 5 Oct 2015 18:07:55 +0000 Subject: [PATCH 3/5] combine getUserClasspath functions --- .../scala/org/apache/spark/deploy/yarn/Client.scala | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 8ffe329f8a73..ec64f09bf43d 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -1187,15 +1187,8 @@ object Client extends Logging { * @param conf Spark configuration. */ def getUserClasspath(conf: SparkConf): Array[URI] = { - getUserClasspath(conf.getOption(CONF_SPARK_USER_JAR), - conf.getOption(CONF_SPARK_YARN_SECONDARY_JARS)) - } - - private def getUserClasspath( - mainJar: Option[String], - secondaryJars: Option[String]): Array[URI] = { - val mainUri = getMainJarUri(mainJar) - val secondaryUris = getSecondaryJarUris(secondaryJars) + val mainUri = getMainJarUri(conf.getOption(CONF_SPARK_USER_JAR)) + val secondaryUris = getSecondaryJarUris(conf.getOption(CONF_SPARK_YARN_SECONDARY_JARS)) (mainUri ++ secondaryUris).toArray } From e627adc4c5e239ff52ec9c1e33fe57dfe8294b0d Mon Sep 17 00:00:00 2001 From: Thomas Graves Date: Mon, 5 Oct 2015 18:09:50 +0000 Subject: [PATCH 4/5] fix scalastyle --- yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index ec64f09bf43d..491853af3779 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -1156,7 +1156,7 @@ object Client extends Logging { // in order to properly add the app jar when user classpath is first // we have to do the mainJar separate in order to send the right thing // into addFileToClasspath - val mainJar = + val mainJar = if (args != null) { getMainJarUri(Option(args.userJar)) } else { @@ -1193,7 +1193,7 @@ object Client extends Logging { } private def getMainJarUri(mainJar: Option[String]): Option[URI] = { - mainJar.flatMap { path => + mainJar.flatMap { path = val uri = new URI(path) if (uri.getScheme == LOCAL_SCHEME) Some(uri) else None }.orElse(Some(new URI(APP_JAR))) From ac3ffcfca047804a2cbdea47064a6234ae6f9fac Mon Sep 17 00:00:00 2001 From: Thomas Graves Date: Mon, 5 Oct 2015 18:25:02 +0000 Subject: [PATCH 5/5] scalastyle again --- yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 491853af3779..589d3238db54 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -1193,10 +1193,10 @@ object Client extends Logging { } private def getMainJarUri(mainJar: Option[String]): Option[URI] = { - mainJar.flatMap { path = + mainJar.flatMap { path => val uri = new URI(path) if (uri.getScheme == LOCAL_SCHEME) Some(uri) else None - }.orElse(Some(new URI(APP_JAR))) + }.orElse(Some(new URI(APP_JAR))) } private def getSecondaryJarUris(secondaryJars: Option[String]): Seq[URI] = {