Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ class FileStreamSource(
fs.makeQualified(new Path(path)) // can contains glob patterns
}

private val metadataLog =
new FileStreamSourceLog(FileStreamSourceLog.VERSION, sparkSession, metadataPath)
private val metadataLog = new FileStreamSourceLog(FileStreamSourceLog.VERSION, sparkSession,
metadataPath, sourceOptions.maxFileAgeMs)
private var maxBatchId = metadataLog.getLatest().map(_._1).getOrElse(-1L)

/** Maximum number of new files to be considered in each batch */
Expand All @@ -58,9 +58,7 @@ class FileStreamSource(
// Visible for testing and debugging in production.
val seenFiles = new SeenFilesMap(sourceOptions.maxFileAgeMs)

metadataLog.allFiles().foreach { entry =>
seenFiles.add(entry)
}
metadataLog.allFiles().foreach { entry => seenFiles.add(entry) }
seenFiles.purge()

logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, maxFileAge = ${sourceOptions.maxFileAgeMs}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ import org.apache.spark.sql.internal.SQLConf
class FileStreamSourceLog(
metadataLogVersion: String,
sparkSession: SparkSession,
path: String)
path: String,
maxFileAge: Long)
extends CompactibleFileStreamLog[FileEntry](metadataLogVersion, sparkSession, path) {

import CompactibleFileStreamLog._
Expand All @@ -51,6 +52,9 @@ class FileStreamSourceLog(

private implicit val formats = Serialization.formats(NoTypeHints)

// Timestamp to track the latest file entry been added to log.
private var latestAddTimestamp = -1L

// A fixed size log entry cache to cache the file entries belong to the compaction batch. It is
// used to avoid scanning the compacted log file to retrieve it's own batch data.
private val cacheSize = compactInterval
Expand All @@ -69,14 +73,22 @@ class FileStreamSourceLog(
}

def compactLogs(logs: Seq[FileEntry]): Seq[FileEntry] = {
logs
val purgeTime = latestAddTimestamp - maxFileAge
logs.filter(_.timestamp > purgeTime)
}

override def add(batchId: Long, logs: Array[FileEntry]): Boolean = {
if (super.add(batchId, logs)) {
if (isCompactionBatch(batchId, compactInterval)) {
fileEntryCache.put(batchId, logs)
}

logs.foreach { entry =>
if (entry.timestamp > latestAddTimestamp) {
latestAddTimestamp = entry.timestamp
}
}

true
} else {
false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.streaming

import java.io.File
import java.net.URI
import java.util.concurrent.TimeUnit

import scala.util.Random

Expand Down Expand Up @@ -90,8 +91,8 @@ class FileStreamSourceSuite extends SparkFunSuite with SharedSQLContext {
classOf[ExistsThrowsExceptionFileSystem].getName)
// add the metadata entries as a pre-req
val dir = new File(temp, "dir") // use non-existent directory to test whether log make the dir
val metadataLog =
new FileStreamSourceLog(FileStreamSourceLog.VERSION, spark, dir.getAbsolutePath)
val metadataLog = new FileStreamSourceLog(FileStreamSourceLog.VERSION, spark,
dir.getAbsolutePath, TimeUnit.DAYS.toMillis(7))
assert(metadataLog.add(0, Array(FileEntry(s"$scheme:///file1", 100L, 0))))

val newSource = new FileStreamSource(spark, s"$scheme:///", "parquet", StructType(Nil),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
AddTextFileData("a\nb", src, tmp),
CheckAnswer("a", "b"),

// SLeeps longer than 5ms (maxFileAge)
// Sleeps longer than 5ms (maxFileAge)
// Unfortunately since a lot of file system does not have modification time granularity
// finer grained than 1 sec, we need to use 1 sec here.
AssertOnQuery { _ => Thread.sleep(1000); true },
Expand Down Expand Up @@ -899,6 +899,52 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
}
}
}

test("purge aged file entry in FileStreamSourceLog") {
withTempDirs { case (src, tmp) =>
withSQLConf(
SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key -> "2"
) {
val fileStream = createFileStream(format = "text", path = src.getCanonicalPath,
options = Map("maxFileAge" -> "5ms"))
val filtered = fileStream.filter($"value" contains "keep")

def metadataLog(execution: StreamExecution): FileStreamSourceLog = {
val _sources = PrivateMethod[Seq[Source]]('sources)
val _metadataLog = PrivateMethod[FileStreamSourceLog]('metadataLog)
val fileSource = (execution invokePrivate _sources()).head.asInstanceOf[FileStreamSource]
fileSource invokePrivate _metadataLog()
}

testStream(filtered)(
AddTextFileData("drop1\nkeep2\nkeep3", src, tmp),
AssertOnQuery { _ => Thread.sleep(1000); true },
CheckAnswer("keep2", "keep3"),
AddTextFileData("drop4\nkeep5\nkeep6", src, tmp),
AssertOnQuery { _ => Thread.sleep(1000); true },
CheckAnswer("keep2", "keep3", "keep5", "keep6"),
AssertOnQuery { e =>
val compactedFileEntries = metadataLog(e).get(1L)
assert(compactedFileEntries.isDefined)
assert(compactedFileEntries.get.map(_.batchId).sorted === Array(0L, 1L))
true
},
AddTextFileData("drop7\nkeep8\nkeep9", src, tmp),
AssertOnQuery { _ => Thread.sleep(1000); true },
CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9"),
AddTextFileData("drop10\nkeep11", src, tmp),
AssertOnQuery { _ => Thread.sleep(1000); true },
CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9", "keep11"),
AssertOnQuery { e =>
val compactedFileEntries = metadataLog(e).get(3L)
assert(compactedFileEntries.isDefined)
assert(compactedFileEntries.get.map(_.batchId).sorted === Array(2L, 3L))
true
}
)
}
}
}
}

class FileStreamSourceStressTestSuite extends FileStreamSourceTest {
Expand Down