File tree Expand file tree Collapse file tree 2 files changed +10
-1
lines changed
main/scala/org/apache/spark/sql/execution/streaming/state
test/scala/org/apache/spark/sql/execution/streaming/state Expand file tree Collapse file tree 2 files changed +10
-1
lines changed Original file line number Diff line number Diff line change @@ -283,7 +283,9 @@ private[state] class HDFSBackedStateStoreProvider(
283283 // semantically correct because Structured Streaming requires rerunning a batch should
284284 // generate the same output. (SPARK-19677)
285285 // scalastyle:on
286- if (! fs.exists(finalDeltaFile) && ! fs.rename(tempDeltaFile, finalDeltaFile)) {
286+ if (fs.exists(finalDeltaFile)) {
287+ fs.delete(tempDeltaFile, true )
288+ } else if (! fs.rename(tempDeltaFile, finalDeltaFile)) {
287289 throw new IOException (s " Failed to rename $tempDeltaFile to $finalDeltaFile" )
288290 }
289291 loadedMaps.put(newVersion, map)
Original file line number Diff line number Diff line change @@ -20,9 +20,11 @@ package org.apache.spark.sql.execution.streaming.state
2020import java .io .{File , IOException }
2121import java .net .URI
2222
23+ import scala .collection .JavaConverters ._
2324import scala .collection .mutable
2425import scala .util .Random
2526
27+ import org .apache .commons .io .FileUtils
2628import org .apache .hadoop .conf .Configuration
2729import org .apache .hadoop .fs .{FileStatus , Path , RawLocalFileSystem }
2830import org .scalatest .{BeforeAndAfter , PrivateMethodTester }
@@ -293,6 +295,11 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth
293295 val provider = newStoreProvider(hadoopConf = conf)
294296 provider.getStore(0 ).commit()
295297 provider.getStore(0 ).commit()
298+
299+ // Verify we don't leak temp files
300+ val tempFiles = FileUtils .listFiles(new File (provider.id.checkpointLocation),
301+ null , true ).asScala.filter(_.getName.startsWith(" temp-" ))
302+ assert(tempFiles.isEmpty)
296303 }
297304
298305 test(" corrupted file handling" ) {
You can’t perform that action at this time.
0 commit comments