diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala index 81e80629092a..4a2aac43b333 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala @@ -646,15 +646,15 @@ class RocksDB( // is enabled. if (shouldForceSnapshot.get()) { uploadSnapshot() + shouldForceSnapshot.set(false) + } + + // ensure that changelog files are always written + try { + assert(changelogWriter.isDefined) + changelogWriter.foreach(_.commit()) + } finally { changelogWriter = None - changelogWriter.foreach(_.abort()) - } else { - try { - assert(changelogWriter.isDefined) - changelogWriter.foreach(_.commit()) - } finally { - changelogWriter = None - } } } else { assert(changelogWriter.isEmpty) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala index 7ac574db98d4..e0ed27f92004 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala @@ -808,6 +808,47 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared } } + testWithChangelogCheckpointingEnabled("RocksDB: ensure that changelog files are written " + + "and snapshots uploaded optionally with changelog format v2") { + withTempDir { dir => + val remoteDir = Utils.createTempDir().toString + val conf = dbConf.copy(minDeltasForSnapshot = 5, compactOnCommit = false) + new File(remoteDir).delete() // to make sure that the directory gets created + withDB(remoteDir, conf = conf, useColumnFamilies = true) { db => + db.createColFamilyIfAbsent("test") + db.load(0) + db.put("a", "1") + db.put("b", "2") + db.commit() + assert(changelogVersionsPresent(remoteDir) == Seq(1)) + assert(snapshotVersionsPresent(remoteDir) == Seq(1)) + + db.load(1) + db.put("a", "3") + db.put("c", "4") + db.commit() + + assert(changelogVersionsPresent(remoteDir) == Seq(1, 2)) + assert(snapshotVersionsPresent(remoteDir) == Seq(1)) + + db.removeColFamilyIfExists("test") + db.load(2) + db.remove("a") + db.put("d", "5") + db.commit() + assert(changelogVersionsPresent(remoteDir) == Seq(1, 2, 3)) + assert(snapshotVersionsPresent(remoteDir) == Seq(1, 3)) + + db.load(3) + db.put("e", "6") + db.remove("b") + db.commit() + assert(changelogVersionsPresent(remoteDir) == Seq(1, 2, 3, 4)) + assert(snapshotVersionsPresent(remoteDir) == Seq(1, 3)) + } + } + } + test("RocksDB: ensure merge operation correctness") { withTempDir { dir => val remoteDir = Utils.createTempDir().toString