Skip to content

Commit b142157

Browse files
HyukjinKwonMarcelo Vanzin
authored andcommitted
[SPARK-24384][PYTHON][SPARK SUBMIT] Add .py files correctly into PythonRunner in submit with client mode in spark-submit
## What changes were proposed in this pull request? In client side before context initialization specifically, .py file doesn't work in client side before context initialization when the application is a Python file. See below: ``` $ cat /home/spark/tmp.py def testtest(): return 1 ``` This works: ``` $ cat app.py import pyspark pyspark.sql.SparkSession.builder.getOrCreate() import tmp print("************************%s" % tmp.testtest()) $ ./bin/spark-submit --master yarn --deploy-mode client --py-files /home/spark/tmp.py app.py ... ************************1 ``` but this doesn't: ``` $ cat app.py import pyspark import tmp pyspark.sql.SparkSession.builder.getOrCreate() print("************************%s" % tmp.testtest()) $ ./bin/spark-submit --master yarn --deploy-mode client --py-files /home/spark/tmp.py app.py Traceback (most recent call last): File "/home/spark/spark/app.py", line 2, in <module> import tmp ImportError: No module named tmp ``` ### How did it happen? In client mode specifically, the paths are being added into PythonRunner as are: https://github.com/apache/spark/blob/628c7b517969c4a7ccb26ea67ab3dd61266073ca/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L430 https://github.com/apache/spark/blob/628c7b517969c4a7ccb26ea67ab3dd61266073ca/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala#L49-L88 The problem here is, .py file shouldn't be added as are since `PYTHONPATH` expects a directory or an archive like zip or egg. ### How does this PR fix? We shouldn't simply just add its parent directory because other files in the parent directory could also be added into the `PYTHONPATH` in client mode before context initialization. Therefore, we copy .py files into a temp directory for .py files and add it to `PYTHONPATH`. ## How was this patch tested? Unit tests are added and manually tested in both standalond and yarn client modes with submit. Author: hyukjinkwon <[email protected]> Closes apache#21426 from HyukjinKwon/SPARK-24384.
1 parent 1e46f92 commit b142157

File tree

2 files changed

+33
-11
lines changed

2 files changed

+33
-11
lines changed

core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.deploy
1919

2020
import java.io.File
2121
import java.net.{InetAddress, URI}
22+
import java.nio.file.Files
2223

2324
import scala.collection.JavaConverters._
2425
import scala.collection.mutable.ArrayBuffer
@@ -48,7 +49,7 @@ object PythonRunner {
4849

4950
// Format python file paths before adding them to the PYTHONPATH
5051
val formattedPythonFile = formatPath(pythonFile)
51-
val formattedPyFiles = formatPaths(pyFiles)
52+
val formattedPyFiles = resolvePyFiles(formatPaths(pyFiles))
5253

5354
// Launch a Py4J gateway server for the process to connect to; this will let it see our
5455
// Java system properties and such
@@ -153,4 +154,30 @@ object PythonRunner {
153154
.map { p => formatPath(p, testWindows) }
154155
}
155156

157+
/**
158+
* Resolves the ".py" files. ".py" file should not be added as is because PYTHONPATH does
159+
* not expect a file. This method creates a temporary directory and puts the ".py" files
160+
* if exist in the given paths.
161+
*/
162+
private def resolvePyFiles(pyFiles: Array[String]): Array[String] = {
163+
lazy val dest = Utils.createTempDir(namePrefix = "localPyFiles")
164+
pyFiles.flatMap { pyFile =>
165+
// In case of client with submit, the python paths should be set before context
166+
// initialization because the context initialization can be done later.
167+
// We will copy the local ".py" files because ".py" file shouldn't be added
168+
// alone but its parent directory in PYTHONPATH. See SPARK-24384.
169+
if (pyFile.endsWith(".py")) {
170+
val source = new File(pyFile)
171+
if (source.exists() && source.isFile && source.canRead) {
172+
Files.copy(source.toPath, new File(dest, source.getName).toPath)
173+
Some(dest.getAbsolutePath)
174+
} else {
175+
// Don't have to add it if it doesn't exist or isn't readable.
176+
None
177+
}
178+
} else {
179+
Some(pyFile)
180+
}
181+
}.distinct
182+
}
156183
}

resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -271,16 +271,11 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
271271
"PYSPARK_ARCHIVES_PATH" -> pythonPath.map("local:" + _).mkString(File.pathSeparator),
272272
"PYTHONPATH" -> pythonPath.mkString(File.pathSeparator)) ++ extraEnv
273273

274-
val moduleDir =
275-
if (clientMode) {
276-
// In client-mode, .py files added with --py-files are not visible in the driver.
277-
// This is something that the launcher library would have to handle.
278-
tempDir
279-
} else {
280-
val subdir = new File(tempDir, "pyModules")
281-
subdir.mkdir()
282-
subdir
283-
}
274+
val moduleDir = {
275+
val subdir = new File(tempDir, "pyModules")
276+
subdir.mkdir()
277+
subdir
278+
}
284279
val pyModule = new File(moduleDir, "mod1.py")
285280
Files.write(TEST_PYMODULE, pyModule, StandardCharsets.UTF_8)
286281

0 commit comments

Comments
 (0)