Skip to content

Commit 24c5129

Browse files
andrewor14Andrew Or
authored andcommitted
[SPARK-3319] [SPARK-3338] Resolve Spark submit config paths
The bulk of this PR is comprised of tests. All changes in functionality are made in `SparkSubmit.scala` (~20 lines). **SPARK-3319.** There is currently a divergence in behavior when the user passes in additional jars through `--jars` and through setting `spark.jars` in the default properties file. The former will happily resolve the paths (e.g. convert `my.jar` to `file:/absolute/path/to/my.jar`), while the latter does not. We should resolve paths consistently in both cases. This also applies to the following pairs of command line arguments and Spark configs: - `--jars` ~ `spark.jars` - `--files` ~ `spark.files` / `spark.yarn.dist.files` - `--archives` ~ `spark.yarn.dist.archives` - `--py-files` ~ `spark.submit.pyFiles` **SPARK-3338.** This PR also fixes the following bug: if the user sets `spark.submit.pyFiles` in his/her properties file, it does not actually get picked up even if `--py-files` is not set. This is simply because the config is overridden by an empty string. Author: Andrew Or <[email protected]> Author: Andrew Or <[email protected]> Closes #2232 from andrewor14/resolve-config-paths and squashes the following commits: fff2869 [Andrew Or] Add spark.yarn.jar da3a1c1 [Andrew Or] Merge branch 'master' of github.com:apache/spark into resolve-config-paths f0fae64 [Andrew Or] Merge branch 'master' of github.com:apache/spark into resolve-config-paths 05e03d6 [Andrew Or] Add tests for resolving both command line and config paths 460117e [Andrew Or] Resolve config paths properly fe039d3 [Andrew Or] Beef up tests to test fixed-pointed-ness of Utils.resolveURI(s)
1 parent 9142c9b commit 24c5129

File tree

3 files changed

+158
-14
lines changed

3 files changed

+158
-14
lines changed

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -158,8 +158,9 @@ object SparkSubmit {
158158
args.files = mergeFileLists(args.files, args.primaryResource)
159159
}
160160
args.files = mergeFileLists(args.files, args.pyFiles)
161-
// Format python file paths properly before adding them to the PYTHONPATH
162-
sysProps("spark.submit.pyFiles") = PythonRunner.formatPaths(args.pyFiles).mkString(",")
161+
if (args.pyFiles != null) {
162+
sysProps("spark.submit.pyFiles") = args.pyFiles
163+
}
163164
}
164165

165166
// Special flag to avoid deprecation warnings at the client
@@ -284,6 +285,29 @@ object SparkSubmit {
284285
sysProps.getOrElseUpdate(k, v)
285286
}
286287

288+
// Resolve paths in certain spark properties
289+
val pathConfigs = Seq(
290+
"spark.jars",
291+
"spark.files",
292+
"spark.yarn.jar",
293+
"spark.yarn.dist.files",
294+
"spark.yarn.dist.archives")
295+
pathConfigs.foreach { config =>
296+
// Replace old URIs with resolved URIs, if they exist
297+
sysProps.get(config).foreach { oldValue =>
298+
sysProps(config) = Utils.resolveURIs(oldValue)
299+
}
300+
}
301+
302+
// Resolve and format python file paths properly before adding them to the PYTHONPATH.
303+
// The resolving part is redundant in the case of --py-files, but necessary if the user
304+
// explicitly sets `spark.submit.pyFiles` in his/her default properties file.
305+
sysProps.get("spark.submit.pyFiles").foreach { pyFiles =>
306+
val resolvedPyFiles = Utils.resolveURIs(pyFiles)
307+
val formattedPyFiles = PythonRunner.formatPaths(resolvedPyFiles).mkString(",")
308+
sysProps("spark.submit.pyFiles") = formattedPyFiles
309+
}
310+
287311
(childArgs, childClasspath, sysProps, childMainClass)
288312
}
289313

core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala

Lines changed: 105 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -292,7 +292,7 @@ class SparkSubmitSuite extends FunSuite with Matchers {
292292
runSparkSubmit(args)
293293
}
294294

295-
test("spark submit includes jars passed in through --jar") {
295+
test("includes jars passed in through --jars") {
296296
val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
297297
val jar1 = TestUtils.createJarWithClasses(Seq("SparkSubmitClassA"))
298298
val jar2 = TestUtils.createJarWithClasses(Seq("SparkSubmitClassB"))
@@ -306,6 +306,110 @@ class SparkSubmitSuite extends FunSuite with Matchers {
306306
runSparkSubmit(args)
307307
}
308308

309+
test("resolves command line argument paths correctly") {
310+
val jars = "/jar1,/jar2" // --jars
311+
val files = "hdfs:/file1,file2" // --files
312+
val archives = "file:/archive1,archive2" // --archives
313+
val pyFiles = "py-file1,py-file2" // --py-files
314+
315+
// Test jars and files
316+
val clArgs = Seq(
317+
"--master", "local",
318+
"--class", "org.SomeClass",
319+
"--jars", jars,
320+
"--files", files,
321+
"thejar.jar")
322+
val appArgs = new SparkSubmitArguments(clArgs)
323+
val sysProps = SparkSubmit.createLaunchEnv(appArgs)._3
324+
appArgs.jars should be (Utils.resolveURIs(jars))
325+
appArgs.files should be (Utils.resolveURIs(files))
326+
sysProps("spark.jars") should be (Utils.resolveURIs(jars + ",thejar.jar"))
327+
sysProps("spark.files") should be (Utils.resolveURIs(files))
328+
329+
// Test files and archives (Yarn)
330+
val clArgs2 = Seq(
331+
"--master", "yarn-client",
332+
"--class", "org.SomeClass",
333+
"--files", files,
334+
"--archives", archives,
335+
"thejar.jar"
336+
)
337+
val appArgs2 = new SparkSubmitArguments(clArgs2)
338+
val sysProps2 = SparkSubmit.createLaunchEnv(appArgs2)._3
339+
appArgs2.files should be (Utils.resolveURIs(files))
340+
appArgs2.archives should be (Utils.resolveURIs(archives))
341+
sysProps2("spark.yarn.dist.files") should be (Utils.resolveURIs(files))
342+
sysProps2("spark.yarn.dist.archives") should be (Utils.resolveURIs(archives))
343+
344+
// Test python files
345+
val clArgs3 = Seq(
346+
"--master", "local",
347+
"--py-files", pyFiles,
348+
"mister.py"
349+
)
350+
val appArgs3 = new SparkSubmitArguments(clArgs3)
351+
val sysProps3 = SparkSubmit.createLaunchEnv(appArgs3)._3
352+
appArgs3.pyFiles should be (Utils.resolveURIs(pyFiles))
353+
sysProps3("spark.submit.pyFiles") should be (
354+
PythonRunner.formatPaths(Utils.resolveURIs(pyFiles)).mkString(","))
355+
}
356+
357+
test("resolves config paths correctly") {
358+
val jars = "/jar1,/jar2" // spark.jars
359+
val files = "hdfs:/file1,file2" // spark.files / spark.yarn.dist.files
360+
val archives = "file:/archive1,archive2" // spark.yarn.dist.archives
361+
val pyFiles = "py-file1,py-file2" // spark.submit.pyFiles
362+
363+
// Test jars and files
364+
val f1 = File.createTempFile("test-submit-jars-files", "")
365+
val writer1 = new PrintWriter(f1)
366+
writer1.println("spark.jars " + jars)
367+
writer1.println("spark.files " + files)
368+
writer1.close()
369+
val clArgs = Seq(
370+
"--master", "local",
371+
"--class", "org.SomeClass",
372+
"--properties-file", f1.getPath,
373+
"thejar.jar"
374+
)
375+
val appArgs = new SparkSubmitArguments(clArgs)
376+
val sysProps = SparkSubmit.createLaunchEnv(appArgs)._3
377+
sysProps("spark.jars") should be(Utils.resolveURIs(jars + ",thejar.jar"))
378+
sysProps("spark.files") should be(Utils.resolveURIs(files))
379+
380+
// Test files and archives (Yarn)
381+
val f2 = File.createTempFile("test-submit-files-archives", "")
382+
val writer2 = new PrintWriter(f2)
383+
writer2.println("spark.yarn.dist.files " + files)
384+
writer2.println("spark.yarn.dist.archives " + archives)
385+
writer2.close()
386+
val clArgs2 = Seq(
387+
"--master", "yarn-client",
388+
"--class", "org.SomeClass",
389+
"--properties-file", f2.getPath,
390+
"thejar.jar"
391+
)
392+
val appArgs2 = new SparkSubmitArguments(clArgs2)
393+
val sysProps2 = SparkSubmit.createLaunchEnv(appArgs2)._3
394+
sysProps2("spark.yarn.dist.files") should be(Utils.resolveURIs(files))
395+
sysProps2("spark.yarn.dist.archives") should be(Utils.resolveURIs(archives))
396+
397+
// Test python files
398+
val f3 = File.createTempFile("test-submit-python-files", "")
399+
val writer3 = new PrintWriter(f3)
400+
writer3.println("spark.submit.pyFiles " + pyFiles)
401+
writer3.close()
402+
val clArgs3 = Seq(
403+
"--master", "local",
404+
"--properties-file", f3.getPath,
405+
"mister.py"
406+
)
407+
val appArgs3 = new SparkSubmitArguments(clArgs3)
408+
val sysProps3 = SparkSubmit.createLaunchEnv(appArgs3)._3
409+
sysProps3("spark.submit.pyFiles") should be(
410+
PythonRunner.formatPaths(Utils.resolveURIs(pyFiles)).mkString(","))
411+
}
412+
309413
test("SPARK_CONF_DIR overrides spark-defaults.conf") {
310414
forConfDir(Map("spark.executor.memory" -> "2.3g")) { path =>
311415
val unusedJar = TestUtils.createJarWithClasses(Seq.empty)

core/src/test/scala/org/apache/spark/util/UtilsSuite.scala

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -217,9 +217,14 @@ class UtilsSuite extends FunSuite {
217217

218218
test("resolveURI") {
219219
def assertResolves(before: String, after: String, testWindows: Boolean = false): Unit = {
220-
assume(before.split(",").length == 1)
221-
assert(Utils.resolveURI(before, testWindows) === new URI(after))
222-
assert(Utils.resolveURI(after, testWindows) === new URI(after))
220+
// This should test only single paths
221+
assume(before.split(",").length === 1)
222+
// Repeated invocations of resolveURI should yield the same result
223+
def resolve(uri: String): String = Utils.resolveURI(uri, testWindows).toString
224+
assert(resolve(after) === after)
225+
assert(resolve(resolve(after)) === after)
226+
assert(resolve(resolve(resolve(after))) === after)
227+
// Also test resolveURIs with single paths
223228
assert(new URI(Utils.resolveURIs(before, testWindows)) === new URI(after))
224229
assert(new URI(Utils.resolveURIs(after, testWindows)) === new URI(after))
225230
}
@@ -235,16 +240,27 @@ class UtilsSuite extends FunSuite {
235240
assertResolves("file:/C:/file.txt#alias.txt", "file:/C:/file.txt#alias.txt", testWindows = true)
236241
intercept[IllegalArgumentException] { Utils.resolveURI("file:foo") }
237242
intercept[IllegalArgumentException] { Utils.resolveURI("file:foo:baby") }
243+
}
238244

239-
// Test resolving comma-delimited paths
240-
assert(Utils.resolveURIs("jar1,jar2") === s"file:$cwd/jar1,file:$cwd/jar2")
241-
assert(Utils.resolveURIs("file:/jar1,file:/jar2") === "file:/jar1,file:/jar2")
242-
assert(Utils.resolveURIs("hdfs:/jar1,file:/jar2,jar3") ===
243-
s"hdfs:/jar1,file:/jar2,file:$cwd/jar3")
244-
assert(Utils.resolveURIs("hdfs:/jar1,file:/jar2,jar3,jar4#jar5") ===
245+
test("resolveURIs with multiple paths") {
246+
def assertResolves(before: String, after: String, testWindows: Boolean = false): Unit = {
247+
assume(before.split(",").length > 1)
248+
assert(Utils.resolveURIs(before, testWindows) === after)
249+
assert(Utils.resolveURIs(after, testWindows) === after)
250+
// Repeated invocations of resolveURIs should yield the same result
251+
def resolve(uri: String): String = Utils.resolveURIs(uri, testWindows)
252+
assert(resolve(after) === after)
253+
assert(resolve(resolve(after)) === after)
254+
assert(resolve(resolve(resolve(after))) === after)
255+
}
256+
val cwd = System.getProperty("user.dir")
257+
assertResolves("jar1,jar2", s"file:$cwd/jar1,file:$cwd/jar2")
258+
assertResolves("file:/jar1,file:/jar2", "file:/jar1,file:/jar2")
259+
assertResolves("hdfs:/jar1,file:/jar2,jar3", s"hdfs:/jar1,file:/jar2,file:$cwd/jar3")
260+
assertResolves("hdfs:/jar1,file:/jar2,jar3,jar4#jar5",
245261
s"hdfs:/jar1,file:/jar2,file:$cwd/jar3,file:$cwd/jar4#jar5")
246-
assert(Utils.resolveURIs("hdfs:/jar1,file:/jar2,jar3,C:\\pi.py#py.pi", testWindows = true) ===
247-
s"hdfs:/jar1,file:/jar2,file:$cwd/jar3,file:/C:/pi.py#py.pi")
262+
assertResolves("hdfs:/jar1,file:/jar2,jar3,C:\\pi.py#py.pi",
263+
s"hdfs:/jar1,file:/jar2,file:$cwd/jar3,file:/C:/pi.py#py.pi", testWindows = true)
248264
}
249265

250266
test("nonLocalPaths") {

0 commit comments

Comments
 (0)