Skip to content

Commit f97daa2

Browse files
committed
Fix Windows spark shell stdin issue
It turns out that java.lang.Process reads directly from the parent process' stdin on Windows. This means we should avoid spawning a thread that also attempts to redirect System.in to the subprocess (in vain) and contends with the subprocess in reading System.in. This raises an issue with knowing when to terminate the JVM in the PySpark shell, however, where Java itself is a python subprocess. We previously relied on the Java process killing itself on broken pipe, but this mechanism is not available on Windows since we no longer read from System.in for the EOF. Instead, in this environment we rely on python's shutdown hook to kill the child process.
1 parent 83ebe60 commit f97daa2

File tree

2 files changed

+21
-6
lines changed

2 files changed

+21
-6
lines changed

core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -133,17 +133,24 @@ private[spark] object SparkSubmitDriverBootstrapper {
133133
val process = builder.start()
134134

135135
// Redirect stdin, stdout, and stderr to/from the child JVM
136-
val stdinThread = new RedirectThread(System.in, process.getOutputStream, "redirect stdin")
137136
val stdoutThread = new RedirectThread(process.getInputStream, System.out, "redirect stdout")
138137
val stderrThread = new RedirectThread(process.getErrorStream, System.err, "redirect stderr")
139-
stdinThread.start()
140138
stdoutThread.start()
141139
stderrThread.start()
142140

143-
// Terminate on broken pipe, which signals that the parent process has exited. This is
144-
// important for the PySpark shell, where Spark submit itself is a python subprocess.
145-
stdinThread.join()
146-
process.destroy()
141+
// In Windows, the subprocess reads directly from our stdin, so we should avoid spawning
142+
// a thread that also reads from stdin and contends with the subprocess.
143+
if (Utils.isWindows) {
144+
// For the PySpark shell, the termination of this process is handled in java_gateway.py
145+
process.waitFor()
146+
} else {
147+
// Terminate on broken pipe, which signals that the parent process has exited. This is
148+
// important for the PySpark shell, where Spark submit itself is a python subprocess.
149+
val stdinThread = new RedirectThread(System.in, process.getOutputStream, "redirect stdin")
150+
stdinThread.start()
151+
stdinThread.join()
152+
process.destroy()
153+
}
147154
}
148155

149156
}

python/pyspark/java_gateway.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
# limitations under the License.
1616
#
1717

18+
import atexit
1819
import os
1920
import sys
2021
import signal
@@ -69,6 +70,13 @@ def preexec_func():
6970
error_msg += "--------------------------------------------------------------\n"
7071
raise Exception(error_msg)
7172

73+
# Ensure the Java subprocess does not linger after python exists. Note that this is best
74+
# effort and intended mainly for Windows. In UNIX-based systems, the child process can kill
75+
# itself on broken pipe (i.e. when the parent process' stdin sends an EOF). In Windows,
76+
# however, this is not possible because java.lang.Process reads directly from the parent
77+
# process' stdin, contending with any opportunity to read an EOF from the parent.
78+
atexit.register(lambda: proc.kill())
79+
7280
# Create a thread to echo output from the GatewayServer, which is required
7381
# for Java log output to show up:
7482
class EchoOutputThread(Thread):

0 commit comments

Comments
 (0)