Skip to content

Commit 9734a92

Browse files
vitillozsxwing
authored andcommitted
[SPARK-19677][SS] Committing a delta file atop an existing one should not fail on HDFS
## What changes were proposed in this pull request? HDFSBackedStateStoreProvider fails to rename files on HDFS but not on the local filesystem. According to the [implementation notes](https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/filesystem/filesystem.html) of `rename()`, the behavior of the local filesystem and HDFS varies: > Destination exists and is a file > Renaming a file atop an existing file is specified as failing, raising an exception. > - Local FileSystem : the rename succeeds; the destination file is replaced by the source file. > - HDFS : The rename fails, no exception is raised. Instead the method call simply returns false. This patch ensures that `rename()` isn't called if the destination file already exists. It's still semantically correct because Structured Streaming requires that rerunning a batch should generate the same output. ## How was this patch tested? This patch was tested by running `StateStoreSuite`. Author: Roberto Agostino Vitillo <[email protected]> Closes #17012 from vitillo/fix_rename.
1 parent 7c7fc30 commit 9734a92

File tree

2 files changed

+34
-8
lines changed

2 files changed

+34
-8
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,16 @@ private[state] class HDFSBackedStateStoreProvider(
274274
private def commitUpdates(newVersion: Long, map: MapType, tempDeltaFile: Path): Path = {
275275
synchronized {
276276
val finalDeltaFile = deltaFile(newVersion)
277-
if (!fs.rename(tempDeltaFile, finalDeltaFile)) {
277+
278+
// scalastyle:off
279+
// Renaming a file atop an existing one fails on HDFS
280+
// (http://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/filesystem/filesystem.html).
281+
// Hence we should either skip the rename step or delete the target file. Because deleting the
282+
// target file will break speculation, skipping the rename step is the only choice. It's still
283+
// semantically correct because Structured Streaming requires rerunning a batch should
284+
// generate the same output. (SPARK-19677)
285+
// scalastyle:on
286+
if (!fs.exists(finalDeltaFile) && !fs.rename(tempDeltaFile, finalDeltaFile)) {
278287
throw new IOException(s"Failed to rename $tempDeltaFile to $finalDeltaFile")
279288
}
280289
loadedMaps.put(newVersion, map)

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -210,13 +210,6 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth
210210
assert(store1.commit() === 2)
211211
assert(rowsToSet(store1.iterator()) === Set("a" -> 1, "b" -> 1))
212212
assert(getDataFromFiles(provider) === Set("a" -> 1, "b" -> 1))
213-
214-
// Overwrite the version with other data
215-
val store2 = provider.getStore(1)
216-
put(store2, "c", 1)
217-
assert(store2.commit() === 2)
218-
assert(rowsToSet(store2.iterator()) === Set("a" -> 1, "c" -> 1))
219-
assert(getDataFromFiles(provider) === Set("a" -> 1, "c" -> 1))
220213
}
221214

222215
test("snapshotting") {
@@ -292,6 +285,15 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth
292285
assert(getDataFromFiles(provider, 19) === Set("a" -> 19))
293286
}
294287

288+
test("SPARK-19677: Committing a delta file atop an existing one should not fail on HDFS") {
289+
val conf = new Configuration()
290+
conf.set("fs.fake.impl", classOf[RenameLikeHDFSFileSystem].getName)
291+
conf.set("fs.default.name", "fake:///")
292+
293+
val provider = newStoreProvider(hadoopConf = conf)
294+
provider.getStore(0).commit()
295+
provider.getStore(0).commit()
296+
}
295297

296298
test("corrupted file handling") {
297299
val provider = newStoreProvider(minDeltasForSnapshot = 5)
@@ -681,6 +683,21 @@ private[state] object StateStoreSuite {
681683
}
682684
}
683685

686+
/**
687+
* Fake FileSystem that simulates HDFS rename semantic, i.e. renaming a file atop an existing
688+
* one should return false.
689+
* See hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/filesystem/filesystem.html
690+
*/
691+
class RenameLikeHDFSFileSystem extends RawLocalFileSystem {
692+
override def rename(src: Path, dst: Path): Boolean = {
693+
if (exists(dst)) {
694+
return false
695+
} else {
696+
return super.rename(src, dst)
697+
}
698+
}
699+
}
700+
684701
/**
685702
* Fake FileSystem to test that the StateStore throws an exception while committing the
686703
* delta file, when `fs.rename` returns `false`.

0 commit comments

Comments
 (0)