diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index 5ebc083a7da92..b5fb866a61e66 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -47,8 +47,8 @@ class FileStreamSource( fs.makeQualified(new Path(path)) // can contains glob patterns } - private val metadataLog = - new FileStreamSourceLog(FileStreamSourceLog.VERSION, sparkSession, metadataPath) + private val metadataLog = new FileStreamSourceLog(FileStreamSourceLog.VERSION, sparkSession, + metadataPath, sourceOptions.maxFileAgeMs) private var maxBatchId = metadataLog.getLatest().map(_._1).getOrElse(-1L) /** Maximum number of new files to be considered in each batch */ @@ -58,9 +58,7 @@ class FileStreamSource( // Visible for testing and debugging in production. val seenFiles = new SeenFilesMap(sourceOptions.maxFileAgeMs) - metadataLog.allFiles().foreach { entry => - seenFiles.add(entry) - } + metadataLog.allFiles().foreach { entry => seenFiles.add(entry) } seenFiles.purge() logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, maxFileAge = ${sourceOptions.maxFileAgeMs}") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala index 4681f2ba08c84..0ebe099d96603 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala @@ -32,7 +32,8 @@ import org.apache.spark.sql.internal.SQLConf class FileStreamSourceLog( metadataLogVersion: String, sparkSession: SparkSession, - path: String) + path: String, + maxFileAge: Long) extends CompactibleFileStreamLog[FileEntry](metadataLogVersion, sparkSession, path) { import CompactibleFileStreamLog._ @@ -51,6 +52,9 @@ class FileStreamSourceLog( private implicit val formats = Serialization.formats(NoTypeHints) + // Timestamp to track the latest file entry been added to log. + private var latestAddTimestamp = -1L + // A fixed size log entry cache to cache the file entries belong to the compaction batch. It is // used to avoid scanning the compacted log file to retrieve it's own batch data. private val cacheSize = compactInterval @@ -69,7 +73,8 @@ class FileStreamSourceLog( } def compactLogs(logs: Seq[FileEntry]): Seq[FileEntry] = { - logs + val purgeTime = latestAddTimestamp - maxFileAge + logs.filter(_.timestamp > purgeTime) } override def add(batchId: Long, logs: Array[FileEntry]): Boolean = { @@ -77,6 +82,13 @@ class FileStreamSourceLog( if (isCompactionBatch(batchId, compactInterval)) { fileEntryCache.put(batchId, logs) } + + logs.foreach { entry => + if (entry.timestamp > latestAddTimestamp) { + latestAddTimestamp = entry.timestamp + } + } + true } else { false diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala index 0795a0527f13a..5eb48de59960f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.streaming import java.io.File import java.net.URI +import java.util.concurrent.TimeUnit import scala.util.Random @@ -90,8 +91,8 @@ class FileStreamSourceSuite extends SparkFunSuite with SharedSQLContext { classOf[ExistsThrowsExceptionFileSystem].getName) // add the metadata entries as a pre-req val dir = new File(temp, "dir") // use non-existent directory to test whether log make the dir - val metadataLog = - new FileStreamSourceLog(FileStreamSourceLog.VERSION, spark, dir.getAbsolutePath) + val metadataLog = new FileStreamSourceLog(FileStreamSourceLog.VERSION, spark, + dir.getAbsolutePath, TimeUnit.DAYS.toMillis(7)) assert(metadataLog.add(0, Array(FileEntry(s"$scheme:///file1", 100L, 0)))) val newSource = new FileStreamSource(spark, s"$scheme:///", "parquet", StructType(Nil), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 55c95ae285c1b..9784c3c6665a9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -353,7 +353,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest { AddTextFileData("a\nb", src, tmp), CheckAnswer("a", "b"), - // SLeeps longer than 5ms (maxFileAge) + // Sleeps longer than 5ms (maxFileAge) // Unfortunately since a lot of file system does not have modification time granularity // finer grained than 1 sec, we need to use 1 sec here. AssertOnQuery { _ => Thread.sleep(1000); true }, @@ -899,6 +899,52 @@ class FileStreamSourceSuite extends FileStreamSourceTest { } } } + + test("purge aged file entry in FileStreamSourceLog") { + withTempDirs { case (src, tmp) => + withSQLConf( + SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key -> "2" + ) { + val fileStream = createFileStream(format = "text", path = src.getCanonicalPath, + options = Map("maxFileAge" -> "5ms")) + val filtered = fileStream.filter($"value" contains "keep") + + def metadataLog(execution: StreamExecution): FileStreamSourceLog = { + val _sources = PrivateMethod[Seq[Source]]('sources) + val _metadataLog = PrivateMethod[FileStreamSourceLog]('metadataLog) + val fileSource = (execution invokePrivate _sources()).head.asInstanceOf[FileStreamSource] + fileSource invokePrivate _metadataLog() + } + + testStream(filtered)( + AddTextFileData("drop1\nkeep2\nkeep3", src, tmp), + AssertOnQuery { _ => Thread.sleep(1000); true }, + CheckAnswer("keep2", "keep3"), + AddTextFileData("drop4\nkeep5\nkeep6", src, tmp), + AssertOnQuery { _ => Thread.sleep(1000); true }, + CheckAnswer("keep2", "keep3", "keep5", "keep6"), + AssertOnQuery { e => + val compactedFileEntries = metadataLog(e).get(1L) + assert(compactedFileEntries.isDefined) + assert(compactedFileEntries.get.map(_.batchId).sorted === Array(0L, 1L)) + true + }, + AddTextFileData("drop7\nkeep8\nkeep9", src, tmp), + AssertOnQuery { _ => Thread.sleep(1000); true }, + CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9"), + AddTextFileData("drop10\nkeep11", src, tmp), + AssertOnQuery { _ => Thread.sleep(1000); true }, + CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9", "keep11"), + AssertOnQuery { e => + val compactedFileEntries = metadataLog(e).get(3L) + assert(compactedFileEntries.isDefined) + assert(compactedFileEntries.get.map(_.batchId).sorted === Array(2L, 3L)) + true + } + ) + } + } + } } class FileStreamSourceStressTestSuite extends FileStreamSourceTest {