Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1116,9 +1116,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
maybeUpdateOutputMetrics(bytesWrittenCallback, outputMetrics, recordsWritten)
recordsWritten += 1
}
} {
writer.close(hadoopContext)
}
}(finallyBlock = writer.close(hadoopContext))
committer.commitTask(hadoopContext)
bytesWrittenCallback.foreach { fn => outputMetrics.setBytesWritten(fn()) }
outputMetrics.setRecordsWritten(recordsWritten)
Expand Down Expand Up @@ -1202,9 +1200,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
maybeUpdateOutputMetrics(bytesWrittenCallback, outputMetrics, recordsWritten)
recordsWritten += 1
}
} {
writer.close()
}
}(finallyBlock = writer.close())
writer.commit()
bytesWrittenCallback.foreach { fn => outputMetrics.setBytesWritten(fn()) }
outputMetrics.setRecordsWritten(recordsWritten)
Expand Down
14 changes: 10 additions & 4 deletions core/src/main/scala/org/apache/spark/scheduler/Task.scala
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,16 @@ private[spark] abstract class Task[T](
}
try {
(runTask(context), context.collectAccumulators())
} catch { case e: Throwable =>
// Catch all errors; run task failure callbacks, and rethrow the exception.
context.markTaskFailed(e)
throw e
} catch {
case e: Throwable =>
// Catch all errors; run task failure callbacks, and rethrow the exception.
try {
context.markTaskFailed(e)
} catch {
case t: Throwable =>
e.addSuppressed(t)
}
throw e
} finally {
// Call the task completion callbacks.
context.markTaskCompleted()
Expand Down
29 changes: 19 additions & 10 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1259,26 +1259,35 @@ private[spark] object Utils extends Logging {
}

/**
* Execute a block of code, call the failure callbacks before finally block if there is any
* exceptions happen. But if exceptions happen in the finally block, do not suppress the original
* exception.
* Execute a block of code and call the failure callbacks in the catch block. If exceptions occur
* in either the catch or the finally block, they are appended to the list of suppressed
* exceptions in original exception which is then rethrown.
*
* This is primarily an issue with `finally { out.close() }` blocks, where
* close needs to be called to clean up `out`, but if an exception happened
* in `out.write`, it's likely `out` may be corrupted and `out.close` will
* This is primarily an issue with `catch { abort() }` or `finally { out.close() }` blocks,
* where the abort/close needs to be called to clean up `out`, but if an exception happened
* in `out.write`, it's likely `out` may be corrupted and `abort` or `out.close` will
* fail as well. This would then suppress the original/likely more meaningful
* exception from the original `out.write` call.
*/
def tryWithSafeFinallyAndFailureCallbacks[T](block: => T)(finallyBlock: => Unit): T = {
def tryWithSafeFinallyAndFailureCallbacks[T](block: => T)
(catchBlock: => Unit = (), finallyBlock: => Unit = ()): T = {
var originalThrowable: Throwable = null
try {
block
} catch {
case t: Throwable =>
case cause: Throwable =>
// Purposefully not using NonFatal, because even fatal exceptions
// we don't want to have our finallyBlock suppress
originalThrowable = t
TaskContext.get().asInstanceOf[TaskContextImpl].markTaskFailed(t)
originalThrowable = cause
try {
logError("Aborting task", originalThrowable)
TaskContext.get().asInstanceOf[TaskContextImpl].markTaskFailed(originalThrowable)
catchBlock
} catch {
case t: Throwable =>
originalThrowable.addSuppressed(t)
logWarning(s"Suppressing exception in catch: " + t.getMessage, t)
}
throw originalThrowable
} finally {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.UnsafeKVExternalSorter
import org.apache.spark.sql.sources.{HadoopFsRelation, OutputWriter, OutputWriterFactory}
import org.apache.spark.sql.types.{StructType, StringType}
import org.apache.spark.util.SerializableConfiguration
import org.apache.spark.util.{SerializableConfiguration, Utils}


private[sql] abstract class BaseWriterContainer(
Expand Down Expand Up @@ -257,19 +257,16 @@ private[sql] class DefaultWriterContainer(

// If anything below fails, we should abort the task.
try {
while (iterator.hasNext) {
val internalRow = iterator.next()
writer.writeInternal(internalRow)
}

commitTask()
Utils.tryWithSafeFinallyAndFailureCallbacks {
while (iterator.hasNext) {
val internalRow = iterator.next()
writer.writeInternal(internalRow)
}
commitTask()
}(catchBlock = abortTask())
} catch {
case cause: Throwable =>
logError("Aborting task.", cause)
// call failure callbacks first, so we could have a chance to cleanup the writer.
TaskContext.get().asInstanceOf[TaskContextImpl].markTaskFailed(cause)
abortTask()
throw new SparkException("Task failed while writing rows.", cause)
case t: Throwable =>
throw new SparkException("Task failed while writing rows", t)
}

def commitTask(): Unit = {
Expand Down Expand Up @@ -343,81 +340,81 @@ private[sql] class DynamicPartitionWriterContainer(
// If anything below fails, we should abort the task.
var currentWriter: OutputWriter = null
try {
// This will be filled in if we have to fall back on sorting.
var sorter: UnsafeKVExternalSorter = null
while (iterator.hasNext && sorter == null) {
val inputRow = iterator.next()
val currentKey = getPartitionKey(inputRow)
currentWriter = outputWriters.get(currentKey)

if (currentWriter == null) {
if (outputWriters.size < maxOpenFiles) {
currentWriter = newOutputWriter(currentKey)
outputWriters.put(currentKey.copy(), currentWriter)
currentWriter.writeInternal(getOutputRow(inputRow))
Utils.tryWithSafeFinallyAndFailureCallbacks {
// This will be filled in if we have to fall back on sorting.
var sorter: UnsafeKVExternalSorter = null
while (iterator.hasNext && sorter == null) {
val inputRow = iterator.next()
val currentKey = getPartitionKey(inputRow)
currentWriter = outputWriters.get(currentKey)

if (currentWriter == null) {
if (outputWriters.size < maxOpenFiles) {
currentWriter = newOutputWriter(currentKey)
outputWriters.put(currentKey.copy(), currentWriter)
currentWriter.writeInternal(getOutputRow(inputRow))
} else {
logInfo(s"Maximum partitions reached, falling back on sorting.")
sorter = new UnsafeKVExternalSorter(
StructType.fromAttributes(partitionColumns),
StructType.fromAttributes(dataColumns),
SparkEnv.get.blockManager,
TaskContext.get().taskMemoryManager().pageSizeBytes)
sorter.insertKV(currentKey, getOutputRow(inputRow))
}
} else {
logInfo(s"Maximum partitions reached, falling back on sorting.")
sorter = new UnsafeKVExternalSorter(
StructType.fromAttributes(partitionColumns),
StructType.fromAttributes(dataColumns),
SparkEnv.get.blockManager,
TaskContext.get().taskMemoryManager().pageSizeBytes)
sorter.insertKV(currentKey, getOutputRow(inputRow))
currentWriter.writeInternal(getOutputRow(inputRow))
}
} else {
currentWriter.writeInternal(getOutputRow(inputRow))
}
}
// current writer is included in outputWriters
currentWriter = null

// If the sorter is not null that means that we reached the maxFiles above and need to finish
// using external sort.
if (sorter != null) {
while (iterator.hasNext) {
val currentRow = iterator.next()
sorter.insertKV(getPartitionKey(currentRow), getOutputRow(currentRow))
}
// current writer is included in outputWriters
currentWriter = null

// If the sorter is not null that means that we reached the maxFiles above and need to
// finish using external sort.
if (sorter != null) {
while (iterator.hasNext) {
val currentRow = iterator.next()
sorter.insertKV(getPartitionKey(currentRow), getOutputRow(currentRow))
}

logInfo(s"Sorting complete. Writing out partition files one at a time.")

val sortedIterator = sorter.sortedIterator()
var currentKey: InternalRow = null
while (sortedIterator.next()) {
if (currentKey != sortedIterator.getKey) {
if (currentWriter != null) {
currentWriter.close()
currentWriter = null
logInfo(s"Sorting complete. Writing out partition files one at a time.")

val sortedIterator = sorter.sortedIterator()
var currentKey: InternalRow = null
while (sortedIterator.next()) {
if (currentKey != sortedIterator.getKey) {
if (currentWriter != null) {
currentWriter.close()
currentWriter = null
}
currentKey = sortedIterator.getKey.copy()
logDebug(s"Writing partition: $currentKey")

// Either use an existing file from before, or open a new one.
currentWriter = outputWriters.remove(currentKey)
if (currentWriter == null) {
currentWriter = newOutputWriter(currentKey)
}
}
currentKey = sortedIterator.getKey.copy()
logDebug(s"Writing partition: $currentKey")

// Either use an existing file from before, or open a new one.
currentWriter = outputWriters.remove(currentKey)
if (currentWriter == null) {
currentWriter = newOutputWriter(currentKey)
}
currentWriter.writeInternal(sortedIterator.getValue)
}
if (currentWriter != null) {
currentWriter.close()
currentWriter = null
}

currentWriter.writeInternal(sortedIterator.getValue)
}
if (currentWriter != null) {
currentWriter.close()
currentWriter = null
}
}

commitTask()
} catch {
case cause: Throwable =>
logError("Aborting task.", cause)
// call failure callbacks first, so we could have a chance to cleanup the writer.
TaskContext.get().asInstanceOf[TaskContextImpl].markTaskFailed(cause)
commitTask()
}(catchBlock = {
if (currentWriter != null) {
currentWriter.close()
}
abortTask()
throw new SparkException("Task failed while writing rows.", cause)
})
} catch {
case t: Throwable =>
throw new SparkException("Task failed while writing rows", t)
}

/** Open and returns a new OutputWriter given a partition key. */
Expand Down