Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class FileStreamSource(
val seenFiles = new SeenFilesMap(sourceOptions.maxFileAgeMs)

metadataLog.allFiles().foreach { entry =>
seenFiles.add(entry)
seenFiles.add(entry.path, entry.timestamp)
}
seenFiles.purge()

Expand All @@ -73,14 +73,16 @@ class FileStreamSource(
*/
private def fetchMaxOffset(): LongOffset = synchronized {
// All the new files found - ignore aged files and files that we have seen.
val newFiles = fetchAllFiles().filter(seenFiles.isNewFile)
val newFiles = fetchAllFiles().filter {
case (path, timestamp) => seenFiles.isNewFile(path, timestamp)
}

// Obey user's setting to limit the number of files in this batch trigger.
val batchFiles =
if (maxFilesPerBatch.nonEmpty) newFiles.take(maxFilesPerBatch.get) else newFiles

batchFiles.foreach { file =>
seenFiles.add(file)
seenFiles.add(file._1, file._2)
logDebug(s"New file: $file")
}
val numPurged = seenFiles.purge()
Expand All @@ -95,7 +97,9 @@ class FileStreamSource(

if (batchFiles.nonEmpty) {
maxBatchId += 1
metadataLog.add(maxBatchId, batchFiles.map(_.copy(batchId = maxBatchId)).toArray)
metadataLog.add(maxBatchId, batchFiles.map { case (path, timestamp) =>
FileEntry(path = path, timestamp = timestamp, batchId = maxBatchId)
}.toArray)
logInfo(s"Max batch id increased to $maxBatchId with ${batchFiles.size} new files")
}

Expand Down Expand Up @@ -140,12 +144,12 @@ class FileStreamSource(
/**
* Returns a list of files found, sorted by their timestamp.
*/
private def fetchAllFiles(): Seq[FileEntry] = {
private def fetchAllFiles(): Seq[(String, Long)] = {
val startTime = System.nanoTime
val globbedPaths = SparkHadoopUtil.get.globPathIfNecessary(qualifiedBasePath)
val catalog = new ListingFileCatalog(sparkSession, globbedPaths, options, Some(new StructType))
val files = catalog.allFiles().sortBy(_.getModificationTime).map { status =>
FileEntry(status.getPath.toUri.toString, status.getModificationTime)
(status.getPath.toUri.toString, status.getModificationTime)
}
val endTime = System.nanoTime
val listingTimeMs = (endTime.toDouble - startTime) / 1000000
Expand All @@ -172,10 +176,7 @@ object FileStreamSource {
/** Timestamp for file modification time, in ms since January 1, 1970 UTC. */
type Timestamp = Long

val NOT_SET = -1L

case class FileEntry(path: String, timestamp: Timestamp, batchId: Long = NOT_SET)
extends Serializable
case class FileEntry(path: String, timestamp: Timestamp, batchId: Long) extends Serializable

/**
* A custom hash map used to track the list of files seen. This map is not thread-safe.
Expand All @@ -196,21 +197,21 @@ object FileStreamSource {
private var lastPurgeTimestamp: Timestamp = 0L

/** Add a new file to the map. */
def add(file: FileEntry): Unit = {
map.put(file.path, file.timestamp)
if (file.timestamp > latestTimestamp) {
latestTimestamp = file.timestamp
def add(path: String, timestamp: Timestamp): Unit = {
map.put(path, timestamp)
if (timestamp > latestTimestamp) {
latestTimestamp = timestamp
}
}

/**
* Returns true if we should consider this file a new file. The file is only considered "new"
* if it is new enough that we are still tracking, and we have not seen it before.
*/
def isNewFile(file: FileEntry): Boolean = {
def isNewFile(path: String, timestamp: Timestamp): Boolean = {
// Note that we are testing against lastPurgeTimestamp here so we'd never miss a file that
// is older than (latestTimestamp - maxAgeMs) but has not been purged yet.
file.timestamp >= lastPurgeTimestamp && !map.containsKey(file.path)
timestamp >= lastPurgeTimestamp && !map.containsKey(path)
}

/** Removes aged entries and returns the number of files removed. */
Expand All @@ -230,8 +231,8 @@ object FileStreamSource {

def size: Int = map.size()

def allEntries: Seq[FileEntry] = {
map.entrySet().asScala.map(entry => FileEntry(entry.getKey, entry.getValue)).toSeq
def allEntries: Seq[(String, Timestamp)] = {
map.asScala.toSeq
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,51 +36,51 @@ class FileStreamSourceSuite extends SparkFunSuite with SharedSQLContext {
test("SeenFilesMap") {
val map = new SeenFilesMap(maxAgeMs = 10)

map.add(FileEntry("a", 5))
map.add("a", 5)
assert(map.size == 1)
map.purge()
assert(map.size == 1)

// Add a new entry and purge should be no-op, since the gap is exactly 10 ms.
map.add(FileEntry("b", 15))
map.add("b", 15)
assert(map.size == 2)
map.purge()
assert(map.size == 2)

// Add a new entry that's more than 10 ms than the first entry. We should be able to purge now.
map.add(FileEntry("c", 16))
map.add("c", 16)
assert(map.size == 3)
map.purge()
assert(map.size == 2)

// Override existing entry shouldn't change the size
map.add(FileEntry("c", 25))
map.add("c", 25)
assert(map.size == 2)

// Not a new file because we have seen c before
assert(!map.isNewFile(FileEntry("c", 20)))
assert(!map.isNewFile("c", 20))

// Not a new file because timestamp is too old
assert(!map.isNewFile(FileEntry("d", 5)))
assert(!map.isNewFile("d", 5))

// Finally a new file: never seen and not too old
assert(map.isNewFile(FileEntry("e", 20)))
assert(map.isNewFile("e", 20))
}

test("SeenFilesMap should only consider a file old if it is earlier than last purge time") {
val map = new SeenFilesMap(maxAgeMs = 10)

map.add(FileEntry("a", 20))
map.add("a", 20)
assert(map.size == 1)

// Timestamp 5 should still considered a new file because purge time should be 0
assert(map.isNewFile(FileEntry("b", 9)))
assert(map.isNewFile(FileEntry("b", 10)))
assert(map.isNewFile("b", 9))
assert(map.isNewFile("b", 10))

// Once purge, purge time should be 10 and then b would be a old file if it is less than 10.
map.purge()
assert(!map.isNewFile(FileEntry("b", 9)))
assert(map.isNewFile(FileEntry("b", 10)))
assert(!map.isNewFile("b", 9))
assert(map.isNewFile("b", 10))
}

testWithUninterruptibleThread("do not recheck that files exist during getBatch") {
Expand Down