Skip to content

Commit 031207d

Browse files
author
Marcelo Vanzin
committed
[SPARK-4606] Send EOF to child JVM when there's no more data to read.
1 parent bf1a6aa commit 031207d

File tree

2 files changed

+19
-8
lines changed

2 files changed

+19
-8
lines changed

core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,8 @@ private[spark] object SparkSubmitDriverBootstrapper {
151151
val isWindows = Utils.isWindows
152152
val isSubprocess = sys.env.contains("IS_SUBPROCESS")
153153
if (!isWindows) {
154-
val stdinThread = new RedirectThread(System.in, process.getOutputStream, "redirect stdin")
154+
val stdinThread = new RedirectThread(System.in, process.getOutputStream, "redirect stdin",
155+
true)
155156
stdinThread.start()
156157
// Spark submit (JVM) may run as a subprocess, and so this JVM should terminate on
157158
// broken pipe, signaling that the parent process has exited. This is the case if the

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1792,19 +1792,29 @@ private[spark] object Utils extends Logging {
17921792
/**
17931793
* A utility class to redirect the child process's stdout or stderr.
17941794
*/
1795-
private[spark] class RedirectThread(in: InputStream, out: OutputStream, name: String)
1795+
private[spark] class RedirectThread(
1796+
in: InputStream,
1797+
out: OutputStream,
1798+
name: String,
1799+
propagateEof: Boolean = false)
17961800
extends Thread(name) {
17971801

17981802
setDaemon(true)
17991803
override def run() {
18001804
scala.util.control.Exception.ignoring(classOf[IOException]) {
18011805
// FIXME: We copy the stream on the level of bytes to avoid encoding problems.
1802-
val buf = new Array[Byte](1024)
1803-
var len = in.read(buf)
1804-
while (len != -1) {
1805-
out.write(buf, 0, len)
1806-
out.flush()
1807-
len = in.read(buf)
1806+
try {
1807+
val buf = new Array[Byte](1024)
1808+
var len = in.read(buf)
1809+
while (len != -1) {
1810+
out.write(buf, 0, len)
1811+
out.flush()
1812+
len = in.read(buf)
1813+
}
1814+
} finally {
1815+
if (propagateEof) {
1816+
out.close()
1817+
}
18081818
}
18091819
}
18101820
}

0 commit comments

Comments
 (0)