1717
1818package org .apache .spark .sql .execution .streaming
1919
20- import scala .util . Try
20+ import scala .collection . JavaConverters . _
2121
2222import org .apache .hadoop .fs .Path
2323
2424import org .apache .spark .deploy .SparkHadoopUtil
2525import org .apache .spark .internal .Logging
2626import org .apache .spark .sql .{DataFrame , Dataset , SparkSession }
27- import org .apache .spark .sql .execution .datasources .{CaseInsensitiveMap , DataSource , ListingFileCatalog , LogicalRelation }
27+ import org .apache .spark .sql .execution .datasources .{DataSource , ListingFileCatalog , LogicalRelation }
2828import org .apache .spark .sql .types .StructType
29- import org .apache .spark .util .collection .OpenHashSet
3029
3130/**
32- * A very simple source that reads text files from the given directory as they appear.
31+ * A very simple source that reads files from the given directory as they appear.
3332 *
34- * TODO Clean up the metadata files periodically
33+ * TODO: Clean up the metadata log files periodically.
3534 */
3635class FileStreamSource (
3736 sparkSession : SparkSession ,
@@ -41,36 +40,62 @@ class FileStreamSource(
4140 metadataPath : String ,
4241 options : Map [String , String ]) extends Source with Logging {
4342
44- private val fs = new Path (path).getFileSystem(sparkSession.sessionState.newHadoopConf())
45- private val qualifiedBasePath = fs.makeQualified(new Path (path)) // can contains glob patterns
46- private val metadataLog = new HDFSMetadataLog [Seq [String ]](sparkSession, metadataPath)
43+ import FileStreamSource ._
44+
45+ private val sourceOptions = new FileStreamOptions (options)
46+
47+ private val qualifiedBasePath : Path = {
48+ val fs = new Path (path).getFileSystem(sparkSession.sessionState.newHadoopConf())
49+ fs.makeQualified(new Path (path)) // can contains glob patterns
50+ }
51+
52+ private val metadataLog = new HDFSMetadataLog [Seq [FileEntry ]](sparkSession, metadataPath)
53+
4754 private var maxBatchId = metadataLog.getLatest().map(_._1).getOrElse(- 1L )
4855
4956 /** Maximum number of new files to be considered in each batch */
50- private val maxFilesPerBatch = getMaxFilesPerBatch()
57+ private val maxFilesPerBatch = sourceOptions.maxFilesPerTrigger
58+
59+ /** A mapping from a file that we have processed to some timestamp it was last modified. */
60+ // Visible for testing and debugging in production.
61+ val seenFiles = new SeenFilesMap (sourceOptions.maxFileAgeMs)
5162
52- private val seenFiles = new OpenHashSet [String ]
53- metadataLog.get(None , Some (maxBatchId)).foreach { case (batchId, files) =>
54- files.foreach(seenFiles.add)
63+ metadataLog.get(None , Some (maxBatchId)).foreach { case (batchId, entry) =>
64+ entry.foreach(seenFiles.add)
65+ // TODO: move purge call out of the loop once we truncate logs.
66+ seenFiles.purge()
5567 }
5668
69+ logInfo(s " maxFilesPerBatch = $maxFilesPerBatch, maxFileAge = ${sourceOptions.maxFileAgeMs}" )
70+
5771 /**
5872 * Returns the maximum offset that can be retrieved from the source.
5973 *
6074 * `synchronized` on this method is for solving race conditions in tests. In the normal usage,
6175 * there is no race here, so the cost of `synchronized` should be rare.
6276 */
6377 private def fetchMaxOffset (): LongOffset = synchronized {
64- val newFiles = fetchAllFiles().filter(! seenFiles.contains(_))
78+ // All the new files found - ignore aged files and files that we have seen.
79+ val newFiles = fetchAllFiles().filter(seenFiles.isNewFile)
80+
81+ // Obey user's setting to limit the number of files in this batch trigger.
6582 val batchFiles =
6683 if (maxFilesPerBatch.nonEmpty) newFiles.take(maxFilesPerBatch.get) else newFiles
84+
6785 batchFiles.foreach { file =>
6886 seenFiles.add(file)
6987 logDebug(s " New file: $file" )
7088 }
71- logTrace(s " Number of new files = ${newFiles.size}) " )
72- logTrace(s " Number of files selected for batch = ${batchFiles.size}" )
73- logTrace(s " Number of seen files = ${seenFiles.size}" )
89+ val numPurged = seenFiles.purge()
90+
91+ logTrace(
92+ s """
93+ |Number of new files = ${newFiles.size}
94+ |Number of files selected for batch = ${batchFiles.size}
95+ |Number of seen files = ${seenFiles.size}
96+ |Number of files purged from tracking map = $numPurged
97+ """ .stripMargin)
98+
7499 if (batchFiles.nonEmpty) {
75100 maxBatchId += 1
76101 metadataLog.add(maxBatchId, batchFiles)
@@ -104,22 +129,26 @@ class FileStreamSource(
104129 val files = metadataLog.get(Some (startId + 1 ), Some (endId)).flatMap(_._2)
105130 logInfo(s " Processing ${files.length} files from ${startId + 1 }: $endId" )
106131 logTrace(s " Files are: \n\t " + files.mkString(" \n\t " ))
107- val newOptions = new CaseInsensitiveMap (options).filterKeys(_ != " path" )
108132 val newDataSource =
109133 DataSource (
110134 sparkSession,
111- paths = files,
135+ paths = files.map(_.path) ,
112136 userSpecifiedSchema = Some (schema),
113137 className = fileFormatClassName,
114- options = newOptions )
138+ options = sourceOptions.optionMapWithoutPath )
115139 Dataset .ofRows(sparkSession, LogicalRelation (newDataSource.resolveRelation()))
116140 }
117141
118- private def fetchAllFiles (): Seq [String ] = {
142+ /**
143+ * Returns a list of files found, sorted by their timestamp.
144+ */
145+ private def fetchAllFiles (): Seq [FileEntry ] = {
119146 val startTime = System .nanoTime
120147 val globbedPaths = SparkHadoopUtil .get.globPathIfNecessary(qualifiedBasePath)
121148 val catalog = new ListingFileCatalog (sparkSession, globbedPaths, options, Some (new StructType ))
122- val files = catalog.allFiles().sortBy(_.getModificationTime).map(_.getPath.toUri.toString)
149+ val files = catalog.allFiles().sortBy(_.getModificationTime).map { status =>
150+ FileEntry (status.getPath.toUri.toString, status.getModificationTime)
151+ }
123152 val endTime = System .nanoTime
124153 val listingTimeMs = (endTime.toDouble - startTime) / 1000000
125154 if (listingTimeMs > 2000 ) {
@@ -132,20 +161,76 @@ class FileStreamSource(
132161 files
133162 }
134163
135- private def getMaxFilesPerBatch (): Option [Int ] = {
136- new CaseInsensitiveMap (options)
137- .get(" maxFilesPerTrigger" )
138- .map { str =>
139- Try (str.toInt).toOption.filter(_ > 0 ).getOrElse {
140- throw new IllegalArgumentException (
141- s " Invalid value ' $str' for option 'maxFilesPerTrigger', must be a positive integer " )
142- }
143- }
144- }
145-
146164 override def getOffset : Option [Offset ] = Some (fetchMaxOffset()).filterNot(_.offset == - 1 )
147165
148166 override def toString : String = s " FileStreamSource[ $qualifiedBasePath] "
149167
150168 override def stop () {}
151169}
170+
171+
172+ object FileStreamSource {
173+
174+ /** Timestamp for file modification time, in ms since January 1, 1970 UTC. */
175+ type Timestamp = Long
176+
177+ case class FileEntry (path : String , timestamp : Timestamp ) extends Serializable
178+
179+ /**
180+ * A custom hash map used to track the list of files seen. This map is not thread-safe.
181+ *
182+ * To prevent the hash map from growing indefinitely, a purge function is available to
183+ * remove files "maxAgeMs" older than the latest file.
184+ */
185+ class SeenFilesMap (maxAgeMs : Long ) {
186+ require(maxAgeMs >= 0 )
187+
188+ /** Mapping from file to its timestamp. */
189+ private val map = new java.util.HashMap [String , Timestamp ]
190+
191+ /** Timestamp of the latest file. */
192+ private var latestTimestamp : Timestamp = 0L
193+
194+ /** Timestamp for the last purge operation. */
195+ private var lastPurgeTimestamp : Timestamp = 0L
196+
197+ /** Add a new file to the map. */
198+ def add (file : FileEntry ): Unit = {
199+ map.put(file.path, file.timestamp)
200+ if (file.timestamp > latestTimestamp) {
201+ latestTimestamp = file.timestamp
202+ }
203+ }
204+
205+ /**
206+ * Returns true if we should consider this file a new file. The file is only considered "new"
207+ * if it is new enough that we are still tracking, and we have not seen it before.
208+ */
209+ def isNewFile (file : FileEntry ): Boolean = {
210+ // Note that we are testing against lastPurgeTimestamp here so we'd never miss a file that
211+ // is older than (latestTimestamp - maxAgeMs) but has not been purged yet.
212+ file.timestamp >= lastPurgeTimestamp && ! map.containsKey(file.path)
213+ }
214+
215+ /** Removes aged entries and returns the number of files removed. */
216+ def purge (): Int = {
217+ lastPurgeTimestamp = latestTimestamp - maxAgeMs
218+ val iter = map.entrySet().iterator()
219+ var count = 0
220+ while (iter.hasNext) {
221+ val entry = iter.next()
222+ if (entry.getValue < lastPurgeTimestamp) {
223+ count += 1
224+ iter.remove()
225+ }
226+ }
227+ count
228+ }
229+
230+ def size : Int = map.size()
231+
232+ def allEntries : Seq [FileEntry ] = {
233+ map.entrySet().asScala.map(entry => FileEntry (entry.getKey, entry.getValue)).toSeq
234+ }
235+ }
236+ }
0 commit comments