From 139b0db4ad8997c01cabb225383e627ab4779b79 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Thu, 12 Feb 2015 14:28:55 -0800 Subject: [PATCH 1/2] capture the exception in python write thread --- core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index b89effc16d36d..a0992b330443d 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -202,7 +202,7 @@ private[spark] class PythonRDD( this.interrupt() } - override def run(): Unit = Utils.logUncaughtExceptions { + override def run(): Unit = Utils.tryLog { try { val stream = new BufferedOutputStream(worker.getOutputStream, bufferSize) val dataOut = new DataOutputStream(stream) From eb0ceffa87ec1b3ca2b78b8d4a938899ad37c0e5 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Mon, 16 Feb 2015 16:09:16 -0800 Subject: [PATCH 2/2] Update PythonRDD.scala --- .../main/scala/org/apache/spark/api/python/PythonRDD.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index a0992b330443d..252721192904f 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -202,7 +202,7 @@ private[spark] class PythonRDD( this.interrupt() } - override def run(): Unit = Utils.tryLog { + override def run(): Unit = Utils.logUncaughtExceptions { try { val stream = new BufferedOutputStream(worker.getOutputStream, bufferSize) val dataOut = new DataOutputStream(stream) @@ -248,13 +248,13 @@ private[spark] class PythonRDD( } catch { case e: Exception if context.isCompleted || context.isInterrupted => logDebug("Exception thrown after task completion (likely due to cleanup)", e) - worker.shutdownOutput() + Utils.tryLog(worker.shutdownOutput()) case e: Exception => // We must avoid throwing exceptions here, because the thread uncaught exception handler // will kill the whole executor (see org.apache.spark.executor.Executor). _exception = e - worker.shutdownOutput() + Utils.tryLog(worker.shutdownOutput()) } finally { // Release memory used by this thread for shuffles env.shuffleMemoryManager.releaseMemoryForThisThread()