From 9b2b612cc409d40b281567ef8b02d878d841af49 Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Fri, 9 Oct 2015 19:48:47 +0800 Subject: [PATCH 1/6] init commit --- .../org/apache/spark/deploy/yarn/Client.scala | 22 +++++++++++++------ 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index eb3b7fb885087..9bb424ae70654 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -537,6 +537,7 @@ private[spark] class Client( val env = new HashMap[String, String]() val extraCp = sparkConf.getOption("spark.driver.extraClassPath") populateClasspath(args, yarnConf, sparkConf, env, true, extraCp) + logInfo("Environment.CLASSPATH: " + env.get(Environment.CLASSPATH.name)) env("SPARK_YARN_MODE") = "true" env("SPARK_YARN_STAGING_DIR") = stagingDir env("SPARK_USER") = UserGroupInformation.getCurrentUser().getShortUserName() @@ -1164,7 +1165,7 @@ object Client extends Logging { } else { getMainJarUri(sparkConf.getOption(CONF_SPARK_USER_JAR)) } - mainJar.foreach(addFileToClasspath(sparkConf, _, APP_JAR, env)) + mainJar.foreach(addFileToClasspath(sparkConf, conf, _, APP_JAR, env)) val secondaryJars = if (args != null) { @@ -1173,10 +1174,10 @@ object Client extends Logging { getSecondaryJarUris(sparkConf.getOption(CONF_SPARK_YARN_SECONDARY_JARS)) } secondaryJars.foreach { x => - addFileToClasspath(sparkConf, x, null, env) + addFileToClasspath(sparkConf, conf, x, null, env) } } - addFileToClasspath(sparkConf, new URI(sparkJar(sparkConf)), SPARK_JAR, env) + addFileToClasspath(sparkConf, conf, new URI(sparkJar(sparkConf)), SPARK_JAR, env) populateHadoopClasspath(conf, env) sys.env.get(ENV_DIST_CLASSPATH).foreach { cp => addClasspathEntry(getClusterPath(sparkConf, cp), env) @@ -1213,13 +1214,15 @@ object Client extends Logging { * * If not a "local:" file and no alternate name, the environment is not modified. * - * @param conf Spark configuration. - * @param uri URI to add to classpath (optional). - * @param fileName Alternate name for the file (optional). - * @param env Map holding the environment variables. + * @param conf Spark configuration. + * @param hadoopConf Hadoop configuration. + * @param uri URI to add to classpath (optional). + * @param fileName Alternate name for the file (optional). + * @param env Map holding the environment variables. */ private def addFileToClasspath( conf: SparkConf, + hadoopConf: Configuration, uri: URI, fileName: String, env: HashMap[String, String]): Unit = { @@ -1228,6 +1231,11 @@ object Client extends Logging { } else if (fileName != null) { addClasspathEntry(buildPath( YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), fileName), env) + } else { + val localPath = getQualifiedLocalPath(localURI, hadoopConf) + val linkName = Option(localURI.getFragment()).getOrElse(localPath.getName()) + addClasspathEntry(buildPath( + YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), linkName), env) } } From 3023e1e3741449e42a0211255b7e63c7d8b84e26 Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Fri, 9 Oct 2015 19:53:12 +0800 Subject: [PATCH 2/6] update comments --- yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 9bb424ae70654..036d23e57b9fa 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -1212,7 +1212,7 @@ object Client extends Logging { * If an alternate name for the file is given, and it's not a "local:" file, the alternate * name will be added to the classpath (relative to the job's work directory). * - * If not a "local:" file and no alternate name, the environment is not modified. + * If not a "local:" file and no alternate name, the linkName will be added to the classpath. * * @param conf Spark configuration. * @param hadoopConf Hadoop configuration. From 4c5290a3a44665eb60bee605ee79591997b1a6d4 Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Fri, 9 Oct 2015 22:46:44 +0800 Subject: [PATCH 3/6] remove debug info --- yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 036d23e57b9fa..68f283ec3da35 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -537,7 +537,6 @@ private[spark] class Client( val env = new HashMap[String, String]() val extraCp = sparkConf.getOption("spark.driver.extraClassPath") populateClasspath(args, yarnConf, sparkConf, env, true, extraCp) - logInfo("Environment.CLASSPATH: " + env.get(Environment.CLASSPATH.name)) env("SPARK_YARN_MODE") = "true" env("SPARK_YARN_STAGING_DIR") = stagingDir env("SPARK_USER") = UserGroupInformation.getCurrentUser().getShortUserName() @@ -1165,7 +1164,7 @@ object Client extends Logging { } else { getMainJarUri(sparkConf.getOption(CONF_SPARK_USER_JAR)) } - mainJar.foreach(addFileToClasspath(sparkConf, conf, _, APP_JAR, env)) + mainJar.foreach(addFileToClasspath(sparkConf, conf, _, APP_JAR, env)) val secondaryJars = if (args != null) { From 812e169c2294b7c82f09de09e8cbd3a4fff27518 Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Sat, 10 Oct 2015 22:46:30 +0800 Subject: [PATCH 4/6] fix minor bug --- yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 68f283ec3da35..5983ed38201a0 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -1231,8 +1231,8 @@ object Client extends Logging { addClasspathEntry(buildPath( YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), fileName), env) } else { - val localPath = getQualifiedLocalPath(localURI, hadoopConf) - val linkName = Option(localURI.getFragment()).getOrElse(localPath.getName()) + val localPath = getQualifiedLocalPath(uri, hadoopConf) + val linkName = Option(uri.getFragment()).getOrElse(localPath.getName()) addClasspathEntry(buildPath( YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), linkName), env) } From 6f395d62684f8421faea0ffba7f575a6e24b7bdd Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Sat, 10 Oct 2015 22:48:05 +0800 Subject: [PATCH 5/6] fix minor bug --- yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 5983ed38201a0..efe0cd0fe570f 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -1230,7 +1230,7 @@ object Client extends Logging { } else if (fileName != null) { addClasspathEntry(buildPath( YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), fileName), env) - } else { + } else if (uri != null){ val localPath = getQualifiedLocalPath(uri, hadoopConf) val linkName = Option(uri.getFragment()).getOrElse(localPath.getName()) addClasspathEntry(buildPath( From d4049a7d71b94086d9c2b2ce95a31489eed07a6a Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Tue, 13 Oct 2015 20:01:55 +0800 Subject: [PATCH 6/6] fix minor nit --- yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index efe0cd0fe570f..6056786d40f5e 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -1230,7 +1230,7 @@ object Client extends Logging { } else if (fileName != null) { addClasspathEntry(buildPath( YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), fileName), env) - } else if (uri != null){ + } else if (uri != null) { val localPath = getQualifiedLocalPath(uri, hadoopConf) val linkName = Option(uri.getFragment()).getOrElse(localPath.getName()) addClasspathEntry(buildPath(