Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -181,21 +181,26 @@ private[spark] object ReliableCheckpointRDD extends Logging {
serializeStream.writeAll(iterator)
} {
serializeStream.close()
fileOutputStream.close()
}

if (!fs.rename(tempOutputPath, finalOutputPath)) {
if (!fs.exists(finalOutputPath)) {
logInfo(s"Deleting tempOutputPath $tempOutputPath")
fs.delete(tempOutputPath, false)
throw new IOException("Checkpoint failed: failed to save output of task: " +
s"${ctx.attemptNumber()} and final output path does not exist: $finalOutputPath")
} else {
// Some other copy of this task must've finished before us and renamed it
logInfo(s"Final output path $finalOutputPath already exists; not overwriting it")
if (!fs.delete(tempOutputPath, false)) {
logWarning(s"Error deleting ${tempOutputPath}")
try {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that this doesn't encompass the span of usage for fs -- better to just call fs.close() at the end and not worry about manually closing in an error case? or expand the try-finally?

Actually, I am not sure we are supposed to call FileSystem.close() because they are shared instances, cached and reused across the whole application.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed with @srowen , FileSystem is a cached object, closing it means removed it from cache. I don't think we need to call this explicitly. Because by default it is designed to be shared.

if (!fs.rename(tempOutputPath, finalOutputPath)) {
if (!fs.exists(finalOutputPath)) {
logInfo(s"Deleting tempOutputPath $tempOutputPath")
fs.delete(tempOutputPath, false)
throw new IOException("Checkpoint failed: failed to save output of task: " +
s"${ctx.attemptNumber()} and final output path does not exist: $finalOutputPath")
} else {
// Some other copy of this task must've finished before us and renamed it
logInfo(s"Final output path $finalOutputPath already exists; not overwriting it")
if (!fs.delete(tempOutputPath, false)) {
logWarning(s"Error deleting ${tempOutputPath}")
}
}
}
} finally {
fs.close()
}
}

Expand All @@ -216,6 +221,8 @@ private[spark] object ReliableCheckpointRDD extends Logging {
serializeStream.writeObject(partitioner)
} {
serializeStream.close()
fileOutputStream.close()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto, this is OK if serializeStream.close() doesn't actually close the underlying stream (?) but not sure about the next line.

fs.close()
}
logDebug(s"Written partitioner to $partitionerFilePath")
} catch {
Expand Down Expand Up @@ -248,6 +255,7 @@ private[spark] object ReliableCheckpointRDD extends Logging {
}
} {
fileInputStream.close()
fs.close()
}

logDebug(s"Read partitioner from $partitionerFilePath")
Expand Down Expand Up @@ -279,8 +287,13 @@ private[spark] object ReliableCheckpointRDD extends Logging {

// Register an on-task-completion callback to close the input stream.
context.addTaskCompletionListener(context => deserializeStream.close())

deserializeStream.asIterator.asInstanceOf[Iterator[T]]
Utils.tryWithSafeFinally {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you can close it here, right? you're returning an iterator on the stream

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code will introduce issue, deserializaStream should be called after finished, the code here will close this stream prematurely.

Also please look at L289, it already takes care of close after the task is finished.

deserializeStream.asIterator.asInstanceOf[Iterator[T]]
} {
deserializeStream.close()
fileInputStream.close()
fs.close()
}
}

}