diff --git a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala index e0a29b48314fb..28a90052dcba3 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala @@ -181,21 +181,26 @@ private[spark] object ReliableCheckpointRDD extends Logging { serializeStream.writeAll(iterator) } { serializeStream.close() + fileOutputStream.close() } - if (!fs.rename(tempOutputPath, finalOutputPath)) { - if (!fs.exists(finalOutputPath)) { - logInfo(s"Deleting tempOutputPath $tempOutputPath") - fs.delete(tempOutputPath, false) - throw new IOException("Checkpoint failed: failed to save output of task: " + - s"${ctx.attemptNumber()} and final output path does not exist: $finalOutputPath") - } else { - // Some other copy of this task must've finished before us and renamed it - logInfo(s"Final output path $finalOutputPath already exists; not overwriting it") - if (!fs.delete(tempOutputPath, false)) { - logWarning(s"Error deleting ${tempOutputPath}") + try { + if (!fs.rename(tempOutputPath, finalOutputPath)) { + if (!fs.exists(finalOutputPath)) { + logInfo(s"Deleting tempOutputPath $tempOutputPath") + fs.delete(tempOutputPath, false) + throw new IOException("Checkpoint failed: failed to save output of task: " + + s"${ctx.attemptNumber()} and final output path does not exist: $finalOutputPath") + } else { + // Some other copy of this task must've finished before us and renamed it + logInfo(s"Final output path $finalOutputPath already exists; not overwriting it") + if (!fs.delete(tempOutputPath, false)) { + logWarning(s"Error deleting ${tempOutputPath}") + } } } + } finally { + fs.close() } } @@ -216,6 +221,8 @@ private[spark] object ReliableCheckpointRDD extends Logging { serializeStream.writeObject(partitioner) } { serializeStream.close() + fileOutputStream.close() + fs.close() } logDebug(s"Written partitioner to $partitionerFilePath") } catch { @@ -248,6 +255,7 @@ private[spark] object ReliableCheckpointRDD extends Logging { } } { fileInputStream.close() + fs.close() } logDebug(s"Read partitioner from $partitionerFilePath") @@ -279,8 +287,13 @@ private[spark] object ReliableCheckpointRDD extends Logging { // Register an on-task-completion callback to close the input stream. context.addTaskCompletionListener(context => deserializeStream.close()) - - deserializeStream.asIterator.asInstanceOf[Iterator[T]] + Utils.tryWithSafeFinally { + deserializeStream.asIterator.asInstanceOf[Iterator[T]] + } { + deserializeStream.close() + fileInputStream.close() + fs.close() + } } }