diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala index e0c2e942072c7..36d6569a4187a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala @@ -386,9 +386,11 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit private def writeSnapshotFile(version: Long, map: MapType): Unit = { val fileToWrite = snapshotFile(version) + val tempFile = + new Path(fileToWrite.getParent, s"${fileToWrite.getName}.temp-${Random.nextLong}") var output: DataOutputStream = null Utils.tryWithSafeFinally { - output = compressStream(fs.create(fileToWrite, false)) + output = compressStream(fs.create(tempFile, false)) val iter = map.entrySet().iterator() while(iter.hasNext) { val entry = iter.next() @@ -403,6 +405,12 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit } { if (output != null) output.close() } + if (fs.exists(fileToWrite)) { + // Skip rename if the file is alreayd created. + fs.delete(tempFile, true) + } else if (!fs.rename(tempFile, fileToWrite)) { + throw new IOException(s"Failed to rename $tempFile to $fileToWrite") + } logInfo(s"Written snapshot file for version $version of $this at $fileToWrite") }