Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,9 @@ private[state] class HDFSBackedStateStoreProvider(
// semantically correct because Structured Streaming requires rerunning a batch should
// generate the same output. (SPARK-19677)
// scalastyle:on
if (!fs.exists(finalDeltaFile) && !fs.rename(tempDeltaFile, finalDeltaFile)) {
if (fs.exists(finalDeltaFile)) {
fs.delete(tempDeltaFile, true)
} else if (!fs.rename(tempDeltaFile, finalDeltaFile)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the file exists, it is deleted, but no new file is renamed to it -- is that right?

Copy link
Contributor Author

@gf53520 gf53520 Mar 1, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when restart streaming job , thefinalDeltaFile generated by the first batch is same to a finalDeltaFile generated by the last batch of streaming job before restart. So here don't need rename to create an same file.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess my point is, after this change, the file may not exist after this executes. Before, it always existed after this block. I wasn't sure that was the intended behavior change because the purpose seems to be to delete the temp file.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. This pr just want to delete the needless temp file, and the delta file need exist.

throw new IOException(s"Failed to rename $tempDeltaFile to $finalDeltaFile")
}
loadedMaps.put(newVersion, map)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ package org.apache.spark.sql.execution.streaming.state
import java.io.{File, IOException}
import java.net.URI

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.util.Random

import org.apache.commons.io.FileUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem}
import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
Expand Down Expand Up @@ -293,6 +295,11 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth
val provider = newStoreProvider(hadoopConf = conf)
provider.getStore(0).commit()
provider.getStore(0).commit()

// Verify we don't leak temp files
val tempFiles = FileUtils.listFiles(new File(provider.id.checkpointLocation),
null, true).asScala.filter(_.getName.startsWith("temp-"))
assert(tempFiles.isEmpty)
}

test("corrupted file handling") {
Expand Down