Skip to content

Commit c0b4655

Browse files
committed
Fix a potential issue that may generate partial snapshot files
1 parent da8c59b commit c0b4655

File tree

1 file changed

+9
-1
lines changed

1 file changed

+9
-1
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -386,9 +386,11 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit
386386

387387
private def writeSnapshotFile(version: Long, map: MapType): Unit = {
388388
val fileToWrite = snapshotFile(version)
389+
val tempFile =
390+
new Path(fileToWrite.getParent, s"${fileToWrite.getName}.temp-${Random.nextLong}")
389391
var output: DataOutputStream = null
390392
Utils.tryWithSafeFinally {
391-
output = compressStream(fs.create(fileToWrite, false))
393+
output = compressStream(fs.create(tempFile, false))
392394
val iter = map.entrySet().iterator()
393395
while(iter.hasNext) {
394396
val entry = iter.next()
@@ -403,6 +405,12 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit
403405
} {
404406
if (output != null) output.close()
405407
}
408+
if (fs.exists(fileToWrite)) {
409+
// Skip rename if the file is alreayd created.
410+
fs.delete(tempFile, true)
411+
} else if (!fs.rename(tempFile, fileToWrite)) {
412+
throw new IOException(s"Failed to rename $tempFile to $fileToWrite")
413+
}
406414
logInfo(s"Written snapshot file for version $version of $this at $fileToWrite")
407415
}
408416

0 commit comments

Comments
 (0)