Skip to content

Commit 47c0655

Browse files
committed
More work to make spark-submit work with Python:
- Launch Py4J gateway server in-process and execute Python main class - Redirect its output to PythonRunner - Various misc fixes to messages and error reporting in SparkSubmit
1 parent d4375bd commit 47c0655

File tree

10 files changed

+218
-104
lines changed

10 files changed

+218
-104
lines changed

assembly/pom.xml

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -40,14 +40,6 @@
4040
<deb.user>root</deb.user>
4141
</properties>
4242

43-
<repositories>
44-
<!-- A repository in the local filesystem for the Py4J JAR, which is not in Maven central -->
45-
<repository>
46-
<id>lib</id>
47-
<url>file://${project.basedir}/lib</url>
48-
</repository>
49-
</repositories>
50-
5143
<dependencies>
5244
<dependency>
5345
<groupId>org.apache.spark</groupId>
@@ -84,11 +76,6 @@
8476
<artifactId>spark-sql_${scala.binary.version}</artifactId>
8577
<version>${project.version}</version>
8678
</dependency>
87-
<dependency>
88-
<groupId>net.sf.py4j</groupId>
89-
<artifactId>py4j</artifactId>
90-
<version>0.8.1</version>
91-
</dependency>
9279
</dependencies>
9380

9481
<build>

core/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,11 @@
247247
<artifactId>pyrolite</artifactId>
248248
<version>2.0.1</version>
249249
</dependency>
250+
<dependency>
251+
<groupId>net.sf.py4j</groupId>
252+
<artifactId>py4j</artifactId>
253+
<version>0.8.1</version>
254+
</dependency>
250255
</dependencies>
251256
<build>
252257
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>

core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
7878
serverSocket = new ServerSocket(0, 1, InetAddress.getByAddress(Array(127, 0, 0, 1)))
7979

8080
// Create and start the worker
81-
val pb = new ProcessBuilder(Seq(pythonExec, "-m", "pyspark.worker"))
81+
val pb = new ProcessBuilder(Seq(pythonExec, "-u", "-m", "pyspark.worker"))
8282
val workerEnv = pb.environment()
8383
workerEnv.putAll(envVars)
8484
val worker = pb.start()
@@ -151,7 +151,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
151151

152152
try {
153153
// Create and start the daemon
154-
val pb = new ProcessBuilder(Seq(pythonExec, "-m", "pyspark.daemon"))
154+
val pb = new ProcessBuilder(Seq(pythonExec, "-u", "-m", "pyspark.daemon"))
155155
val workerEnv = pb.environment()
156156
workerEnv.putAll(envVars)
157157
daemon = pb.start()
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
package org.apache.spark.deploy
2+
3+
import java.io.{IOException, File, InputStream, OutputStream}
4+
5+
import scala.collection.mutable.ArrayBuffer
6+
import scala.collection.JavaConversions._
7+
8+
import org.apache.spark.SparkContext
9+
10+
/**
11+
* A main class used by spark-submit to launch Python applications. It executes python as a
12+
* subprocess and then has it connect back to the JVM to access system properties, etc.
13+
*/
14+
object PythonRunner {
15+
def main(args: Array[String]) {
16+
val primaryResource = args(0)
17+
val pyFiles = args(1)
18+
val otherArgs = args.slice(2, args.length)
19+
20+
val pythonExec = sys.env.get("PYSPARK_PYTHON").getOrElse("python") // TODO: get this from conf
21+
22+
// Launch a Py4J gateway server for the process to connect to; this will let it see our
23+
// Java system properties and such
24+
val gatewayServer = new py4j.GatewayServer(null, 0)
25+
gatewayServer.start()
26+
27+
// Build up a PYTHONPATH that includes the Spark assembly JAR (where this class is), the
28+
// python directories in SPARK_HOME (if set), and any files in the pyFiles argument
29+
val sparkJar = SparkContext.jarOfObject(this).get
30+
val pathSeparator: String = System.getProperty("path.separator")
31+
val pythonPath = new ArrayBuffer[String]
32+
pythonPath += sparkJar
33+
pythonPath ++= pyFiles.split(",").filter(_ != "")
34+
for (sparkHome <- sys.env.get("SPARK_HOME")) {
35+
pythonPath += Seq(sparkHome, "python").mkString(File.separator)
36+
pythonPath += Seq(sparkHome, "python", "lib", "py4j-0.8.1-src.zip").mkString(File.separator)
37+
}
38+
for (oldPythonPath <- sys.env.get("PYTHONPATH")) {
39+
pythonPath ++= oldPythonPath.split(pathSeparator)
40+
}
41+
42+
// Launch Python process
43+
val builder = new ProcessBuilder(Seq(pythonExec, "-u", primaryResource) ++ otherArgs)
44+
val env = builder.environment()
45+
env.put("PYTHONPATH", pythonPath.mkString(pathSeparator))
46+
env.put("PYSPARK_GATEWAY_PORT", "" + gatewayServer.getListeningPort)
47+
builder.redirectErrorStream(true) // Ugly but needed for stdout and stderr to synchronize
48+
val process = builder.start()
49+
50+
new RedirectThread(process.getInputStream, System.out, "redirect output").start()
51+
52+
System.exit(process.waitFor())
53+
}
54+
55+
/**
56+
* A utility class to redirect the child process's stdout or stderr
57+
*/
58+
class RedirectThread(in: InputStream, out: OutputStream, name: String) extends Thread(name) {
59+
setDaemon(true)
60+
override def run() {
61+
scala.util.control.Exception.ignoring(classOf[IOException]) {
62+
// FIXME: We copy the stream on the level of bytes to avoid encoding problems.
63+
val buf = new Array[Byte](1024)
64+
var len = in.read(buf)
65+
while (len != -1) {
66+
out.write(buf, 0, len)
67+
out.flush()
68+
len = in.read(buf)
69+
}
70+
}
71+
}
72+
}
73+
}

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 65 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -60,11 +60,11 @@ object SparkSubmit {
6060
private[spark] var exitFn: () => Unit = () => System.exit(-1)
6161

6262
private[spark] def printErrorAndExit(str: String) = {
63-
printStream.println("error: " + str)
64-
printStream.println("run with --help for more information or --verbose for debugging output")
63+
printStream.println("Error: " + str)
64+
printStream.println("Run with --help for usage help or --verbose for debug output")
6565
exitFn()
6666
}
67-
private[spark] def printWarning(str: String) = printStream.println("warning: " + str)
67+
private[spark] def printWarning(str: String) = printStream.println("Warning: " + str)
6868

6969
/**
7070
* @return
@@ -83,7 +83,7 @@ object SparkSubmit {
8383
} else if (appArgs.master.startsWith("mesos")) {
8484
clusterManager = MESOS
8585
} else {
86-
printErrorAndExit("master must start with yarn, mesos, spark, or local")
86+
printErrorAndExit("Master must start with yarn, mesos, spark, or local")
8787
}
8888

8989
// Because "yarn-cluster" and "yarn-client" encapsulate both the master
@@ -116,9 +116,20 @@ object SparkSubmit {
116116
var childMainClass = ""
117117

118118
if (clusterManager == MESOS && deployOnCluster) {
119-
printErrorAndExit("Mesos does not support running the driver on the cluster")
119+
printErrorAndExit("Cannot run driver on the cluster in Mesos")
120120
}
121121

122+
// If we're running a Python app, set the Java class to run to be our PythonRunner, add
123+
// Python files to deployment list, and pass the main file and Python path to PythonRunner
124+
if (appArgs.isPython) {
125+
appArgs.mainClass = "org.apache.spark.deploy.PythonRunner"
126+
appArgs.files = mergeFileLists(appArgs.files, appArgs.pyFiles, appArgs.primaryResource)
127+
val pyFiles = Option(appArgs.pyFiles).getOrElse("")
128+
appArgs.childArgs = ArrayBuffer(appArgs.primaryResource, pyFiles) ++ appArgs.childArgs
129+
appArgs.primaryResource = RESERVED_JAR_NAME
130+
}
131+
132+
// If we're deploying into YARN, use yarn.Client as a wrapper around the user class
122133
if (!deployOnCluster) {
123134
childMainClass = appArgs.mainClass
124135
if (appArgs.primaryResource != RESERVED_JAR_NAME) {
@@ -130,8 +141,8 @@ object SparkSubmit {
130141
childArgs += ("--class", appArgs.mainClass)
131142
}
132143

144+
// Make sure YARN is included in our build if we're trying to use it
133145
if (clusterManager == YARN) {
134-
// The choice of class is arbitrary, could use any spark-yarn class
135146
if (!Utils.classIsLoadable("org.apache.spark.deploy.yarn.Client") && !Utils.isTesting) {
136147
val msg = "Could not load YARN classes. This copy of Spark may not have been compiled " +
137148
"with YARN support."
@@ -142,38 +153,39 @@ object SparkSubmit {
142153
// Special flag to avoid deprecation warnings at the client
143154
sysProps("SPARK_SUBMIT") = "true"
144155

156+
// A list of rules to map each argument to system properties or command-line options in
157+
// each deploy mode; we iterate through these below
145158
val options = List[OptionAssigner](
146-
new OptionAssigner(appArgs.master, ALL_CLUSTER_MGRS, false, sysProp = "spark.master"),
147-
new OptionAssigner(appArgs.driverExtraClassPath, STANDALONE | YARN, true,
159+
OptionAssigner(appArgs.master, ALL_CLUSTER_MGRS, false, sysProp = "spark.master"),
160+
OptionAssigner(appArgs.driverExtraClassPath, STANDALONE | YARN, true,
148161
sysProp = "spark.driver.extraClassPath"),
149-
new OptionAssigner(appArgs.driverExtraJavaOptions, STANDALONE | YARN, true,
162+
OptionAssigner(appArgs.driverExtraJavaOptions, STANDALONE | YARN, true,
150163
sysProp = "spark.driver.extraJavaOptions"),
151-
new OptionAssigner(appArgs.driverExtraLibraryPath, STANDALONE | YARN, true,
164+
OptionAssigner(appArgs.driverExtraLibraryPath, STANDALONE | YARN, true,
152165
sysProp = "spark.driver.extraLibraryPath"),
153-
new OptionAssigner(appArgs.driverMemory, YARN, true, clOption = "--driver-memory"),
154-
new OptionAssigner(appArgs.name, YARN, true, clOption = "--name"),
155-
new OptionAssigner(appArgs.queue, YARN, true, clOption = "--queue"),
156-
new OptionAssigner(appArgs.queue, YARN, false, sysProp = "spark.yarn.queue"),
157-
new OptionAssigner(appArgs.numExecutors, YARN, true, clOption = "--num-executors"),
158-
new OptionAssigner(appArgs.numExecutors, YARN, false, sysProp = "spark.executor.instances"),
159-
new OptionAssigner(appArgs.executorMemory, YARN, true, clOption = "--executor-memory"),
160-
new OptionAssigner(appArgs.executorMemory, STANDALONE | MESOS | YARN, false,
166+
OptionAssigner(appArgs.driverMemory, YARN, true, clOption = "--driver-memory"),
167+
OptionAssigner(appArgs.name, YARN, true, clOption = "--name"),
168+
OptionAssigner(appArgs.queue, YARN, true, clOption = "--queue"),
169+
OptionAssigner(appArgs.queue, YARN, false, sysProp = "spark.yarn.queue"),
170+
OptionAssigner(appArgs.numExecutors, YARN, true, clOption = "--num-executors"),
171+
OptionAssigner(appArgs.numExecutors, YARN, false, sysProp = "spark.executor.instances"),
172+
OptionAssigner(appArgs.executorMemory, YARN, true, clOption = "--executor-memory"),
173+
OptionAssigner(appArgs.executorMemory, STANDALONE | MESOS | YARN, false,
161174
sysProp = "spark.executor.memory"),
162-
new OptionAssigner(appArgs.driverMemory, STANDALONE, true, clOption = "--memory"),
163-
new OptionAssigner(appArgs.driverCores, STANDALONE, true, clOption = "--cores"),
164-
new OptionAssigner(appArgs.executorCores, YARN, true, clOption = "--executor-cores"),
165-
new OptionAssigner(appArgs.executorCores, YARN, false, sysProp = "spark.executor.cores"),
166-
new OptionAssigner(appArgs.totalExecutorCores, STANDALONE | MESOS, false,
175+
OptionAssigner(appArgs.driverMemory, STANDALONE, true, clOption = "--memory"),
176+
OptionAssigner(appArgs.driverCores, STANDALONE, true, clOption = "--cores"),
177+
OptionAssigner(appArgs.executorCores, YARN, true, clOption = "--executor-cores"),
178+
OptionAssigner(appArgs.executorCores, YARN, false, sysProp = "spark.executor.cores"),
179+
OptionAssigner(appArgs.totalExecutorCores, STANDALONE | MESOS, false,
167180
sysProp = "spark.cores.max"),
168-
new OptionAssigner(appArgs.files, YARN, false, sysProp = "spark.yarn.dist.files"),
169-
new OptionAssigner(appArgs.files, YARN, true, clOption = "--files"),
170-
new OptionAssigner(appArgs.archives, YARN, false, sysProp = "spark.yarn.dist.archives"),
171-
new OptionAssigner(appArgs.archives, YARN, true, clOption = "--archives"),
172-
new OptionAssigner(appArgs.jars, YARN, true, clOption = "--addJars"),
173-
new OptionAssigner(appArgs.files, LOCAL | STANDALONE | MESOS, true, sysProp = "spark.files"),
174-
new OptionAssigner(appArgs.jars, LOCAL | STANDALONE | MESOS, false, sysProp = "spark.jars"),
175-
new OptionAssigner(appArgs.name, LOCAL | STANDALONE | MESOS, false,
176-
sysProp = "spark.app.name")
181+
OptionAssigner(appArgs.files, YARN, false, sysProp = "spark.yarn.dist.files"),
182+
OptionAssigner(appArgs.files, YARN, true, clOption = "--files"),
183+
OptionAssigner(appArgs.archives, YARN, false, sysProp = "spark.yarn.dist.archives"),
184+
OptionAssigner(appArgs.archives, YARN, true, clOption = "--archives"),
185+
OptionAssigner(appArgs.jars, YARN, true, clOption = "--addJars"),
186+
OptionAssigner(appArgs.files, LOCAL | STANDALONE | MESOS, true, sysProp = "spark.files"),
187+
OptionAssigner(appArgs.jars, LOCAL | STANDALONE | MESOS, false, sysProp = "spark.jars"),
188+
OptionAssigner(appArgs.name, LOCAL | STANDALONE | MESOS, false, sysProp = "spark.app.name")
177189
)
178190

179191
// For client mode make any added jars immediately visible on the classpath
@@ -183,9 +195,10 @@ object SparkSubmit {
183195
}
184196
}
185197

198+
// Map all arguments to command-line options or system properties for our chosen mode
186199
for (opt <- options) {
187200
if (opt.value != null && deployOnCluster == opt.deployOnCluster &&
188-
(clusterManager & opt.clusterManager) != 0) {
201+
(clusterManager & opt.clusterManager) != 0) {
189202
if (opt.clOption != null) {
190203
childArgs += (opt.clOption, opt.value)
191204
} else if (opt.sysProp != null) {
@@ -230,8 +243,8 @@ object SparkSubmit {
230243
}
231244

232245
private def launch(childArgs: ArrayBuffer[String], childClasspath: ArrayBuffer[String],
233-
sysProps: Map[String, String], childMainClass: String, verbose: Boolean = false) {
234-
246+
sysProps: Map[String, String], childMainClass: String, verbose: Boolean = false)
247+
{
235248
if (verbose) {
236249
printStream.println(s"Main class:\n$childMainClass")
237250
printStream.println(s"Arguments:\n${childArgs.mkString("\n")}")
@@ -273,15 +286,26 @@ object SparkSubmit {
273286
val url = localJarFile.getAbsoluteFile.toURI.toURL
274287
loader.addURL(url)
275288
}
289+
290+
/**
291+
* Merge a sequence of comma-separated file lists, some of which may be null to indicate
292+
* no files, into a single comma-separated string.
293+
*/
294+
private[spark] def mergeFileLists(lists: String*): String = {
295+
val merged = lists.filter(_ != null)
296+
.flatMap(_.split(","))
297+
.mkString(",")
298+
if (merged == "") null else merged
299+
}
276300
}
277301

278302
/**
279303
* Provides an indirection layer for passing arguments as system properties or flags to
280304
* the user's driver program or to downstream launcher tools.
281305
*/
282-
private[spark] class OptionAssigner(val value: String,
283-
val clusterManager: Int,
284-
val deployOnCluster: Boolean,
285-
val clOption: String = null,
286-
val sysProp: String = null
287-
) { }
306+
private[spark] case class OptionAssigner(
307+
value: String,
308+
clusterManager: Int,
309+
deployOnCluster: Boolean,
310+
clOption: String = null,
311+
sysProp: String = null)

core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
7979
}
8080

8181
/** Fill in any undefined values based on the current properties file or built-in defaults. */
82-
private def loadDefaults() = {
82+
private def loadDefaults(): Unit = {
8383

8484
// Use common defaults file, if not specified by user
8585
if (propertiesFile == null) {
@@ -112,16 +112,25 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
112112

113113
// Try to set main class from JAR if no --class argument is given
114114
if (mainClass == null && !isPython && primaryResource != null) {
115-
val jar = new JarFile(primaryResource)
116-
// Note that this might still return null if no main-class is set; we catch that later
117-
mainClass = jar.getManifest.getMainAttributes.getValue("Main-Class")
115+
try {
116+
val jar = new JarFile(primaryResource)
117+
// Note that this might still return null if no main-class is set; we catch that later
118+
mainClass = jar.getManifest.getMainAttributes.getValue("Main-Class")
119+
} catch {
120+
case e: Exception =>
121+
SparkSubmit.printErrorAndExit("Failed to read JAR: " + primaryResource)
122+
return
123+
}
118124
}
119125

120126
// Global defaults. These should be keep to minimum to avoid confusing behavior.
121127
master = Option(master).getOrElse("local[*]")
122128

123129
// Set name from main class if not given
124-
name = Option(name).orElse(Option(mainClass)).getOrElse(new File(primaryResource).getName)
130+
name = Option(name).orElse(Option(mainClass)).orNull
131+
if (name == null && primaryResource != null) {
132+
name = Utils.stripDirectory(primaryResource)
133+
}
125134
}
126135

127136
/** Ensure that required fields exists. Call this only once all defaults are loaded. */
@@ -133,7 +142,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
133142
SparkSubmit.printErrorAndExit("Must specify a primary resource (JAR or Python file)")
134143
}
135144
if (mainClass == null && !isPython) {
136-
SparkSubmit.printErrorAndExit("Must specify a main class with --class")
145+
SparkSubmit.printErrorAndExit("No main class set in JAR; please specify one with --class")
137146
}
138147
if (pyFiles != null && !isPython) {
139148
SparkSubmit.printErrorAndExit("--py-files given but primary resource is not a Python script")
@@ -165,6 +174,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
165174
| queue $queue
166175
| numExecutors $numExecutors
167176
| files $files
177+
| pyFiles $pyFiles
168178
| archives $archives
169179
| mainClass $mainClass
170180
| primaryResource $primaryResource
@@ -309,15 +319,15 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
309319
"""Usage: spark-submit [options] <app jar | python file> [app options]
310320
|Options:
311321
| --master MASTER_URL spark://host:port, mesos://host:port, yarn, or local.
312-
| --deploy-mode DEPLOY_MODE Where to run the driver program: either 'client' to run
313-
| on the local machine, or 'cluster' to run inside cluster.
314-
| --class CLASS_NAME Your application's main class (for Java apps).
322+
| --deploy-mode DEPLOY_MODE Where to run the driver program: either "client" to run
323+
| on the local machine, or "cluster" to run inside cluster.
324+
| --class CLASS_NAME Your application's main class (for Java / Scala apps).
315325
| --name NAME A name of your application.
316326
| --jars JARS Comma-separated list of local jars to include on the driver
317327
| and executor classpaths. Doesn't work for drivers in
318-
| standalone mode with 'cluster' deploy mode.
319-
| --py-files PY_FILES Comma-separated list of files to place on the PYTHONPATH
320-
| for Python apps. Can be .py, .zip, or .egg files.
328+
| standalone mode with "cluster" deploy mode.
329+
| --py-files PY_FILES Comma-separated list of .zip or .egg files to place on the
330+
| PYTHONPATH for Python apps.
321331
| --files FILES Comma-separated list of files to be placed in the working
322332
| directory of each executor.
323333
| --properties-file FILE Path to a file from which to load extra properties. If not
@@ -341,7 +351,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
341351
|
342352
| YARN-only:
343353
| --executor-cores NUM Number of cores per executor (Default: 1).
344-
| --queue QUEUE_NAME The YARN queue to submit to (Default: 'default').
354+
| --queue QUEUE_NAME The YARN queue to submit to (Default: "default").
345355
| --num-executors NUM Number of executors to launch (Default: 2).
346356
| --archives ARCHIVES Comma separated list of archives to be extracted into the
347357
| working directory of each executor.""".stripMargin

0 commit comments

Comments
 (0)