diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala index e0c2e942072c7..7ef639caf3e41 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala @@ -451,6 +451,10 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit } catch { case _: FileNotFoundException => None + case e: IOException => + logWarning(s"Corrupted snapshot file for version $version of $this: $fileToRead", e) + fs.delete(fileToRead, true) + None } finally { if (input != null) input.close() } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala index c843b65020d8c..ce647211536b0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala @@ -164,12 +164,10 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] val snapshotVersion = (0 to 10).find( version => fileExists(provider, version, isSnapshot = true)).getOrElse(fail("snapshot file not found")) - // Corrupt snapshot file and verify that it throws error + // Corrupt snapshot file and verify that it doesn't throw an error assert(getData(provider, snapshotVersion) === Set("a" -> snapshotVersion)) corruptFile(provider, snapshotVersion, isSnapshot = true) - intercept[Exception] { - getData(provider, snapshotVersion) - } + getData(provider, snapshotVersion) // Corrupt delta file and verify that it throws error assert(getData(provider, snapshotVersion - 1) === Set("a" -> (snapshotVersion - 1)))