Skip to content

Commit e497472

Browse files
jerryshaoMarcelo Vanzin
authored andcommitted
[SPARK-19306][CORE] Fix inconsistent state in DiskBlockObject when expection occurred
## What changes were proposed in this pull request? In `DiskBlockObjectWriter`, when some errors happened during writing, it will call `revertPartialWritesAndClose`, if this method again failed due to some issues like out of disk, it will throw exception without resetting the state of this writer, also skipping the revert. So here propose to fix this issue to offer user a chance to recover from such issue. ## How was this patch tested? Existing test. Author: jerryshao <[email protected]> Closes #16657 from jerryshao/SPARK-19306.
1 parent 5b258b8 commit e497472

File tree

1 file changed

+25
-19
lines changed

1 file changed

+25
-19
lines changed

core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala

Lines changed: 25 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -128,16 +128,19 @@ private[spark] class DiskBlockObjectWriter(
128128
*/
129129
private def closeResources(): Unit = {
130130
if (initialized) {
131-
mcs.manualClose()
132-
channel = null
133-
mcs = null
134-
bs = null
135-
fos = null
136-
ts = null
137-
objOut = null
138-
initialized = false
139-
streamOpen = false
140-
hasBeenClosed = true
131+
Utils.tryWithSafeFinally {
132+
mcs.manualClose()
133+
} {
134+
channel = null
135+
mcs = null
136+
bs = null
137+
fos = null
138+
ts = null
139+
objOut = null
140+
initialized = false
141+
streamOpen = false
142+
hasBeenClosed = true
143+
}
141144
}
142145
}
143146

@@ -199,26 +202,29 @@ private[spark] class DiskBlockObjectWriter(
199202
def revertPartialWritesAndClose(): File = {
200203
// Discard current writes. We do this by flushing the outstanding writes and then
201204
// truncating the file to its initial position.
202-
try {
205+
Utils.tryWithSafeFinally {
203206
if (initialized) {
204207
writeMetrics.decBytesWritten(reportedPosition - committedPosition)
205208
writeMetrics.decRecordsWritten(numRecordsWritten)
206209
streamOpen = false
207210
closeResources()
208211
}
209-
210-
val truncateStream = new FileOutputStream(file, true)
212+
} {
213+
var truncateStream: FileOutputStream = null
211214
try {
215+
truncateStream = new FileOutputStream(file, true)
212216
truncateStream.getChannel.truncate(committedPosition)
213-
file
217+
} catch {
218+
case e: Exception =>
219+
logError("Uncaught exception while reverting partial writes to file " + file, e)
214220
} finally {
215-
truncateStream.close()
221+
if (truncateStream != null) {
222+
truncateStream.close()
223+
truncateStream = null
224+
}
216225
}
217-
} catch {
218-
case e: Exception =>
219-
logError("Uncaught exception while reverting partial writes to file " + file, e)
220-
file
221226
}
227+
file
222228
}
223229

224230
/**

0 commit comments

Comments
 (0)