From fe039d35855b4af1635ff7046adccbe639884ed1 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Fri, 29 Aug 2014 18:58:07 -0700 Subject: [PATCH 1/4] Beef up tests to test fixed-pointed-ness of Utils.resolveURI(s) --- .../org/apache/spark/util/UtilsSuite.scala | 38 +++++++++++++------ 1 file changed, 27 insertions(+), 11 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index 70d423ba8a04d..856e85f435586 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -204,9 +204,14 @@ class UtilsSuite extends FunSuite { test("resolveURI") { def assertResolves(before: String, after: String, testWindows: Boolean = false): Unit = { - assume(before.split(",").length == 1) - assert(Utils.resolveURI(before, testWindows) === new URI(after)) - assert(Utils.resolveURI(after, testWindows) === new URI(after)) + // This should test only single paths + assume(before.split(",").length === 1) + // Repeated invocations of resolveURI should yield the same result + def resolve(uri: String): String = Utils.resolveURI(uri, testWindows).toString + assert(resolve(after) === after) + assert(resolve(resolve(after)) === after) + assert(resolve(resolve(resolve(after))) === after) + // Also test resolveURIs with single paths assert(new URI(Utils.resolveURIs(before, testWindows)) === new URI(after)) assert(new URI(Utils.resolveURIs(after, testWindows)) === new URI(after)) } @@ -222,16 +227,27 @@ class UtilsSuite extends FunSuite { assertResolves("file:/C:/file.txt#alias.txt", "file:/C:/file.txt#alias.txt", testWindows = true) intercept[IllegalArgumentException] { Utils.resolveURI("file:foo") } intercept[IllegalArgumentException] { Utils.resolveURI("file:foo:baby") } + } - // Test resolving comma-delimited paths - assert(Utils.resolveURIs("jar1,jar2") === s"file:$cwd/jar1,file:$cwd/jar2") - assert(Utils.resolveURIs("file:/jar1,file:/jar2") === "file:/jar1,file:/jar2") - assert(Utils.resolveURIs("hdfs:/jar1,file:/jar2,jar3") === - s"hdfs:/jar1,file:/jar2,file:$cwd/jar3") - assert(Utils.resolveURIs("hdfs:/jar1,file:/jar2,jar3,jar4#jar5") === + test("resolveURIs with multiple paths") { + def assertResolves(before: String, after: String, testWindows: Boolean = false): Unit = { + assume(before.split(",").length > 1) + assert(Utils.resolveURIs(before, testWindows) === after) + assert(Utils.resolveURIs(after, testWindows) === after) + // Repeated invocations of resolveURIs should yield the same result + def resolve(uri: String): String = Utils.resolveURIs(uri, testWindows) + assert(resolve(after) === after) + assert(resolve(resolve(after)) === after) + assert(resolve(resolve(resolve(after))) === after) + } + val cwd = System.getProperty("user.dir") + assertResolves("jar1,jar2", s"file:$cwd/jar1,file:$cwd/jar2") + assertResolves("file:/jar1,file:/jar2", "file:/jar1,file:/jar2") + assertResolves("hdfs:/jar1,file:/jar2,jar3", s"hdfs:/jar1,file:/jar2,file:$cwd/jar3") + assertResolves("hdfs:/jar1,file:/jar2,jar3,jar4#jar5", s"hdfs:/jar1,file:/jar2,file:$cwd/jar3,file:$cwd/jar4#jar5") - assert(Utils.resolveURIs("hdfs:/jar1,file:/jar2,jar3,C:\\pi.py#py.pi", testWindows = true) === - s"hdfs:/jar1,file:/jar2,file:$cwd/jar3,file:/C:/pi.py#py.pi") + assertResolves("hdfs:/jar1,file:/jar2,jar3,C:\\pi.py#py.pi", + s"hdfs:/jar1,file:/jar2,file:$cwd/jar3,file:/C:/pi.py#py.pi", testWindows = true) } test("nonLocalPaths") { From 460117e9744b173f4209f28a6c7dde2fcc519efd Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Mon, 1 Sep 2014 15:58:16 -0700 Subject: [PATCH 2/4] Resolve config paths properly --- .../org/apache/spark/deploy/SparkSubmit.scala | 27 +++++++++++++++++-- 1 file changed, 25 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 0fdb5ae3c2e40..f56977924214e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -158,8 +158,9 @@ object SparkSubmit { args.files = mergeFileLists(args.files, args.primaryResource) } args.files = mergeFileLists(args.files, args.pyFiles) - // Format python file paths properly before adding them to the PYTHONPATH - sysProps("spark.submit.pyFiles") = PythonRunner.formatPaths(args.pyFiles).mkString(",") + if (args.pyFiles != null) { + sysProps("spark.submit.pyFiles") = args.pyFiles + } } // Special flag to avoid deprecation warnings at the client @@ -283,6 +284,28 @@ object SparkSubmit { sysProps.getOrElseUpdate(k, v) } + // Resolve paths in certain spark properties + val pathConfigs = Seq( + "spark.jars", + "spark.files", + "spark.yarn.dist.files", + "spark.yarn.dist.archives") + pathConfigs.foreach { config => + // Replace old URIs with resolved URIs, if they exist + sysProps.get(config).foreach { oldValue => + sysProps(config) = Utils.resolveURIs(oldValue) + } + } + + // Resolve and format python file paths properly before adding them to the PYTHONPATH. + // The resolving part is redundant in the case of --py-files, but necessary if the user + // explicitly sets `spark.submit.pyFiles` in his/her default properties file. + sysProps.get("spark.submit.pyFiles").foreach { pyFiles => + val resolvedPyFiles = Utils.resolveURIs(pyFiles) + val formattedPyFiles = PythonRunner.formatPaths(resolvedPyFiles).mkString(",") + sysProps("spark.submit.pyFiles") = formattedPyFiles + } + (childArgs, childClasspath, sysProps, childMainClass) } From 05e03d649980d5025a1ba53c4c9c27b51e5224cc Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Mon, 1 Sep 2014 15:58:51 -0700 Subject: [PATCH 3/4] Add tests for resolving both command line and config paths --- .../spark/deploy/SparkSubmitSuite.scala | 108 +++++++++++++++++- 1 file changed, 106 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index 7e1ef80c84561..499b1185f2865 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.deploy -import java.io.{File, OutputStream, PrintStream} +import java.io.{File, OutputStream, PrintWriter, PrintStream} import scala.collection.mutable.ArrayBuffer @@ -291,7 +291,7 @@ class SparkSubmitSuite extends FunSuite with Matchers { runSparkSubmit(args) } - test("spark submit includes jars passed in through --jar") { + test("includes jars passed in through --jars") { val unusedJar = TestUtils.createJarWithClasses(Seq.empty) val jar1 = TestUtils.createJarWithClasses(Seq("SparkSubmitClassA")) val jar2 = TestUtils.createJarWithClasses(Seq("SparkSubmitClassB")) @@ -305,6 +305,110 @@ class SparkSubmitSuite extends FunSuite with Matchers { runSparkSubmit(args) } + test("resolves command line argument paths correctly") { + val jars = "/jar1,/jar2" // --jars + val files = "hdfs:/file1,file2" // --files + val archives = "file:/archive1,archive2" // --archives + val pyFiles = "py-file1,py-file2" // --py-files + + // Test jars and files + val clArgs = Seq( + "--master", "local", + "--class", "org.SomeClass", + "--jars", jars, + "--files", files, + "thejar.jar") + val appArgs = new SparkSubmitArguments(clArgs) + val sysProps = SparkSubmit.createLaunchEnv(appArgs)._3 + appArgs.jars should be (Utils.resolveURIs(jars)) + appArgs.files should be (Utils.resolveURIs(files)) + sysProps("spark.jars") should be (Utils.resolveURIs(jars + ",thejar.jar")) + sysProps("spark.files") should be (Utils.resolveURIs(files)) + + // Test files and archives (Yarn) + val clArgs2 = Seq( + "--master", "yarn-client", + "--class", "org.SomeClass", + "--files", files, + "--archives", archives, + "thejar.jar" + ) + val appArgs2 = new SparkSubmitArguments(clArgs2) + val sysProps2 = SparkSubmit.createLaunchEnv(appArgs2)._3 + appArgs2.files should be (Utils.resolveURIs(files)) + appArgs2.archives should be (Utils.resolveURIs(archives)) + sysProps2("spark.yarn.dist.files") should be (Utils.resolveURIs(files)) + sysProps2("spark.yarn.dist.archives") should be (Utils.resolveURIs(archives)) + + // Test python files + val clArgs3 = Seq( + "--master", "local", + "--py-files", pyFiles, + "mister.py" + ) + val appArgs3 = new SparkSubmitArguments(clArgs3) + val sysProps3 = SparkSubmit.createLaunchEnv(appArgs3)._3 + appArgs3.pyFiles should be (Utils.resolveURIs(pyFiles)) + sysProps3("spark.submit.pyFiles") should be ( + PythonRunner.formatPaths(Utils.resolveURIs(pyFiles)).mkString(",")) + } + + test("resolves config paths correctly") { + val jars = "/jar1,/jar2" // spark.jars + val files = "hdfs:/file1,file2" // spark.files / spark.yarn.dist.files + val archives = "file:/archive1,archive2" // spark.yarn.dist.archives + val pyFiles = "py-file1,py-file2" // spark.submit.pyFiles + + // Test jars and files + val f1 = File.createTempFile("test-submit-jars-files", "") + val writer1 = new PrintWriter(f1) + writer1.println("spark.jars " + jars) + writer1.println("spark.files " + files) + writer1.close() + val clArgs = Seq( + "--master", "local", + "--class", "org.SomeClass", + "--properties-file", f1.getPath, + "thejar.jar" + ) + val appArgs = new SparkSubmitArguments(clArgs) + val sysProps = SparkSubmit.createLaunchEnv(appArgs)._3 + sysProps("spark.jars") should be (Utils.resolveURIs(jars + ",thejar.jar")) + sysProps("spark.files") should be (Utils.resolveURIs(files)) + + // Test files and archives (Yarn) + val f2 = File.createTempFile("test-submit-files-archives", "") + val writer2 = new PrintWriter(f2) + writer2.println("spark.yarn.dist.files " + files) + writer2.println("spark.yarn.dist.archives " + archives) + writer2.close() + val clArgs2 = Seq( + "--master", "yarn-client", + "--class", "org.SomeClass", + "--properties-file", f2.getPath, + "thejar.jar" + ) + val appArgs2 = new SparkSubmitArguments(clArgs2) + val sysProps2 = SparkSubmit.createLaunchEnv(appArgs2)._3 + sysProps2("spark.yarn.dist.files") should be (Utils.resolveURIs(files)) + sysProps2("spark.yarn.dist.archives") should be (Utils.resolveURIs(archives)) + + // Test python files + val f3 = File.createTempFile("test-submit-python-files", "") + val writer3 = new PrintWriter(f3) + writer3.println("spark.submit.pyFiles " + pyFiles) + writer3.close() + val clArgs3 = Seq( + "--master", "local", + "--properties-file", f3.getPath, + "mister.py" + ) + val appArgs3 = new SparkSubmitArguments(clArgs3) + val sysProps3 = SparkSubmit.createLaunchEnv(appArgs3)._3 + sysProps3("spark.submit.pyFiles") should be ( + PythonRunner.formatPaths(Utils.resolveURIs(pyFiles)).mkString(",")) + } + // NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly. def runSparkSubmit(args: Seq[String]): String = { val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!")) From fff2869e9536f02ce3ddc81186b5594ca16dad5d Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Thu, 30 Oct 2014 12:34:24 -0700 Subject: [PATCH 4/4] Add spark.yarn.jar --- core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index ca7a6766d97f9..0379adeb07b83 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -289,6 +289,7 @@ object SparkSubmit { val pathConfigs = Seq( "spark.jars", "spark.files", + "spark.yarn.jar", "spark.yarn.dist.files", "spark.yarn.dist.archives") pathConfigs.foreach { config =>