Skip to content

Commit c40f8a1

Browse files
committed
[SPARK-28025][SS] Fix FileContextBasedCheckpointFileManager leaking crc files
This PR fixes the leak of crc files from CheckpointFileManager when FileContextBasedCheckpointFileManager is being used. Spark hits the Hadoop bug, [HADOOP-16255](https://issues.apache.org/jira/browse/HADOOP-16255) which seems to be a long-standing issue. This is there're two `renameInternal` methods: ``` public void renameInternal(Path src, Path dst) public void renameInternal(final Path src, final Path dst, boolean overwrite) ``` which should be overridden to handle all cases but ChecksumFs only overrides method with 2 params, so when latter is called FilterFs.renameInternal(...) is called instead, and it will do rename with RawLocalFs as underlying filesystem. The bug is related to FileContext, so FileSystemBasedCheckpointFileManager is not affected. [SPARK-17475](https://issues.apache.org/jira/browse/SPARK-17475) took a workaround for this bug, but [SPARK-23966](https://issues.apache.org/jira/browse/SPARK-23966) seemed to bring regression. This PR deletes crc file as "best-effort" when renaming, as failing to delete crc file is not that critical to fail the task. This PR prevents crc files not being cleaned up even purging batches. Too many files in same directory often hurts performance, as well as each crc file occupies more space than its own size so possible to occupy nontrivial amount of space when batches go up to 100000+. No. Some unit tests are modified to check leakage of crc files. Closes #25488 from HeartSaVioR/SPARK-28025. Authored-by: Jungtaek Lim (HeartSaVioR) <[email protected]> Signed-off-by: Shixiong Zhu <[email protected]>
1 parent e468576 commit c40f8a1

File tree

3 files changed

+55
-5
lines changed

3 files changed

+55
-5
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -329,6 +329,8 @@ class FileContextBasedCheckpointFileManager(path: Path, hadoopConf: Configuratio
329329
override def renameTempFile(srcPath: Path, dstPath: Path, overwriteIfPossible: Boolean): Unit = {
330330
import Options.Rename._
331331
fc.rename(srcPath, dstPath, if (overwriteIfPossible) OVERWRITE else NONE)
332+
// TODO: this is a workaround of HADOOP-16255 - remove this when HADOOP-16255 is resolved
333+
mayRemoveCrcFile(srcPath)
332334
}
333335

334336

@@ -345,5 +347,17 @@ class FileContextBasedCheckpointFileManager(path: Path, hadoopConf: Configuratio
345347
case _: LocalFs | _: RawLocalFs => true // LocalFs = RawLocalFs + ChecksumFs
346348
case _ => false
347349
}
350+
351+
private def mayRemoveCrcFile(path: Path): Unit = {
352+
try {
353+
val checksumFile = new Path(path.getParent, s".${path.getName}.crc")
354+
if (exists(checksumFile)) {
355+
// checksum file exists, deleting it
356+
delete(checksumFile)
357+
}
358+
} catch {
359+
case NonFatal(_) => // ignore, we are removing crc file as "best-effort"
360+
}
361+
}
348362
}
349363

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,22 @@ abstract class CheckpointFileManagerTests extends SparkFunSuite {
7878
assert(fm.exists(path))
7979
fm.createAtomic(path, overwriteIfPossible = true).close() // should not throw exception
8080

81+
// crc file should not be leaked when origin file doesn't exist.
82+
// The implementation of Hadoop filesystem may filter out checksum file, so
83+
// listing files from local filesystem.
84+
val fileNames = new File(path.getParent.toString).listFiles().toSeq
85+
.filter(p => p.isFile).map(p => p.getName)
86+
val crcFiles = fileNames.filter(n => n.startsWith(".") && n.endsWith(".crc"))
87+
val originFileNamesForExistingCrcFiles = crcFiles.map { name =>
88+
// remove first "." and last ".crc"
89+
name.substring(1, name.length - 4)
90+
}
91+
92+
// Check all origin files exist for all crc files.
93+
assert(originFileNamesForExistingCrcFiles.toSet.subsetOf(fileNames.toSet),
94+
s"Some of origin files for crc files don't exist - crc files: $crcFiles / " +
95+
s"expected origin files: $originFileNamesForExistingCrcFiles / actual files: $fileNames")
96+
8197
// Open and delete
8298
fm.open(path).close()
8399
fm.delete(path)

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import org.scalatest.concurrent.Waiters._
2626
import org.scalatest.time.SpanSugar._
2727

2828
import org.apache.spark.SparkFunSuite
29+
import org.apache.spark.sql.internal.SQLConf
2930
import org.apache.spark.sql.test.SharedSQLContext
3031
import org.apache.spark.util.UninterruptibleThread
3132

@@ -59,6 +60,21 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
5960
}
6061

6162
test("HDFSMetadataLog: purge") {
63+
testPurge()
64+
}
65+
66+
Seq(
67+
classOf[FileSystemBasedCheckpointFileManager],
68+
classOf[FileContextBasedCheckpointFileManager]
69+
).map(_.getCanonicalName).foreach { cls =>
70+
test(s"HDFSMetadataLog: purge - explicit file manager - $cls") {
71+
withSQLConf(SQLConf.STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key -> cls) {
72+
testPurge()
73+
}
74+
}
75+
}
76+
77+
private def testPurge(): Unit = {
6278
withTempDir { temp =>
6379
val metadataLog = new HDFSMetadataLog[String](spark, temp.getAbsolutePath)
6480
assert(metadataLog.add(0, "batch0"))
@@ -75,12 +91,16 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
7591
assert(metadataLog.get(2).isDefined)
7692
assert(metadataLog.getLatest().get._1 == 2)
7793

78-
// There should be exactly one file, called "2", in the metadata directory.
94+
// There should be at most two files, called "2", and optionally crc file,
95+
// in the metadata directory.
7996
// This check also tests for regressions of SPARK-17475
80-
val allFiles = new File(metadataLog.metadataPath.toString).listFiles()
81-
.filter(!_.getName.startsWith(".")).toSeq
82-
assert(allFiles.size == 1)
83-
assert(allFiles(0).getName() == "2")
97+
val allFiles = new File(metadataLog.metadataPath.toString).listFiles().toSeq
98+
assert(allFiles.size <= 2)
99+
assert(allFiles.exists(_.getName == "2"))
100+
if (allFiles.size == 2) {
101+
// there's possibly crc file being left as well
102+
assert(allFiles.exists(_.getName == ".2.crc"))
103+
}
84104
}
85105
}
86106

0 commit comments

Comments
 (0)