From c9aaff0818b0adf9775c4da45f3099907c969e48 Mon Sep 17 00:00:00 2001 From: Sameer Agarwal Date: Wed, 6 Apr 2016 23:29:34 -0700 Subject: [PATCH 1/2] Better exception handling while marking tasks as failed --- .../org/apache/spark/scheduler/Task.scala | 14 ++-- .../scala/org/apache/spark/util/Utils.scala | 31 ++++++++ .../datasources/WriterContainer.scala | 71 +++++++++---------- 3 files changed, 76 insertions(+), 40 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala index 46c64f61de5fe..c91d8fbfc44a7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala @@ -80,10 +80,16 @@ private[spark] abstract class Task[T]( } try { runTask(context) - } catch { case e: Throwable => - // Catch all errors; run task failure callbacks, and rethrow the exception. - context.markTaskFailed(e) - throw e + } catch { + case e: Throwable => + // Catch all errors; run task failure callbacks, and rethrow the exception. + try { + context.markTaskFailed(e) + } catch { + case t: Throwable => + e.addSuppressed(t) + } + throw e } finally { // Call the task completion callbacks. context.markTaskCompleted() diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index c304629bcdbe9..5aa47631a6ab9 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1297,6 +1297,37 @@ private[spark] object Utils extends Logging { } } + /** + * Execute a block of code, then a catch block, but if exceptions happen in + * the catch block, do not suppress the original exception. + * + * This is primarily an issue with `catch { out.close() }` blocks, where + * close needs to be called to clean up `out`, but if an exception happened + * in `out.write`, it's likely `out` may be corrupted and `out.close` will + * fail as well. This would then suppress the original/likely more meaningful + * exception from the original `out.write` call. + */ + def tryWithSafeCatchAndFailureCallbacks[T](block: => T)(catchBlock: => Unit): T = { + try { + block + } catch { + case cause: Throwable => + // Purposefully not using NonFatal, because for even fatal exceptions + // we don't want to have our catchBlock suppress + val originalThrowable = cause + try { + logError("Aborting task", originalThrowable) + TaskContext.get().asInstanceOf[TaskContextImpl].markTaskFailed(originalThrowable) + catchBlock + } catch { + case t: Throwable => + logWarning(s"Suppressing exception in catch: " + t.getMessage, t) + originalThrowable.addSuppressed(t) + } + throw originalThrowable + } + } + /** Default filtering function for finding call sites using `getCallSite`. */ private def sparkInternalExclusionFunction(className: String): Boolean = { // A regular expression to match classes of the internal Spark API's diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala index 233ac263aaafc..7e0e24dc803a3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala @@ -35,7 +35,7 @@ import org.apache.spark.sql.execution.UnsafeKVExternalSorter import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.{HadoopFsRelation, OutputWriter, OutputWriterFactory} import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} -import org.apache.spark.util.SerializableConfiguration +import org.apache.spark.util.{SerializableConfiguration, Utils} /** A container for all the details required when writing to a table. */ case class WriteRelation( @@ -255,19 +255,18 @@ private[sql] class DefaultWriterContainer( // If anything below fails, we should abort the task. try { - while (iterator.hasNext) { - val internalRow = iterator.next() - writer.writeInternal(internalRow) + Utils.tryWithSafeCatchAndFailureCallbacks { + while (iterator.hasNext) { + val internalRow = iterator.next() + writer.writeInternal(internalRow) + } + commitTask() + } { + abortTask() } - - commitTask() } catch { - case cause: Throwable => - logError("Aborting task.", cause) - // call failure callbacks first, so we could have a chance to cleanup the writer. - TaskContext.get().asInstanceOf[TaskContextImpl].markTaskFailed(cause) - abortTask() - throw new SparkException("Task failed while writing rows.", cause) + case t: Throwable => + throw new SparkException("Task failed while writing rows", t) } def commitTask(): Unit = { @@ -421,37 +420,37 @@ private[sql] class DynamicPartitionWriterContainer( // If anything below fails, we should abort the task. var currentWriter: OutputWriter = null try { - var currentKey: UnsafeRow = null - while (sortedIterator.next()) { - val nextKey = getBucketingKey(sortedIterator.getKey).asInstanceOf[UnsafeRow] - if (currentKey != nextKey) { - if (currentWriter != null) { - currentWriter.close() - currentWriter = null + Utils.tryWithSafeCatchAndFailureCallbacks { + var currentKey: UnsafeRow = null + while (sortedIterator.next()) { + val nextKey = getBucketingKey(sortedIterator.getKey).asInstanceOf[UnsafeRow] + if (currentKey != nextKey) { + if (currentWriter != null) { + currentWriter.close() + currentWriter = null + } + currentKey = nextKey.copy() + logDebug(s"Writing partition: $currentKey") + + currentWriter = newOutputWriter(currentKey, getPartitionString) } - currentKey = nextKey.copy() - logDebug(s"Writing partition: $currentKey") - - currentWriter = newOutputWriter(currentKey, getPartitionString) + currentWriter.writeInternal(sortedIterator.getValue) + } + if (currentWriter != null) { + currentWriter.close() + currentWriter = null } - currentWriter.writeInternal(sortedIterator.getValue) - } - if (currentWriter != null) { - currentWriter.close() - currentWriter = null - } - commitTask() - } catch { - case cause: Throwable => - logError("Aborting task.", cause) - // call failure callbacks first, so we could have a chance to cleanup the writer. - TaskContext.get().asInstanceOf[TaskContextImpl].markTaskFailed(cause) + commitTask() + } { if (currentWriter != null) { currentWriter.close() } abortTask() - throw new SparkException("Task failed while writing rows.", cause) + } + } catch { + case t: Throwable => + throw new SparkException("Task failed while writing rows", t) } } } From e41cae8a8a3fb248e643d68d5e9d144292b6ad2f Mon Sep 17 00:00:00 2001 From: Sameer Agarwal Date: Thu, 7 Apr 2016 18:48:26 -0700 Subject: [PATCH 2/2] CR --- .../apache/spark/rdd/PairRDDFunctions.scala | 8 +-- .../scala/org/apache/spark/util/Utils.scala | 60 ++++++------------- .../datasources/WriterContainer.scala | 12 ++-- 3 files changed, 26 insertions(+), 54 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 296179b75bc43..085829af6eee7 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -1111,9 +1111,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) maybeUpdateOutputMetrics(outputMetricsAndBytesWrittenCallback, recordsWritten) recordsWritten += 1 } - } { - writer.close(hadoopContext) - } + }(finallyBlock = writer.close(hadoopContext)) committer.commitTask(hadoopContext) outputMetricsAndBytesWrittenCallback.foreach { case (om, callback) => om.setBytesWritten(callback()) @@ -1200,9 +1198,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) maybeUpdateOutputMetrics(outputMetricsAndBytesWrittenCallback, recordsWritten) recordsWritten += 1 } - } { - writer.close() - } + }(finallyBlock = writer.close()) writer.commit() outputMetricsAndBytesWrittenCallback.foreach { case (om, callback) => om.setBytesWritten(callback()) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 5aa47631a6ab9..78e164cff7738 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1260,26 +1260,35 @@ private[spark] object Utils extends Logging { } /** - * Execute a block of code, call the failure callbacks before finally block if there is any - * exceptions happen. But if exceptions happen in the finally block, do not suppress the original - * exception. + * Execute a block of code and call the failure callbacks in the catch block. If exceptions occur + * in either the catch or the finally block, they are appended to the list of suppressed + * exceptions in original exception which is then rethrown. * - * This is primarily an issue with `finally { out.close() }` blocks, where - * close needs to be called to clean up `out`, but if an exception happened - * in `out.write`, it's likely `out` may be corrupted and `out.close` will + * This is primarily an issue with `catch { abort() }` or `finally { out.close() }` blocks, + * where the abort/close needs to be called to clean up `out`, but if an exception happened + * in `out.write`, it's likely `out` may be corrupted and `abort` or `out.close` will * fail as well. This would then suppress the original/likely more meaningful * exception from the original `out.write` call. */ - def tryWithSafeFinallyAndFailureCallbacks[T](block: => T)(finallyBlock: => Unit): T = { + def tryWithSafeFinallyAndFailureCallbacks[T](block: => T) + (catchBlock: => Unit = (), finallyBlock: => Unit = ()): T = { var originalThrowable: Throwable = null try { block } catch { - case t: Throwable => + case cause: Throwable => // Purposefully not using NonFatal, because even fatal exceptions // we don't want to have our finallyBlock suppress - originalThrowable = t - TaskContext.get().asInstanceOf[TaskContextImpl].markTaskFailed(t) + originalThrowable = cause + try { + logError("Aborting task", originalThrowable) + TaskContext.get().asInstanceOf[TaskContextImpl].markTaskFailed(originalThrowable) + catchBlock + } catch { + case t: Throwable => + originalThrowable.addSuppressed(t) + logWarning(s"Suppressing exception in catch: " + t.getMessage, t) + } throw originalThrowable } finally { try { @@ -1297,37 +1306,6 @@ private[spark] object Utils extends Logging { } } - /** - * Execute a block of code, then a catch block, but if exceptions happen in - * the catch block, do not suppress the original exception. - * - * This is primarily an issue with `catch { out.close() }` blocks, where - * close needs to be called to clean up `out`, but if an exception happened - * in `out.write`, it's likely `out` may be corrupted and `out.close` will - * fail as well. This would then suppress the original/likely more meaningful - * exception from the original `out.write` call. - */ - def tryWithSafeCatchAndFailureCallbacks[T](block: => T)(catchBlock: => Unit): T = { - try { - block - } catch { - case cause: Throwable => - // Purposefully not using NonFatal, because for even fatal exceptions - // we don't want to have our catchBlock suppress - val originalThrowable = cause - try { - logError("Aborting task", originalThrowable) - TaskContext.get().asInstanceOf[TaskContextImpl].markTaskFailed(originalThrowable) - catchBlock - } catch { - case t: Throwable => - logWarning(s"Suppressing exception in catch: " + t.getMessage, t) - originalThrowable.addSuppressed(t) - } - throw originalThrowable - } - } - /** Default filtering function for finding call sites using `getCallSite`. */ private def sparkInternalExclusionFunction(className: String): Boolean = { // A regular expression to match classes of the internal Spark API's diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala index 7e0e24dc803a3..6508c2441af63 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala @@ -255,15 +255,13 @@ private[sql] class DefaultWriterContainer( // If anything below fails, we should abort the task. try { - Utils.tryWithSafeCatchAndFailureCallbacks { + Utils.tryWithSafeFinallyAndFailureCallbacks { while (iterator.hasNext) { val internalRow = iterator.next() writer.writeInternal(internalRow) } commitTask() - } { - abortTask() - } + }(catchBlock = abortTask()) } catch { case t: Throwable => throw new SparkException("Task failed while writing rows", t) @@ -420,7 +418,7 @@ private[sql] class DynamicPartitionWriterContainer( // If anything below fails, we should abort the task. var currentWriter: OutputWriter = null try { - Utils.tryWithSafeCatchAndFailureCallbacks { + Utils.tryWithSafeFinallyAndFailureCallbacks { var currentKey: UnsafeRow = null while (sortedIterator.next()) { val nextKey = getBucketingKey(sortedIterator.getKey).asInstanceOf[UnsafeRow] @@ -442,12 +440,12 @@ private[sql] class DynamicPartitionWriterContainer( } commitTask() - } { + }(catchBlock = { if (currentWriter != null) { currentWriter.close() } abortTask() - } + }) } catch { case t: Throwable => throw new SparkException("Task failed while writing rows", t)