Skip to content

Commit a967b00

Browse files
aarondavpwendell
authored andcommitted
SPARK-1572 Don't kill Executor if PythonRDD fails while computing parent
Previously, the behavior was that if the parent RDD threw any exception other than IOException or FileNotFoundException (which is quite possible for Hadoop input sources), the entire Executor would crash, because the default thread a uncaught exception handler calls System.exit(). This patch avoids two related issues: 1. Always catch exceptions in this reader thread. 2. Don't mask readerException when Python throws an EOFError after worker.shutdownOutput() is called. Author: Aaron Davidson <[email protected]> Closes #486 from aarondav/pyspark and squashes the following commits: fbb11e9 [Aaron Davidson] Make sure FileNotFoundExceptions are handled same as before b9acb3e [Aaron Davidson] SPARK-1572 Don't kill Executor if PythonRDD fails while computing parent
1 parent a664606 commit a967b00

File tree

1 file changed

+21
-11
lines changed

1 file changed

+21
-11
lines changed

core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import java.util.{List => JList, ArrayList => JArrayList, Map => JMap, Collectio
2424

2525
import scala.collection.JavaConversions._
2626
import scala.reflect.ClassTag
27+
import scala.util.Try
2728

2829
import net.razorvine.pickle.{Pickler, Unpickler}
2930

@@ -89,16 +90,22 @@ private[spark] class PythonRDD[T: ClassTag](
8990
dataOut.flush()
9091
worker.shutdownOutput()
9192
} catch {
93+
9294
case e: java.io.FileNotFoundException =>
9395
readerException = e
94-
// Kill the Python worker process:
95-
worker.shutdownOutput()
96+
Try(worker.shutdownOutput()) // kill Python worker process
97+
9698
case e: IOException =>
9799
// This can happen for legitimate reasons if the Python code stops returning data
98-
// before we are done passing elements through, e.g., for take(). Just log a message
99-
// to say it happened.
100-
logInfo("stdin writer to Python finished early")
101-
logDebug("stdin writer to Python finished early", e)
100+
// before we are done passing elements through, e.g., for take(). Just log a message to
101+
// say it happened (as it could also be hiding a real IOException from a data source).
102+
logInfo("stdin writer to Python finished early (may not be an error)", e)
103+
104+
case e: Exception =>
105+
// We must avoid throwing exceptions here, because the thread uncaught exception handler
106+
// will kill the whole executor (see Executor).
107+
readerException = e
108+
Try(worker.shutdownOutput()) // kill Python worker process
102109
}
103110
}
104111
}.start()
@@ -152,7 +159,7 @@ private[spark] class PythonRDD[T: ClassTag](
152159
val exLength = stream.readInt()
153160
val obj = new Array[Byte](exLength)
154161
stream.readFully(obj)
155-
throw new PythonException(new String(obj))
162+
throw new PythonException(new String(obj), readerException)
156163
case SpecialLengths.END_OF_DATA_SECTION =>
157164
// We've finished the data section of the output, but we can still
158165
// read some accumulator updates:
@@ -167,10 +174,13 @@ private[spark] class PythonRDD[T: ClassTag](
167174
Array.empty[Byte]
168175
}
169176
} catch {
170-
case eof: EOFException => {
177+
case e: Exception if readerException != null =>
178+
logError("Python worker exited unexpectedly (crashed)", e)
179+
logError("Python crash may have been caused by prior exception:", readerException)
180+
throw readerException
181+
182+
case eof: EOFException =>
171183
throw new SparkException("Python worker exited unexpectedly (crashed)", eof)
172-
}
173-
case e: Throwable => throw e
174184
}
175185
}
176186

@@ -185,7 +195,7 @@ private[spark] class PythonRDD[T: ClassTag](
185195
}
186196

187197
/** Thrown for exceptions in user Python code. */
188-
private class PythonException(msg: String) extends Exception(msg)
198+
private class PythonException(msg: String, cause: Exception) extends RuntimeException(msg, cause)
189199

190200
/**
191201
* Form an RDD[(Array[Byte], Array[Byte])] from key-value pairs returned from Python.

0 commit comments

Comments
 (0)