Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,8 @@ class FileContextBasedCheckpointFileManager(path: Path, hadoopConf: Configuratio
override def renameTempFile(srcPath: Path, dstPath: Path, overwriteIfPossible: Boolean): Unit = {
import Options.Rename._
fc.rename(srcPath, dstPath, if (overwriteIfPossible) OVERWRITE else NONE)
// TODO: this is a workaround of HADOOP-16255 - remove this when HADOOP-16255 is resolved
mayRemoveCrcFile(srcPath)
}


Expand All @@ -345,5 +347,17 @@ class FileContextBasedCheckpointFileManager(path: Path, hadoopConf: Configuratio
case _: LocalFs | _: RawLocalFs => true // LocalFs = RawLocalFs + ChecksumFs
case _ => false
}

private def mayRemoveCrcFile(path: Path): Unit = {
try {
val checksumFile = new Path(path.getParent, s".${path.getName}.crc")
if (exists(checksumFile)) {
// checksum file exists, deleting it
delete(checksumFile)
}
} catch {
case NonFatal(_) => // ignore, we are removing crc file as "best-effort"
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,22 @@ abstract class CheckpointFileManagerTests extends SparkFunSuite {
assert(fm.exists(path))
fm.createAtomic(path, overwriteIfPossible = true).close() // should not throw exception

// crc file should not be leaked when origin file doesn't exist.
// The implementation of Hadoop filesystem may filter out checksum file, so
// listing files from local filesystem.
val fileNames = new File(path.getParent.toString).listFiles().toSeq
.filter(p => p.isFile).map(p => p.getName)
val crcFiles = fileNames.filter(n => n.startsWith(".") && n.endsWith(".crc"))
val originFileNamesForExistingCrcFiles = crcFiles.map { name =>
// remove first "." and last ".crc"
name.substring(1, name.length - 4)
}

// Check all origin files exist for all crc files.
assert(originFileNamesForExistingCrcFiles.toSet.subsetOf(fileNames.toSet),
s"Some of origin files for crc files don't exist - crc files: $crcFiles / " +
s"expected origin files: $originFileNamesForExistingCrcFiles / actual files: $fileNames")

// Open and delete
fm.open(path).close()
fm.delete(path)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.scalatest.concurrent.Waiters._
import org.scalatest.time.SpanSugar._

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.util.UninterruptibleThread

Expand Down Expand Up @@ -59,6 +60,21 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
}

test("HDFSMetadataLog: purge") {
testPurge()
}

Seq(
classOf[FileSystemBasedCheckpointFileManager],
classOf[FileContextBasedCheckpointFileManager]
).map(_.getCanonicalName).foreach { cls =>
test(s"HDFSMetadataLog: purge - explicit file manager - $cls") {
withSQLConf(SQLConf.STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key -> cls) {
testPurge()
}
}
}

private def testPurge(): Unit = {
withTempDir { temp =>
val metadataLog = new HDFSMetadataLog[String](spark, temp.getAbsolutePath)
assert(metadataLog.add(0, "batch0"))
Expand All @@ -75,12 +91,16 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
assert(metadataLog.get(2).isDefined)
assert(metadataLog.getLatest().get._1 == 2)

// There should be exactly one file, called "2", in the metadata directory.
// There should be at most two files, called "2", and optionally crc file,
// in the metadata directory.
// This check also tests for regressions of SPARK-17475
val allFiles = new File(metadataLog.metadataPath.toString).listFiles()
.filter(!_.getName.startsWith(".")).toSeq
assert(allFiles.size == 1)
assert(allFiles(0).getName() == "2")
val allFiles = new File(metadataLog.metadataPath.toString).listFiles().toSeq
assert(allFiles.size <= 2)
assert(allFiles.exists(_.getName == "2"))
if (allFiles.size == 2) {
// there's possibly crc file being left as well
assert(allFiles.exists(_.getName == ".2.crc"))
}
}
}

Expand Down