Skip to content

Commit a371f05

Browse files
committed
More conservative check against lastPurgeTimestamp
1 parent ce1dd9c commit a371f05

File tree

2 files changed

+29
-6
lines changed

2 files changed

+29
-6
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ import org.apache.spark.sql.types.StructType
2929

3030
/**
3131
* A very simple source that reads files from the given directory as they appear.
32+
*
33+
* TODO: Clean up the metadata log files periodically.
3234
*/
3335
class FileStreamSource(
3436
sparkSession: SparkSession,
@@ -183,15 +185,17 @@ object FileStreamSource {
183185
/** Mapping from file to its timestamp. */
184186
private val map = new java.util.HashMap[String, Timestamp]
185187

186-
private var lastTimestamp: Timestamp = 0L
188+
/** Timestamp of the latest file. */
189+
private var latestTimestamp: Timestamp = 0L
187190

188-
private def ageThreshold: Timestamp = lastTimestamp - maxAgeMs
191+
/** Timestamp for the last purge operation. */
192+
private var lastPurgeTimestamp: Timestamp = 0L
189193

190194
/** Add a new file to the map. */
191195
def add(file: FileEntry): Unit = {
192196
map.put(file.path, file.timestamp)
193-
if (file.timestamp > lastTimestamp) {
194-
lastTimestamp = file.timestamp
197+
if (file.timestamp > latestTimestamp) {
198+
latestTimestamp = file.timestamp
195199
}
196200
}
197201

@@ -200,16 +204,19 @@ object FileStreamSource {
200204
* if it is new enough that we are still tracking, and we have not seen it before.
201205
*/
202206
def isNewFile(file: FileEntry): Boolean = {
203-
file.timestamp > ageThreshold && !map.containsKey(file.path)
207+
// Note that we are testing against lastPurgeTimestamp here so we'd never miss a file that
208+
// is older than (latestTimestamp - maxAgeMs) but has not been purged yet.
209+
file.timestamp >= lastPurgeTimestamp && !map.containsKey(file.path)
204210
}
205211

206212
/** Removes aged entries and returns the number of files removed. */
207213
def purge(): Int = {
214+
lastPurgeTimestamp = latestTimestamp - maxAgeMs
208215
val iter = map.entrySet().iterator()
209216
var count = 0
210217
while (iter.hasNext) {
211218
val entry = iter.next()
212-
if (entry.getValue < lastTimestamp - maxAgeMs) {
219+
if (entry.getValue < lastPurgeTimestamp) {
213220
count += 1
214221
iter.remove()
215222
}

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,4 +57,20 @@ class FileStreamSourceSuite extends SparkFunSuite {
5757
assert(map.isNewFile(FileEntry("e", 20)))
5858
}
5959

60+
test("SeenFilesMap should only consider a file old if it is earlier than last purge time") {
61+
val map = new SeenFilesMap(maxAgeMs = 10)
62+
63+
map.add(FileEntry("a", 20))
64+
assert(map.size == 1)
65+
66+
// Timestamp 5 should still considered a new file because purge time should be 0
67+
assert(map.isNewFile(FileEntry("b", 9)))
68+
assert(map.isNewFile(FileEntry("b", 10)))
69+
70+
// Once purge, purge time should be 10 and then b would be a old file if it is less than 10.
71+
map.purge()
72+
assert(!map.isNewFile(FileEntry("b", 9)))
73+
assert(map.isNewFile(FileEntry("b", 10)))
74+
}
75+
6076
}

0 commit comments

Comments
 (0)