From f72987cc7528e55f8ea43177630a14975b5ecf4f Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Fri, 17 Apr 2015 00:15:49 +0800 Subject: [PATCH 01/19] add archives path to PYTHONPATH --- .../main/scala/org/apache/spark/deploy/SparkSubmit.scala | 4 ++++ .../main/scala/org/apache/spark/deploy/yarn/Client.scala | 6 ++++++ .../org/apache/spark/deploy/yarn/ExecutorRunnable.scala | 4 ++++ 3 files changed, 14 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 296a0764b8ba..6664eeb1c4fa 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -328,6 +328,10 @@ object SparkSubmit { } } + if (args.isPython && System.getenv("PYSPARK_ARCHIVES_PATH") != null) { + args.files = mergeFileLists(args.files, System.getenv("PYSPARK_ARCHIVES_PATH")) + } + // If we're running a R app, set the main class to our specific R runner if (args.isR && deployMode == CLIENT) { if (args.primaryResource == SPARKR_SHELL) { diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 1091ff54b046..71c7c6dd1146 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -326,6 +326,12 @@ private[spark] class Client( distCacheMgr.setDistFilesEnv(env) distCacheMgr.setDistArchivesEnv(env) + if (System.getenv("PYSPARK_ARCHIVES_PATH") != null) { + val pythonPath = System.getenv("PYSPARK_ARCHIVES_PATH").split(",").map( + p => (new Path(p)).getName).mkString(":") + env("PYTHONPATH") = pythonPath + } + // Pick up any environment variables for the AM provided through spark.yarn.appMasterEnv.* val amEnvPrefix = "spark.yarn.appMasterEnv." sparkConf.getAll diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index b06069c07f45..33770c828583 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -285,6 +285,10 @@ class ExecutorRunnable( YarnSparkHadoopUtil.addPathToEnvironment(env, key, value) } + if (System.getenv("PYTHONPATH") != null) { + env("PYTHONPATH") = System.getenv("PYTHONPATH") + } + // Keep this for backwards compatibility but users should move to the config sys.env.get("SPARK_YARN_USER_ENV").foreach { userEnvs => YarnSparkHadoopUtil.setEnvFromInputString(env, userEnvs) From 9f31dace1fc567db4c12705c9ab7c8fae56a30e5 Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Sun, 19 Apr 2015 22:22:45 +0800 Subject: [PATCH 02/19] update code and add comments --- .../main/scala/org/apache/spark/deploy/SparkSubmit.scala | 8 ++++++-- .../main/scala/org/apache/spark/deploy/yarn/Client.scala | 9 +++++++-- .../org/apache/spark/deploy/yarn/ExecutorRunnable.scala | 4 ---- 3 files changed, 13 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 6664eeb1c4fa..c668380432de 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -328,8 +328,12 @@ object SparkSubmit { } } - if (args.isPython && System.getenv("PYSPARK_ARCHIVES_PATH") != null) { - args.files = mergeFileLists(args.files, System.getenv("PYSPARK_ARCHIVES_PATH")) + // In yarn mode for a python app, if PYSPARK_ARCHIVES_PATH is in the user environment + // add pyspark archives to files that can be distributed with the job + if (args.isPython && clusterManager == YARN){ + sys.env.get("PYSPARK_ARCHIVES_PATH").map { archives => + args.files = mergeFileLists(args.files, Utils.resolveURIs(archives)) + } } // If we're running a R app, set the main class to our specific R runner diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 71c7c6dd1146..9ffdcd6a8afc 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -326,10 +326,15 @@ private[spark] class Client( distCacheMgr.setDistFilesEnv(env) distCacheMgr.setDistArchivesEnv(env) - if (System.getenv("PYSPARK_ARCHIVES_PATH") != null) { - val pythonPath = System.getenv("PYSPARK_ARCHIVES_PATH").split(",").map( + // If PYSPARK_ARCHIVES_PATH is in the user environment, set PYTHONPATH to be passed + // on to the ApplicationMaster and the executors. + sys.env.get("PYSPARK_ARCHIVES_PATH").map { archives => + // archives will be distributed to each machine's working directory, so strip the + // path prefix + val pythonPath = archives.split(",").map( p => (new Path(p)).getName).mkString(":") env("PYTHONPATH") = pythonPath + sparkConf.setExecutorEnv("PYTHONPATH", pythonPath) } // Pick up any environment variables for the AM provided through spark.yarn.appMasterEnv.* diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index 33770c828583..b06069c07f45 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -285,10 +285,6 @@ class ExecutorRunnable( YarnSparkHadoopUtil.addPathToEnvironment(env, key, value) } - if (System.getenv("PYTHONPATH") != null) { - env("PYTHONPATH") = System.getenv("PYTHONPATH") - } - // Keep this for backwards compatibility but users should move to the config sys.env.get("SPARK_YARN_USER_ENV").foreach { userEnvs => YarnSparkHadoopUtil.setEnvFromInputString(env, userEnvs) From 31e8e06739a14d710d3ad2714fb2cd6777729ed1 Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Sun, 19 Apr 2015 22:33:52 +0800 Subject: [PATCH 03/19] update code style --- yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 9ffdcd6a8afc..508eac37d6a9 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -331,8 +331,7 @@ private[spark] class Client( sys.env.get("PYSPARK_ARCHIVES_PATH").map { archives => // archives will be distributed to each machine's working directory, so strip the // path prefix - val pythonPath = archives.split(",").map( - p => (new Path(p)).getName).mkString(":") + val pythonPath = archives.split(",").map(p => (new Path(p)).getName).mkString(":") env("PYTHONPATH") = pythonPath sparkConf.setExecutorEnv("PYTHONPATH", pythonPath) } From e0179be997988e9c7b784e9516fddfc8bc7f8973 Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Sun, 26 Apr 2015 19:43:28 +0800 Subject: [PATCH 04/19] add zip pyspark archives in build or sparksubmit --- .../org/apache/spark/deploy/SparkSubmit.scala | 40 ++++++++++++++++--- .../scala/org/apache/spark/util/Utils.scala | 35 ++++++++++++++++ project/SparkBuild.scala | 12 +++++- .../org/apache/spark/deploy/yarn/Client.scala | 12 +++--- 4 files changed, 84 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index c668380432de..cd55861e00cc 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -39,7 +39,7 @@ import org.apache.ivy.plugins.resolver.{ChainResolver, IBiblioResolver} import org.apache.spark.SPARK_VERSION import org.apache.spark.deploy.rest._ -import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, Utils} +import org.apache.spark.util.{Utils, ChildFirstURLClassLoader, MutableURLClassLoader} /** * Whether to submit, kill, or request the status of an application. @@ -328,12 +328,40 @@ object SparkSubmit { } } - // In yarn mode for a python app, if PYSPARK_ARCHIVES_PATH is in the user environment - // add pyspark archives to files that can be distributed with the job - if (args.isPython && clusterManager == YARN){ - sys.env.get("PYSPARK_ARCHIVES_PATH").map { archives => - args.files = mergeFileLists(args.files, Utils.resolveURIs(archives)) + // In yarn mode for a python app, add pyspark archives to files + // that can be distributed with the job + if (args.isPython && clusterManager == YARN) { + var pyArchives: String = null + if (sys.env.contains("PYSPARK_ARCHIVES_PATH")) { + pyArchives = sys.env.get("PYSPARK_ARCHIVES_PATH").get + } else { + if (!sys.env.contains("SPARK_HOME")) { + printErrorAndExit("SPARK_HOME does not exist for python application in yarn mode.") + } + val pythonPath = new ArrayBuffer[String] + for (sparkHome <- sys.env.get("SPARK_HOME")) { + val pyLibPath = Seq(sparkHome, "python", "lib").mkString(File.separator) + val pyArchivesFile = new File(pyLibPath, "pyspark.zip") + if (!pyArchivesFile.exists()) { + val pySrc = new File(Seq(sparkHome, "python", "pyspark").mkString(File.separator)) + Utils.zipRecursive(pySrc, pyArchivesFile) + } + pythonPath += pyArchivesFile.getAbsolutePath + pythonPath += Seq(pyLibPath, "py4j-0.8.2.1-src.zip").mkString(File.separator) + } + pyArchives = pythonPath.mkString(",") } + + pyArchives = pyArchives.split(",").map( localPath=> { + val localURI = Utils.resolveURI(localPath) + if (localURI.getScheme != "local") { + args.files = mergeFileLists(args.files, localURI.toString) + (new Path(localPath)).getName + } else { + localURI.getPath.toString + } + }).mkString(File.pathSeparator) + sysProps("spark.submit.pyArchives") = pyArchives } // If we're running a R app, set the main class to our specific R runner diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 1029b0f9fce1..6d12f85b8776 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -18,6 +18,7 @@ package org.apache.spark.util import java.io._ +import java.util.zip._ import java.lang.management.ManagementFactory import java.net._ import java.nio.ByteBuffer @@ -1000,6 +1001,40 @@ private[spark] object Utils extends Logging { !fileInCanonicalDir.getCanonicalFile().equals(fileInCanonicalDir.getAbsoluteFile()) } + /** + * recursively add files to the zip file + */ + def addFilesToZip(parent: String, source: File, output: ZipOutputStream): Unit = { + if (source.isDirectory()) { + output.putNextEntry(new ZipEntry(parent + source.getName())) + for (file <- source.listFiles()) { + addFilesToZip(parent + source.getName + File.separator, file, output) + } + } else { + val in = new FileInputStream(source) + output.putNextEntry(new ZipEntry(parent + source.getName())) + val buf = new Array[Byte](8192) + var n = 0 + while (n != -1) { + n = in.read(buf) + if (n != -1) { + output.write(buf, 0, n) + } + } + in.close() + } + } + + /** + * zip source file to dest ZipFile + */ + def zipRecursive(source: File, destZipFile: File) = { + val destOutput = new ZipOutputStream(new FileOutputStream(destZipFile)) + addFilesToZip("", source, destOutput) + destOutput.flush() + destOutput.close() + } + /** * Determines if a directory contains any files newer than cutoff seconds. * diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 09b4976d10c2..3bd70dc0f6af 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -361,12 +361,20 @@ object PySparkAssembly { // to be included in the assembly. We can't just add "python/" to the assembly's resource dir // list since that will copy unneeded / unwanted files. resourceGenerators in Compile <+= resourceManaged in Compile map { outDir: File => + val src = new File(BuildCommons.sparkHome, "python/pyspark") + + val zipFile = new File(BuildCommons.sparkHome , "python/lib/pyspark.zip") + IO.delete(zipFile) + def entries(f: File):List[File] = + f :: (if (f.isDirectory) IO.listFiles(f).toList.flatMap(entries(_)) else Nil) + IO.zip(entries(src).map( + d => (d, d.getAbsolutePath.substring(src.getParent.length +1))), + zipFile) + val dst = new File(outDir, "pyspark") if (!dst.isDirectory()) { require(dst.mkdirs()) } - - val src = new File(BuildCommons.sparkHome, "python/pyspark") copy(src, dst) } ) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 508eac37d6a9..76f6edf69b46 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -326,14 +326,12 @@ private[spark] class Client( distCacheMgr.setDistFilesEnv(env) distCacheMgr.setDistArchivesEnv(env) - // If PYSPARK_ARCHIVES_PATH is in the user environment, set PYTHONPATH to be passed + // if spark.submit.pyArchives is in sparkConf, set PYTHONPATH to be passed // on to the ApplicationMaster and the executors. - sys.env.get("PYSPARK_ARCHIVES_PATH").map { archives => - // archives will be distributed to each machine's working directory, so strip the - // path prefix - val pythonPath = archives.split(",").map(p => (new Path(p)).getName).mkString(":") - env("PYTHONPATH") = pythonPath - sparkConf.setExecutorEnv("PYTHONPATH", pythonPath) + if (sparkConf.contains("spark.submit.pyArchives")) { + val archives = sparkConf.get("spark.submit.pyArchives") + env("PYTHONPATH") = archives + sparkConf.setExecutorEnv("PYTHONPATH", archives) } // Pick up any environment variables for the AM provided through spark.yarn.appMasterEnv.* From 0d2baf79374fc2ec9010a43c8a753d2861322085 Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Sun, 26 Apr 2015 20:19:56 +0800 Subject: [PATCH 05/19] update import paths --- core/src/main/scala/org/apache/spark/util/Utils.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 6d12f85b8776..14ece5debc88 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -18,7 +18,7 @@ package org.apache.spark.util import java.io._ -import java.util.zip._ +import java.util.zip.{ZipOutputStream, ZipEntry} import java.lang.management.ManagementFactory import java.net._ import java.nio.ByteBuffer From 9396346ad988988da3fd4b42aa45266657211cca Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Tue, 28 Apr 2015 00:19:31 +0800 Subject: [PATCH 06/19] put zip to make-distribution.sh --- make-distribution.sh | 1 + project/SparkBuild.scala | 12 ++---------- 2 files changed, 3 insertions(+), 10 deletions(-) diff --git a/make-distribution.sh b/make-distribution.sh index 738a9c4d6960..c9a26d78239b 100755 --- a/make-distribution.sh +++ b/make-distribution.sh @@ -228,6 +228,7 @@ cp "$SPARK_HOME"/conf/*.template "$DISTDIR"/conf cp "$SPARK_HOME/README.md" "$DISTDIR" cp -r "$SPARK_HOME/bin" "$DISTDIR" cp -r "$SPARK_HOME/python" "$DISTDIR" +zip -r "$DISTDIR"/python/lib/pyspark.zip "$SPARK_HOME"/python/lib/pyspark cp -r "$SPARK_HOME/sbin" "$DISTDIR" cp -r "$SPARK_HOME/ec2" "$DISTDIR" diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 3bd70dc0f6af..09b4976d10c2 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -361,20 +361,12 @@ object PySparkAssembly { // to be included in the assembly. We can't just add "python/" to the assembly's resource dir // list since that will copy unneeded / unwanted files. resourceGenerators in Compile <+= resourceManaged in Compile map { outDir: File => - val src = new File(BuildCommons.sparkHome, "python/pyspark") - - val zipFile = new File(BuildCommons.sparkHome , "python/lib/pyspark.zip") - IO.delete(zipFile) - def entries(f: File):List[File] = - f :: (if (f.isDirectory) IO.listFiles(f).toList.flatMap(entries(_)) else Nil) - IO.zip(entries(src).map( - d => (d, d.getAbsolutePath.substring(src.getParent.length +1))), - zipFile) - val dst = new File(outDir, "pyspark") if (!dst.isDirectory()) { require(dst.mkdirs()) } + + val src = new File(BuildCommons.sparkHome, "python/pyspark") copy(src, dst) } ) From 3b1e4c8dccc54e67443648ffa4949ffc3df1e757 Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Tue, 28 Apr 2015 01:33:12 +0800 Subject: [PATCH 07/19] address tgravescs's comments --- .../org/apache/spark/deploy/SparkSubmit.scala | 10 ++++-- .../scala/org/apache/spark/util/Utils.scala | 35 ------------------- .../org/apache/spark/deploy/yarn/Client.scala | 20 ++++++----- 3 files changed, 19 insertions(+), 46 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index cd55861e00cc..ac23bc991217 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -343,10 +343,14 @@ object SparkSubmit { val pyLibPath = Seq(sparkHome, "python", "lib").mkString(File.separator) val pyArchivesFile = new File(pyLibPath, "pyspark.zip") if (!pyArchivesFile.exists()) { - val pySrc = new File(Seq(sparkHome, "python", "pyspark").mkString(File.separator)) - Utils.zipRecursive(pySrc, pyArchivesFile) + printErrorAndExit("pyspark.zip does not exist for python application in yarn mode.") } - pythonPath += pyArchivesFile.getAbsolutePath + val py4jFile = new File(pyLibPath, "py4j-0.8.2.1-src.zip") + if (!py4jFile.exists()) { + printErrorAndExit("py4j-0.8.2.1-src.zip does not exist for python application " + + "in yarn mode.") + } + pythonPath += Seq(pyLibPath, "pyspark.zip").mkString(File.separator) pythonPath += Seq(pyLibPath, "py4j-0.8.2.1-src.zip").mkString(File.separator) } pyArchives = pythonPath.mkString(",") diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 14ece5debc88..1029b0f9fce1 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -18,7 +18,6 @@ package org.apache.spark.util import java.io._ -import java.util.zip.{ZipOutputStream, ZipEntry} import java.lang.management.ManagementFactory import java.net._ import java.nio.ByteBuffer @@ -1001,40 +1000,6 @@ private[spark] object Utils extends Logging { !fileInCanonicalDir.getCanonicalFile().equals(fileInCanonicalDir.getAbsoluteFile()) } - /** - * recursively add files to the zip file - */ - def addFilesToZip(parent: String, source: File, output: ZipOutputStream): Unit = { - if (source.isDirectory()) { - output.putNextEntry(new ZipEntry(parent + source.getName())) - for (file <- source.listFiles()) { - addFilesToZip(parent + source.getName + File.separator, file, output) - } - } else { - val in = new FileInputStream(source) - output.putNextEntry(new ZipEntry(parent + source.getName())) - val buf = new Array[Byte](8192) - var n = 0 - while (n != -1) { - n = in.read(buf) - if (n != -1) { - output.write(buf, 0, n) - } - } - in.close() - } - } - - /** - * zip source file to dest ZipFile - */ - def zipRecursive(source: File, destZipFile: File) = { - val destOutput = new ZipOutputStream(new FileOutputStream(destZipFile)) - addFilesToZip("", source, destOutput) - destOutput.flush() - destOutput.close() - } - /** * Determines if a directory contains any files newer than cutoff seconds. * diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 76f6edf69b46..c27453b4703c 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -17,6 +17,7 @@ package org.apache.spark.deploy.yarn +import java.io.File import java.net.{InetAddress, UnknownHostException, URI, URISyntaxException} import java.nio.ByteBuffer @@ -326,14 +327,6 @@ private[spark] class Client( distCacheMgr.setDistFilesEnv(env) distCacheMgr.setDistArchivesEnv(env) - // if spark.submit.pyArchives is in sparkConf, set PYTHONPATH to be passed - // on to the ApplicationMaster and the executors. - if (sparkConf.contains("spark.submit.pyArchives")) { - val archives = sparkConf.get("spark.submit.pyArchives") - env("PYTHONPATH") = archives - sparkConf.setExecutorEnv("PYTHONPATH", archives) - } - // Pick up any environment variables for the AM provided through spark.yarn.appMasterEnv.* val amEnvPrefix = "spark.yarn.appMasterEnv." sparkConf.getAll @@ -349,6 +342,17 @@ private[spark] class Client( env("SPARK_YARN_USER_ENV") = userEnvs } + // if spark.submit.pyArchives is in sparkConf, append pyArchives to PYTHONPATH + // that can be passed on to the ApplicationMaster and the executors. + if (sparkConf.contains("spark.submit.pyArchives")) { + var pythonPath = sparkConf.get("spark.submit.pyArchives") + if (env.contains("PYTHONPATH")) { + pythonPath = Seq(env.get("PYTHONPATH"), pythonPath).mkString(File.pathSeparator) + } + env("PYTHONPATH") = pythonPath + sparkConf.setExecutorEnv("PYTHONPATH", pythonPath) + } + // In cluster mode, if the deprecated SPARK_JAVA_OPTS is set, we need to propagate it to // executors. But we can't just set spark.executor.extraJavaOptions, because the driver's // SparkContext will not let that set spark* system properties, which is expected behavior for From 5192ccafb4aa0bf04b484da576b2d619769913e6 Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Tue, 28 Apr 2015 01:35:13 +0800 Subject: [PATCH 08/19] update import path --- core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index ac23bc991217..60cea3f48f84 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -39,7 +39,7 @@ import org.apache.ivy.plugins.resolver.{ChainResolver, IBiblioResolver} import org.apache.spark.SPARK_VERSION import org.apache.spark.deploy.rest._ -import org.apache.spark.util.{Utils, ChildFirstURLClassLoader, MutableURLClassLoader} +import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, Utils} /** * Whether to submit, kill, or request the status of an application. From f11f84a4d3dde7c3c4358a0f4f20ff37eae1c720 Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Thu, 30 Apr 2015 02:50:27 +0800 Subject: [PATCH 09/19] zip pyspark archives --- assembly/pom.xml | 36 +++++++++++++++++++++++++++++++++++- make-distribution.sh | 1 - project/SparkBuild.scala | 12 ++++++++++-- 3 files changed, 45 insertions(+), 4 deletions(-) diff --git a/assembly/pom.xml b/assembly/pom.xml index f1f8b0d3682e..53059eaa34fa 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -92,6 +92,27 @@ true + + + org.apache.maven.plugins + maven-antrun-plugin + + + package + + run + + + + + + + + + + + + org.apache.maven.plugins @@ -196,6 +217,19 @@ maven-assembly-plugin 2.4 + dist package @@ -208,7 +242,7 @@ - + diff --git a/make-distribution.sh b/make-distribution.sh index c9a26d78239b..738a9c4d6960 100755 --- a/make-distribution.sh +++ b/make-distribution.sh @@ -228,7 +228,6 @@ cp "$SPARK_HOME"/conf/*.template "$DISTDIR"/conf cp "$SPARK_HOME/README.md" "$DISTDIR" cp -r "$SPARK_HOME/bin" "$DISTDIR" cp -r "$SPARK_HOME/python" "$DISTDIR" -zip -r "$DISTDIR"/python/lib/pyspark.zip "$SPARK_HOME"/python/lib/pyspark cp -r "$SPARK_HOME/sbin" "$DISTDIR" cp -r "$SPARK_HOME/ec2" "$DISTDIR" diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 09b4976d10c2..3bd70dc0f6af 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -361,12 +361,20 @@ object PySparkAssembly { // to be included in the assembly. We can't just add "python/" to the assembly's resource dir // list since that will copy unneeded / unwanted files. resourceGenerators in Compile <+= resourceManaged in Compile map { outDir: File => + val src = new File(BuildCommons.sparkHome, "python/pyspark") + + val zipFile = new File(BuildCommons.sparkHome , "python/lib/pyspark.zip") + IO.delete(zipFile) + def entries(f: File):List[File] = + f :: (if (f.isDirectory) IO.listFiles(f).toList.flatMap(entries(_)) else Nil) + IO.zip(entries(src).map( + d => (d, d.getAbsolutePath.substring(src.getParent.length +1))), + zipFile) + val dst = new File(outDir, "pyspark") if (!dst.isDirectory()) { require(dst.mkdirs()) } - - val src = new File(BuildCommons.sparkHome, "python/pyspark") copy(src, dst) } ) From e6b573be6892ff04d8f00473f81ddda232bcc3d7 Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Thu, 30 Apr 2015 03:00:22 +0800 Subject: [PATCH 10/19] address vanzin's comments --- assembly/pom.xml | 15 +-------------- .../org/apache/spark/deploy/SparkSubmit.scala | 8 ++++---- 2 files changed, 5 insertions(+), 18 deletions(-) diff --git a/assembly/pom.xml b/assembly/pom.xml index 53059eaa34fa..764eb44d27a7 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -217,19 +217,6 @@ maven-assembly-plugin 2.4 - dist package @@ -242,7 +229,7 @@ - + diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 60cea3f48f84..3effbca1bbcd 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -350,13 +350,13 @@ object SparkSubmit { printErrorAndExit("py4j-0.8.2.1-src.zip does not exist for python application " + "in yarn mode.") } - pythonPath += Seq(pyLibPath, "pyspark.zip").mkString(File.separator) - pythonPath += Seq(pyLibPath, "py4j-0.8.2.1-src.zip").mkString(File.separator) + pythonPath += pyArchivesFile.getAbsolutePath() + pythonPath += py4jFile.getAbsolutePath() } pyArchives = pythonPath.mkString(",") } - pyArchives = pyArchives.split(",").map( localPath=> { + pyArchives = pyArchives.split(",").map { localPath=> val localURI = Utils.resolveURI(localPath) if (localURI.getScheme != "local") { args.files = mergeFileLists(args.files, localURI.toString) @@ -364,7 +364,7 @@ object SparkSubmit { } else { localURI.getPath.toString } - }).mkString(File.pathSeparator) + }.mkString(File.pathSeparator) sysProps("spark.submit.pyArchives") = pyArchives } From 4b8a3edf1bb9c77ebeed534568dfe7062c78dc91 Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Thu, 30 Apr 2015 03:08:21 +0800 Subject: [PATCH 11/19] use pyArchivesEnvOpt --- .../src/main/scala/org/apache/spark/deploy/SparkSubmit.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 3effbca1bbcd..18a536698611 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -332,8 +332,9 @@ object SparkSubmit { // that can be distributed with the job if (args.isPython && clusterManager == YARN) { var pyArchives: String = null - if (sys.env.contains("PYSPARK_ARCHIVES_PATH")) { - pyArchives = sys.env.get("PYSPARK_ARCHIVES_PATH").get + val pyArchivesEnvOpt = sys.env.get("PYSPARK_ARCHIVES_PATH") + if (pyArchivesEnvOpt.isDefined) { + pyArchives = pyArchivesEnvOpt.get } else { if (!sys.env.contains("SPARK_HOME")) { printErrorAndExit("SPARK_HOME does not exist for python application in yarn mode.") From e7bd971a7d873ad0a862f0d007af0968a54f405a Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Thu, 30 Apr 2015 18:44:50 +0800 Subject: [PATCH 12/19] address vanzin's comments --- .../main/scala/org/apache/spark/deploy/SparkSubmit.scala | 4 ++-- project/SparkBuild.scala | 9 +++++---- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 18a536698611..790957f25044 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -361,9 +361,9 @@ object SparkSubmit { val localURI = Utils.resolveURI(localPath) if (localURI.getScheme != "local") { args.files = mergeFileLists(args.files, localURI.toString) - (new Path(localPath)).getName + new Path(localPath).getName } else { - localURI.getPath.toString + localURI.getPath } }.mkString(File.pathSeparator) sysProps("spark.submit.pyArchives") = pyArchives diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 3bd70dc0f6af..96e16c3cc201 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -364,12 +364,13 @@ object PySparkAssembly { val src = new File(BuildCommons.sparkHome, "python/pyspark") val zipFile = new File(BuildCommons.sparkHome , "python/lib/pyspark.zip") - IO.delete(zipFile) + zipFile.delete() def entries(f: File):List[File] = f :: (if (f.isDirectory) IO.listFiles(f).toList.flatMap(entries(_)) else Nil) - IO.zip(entries(src).map( - d => (d, d.getAbsolutePath.substring(src.getParent.length +1))), - zipFile) + val sources = entries(src).map { d => + (d, d.getAbsolutePath.substring(src.getParent.length +1)) + } + IO.zip(sources, zipFile) val dst = new File(outDir, "pyspark") if (!dst.isDirectory()) { From 9d87c3f6c471fb95a3ef6f49b34fe2e8eaad76c0 Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Thu, 30 Apr 2015 18:46:26 +0800 Subject: [PATCH 13/19] update scala style --- project/SparkBuild.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 96e16c3cc201..a3724a46585b 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -365,7 +365,7 @@ object PySparkAssembly { val zipFile = new File(BuildCommons.sparkHome , "python/lib/pyspark.zip") zipFile.delete() - def entries(f: File):List[File] = + def entries(f: File): List[File] = f :: (if (f.isDirectory) IO.listFiles(f).toList.flatMap(entries(_)) else Nil) val sources = entries(src).map { d => (d, d.getAbsolutePath.substring(src.getParent.length +1)) From 20402cd759ebed14ab0317ed2f28c4b13ead5e9c Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Thu, 30 Apr 2015 22:10:41 +0800 Subject: [PATCH 14/19] use ZipEntry --- project/SparkBuild.scala | 36 ++++++++++++++++++++++++++++++------ 1 file changed, 30 insertions(+), 6 deletions(-) diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index a3724a46585b..81425b84ce3a 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -354,6 +354,7 @@ object Assembly { object PySparkAssembly { import sbtassembly.Plugin._ import AssemblyKeys._ + import java.util.zip.{ZipOutputStream, ZipEntry} lazy val settings = Seq( unmanagedJars in Compile += { BuildCommons.sparkHome / "python/lib/py4j-0.8.2.1-src.zip" }, @@ -365,12 +366,7 @@ object PySparkAssembly { val zipFile = new File(BuildCommons.sparkHome , "python/lib/pyspark.zip") zipFile.delete() - def entries(f: File): List[File] = - f :: (if (f.isDirectory) IO.listFiles(f).toList.flatMap(entries(_)) else Nil) - val sources = entries(src).map { d => - (d, d.getAbsolutePath.substring(src.getParent.length +1)) - } - IO.zip(sources, zipFile) + zipRecursive(src, zipFile) val dst = new File(outDir, "pyspark") if (!dst.isDirectory()) { @@ -380,6 +376,34 @@ object PySparkAssembly { } ) + private def zipRecursive(source: File, destZipFile: File) = { + val destOutput = new ZipOutputStream(new FileOutputStream(destZipFile)) + addFilesToZipStream("", source, destOutput) + destOutput.flush() + destOutput.close() + } + + private def addFilesToZipStream(parent: String, source: File, output: ZipOutputStream): Unit = { + if (source.isDirectory()) { + output.putNextEntry(new ZipEntry(parent + source.getName())) + for (file <- source.listFiles()) { + addFilesToZipStream(parent + source.getName() + File.separator, file, output) + } + } else { + val in = new FileInputStream(source) + output.putNextEntry(new ZipEntry(parent + source.getName())) + val buf = new Array[Byte](8192) + var n = 0 + while (n != -1) { + n = in.read(buf) + if (n != -1) { + output.write(buf, 0, n) + } + } + in.close() + } + } + private def copy(src: File, dst: File): Seq[File] = { src.listFiles().flatMap { f => val child = new File(dst, f.getName()) From f0b4ed85e37ac746785f6742621aa3c80bb01b9a Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Mon, 4 May 2015 00:11:53 +0800 Subject: [PATCH 15/19] Merge remote-tracking branch 'remotes/apache/master' into SPARK-6869 --- .../org/apache/spark/deploy/yarn/Client.scala | 366 +++++++++++++----- 1 file changed, 265 insertions(+), 101 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 1091ff54b046..de55ede19c51 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -17,15 +17,20 @@ package org.apache.spark.deploy.yarn +import java.io.{ByteArrayInputStream, DataInputStream, File, FileOutputStream} import java.net.{InetAddress, UnknownHostException, URI, URISyntaxException} import java.nio.ByteBuffer +import java.security.PrivilegedExceptionAction +import java.util.UUID +import java.util.zip.{ZipEntry, ZipOutputStream} import scala.collection.JavaConversions._ -import scala.collection.mutable.{ArrayBuffer, HashMap, ListBuffer, Map} +import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, ListBuffer, Map} import scala.reflect.runtime.universe import scala.util.{Try, Success, Failure} import com.google.common.base.Objects +import com.google.common.io.Files import org.apache.hadoop.io.DataOutputBuffer import org.apache.hadoop.conf.Configuration @@ -33,10 +38,9 @@ import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifie import org.apache.hadoop.fs._ import org.apache.hadoop.fs.permission.FsPermission import org.apache.hadoop.io.Text -import org.apache.hadoop.mapred.Master import org.apache.hadoop.mapreduce.MRJobConfig import org.apache.hadoop.security.{Credentials, UserGroupInformation} -import org.apache.hadoop.security.token.Token +import org.apache.hadoop.security.token.{TokenIdentifier, Token} import org.apache.hadoop.util.StringUtils import org.apache.hadoop.yarn.api._ import org.apache.hadoop.yarn.api.ApplicationConstants.Environment @@ -47,14 +51,14 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException import org.apache.hadoop.yarn.util.Records -import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkException} import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkException} import org.apache.spark.util.Utils private[spark] class Client( - val args: ClientArguments, - val hadoopConf: Configuration, - val sparkConf: SparkConf) + val args: ClientArguments, + val hadoopConf: Configuration, + val sparkConf: SparkConf) extends Logging { import Client._ @@ -66,23 +70,19 @@ private[spark] class Client( private val yarnClient = YarnClient.createYarnClient private val yarnConf = new YarnConfiguration(hadoopConf) - private val credentials = UserGroupInformation.getCurrentUser.getCredentials + private var credentials: Credentials = null private val amMemoryOverhead = args.amMemoryOverhead // MB private val executorMemoryOverhead = args.executorMemoryOverhead // MB private val distCacheMgr = new ClientDistributedCacheManager() private val isClusterMode = args.isClusterMode + + private var loginFromKeytab = false private val fireAndForget = isClusterMode && !sparkConf.getBoolean("spark.yarn.submit.waitAppCompletion", true) def stop(): Unit = yarnClient.stop() - /* ------------------------------------------------------------------------------------- * - | The following methods have much in common in the stable and alpha versions of Client, | - | but cannot be implemented in the parent trait due to subtle API differences across | - | hadoop versions. | - * ------------------------------------------------------------------------------------- */ - /** * Submit an application running our ApplicationMaster to the ResourceManager. * @@ -91,6 +91,8 @@ private[spark] class Client( * available in the alpha API. */ def submitApplication(): ApplicationId = { + // Setup the credentials before doing anything else, so we have don't have issues at any point. + setupCredentials() yarnClient.init(yarnConf) yarnClient.start() @@ -120,8 +122,8 @@ private[spark] class Client( * This uses the YarnClientApplication not available in the Yarn alpha API. */ def createApplicationSubmissionContext( - newApp: YarnClientApplication, - containerContext: ContainerLaunchContext): ApplicationSubmissionContext = { + newApp: YarnClientApplication, + containerContext: ContainerLaunchContext): ApplicationSubmissionContext = { val appContext = newApp.getApplicationSubmissionContext appContext.setApplicationName(args.appName) appContext.setQueue(args.amQueue) @@ -130,7 +132,7 @@ private[spark] class Client( sparkConf.getOption("spark.yarn.maxAppAttempts").map(_.toInt) match { case Some(v) => appContext.setMaxAppAttempts(v) case None => logDebug("spark.yarn.maxAppAttempts is not set. " + - "Cluster's default value will be used.") + "Cluster's default value will be used.") } val capability = Records.newRecord(classOf[Resource]) capability.setMemory(args.amMemory + amMemoryOverhead) @@ -188,9 +190,9 @@ private[spark] class Client( * for preparing resources for launching the ApplicationMaster container. Exposed for testing. */ private[yarn] def copyFileToRemote( - destDir: Path, - srcPath: Path, - replication: Short): Path = { + destDir: Path, + srcPath: Path, + replication: Short): Path = { val destFs = destDir.getFileSystem(hadoopConf) val srcFs = srcPath.getFileSystem(hadoopConf) var destPath = srcPath @@ -222,9 +224,14 @@ private[spark] class Client( // and add them as local resources to the application master. val fs = FileSystem.get(hadoopConf) val dst = new Path(fs.getHomeDirectory(), appStagingDir) - val nns = getNameNodesToAccess(sparkConf) + dst - obtainTokensForNamenodes(nns, hadoopConf, credentials) + val nns = YarnSparkHadoopUtil.get.getNameNodesToAccess(sparkConf) + dst + YarnSparkHadoopUtil.get.obtainTokensForNamenodes(nns, hadoopConf, credentials) + // Used to keep track of URIs added to the distributed cache. If the same URI is added + // multiple times, YARN will fail to launch containers for the app with an internal + // error. + val distributedUris = new HashSet[String] obtainTokenForHiveMetastore(hadoopConf, credentials) + obtainTokenForHBase(hadoopConf, credentials) val replication = sparkConf.getInt("spark.yarn.submit.file.replication", fs.getDefaultReplication(dst)).toShort @@ -241,6 +248,31 @@ private[spark] class Client( "for alternatives.") } + // If we passed in a keytab, make sure we copy the keytab to the staging directory on + // HDFS, and setup the relevant environment vars, so the AM can login again. + if (loginFromKeytab) { + logInfo("To enable the AM to login from keytab, credentials are being copied over to the AM" + + " via the YARN Secure Distributed Cache.") + val localUri = new URI(args.keytab) + val localPath = getQualifiedLocalPath(localUri, hadoopConf) + val destinationPath = copyFileToRemote(dst, localPath, replication) + val destFs = FileSystem.get(destinationPath.toUri(), hadoopConf) + distCacheMgr.addResource( + destFs, hadoopConf, destinationPath, localResources, LocalResourceType.FILE, + sparkConf.get("spark.yarn.keytab"), statCache, appMasterOnly = true) + } + + def addDistributedUri(uri: URI): Boolean = { + val uriStr = uri.toString() + if (distributedUris.contains(uriStr)) { + logWarning(s"Resource $uri added multiple times to distributed cache.") + false + } else { + distributedUris += uriStr + true + } + } + /** * Copy the given main resource to the distributed cache if the scheme is not "local". * Otherwise, set the corresponding key in our SparkConf to handle it downstream. @@ -258,11 +290,13 @@ private[spark] class Client( if (!localPath.isEmpty()) { val localURI = new URI(localPath) if (localURI.getScheme != LOCAL_SCHEME) { - val src = getQualifiedLocalPath(localURI, hadoopConf) - val destPath = copyFileToRemote(dst, src, replication) - val destFs = FileSystem.get(destPath.toUri(), hadoopConf) - distCacheMgr.addResource(destFs, hadoopConf, destPath, - localResources, LocalResourceType.FILE, destName, statCache) + if (addDistributedUri(localURI)) { + val src = getQualifiedLocalPath(localURI, hadoopConf) + val destPath = copyFileToRemote(dst, src, replication) + val destFs = FileSystem.get(destPath.toUri(), hadoopConf) + distCacheMgr.addResource(destFs, hadoopConf, destPath, + localResources, LocalResourceType.FILE, destName, statCache) + } } else if (confKey != null) { // If the resource is intended for local use only, handle this downstream // by setting the appropriate property @@ -271,6 +305,13 @@ private[spark] class Client( } } + createConfArchive().foreach { file => + require(addDistributedUri(file.toURI())) + val destPath = copyFileToRemote(dst, new Path(file.toURI()), replication) + distCacheMgr.addResource(fs, hadoopConf, destPath, localResources, LocalResourceType.ARCHIVE, + LOCALIZED_HADOOP_CONF_DIR, statCache, appMasterOnly = true) + } + /** * Do the same for any additional resources passed in through ClientArguments. * Each resource category is represented by a 3-tuple of: @@ -288,13 +329,15 @@ private[spark] class Client( flist.split(',').foreach { file => val localURI = new URI(file.trim()) if (localURI.getScheme != LOCAL_SCHEME) { - val localPath = new Path(localURI) - val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName()) - val destPath = copyFileToRemote(dst, localPath, replication) - distCacheMgr.addResource( - fs, hadoopConf, destPath, localResources, resType, linkname, statCache) - if (addToClasspath) { - cachedSecondaryJarLinks += linkname + if (addDistributedUri(localURI)) { + val localPath = new Path(localURI) + val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName()) + val destPath = copyFileToRemote(dst, localPath, replication) + distCacheMgr.addResource( + fs, hadoopConf, destPath, localResources, resType, linkname, statCache) + if (addToClasspath) { + cachedSecondaryJarLinks += linkname + } } } else if (addToClasspath) { // Resource is intended for local use only and should be added to the class path @@ -310,6 +353,81 @@ private[spark] class Client( localResources } + /** + * Create an archive with the Hadoop config files for distribution. + * + * These are only used by the AM, since executors will use the configuration object broadcast by + * the driver. The files are zipped and added to the job as an archive, so that YARN will explode + * it when distributing to the AM. This directory is then added to the classpath of the AM + * process, just to make sure that everybody is using the same default config. + * + * This follows the order of precedence set by the startup scripts, in which HADOOP_CONF_DIR + * shows up in the classpath before YARN_CONF_DIR. + * + * Currently this makes a shallow copy of the conf directory. If there are cases where a + * Hadoop config directory contains subdirectories, this code will have to be fixed. + */ + private def createConfArchive(): Option[File] = { + val hadoopConfFiles = new HashMap[String, File]() + Seq("HADOOP_CONF_DIR", "YARN_CONF_DIR").foreach { envKey => + sys.env.get(envKey).foreach { path => + val dir = new File(path) + if (dir.isDirectory()) { + dir.listFiles().foreach { file => + if (file.isFile && !hadoopConfFiles.contains(file.getName())) { + hadoopConfFiles(file.getName()) = file + } + } + } + } + } + + if (!hadoopConfFiles.isEmpty) { + val hadoopConfArchive = File.createTempFile(LOCALIZED_HADOOP_CONF_DIR, ".zip", + new File(Utils.getLocalDir(sparkConf))) + + val hadoopConfStream = new ZipOutputStream(new FileOutputStream(hadoopConfArchive)) + try { + hadoopConfStream.setLevel(0) + hadoopConfFiles.foreach { case (name, file) => + if (file.canRead()) { + hadoopConfStream.putNextEntry(new ZipEntry(name)) + Files.copy(file, hadoopConfStream) + hadoopConfStream.closeEntry() + } + } + } finally { + hadoopConfStream.close() + } + + Some(hadoopConfArchive) + } else { + None + } + } + + /** + * Get the renewal interval for tokens. + */ + private def getTokenRenewalInterval(stagingDirPath: Path): Long = { + // We cannot use the tokens generated above since those have renewer yarn. Trying to renew + // those will fail with an access control issue. So create new tokens with the logged in + // user as renewer. + val creds = new Credentials() + val nns = YarnSparkHadoopUtil.get.getNameNodesToAccess(sparkConf) + stagingDirPath + YarnSparkHadoopUtil.get.obtainTokensForNamenodes( + nns, hadoopConf, creds, Some(sparkConf.get("spark.yarn.principal"))) + val t = creds.getAllTokens + .filter(_.getKind == DelegationTokenIdentifier.HDFS_DELEGATION_KIND) + .head + val newExpiration = t.renew(hadoopConf) + val identifier = new DelegationTokenIdentifier() + identifier.readFields(new DataInputStream(new ByteArrayInputStream(t.getIdentifier))) + val interval = newExpiration - identifier.getIssueDate + logInfo(s"Renewal Interval set to $interval") + interval + } + /** * Set up the environment for launching our ApplicationMaster container. */ @@ -317,11 +435,20 @@ private[spark] class Client( logInfo("Setting up the launch environment for our AM container") val env = new HashMap[String, String]() val extraCp = sparkConf.getOption("spark.driver.extraClassPath") - populateClasspath(args, yarnConf, sparkConf, env, extraCp) + populateClasspath(args, yarnConf, sparkConf, env, true, extraCp) env("SPARK_YARN_MODE") = "true" env("SPARK_YARN_STAGING_DIR") = stagingDir env("SPARK_USER") = UserGroupInformation.getCurrentUser().getShortUserName() - + if (loginFromKeytab) { + val remoteFs = FileSystem.get(hadoopConf) + val stagingDirPath = new Path(remoteFs.getHomeDirectory, stagingDir) + val credentialsFile = "credentials-" + UUID.randomUUID().toString + sparkConf.set( + "spark.yarn.credentials.file", new Path(stagingDirPath, credentialsFile).toString) + logInfo(s"Credentials file set to: $credentialsFile") + val renewalInterval = getTokenRenewalInterval(stagingDirPath) + sparkConf.set("spark.yarn.token.renewal.interval", renewalInterval.toString) + } // Set the environment variables to be passed on to the executors. distCacheMgr.setDistFilesEnv(env) distCacheMgr.setDistArchivesEnv(env) @@ -335,12 +462,23 @@ private[spark] class Client( // Keep this for backwards compatibility but users should move to the config sys.env.get("SPARK_YARN_USER_ENV").foreach { userEnvs => - // Allow users to specify some environment variables. + // Allow users to specify some environment variables. YarnSparkHadoopUtil.setEnvFromInputString(env, userEnvs) // Pass SPARK_YARN_USER_ENV itself to the AM so it can use it to set up executor environments. env("SPARK_YARN_USER_ENV") = userEnvs } + // if spark.submit.pyArchives is in sparkConf, append pyArchives to PYTHONPATH + // that can be passed on to the ApplicationMaster and the executors. + if (sparkConf.contains("spark.submit.pyArchives")) { + var pythonPath = sparkConf.get("spark.submit.pyArchives") + if (env.contains("PYTHONPATH")) { + pythonPath = Seq(env.get("PYTHONPATH"), pythonPath).mkString(File.pathSeparator) + } + env("PYTHONPATH") = pythonPath + sparkConf.setExecutorEnv("PYTHONPATH", pythonPath) + } + // In cluster mode, if the deprecated SPARK_JAVA_OPTS is set, we need to propagate it to // executors. But we can't just set spark.executor.extraJavaOptions, because the driver's // SparkContext will not let that set spark* system properties, which is expected behavior for @@ -384,9 +522,8 @@ private[spark] class Client( * This sets up the launch environment, java options, and the command for launching the AM. */ private def createContainerLaunchContext(newAppResponse: GetNewApplicationResponse) - : ContainerLaunchContext = { + : ContainerLaunchContext = { logInfo("Setting up container launch context for our AM") - val appId = newAppResponse.getApplicationId val appStagingDir = getAppStagingDir(appId) val localResources = prepareLocalResources(appStagingDir) @@ -467,6 +604,10 @@ private[spark] class Client( } javaOpts ++= Utils.splitCommandString(opts).map(YarnSparkHadoopUtil.escapeForShell) } + + sparkConf.getOption("spark.yarn.am.extraLibraryPath").foreach { paths => + prefixEnv = Some(Utils.libraryPathEnvPrefix(Seq(paths))) + } } // For log4j configuration to reference @@ -520,14 +661,14 @@ private[spark] class Client( val amArgs = Seq(amClass) ++ userClass ++ userJar ++ primaryPyFile ++ pyFiles ++ primaryRFile ++ userArgs ++ Seq( - "--executor-memory", args.executorMemory.toString + "m", - "--executor-cores", args.executorCores.toString, - "--num-executors ", args.numExecutors.toString) + "--executor-memory", args.executorMemory.toString + "m", + "--executor-cores", args.executorCores.toString, + "--num-executors ", args.numExecutors.toString) // Command for the ApplicationMaster val commands = prefixEnv ++ Seq( - YarnSparkHadoopUtil.expandEnvironment(Environment.JAVA_HOME) + "/bin/java", "-server" - ) ++ + YarnSparkHadoopUtil.expandEnvironment(Environment.JAVA_HOME) + "/bin/java", "-server" + ) ++ javaOpts ++ amArgs ++ Seq( "1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout", @@ -557,6 +698,24 @@ private[spark] class Client( amContainer } + def setupCredentials(): Unit = { + if (args.principal != null) { + require(args.keytab != null, "Keytab must be specified when principal is specified.") + logInfo("Attempting to login to the Kerberos" + + s" using principal: ${args.principal} and keytab: ${args.keytab}") + val f = new File(args.keytab) + // Generate a file name that can be used for the keytab file, that does not conflict + // with any user file. + val keytabFileName = f.getName + "-" + UUID.randomUUID().toString + UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab) + loginFromKeytab = true + sparkConf.set("spark.yarn.keytab", keytabFileName) + sparkConf.set("spark.yarn.principal", args.principal) + logInfo("Successfully logged into the KDC.") + } + credentials = UserGroupInformation.getCurrentUser.getCredentials + } + /** * Report the state of an application until it has exited, either successfully or * due to some failure, then return a pair of the yarn application state (FINISHED, FAILED, @@ -569,9 +728,9 @@ private[spark] class Client( * @return A pair of the yarn application state and the final application state. */ def monitorApplication( - appId: ApplicationId, - returnOnRunning: Boolean = false, - logApplicationReport: Boolean = true): (YarnApplicationState, FinalApplicationStatus) = { + appId: ApplicationId, + returnOnRunning: Boolean = false, + logApplicationReport: Boolean = true): (YarnApplicationState, FinalApplicationStatus) = { val interval = sparkConf.getLong("spark.yarn.report.interval", 1000) var lastState: YarnApplicationState = null while (true) { @@ -718,6 +877,9 @@ object Client extends Logging { // Distribution-defined classpath to add to processes val ENV_DIST_CLASSPATH = "SPARK_DIST_CLASSPATH" + // Subdirectory where the user's hadoop config files will be placed. + val LOCALIZED_HADOOP_CONF_DIR = "__hadoop_conf__" + /** * Find the user-defined Spark jar if configured, or return the jar containing this * class if not. @@ -750,7 +912,7 @@ object Client extends Logging { * classpath specified through the Hadoop and Yarn configurations. */ private[yarn] def populateHadoopClasspath(conf: Configuration, env: HashMap[String, String]) - : Unit = { + : Unit = { val classPathElementsToAdd = getYarnAppClasspath(conf) ++ getMRAppClasspath(conf) for (c <- classPathElementsToAdd.flatten) { YarnSparkHadoopUtil.addPathToEnvironment(env, Environment.CLASSPATH.name, c.trim) @@ -827,15 +989,23 @@ object Client extends Logging { * @param args Client arguments (when starting the AM) or null (when starting executors). */ private[yarn] def populateClasspath( - args: ClientArguments, - conf: Configuration, - sparkConf: SparkConf, - env: HashMap[String, String], - extraClassPath: Option[String] = None): Unit = { + args: ClientArguments, + conf: Configuration, + sparkConf: SparkConf, + env: HashMap[String, String], + isAM: Boolean, + extraClassPath: Option[String] = None): Unit = { extraClassPath.foreach(addClasspathEntry(_, env)) addClasspathEntry( YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), env ) + + if (isAM) { + addClasspathEntry( + YarnSparkHadoopUtil.expandEnvironment(Environment.PWD) + Path.SEPARATOR + + LOCALIZED_HADOOP_CONF_DIR, env) + } + if (sparkConf.getBoolean("spark.yarn.user.classpath.first", false)) { val userClassPath = if (args != null) { @@ -863,8 +1033,8 @@ object Client extends Logging { } private def getUserClasspath( - mainJar: Option[String], - secondaryJars: Option[String]): Array[URI] = { + mainJar: Option[String], + secondaryJars: Option[String]): Array[URI] = { val mainUri = mainJar.orElse(Some(APP_JAR)).map(new URI(_)) val secondaryUris = secondaryJars.map(_.split(",")).toSeq.flatten.map(new URI(_)) (mainUri ++ secondaryUris).toArray @@ -883,9 +1053,9 @@ object Client extends Logging { * @param env Map holding the environment variables. */ private def addFileToClasspath( - uri: URI, - fileName: String, - env: HashMap[String, String]): Unit = { + uri: URI, + fileName: String, + env: HashMap[String, String]): Unit = { if (uri != null && uri.getScheme == LOCAL_SCHEME) { addClasspathEntry(uri.getPath, env) } else if (fileName != null) { @@ -901,46 +1071,6 @@ object Client extends Logging { private def addClasspathEntry(path: String, env: HashMap[String, String]): Unit = YarnSparkHadoopUtil.addPathToEnvironment(env, Environment.CLASSPATH.name, path) - /** - * Get the list of namenodes the user may access. - */ - private[yarn] def getNameNodesToAccess(sparkConf: SparkConf): Set[Path] = { - sparkConf.get("spark.yarn.access.namenodes", "") - .split(",") - .map(_.trim()) - .filter(!_.isEmpty) - .map(new Path(_)) - .toSet - } - - private[yarn] def getTokenRenewer(conf: Configuration): String = { - val delegTokenRenewer = Master.getMasterPrincipal(conf) - logDebug("delegation token renewer is: " + delegTokenRenewer) - if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) { - val errorMessage = "Can't get Master Kerberos principal for use as renewer" - logError(errorMessage) - throw new SparkException(errorMessage) - } - delegTokenRenewer - } - - /** - * Obtains tokens for the namenodes passed in and adds them to the credentials. - */ - private def obtainTokensForNamenodes( - paths: Set[Path], - conf: Configuration, - creds: Credentials): Unit = { - if (UserGroupInformation.isSecurityEnabled()) { - val delegTokenRenewer = getTokenRenewer(conf) - paths.foreach { dst => - val dstFs = dst.getFileSystem(conf) - logDebug("getting token for namenode: " + dst) - dstFs.addDelegationTokens(delegTokenRenewer, creds) - } - } - } - /** * Obtains token for the Hive metastore and adds them to the credentials. */ @@ -999,6 +1129,41 @@ object Client extends Logging { } } + /** + * Obtain security token for HBase. + */ + def obtainTokenForHBase(conf: Configuration, credentials: Credentials): Unit = { + if (UserGroupInformation.isSecurityEnabled) { + val mirror = universe.runtimeMirror(getClass.getClassLoader) + + try { + val confCreate = mirror.classLoader. + loadClass("org.apache.hadoop.hbase.HBaseConfiguration"). + getMethod("create", classOf[Configuration]) + val obtainToken = mirror.classLoader. + loadClass("org.apache.hadoop.hbase.security.token.TokenUtil"). + getMethod("obtainToken", classOf[Configuration]) + + logDebug("Attempting to fetch HBase security token.") + + val hbaseConf = confCreate.invoke(null, conf) + val token = obtainToken.invoke(null, hbaseConf).asInstanceOf[Token[TokenIdentifier]] + credentials.addToken(token.getService, token) + + logInfo("Added HBase security token to credentials.") + } catch { + case e:java.lang.NoSuchMethodException => + logInfo("HBase Method not found: " + e) + case e:java.lang.ClassNotFoundException => + logDebug("HBase Class not found: " + e) + case e:java.lang.NoClassDefFoundError => + logDebug("HBase Class not found: " + e) + case e:Exception => + logError("Exception when obtaining HBase security token: " + e) + } + } + } + /** * Return whether the two file systems are the same. */ @@ -1052,8 +1217,7 @@ object Client extends Logging { if (isDriver) { conf.getBoolean("spark.driver.userClassPathFirst", false) } else { - conf.getBoolean("spark.executor.userClassPathFirst", - conf.getBoolean("spark.files.userClassPathFirst", false)) + conf.getBoolean("spark.executor.userClassPathFirst", false) } } @@ -1064,4 +1228,4 @@ object Client extends Logging { components.mkString(Path.SEPARATOR) } -} +} \ No newline at end of file From 008850ab9fdc4afc62d4cf9d0133e027920dcbd1 Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Mon, 4 May 2015 00:26:08 +0800 Subject: [PATCH 16/19] Merge remote-tracking branch 'remotes/apache/master' into SPARK-6869 --- .../org/apache/spark/deploy/yarn/Client.scala | 48 +++++++++---------- 1 file changed, 24 insertions(+), 24 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index de55ede19c51..b6075c87bae7 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -56,9 +56,9 @@ import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, Spar import org.apache.spark.util.Utils private[spark] class Client( - val args: ClientArguments, - val hadoopConf: Configuration, - val sparkConf: SparkConf) + val args: ClientArguments, + val hadoopConf: Configuration, + val sparkConf: SparkConf) extends Logging { import Client._ @@ -122,8 +122,8 @@ private[spark] class Client( * This uses the YarnClientApplication not available in the Yarn alpha API. */ def createApplicationSubmissionContext( - newApp: YarnClientApplication, - containerContext: ContainerLaunchContext): ApplicationSubmissionContext = { + newApp: YarnClientApplication, + containerContext: ContainerLaunchContext): ApplicationSubmissionContext = { val appContext = newApp.getApplicationSubmissionContext appContext.setApplicationName(args.appName) appContext.setQueue(args.amQueue) @@ -190,9 +190,9 @@ private[spark] class Client( * for preparing resources for launching the ApplicationMaster container. Exposed for testing. */ private[yarn] def copyFileToRemote( - destDir: Path, - srcPath: Path, - replication: Short): Path = { + destDir: Path, + srcPath: Path, + replication: Short): Path = { val destFs = destDir.getFileSystem(hadoopConf) val srcFs = srcPath.getFileSystem(hadoopConf) var destPath = srcPath @@ -462,7 +462,7 @@ private[spark] class Client( // Keep this for backwards compatibility but users should move to the config sys.env.get("SPARK_YARN_USER_ENV").foreach { userEnvs => - // Allow users to specify some environment variables. + // Allow users to specify some environment variables. YarnSparkHadoopUtil.setEnvFromInputString(env, userEnvs) // Pass SPARK_YARN_USER_ENV itself to the AM so it can use it to set up executor environments. env("SPARK_YARN_USER_ENV") = userEnvs @@ -522,7 +522,7 @@ private[spark] class Client( * This sets up the launch environment, java options, and the command for launching the AM. */ private def createContainerLaunchContext(newAppResponse: GetNewApplicationResponse) - : ContainerLaunchContext = { + : ContainerLaunchContext = { logInfo("Setting up container launch context for our AM") val appId = newAppResponse.getApplicationId val appStagingDir = getAppStagingDir(appId) @@ -661,14 +661,14 @@ private[spark] class Client( val amArgs = Seq(amClass) ++ userClass ++ userJar ++ primaryPyFile ++ pyFiles ++ primaryRFile ++ userArgs ++ Seq( - "--executor-memory", args.executorMemory.toString + "m", - "--executor-cores", args.executorCores.toString, - "--num-executors ", args.numExecutors.toString) + "--executor-memory", args.executorMemory.toString + "m", + "--executor-cores", args.executorCores.toString, + "--num-executors ", args.numExecutors.toString) // Command for the ApplicationMaster val commands = prefixEnv ++ Seq( - YarnSparkHadoopUtil.expandEnvironment(Environment.JAVA_HOME) + "/bin/java", "-server" - ) ++ + YarnSparkHadoopUtil.expandEnvironment(Environment.JAVA_HOME) + "/bin/java", "-server" + ) ++ javaOpts ++ amArgs ++ Seq( "1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout", @@ -728,9 +728,9 @@ private[spark] class Client( * @return A pair of the yarn application state and the final application state. */ def monitorApplication( - appId: ApplicationId, - returnOnRunning: Boolean = false, - logApplicationReport: Boolean = true): (YarnApplicationState, FinalApplicationStatus) = { + appId: ApplicationId, + returnOnRunning: Boolean = false, + logApplicationReport: Boolean = true): (YarnApplicationState, FinalApplicationStatus) = { val interval = sparkConf.getLong("spark.yarn.report.interval", 1000) var lastState: YarnApplicationState = null while (true) { @@ -1085,7 +1085,7 @@ object Client extends Logging { val hiveConf = hiveClass.getMethod("getConf").invoke(hive) val hiveConfClass = mirror.classLoader.loadClass("org.apache.hadoop.hive.conf.HiveConf") - val hiveConfGet = (param:String) => Option(hiveConfClass + val hiveConfGet = (param: String) => Option(hiveConfClass .getMethod("get", classOf[java.lang.String]) .invoke(hiveConf, param)) @@ -1107,7 +1107,7 @@ object Client extends Logging { val hive2Token = new Token[DelegationTokenIdentifier]() hive2Token.decodeFromUrlString(tokenStr) - credentials.addToken(new Text("hive.server2.delegation.token"),hive2Token) + credentials.addToken(new Text("hive.server2.delegation.token"), hive2Token) logDebug("Added hive.Server2.delegation.token to conf.") hiveClass.getMethod("closeCurrent").invoke(null) } else { @@ -1152,13 +1152,13 @@ object Client extends Logging { logInfo("Added HBase security token to credentials.") } catch { - case e:java.lang.NoSuchMethodException => + case e: java.lang.NoSuchMethodException => logInfo("HBase Method not found: " + e) - case e:java.lang.ClassNotFoundException => + case e: java.lang.ClassNotFoundException => logDebug("HBase Class not found: " + e) - case e:java.lang.NoClassDefFoundError => + case e: java.lang.NoClassDefFoundError => logDebug("HBase Class not found: " + e) - case e:Exception => + case e: Exception => logError("Exception when obtaining HBase security token: " + e) } } From 1c8f66421b2c532d69e7c9de36bbaab981bbb0ce Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Mon, 4 May 2015 00:29:58 +0800 Subject: [PATCH 17/19] Merge remote-tracking branch 'remotes/apache/master' into SPARK-6869 --- .../org/apache/spark/deploy/yarn/Client.scala | 26 +++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index b6075c87bae7..80cd894934df 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -132,7 +132,7 @@ private[spark] class Client( sparkConf.getOption("spark.yarn.maxAppAttempts").map(_.toInt) match { case Some(v) => appContext.setMaxAppAttempts(v) case None => logDebug("spark.yarn.maxAppAttempts is not set. " + - "Cluster's default value will be used.") + "Cluster's default value will be used.") } val capability = Records.newRecord(classOf[Resource]) capability.setMemory(args.amMemory + amMemoryOverhead) @@ -912,7 +912,7 @@ object Client extends Logging { * classpath specified through the Hadoop and Yarn configurations. */ private[yarn] def populateHadoopClasspath(conf: Configuration, env: HashMap[String, String]) - : Unit = { + : Unit = { val classPathElementsToAdd = getYarnAppClasspath(conf) ++ getMRAppClasspath(conf) for (c <- classPathElementsToAdd.flatten) { YarnSparkHadoopUtil.addPathToEnvironment(env, Environment.CLASSPATH.name, c.trim) @@ -989,12 +989,12 @@ object Client extends Logging { * @param args Client arguments (when starting the AM) or null (when starting executors). */ private[yarn] def populateClasspath( - args: ClientArguments, - conf: Configuration, - sparkConf: SparkConf, - env: HashMap[String, String], - isAM: Boolean, - extraClassPath: Option[String] = None): Unit = { + args: ClientArguments, + conf: Configuration, + sparkConf: SparkConf, + env: HashMap[String, String], + isAM: Boolean, + extraClassPath: Option[String] = None): Unit = { extraClassPath.foreach(addClasspathEntry(_, env)) addClasspathEntry( YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), env @@ -1033,8 +1033,8 @@ object Client extends Logging { } private def getUserClasspath( - mainJar: Option[String], - secondaryJars: Option[String]): Array[URI] = { + mainJar: Option[String], + secondaryJars: Option[String]): Array[URI] = { val mainUri = mainJar.orElse(Some(APP_JAR)).map(new URI(_)) val secondaryUris = secondaryJars.map(_.split(",")).toSeq.flatten.map(new URI(_)) (mainUri ++ secondaryUris).toArray @@ -1053,9 +1053,9 @@ object Client extends Logging { * @param env Map holding the environment variables. */ private def addFileToClasspath( - uri: URI, - fileName: String, - env: HashMap[String, String]): Unit = { + uri: URI, + fileName: String, + env: HashMap[String, String]): Unit = { if (uri != null && uri.getScheme == LOCAL_SCHEME) { addClasspathEntry(uri.getPath, env) } else if (fileName != null) { From c2ad0f9c3b44bf2a3dd7536901b8a6ed0456d086 Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Mon, 4 May 2015 00:32:07 +0800 Subject: [PATCH 18/19] Update Client.scala --- yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 80cd894934df..c46b194240bf 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -1227,5 +1227,5 @@ object Client extends Logging { def buildPath(components: String*): String = { components.mkString(Path.SEPARATOR) } - -} \ No newline at end of file + +} From 66ffa4395582db467be99cdafd64b23ea995a9d5 Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Mon, 4 May 2015 00:33:07 +0800 Subject: [PATCH 19/19] Update Client.scala --- yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index c46b194240bf..d21a7393478c 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -1227,5 +1227,5 @@ object Client extends Logging { def buildPath(components: String*): String = { components.mkString(Path.SEPARATOR) } - + }