Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 28 additions & 1 deletion core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.deploy

import java.io.File
import java.net.{InetAddress, URI}
import java.nio.file.Files

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
Expand Down Expand Up @@ -48,7 +49,7 @@ object PythonRunner {

// Format python file paths before adding them to the PYTHONPATH
val formattedPythonFile = formatPath(pythonFile)
val formattedPyFiles = formatPaths(pyFiles)
val formattedPyFiles = resolvePyFiles(formatPaths(pyFiles))

// Launch a Py4J gateway server for the process to connect to; this will let it see our
// Java system properties and such
Expand Down Expand Up @@ -153,4 +154,30 @@ object PythonRunner {
.map { p => formatPath(p, testWindows) }
}

/**
* Resolves the ".py" files. ".py" file should not be added as is because PYTHONPATH does
* not expect a file. This method creates a temporary directory and puts the ".py" files
* if exist in the given paths.
*/
private def resolvePyFiles(pyFiles: Array[String]): Array[String] = {
lazy val dest = Utils.createTempDir(namePrefix = "localPyFiles")
pyFiles.flatMap { pyFile =>
// In case of client with submit, the python paths should be set before context
// initialization because the context initialization can be done later.
// We will copy the local ".py" files because ".py" file shouldn't be added
// alone but its parent directory in PYTHONPATH. See SPARK-24384.
if (pyFile.endsWith(".py")) {
val source = new File(pyFile)
Copy link
Contributor

@jerryshao jerryshao May 25, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we check if the file is existed or not, also if it is readable?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeap

if (source.exists() && source.isFile && source.canRead) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using both exists and isFile is redundant, but no biggie.

Files.copy(source.toPath, new File(dest, source.getName).toPath)
Some(dest.getAbsolutePath)
} else {
// Don't have to add it if it doesn't exist or isn't readable.
None
}
} else {
Some(pyFile)
}
}.distinct
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -271,16 +271,11 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
"PYSPARK_ARCHIVES_PATH" -> pythonPath.map("local:" + _).mkString(File.pathSeparator),
"PYTHONPATH" -> pythonPath.mkString(File.pathSeparator)) ++ extraEnv

val moduleDir =
if (clientMode) {
// In client-mode, .py files added with --py-files are not visible in the driver.
// This is something that the launcher library would have to handle.
tempDir
} else {
val subdir = new File(tempDir, "pyModules")
subdir.mkdir()
subdir
}
val moduleDir = {
val subdir = new File(tempDir, "pyModules")
subdir.mkdir()
subdir
}
val pyModule = new File(moduleDir, "mod1.py")
Files.write(TEST_PYMODULE, pyModule, StandardCharsets.UTF_8)

Expand Down