Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 26 additions & 2 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,9 @@ object SparkSubmit {
args.files = mergeFileLists(args.files, args.primaryResource)
}
args.files = mergeFileLists(args.files, args.pyFiles)
// Format python file paths properly before adding them to the PYTHONPATH
sysProps("spark.submit.pyFiles") = PythonRunner.formatPaths(args.pyFiles).mkString(",")
if (args.pyFiles != null) {
sysProps("spark.submit.pyFiles") = args.pyFiles
}
}

// Special flag to avoid deprecation warnings at the client
Expand Down Expand Up @@ -284,6 +285,29 @@ object SparkSubmit {
sysProps.getOrElseUpdate(k, v)
}

// Resolve paths in certain spark properties
val pathConfigs = Seq(
"spark.jars",
"spark.files",
"spark.yarn.jar",
"spark.yarn.dist.files",
"spark.yarn.dist.archives")
pathConfigs.foreach { config =>
// Replace old URIs with resolved URIs, if they exist
sysProps.get(config).foreach { oldValue =>
sysProps(config) = Utils.resolveURIs(oldValue)
}
}

// Resolve and format python file paths properly before adding them to the PYTHONPATH.
// The resolving part is redundant in the case of --py-files, but necessary if the user
// explicitly sets `spark.submit.pyFiles` in his/her default properties file.
sysProps.get("spark.submit.pyFiles").foreach { pyFiles =>
val resolvedPyFiles = Utils.resolveURIs(pyFiles)
val formattedPyFiles = PythonRunner.formatPaths(resolvedPyFiles).mkString(",")
sysProps("spark.submit.pyFiles") = formattedPyFiles
}

(childArgs, childClasspath, sysProps, childMainClass)
}

Expand Down
106 changes: 105 additions & 1 deletion core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ class SparkSubmitSuite extends FunSuite with Matchers {
runSparkSubmit(args)
}

test("spark submit includes jars passed in through --jar") {
test("includes jars passed in through --jars") {
val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
val jar1 = TestUtils.createJarWithClasses(Seq("SparkSubmitClassA"))
val jar2 = TestUtils.createJarWithClasses(Seq("SparkSubmitClassB"))
Expand All @@ -306,6 +306,110 @@ class SparkSubmitSuite extends FunSuite with Matchers {
runSparkSubmit(args)
}

test("resolves command line argument paths correctly") {
val jars = "/jar1,/jar2" // --jars
val files = "hdfs:/file1,file2" // --files
val archives = "file:/archive1,archive2" // --archives
val pyFiles = "py-file1,py-file2" // --py-files

// Test jars and files
val clArgs = Seq(
"--master", "local",
"--class", "org.SomeClass",
"--jars", jars,
"--files", files,
"thejar.jar")
val appArgs = new SparkSubmitArguments(clArgs)
val sysProps = SparkSubmit.createLaunchEnv(appArgs)._3
appArgs.jars should be (Utils.resolveURIs(jars))
appArgs.files should be (Utils.resolveURIs(files))
sysProps("spark.jars") should be (Utils.resolveURIs(jars + ",thejar.jar"))
sysProps("spark.files") should be (Utils.resolveURIs(files))

// Test files and archives (Yarn)
val clArgs2 = Seq(
"--master", "yarn-client",
"--class", "org.SomeClass",
"--files", files,
"--archives", archives,
"thejar.jar"
)
val appArgs2 = new SparkSubmitArguments(clArgs2)
val sysProps2 = SparkSubmit.createLaunchEnv(appArgs2)._3
appArgs2.files should be (Utils.resolveURIs(files))
appArgs2.archives should be (Utils.resolveURIs(archives))
sysProps2("spark.yarn.dist.files") should be (Utils.resolveURIs(files))
sysProps2("spark.yarn.dist.archives") should be (Utils.resolveURIs(archives))

// Test python files
val clArgs3 = Seq(
"--master", "local",
"--py-files", pyFiles,
"mister.py"
)
val appArgs3 = new SparkSubmitArguments(clArgs3)
val sysProps3 = SparkSubmit.createLaunchEnv(appArgs3)._3
appArgs3.pyFiles should be (Utils.resolveURIs(pyFiles))
sysProps3("spark.submit.pyFiles") should be (
PythonRunner.formatPaths(Utils.resolveURIs(pyFiles)).mkString(","))
}

test("resolves config paths correctly") {
val jars = "/jar1,/jar2" // spark.jars
val files = "hdfs:/file1,file2" // spark.files / spark.yarn.dist.files
val archives = "file:/archive1,archive2" // spark.yarn.dist.archives
val pyFiles = "py-file1,py-file2" // spark.submit.pyFiles

// Test jars and files
val f1 = File.createTempFile("test-submit-jars-files", "")
val writer1 = new PrintWriter(f1)
writer1.println("spark.jars " + jars)
writer1.println("spark.files " + files)
writer1.close()
val clArgs = Seq(
"--master", "local",
"--class", "org.SomeClass",
"--properties-file", f1.getPath,
"thejar.jar"
)
val appArgs = new SparkSubmitArguments(clArgs)
val sysProps = SparkSubmit.createLaunchEnv(appArgs)._3
sysProps("spark.jars") should be(Utils.resolveURIs(jars + ",thejar.jar"))
sysProps("spark.files") should be(Utils.resolveURIs(files))

// Test files and archives (Yarn)
val f2 = File.createTempFile("test-submit-files-archives", "")
val writer2 = new PrintWriter(f2)
writer2.println("spark.yarn.dist.files " + files)
writer2.println("spark.yarn.dist.archives " + archives)
writer2.close()
val clArgs2 = Seq(
"--master", "yarn-client",
"--class", "org.SomeClass",
"--properties-file", f2.getPath,
"thejar.jar"
)
val appArgs2 = new SparkSubmitArguments(clArgs2)
val sysProps2 = SparkSubmit.createLaunchEnv(appArgs2)._3
sysProps2("spark.yarn.dist.files") should be(Utils.resolveURIs(files))
sysProps2("spark.yarn.dist.archives") should be(Utils.resolveURIs(archives))

// Test python files
val f3 = File.createTempFile("test-submit-python-files", "")
val writer3 = new PrintWriter(f3)
writer3.println("spark.submit.pyFiles " + pyFiles)
writer3.close()
val clArgs3 = Seq(
"--master", "local",
"--properties-file", f3.getPath,
"mister.py"
)
val appArgs3 = new SparkSubmitArguments(clArgs3)
val sysProps3 = SparkSubmit.createLaunchEnv(appArgs3)._3
sysProps3("spark.submit.pyFiles") should be(
PythonRunner.formatPaths(Utils.resolveURIs(pyFiles)).mkString(","))
}

test("SPARK_CONF_DIR overrides spark-defaults.conf") {
forConfDir(Map("spark.executor.memory" -> "2.3g")) { path =>
val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
Expand Down
38 changes: 27 additions & 11 deletions core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -217,9 +217,14 @@ class UtilsSuite extends FunSuite {

test("resolveURI") {
def assertResolves(before: String, after: String, testWindows: Boolean = false): Unit = {
assume(before.split(",").length == 1)
assert(Utils.resolveURI(before, testWindows) === new URI(after))
assert(Utils.resolveURI(after, testWindows) === new URI(after))
// This should test only single paths
assume(before.split(",").length === 1)
// Repeated invocations of resolveURI should yield the same result
def resolve(uri: String): String = Utils.resolveURI(uri, testWindows).toString
assert(resolve(after) === after)
assert(resolve(resolve(after)) === after)
assert(resolve(resolve(resolve(after))) === after)
// Also test resolveURIs with single paths
assert(new URI(Utils.resolveURIs(before, testWindows)) === new URI(after))
assert(new URI(Utils.resolveURIs(after, testWindows)) === new URI(after))
}
Expand All @@ -235,16 +240,27 @@ class UtilsSuite extends FunSuite {
assertResolves("file:/C:/file.txt#alias.txt", "file:/C:/file.txt#alias.txt", testWindows = true)
intercept[IllegalArgumentException] { Utils.resolveURI("file:foo") }
intercept[IllegalArgumentException] { Utils.resolveURI("file:foo:baby") }
}

// Test resolving comma-delimited paths
assert(Utils.resolveURIs("jar1,jar2") === s"file:$cwd/jar1,file:$cwd/jar2")
assert(Utils.resolveURIs("file:/jar1,file:/jar2") === "file:/jar1,file:/jar2")
assert(Utils.resolveURIs("hdfs:/jar1,file:/jar2,jar3") ===
s"hdfs:/jar1,file:/jar2,file:$cwd/jar3")
assert(Utils.resolveURIs("hdfs:/jar1,file:/jar2,jar3,jar4#jar5") ===
test("resolveURIs with multiple paths") {
def assertResolves(before: String, after: String, testWindows: Boolean = false): Unit = {
assume(before.split(",").length > 1)
assert(Utils.resolveURIs(before, testWindows) === after)
assert(Utils.resolveURIs(after, testWindows) === after)
// Repeated invocations of resolveURIs should yield the same result
def resolve(uri: String): String = Utils.resolveURIs(uri, testWindows)
assert(resolve(after) === after)
assert(resolve(resolve(after)) === after)
assert(resolve(resolve(resolve(after))) === after)
}
val cwd = System.getProperty("user.dir")
assertResolves("jar1,jar2", s"file:$cwd/jar1,file:$cwd/jar2")
assertResolves("file:/jar1,file:/jar2", "file:/jar1,file:/jar2")
assertResolves("hdfs:/jar1,file:/jar2,jar3", s"hdfs:/jar1,file:/jar2,file:$cwd/jar3")
assertResolves("hdfs:/jar1,file:/jar2,jar3,jar4#jar5",
s"hdfs:/jar1,file:/jar2,file:$cwd/jar3,file:$cwd/jar4#jar5")
assert(Utils.resolveURIs("hdfs:/jar1,file:/jar2,jar3,C:\\pi.py#py.pi", testWindows = true) ===
s"hdfs:/jar1,file:/jar2,file:$cwd/jar3,file:/C:/pi.py#py.pi")
assertResolves("hdfs:/jar1,file:/jar2,jar3,C:\\pi.py#py.pi",
s"hdfs:/jar1,file:/jar2,file:$cwd/jar3,file:/C:/pi.py#py.pi", testWindows = true)
}

test("nonLocalPaths") {
Expand Down