1717
1818package org .apache .spark .sql .execution .streaming
1919
20- import scala .util . Try
20+ import scala .collection . JavaConverters . _
2121
2222import org .apache .hadoop .fs .Path
2323
2424import org .apache .spark .deploy .SparkHadoopUtil
2525import org .apache .spark .internal .Logging
2626import org .apache .spark .sql .{DataFrame , Dataset , SparkSession }
27- import org .apache .spark .sql .execution .datasources .{CaseInsensitiveMap , DataSource , ListingFileCatalog , LogicalRelation }
27+ import org .apache .spark .sql .execution .datasources .{DataSource , ListingFileCatalog , LogicalRelation }
2828import org .apache .spark .sql .types .StructType
29- import org .apache .spark .util .collection .OpenHashSet
3029
3130/**
32- * A very simple source that reads text files from the given directory as they appear.
33- *
34- * TODO Clean up the metadata files periodically
31+ * A very simple source that reads files from the given directory as they appear.
3532 */
3633class FileStreamSource (
3734 sparkSession : SparkSession ,
@@ -41,36 +38,59 @@ class FileStreamSource(
4138 metadataPath : String ,
4239 options : Map [String , String ]) extends Source with Logging {
4340
41+ import FileStreamSource ._
42+
43+ private val sourceOptions = new FileStreamOptions (options)
44+
4445 private val fs = new Path (path).getFileSystem(sparkSession.sessionState.newHadoopConf())
4546 private val qualifiedBasePath = fs.makeQualified(new Path (path)) // can contains glob patterns
46- private val metadataLog = new HDFSMetadataLog [Seq [String ]](sparkSession, metadataPath)
47+
48+ private val metadataLog = new HDFSMetadataLog [Seq [FileEntry ]](sparkSession, metadataPath)
49+
4750 private var maxBatchId = metadataLog.getLatest().map(_._1).getOrElse(- 1L )
4851
4952 /** Maximum number of new files to be considered in each batch */
50- private val maxFilesPerBatch = getMaxFilesPerBatch()
53+ private val maxFilesPerBatch = sourceOptions.maxFilesPerTrigger
54+
55+ /** A mapping from a file that we have processed to some timestamp it was last modified. */
56+ // Visible for testing.
57+ val seenFiles = new SeenFilesMap (sourceOptions.maxFileAgeMs)
5158
52- private val seenFiles = new OpenHashSet [ String ]
53- metadataLog.get( None , Some (maxBatchId)) .foreach { case (batchId, files) =>
54- files.foreach( seenFiles.add )
59+ metadataLog.get( None , Some (maxBatchId)).foreach { case (batchId, entry) =>
60+ entry .foreach(seenFiles.add)
61+ seenFiles.purge( )
5562 }
5663
64+ logInfo(s " maxFilesPerBatch = $maxFilesPerBatch, maxFileAge = ${sourceOptions.maxFileAgeMs}" )
65+
5766 /**
5867 * Returns the maximum offset that can be retrieved from the source.
5968 *
6069 * `synchronized` on this method is for solving race conditions in tests. In the normal usage,
6170 * there is no race here, so the cost of `synchronized` should be rare.
6271 */
6372 private def fetchMaxOffset (): LongOffset = synchronized {
64- val newFiles = fetchAllFiles().filter(! seenFiles.contains(_))
73+ // All the new files found - ignore aged files and files that we have seen.
74+ val newFiles = fetchAllFiles().filter(seenFiles.isNewFile)
75+
76+ // Obey user's setting to limit the number of files in this batch trigger.
6577 val batchFiles =
6678 if (maxFilesPerBatch.nonEmpty) newFiles.take(maxFilesPerBatch.get) else newFiles
79+
6780 batchFiles.foreach { file =>
6881 seenFiles.add(file)
6982 logDebug(s " New file: $file" )
7083 }
71- logTrace(s " Number of new files = ${newFiles.size}) " )
72- logTrace(s " Number of files selected for batch = ${batchFiles.size}" )
73- logTrace(s " Number of seen files = ${seenFiles.size}" )
84+ val numPurged = seenFiles.purge()
85+
86+ logTrace(
87+ s """
88+ |Number of new files = ${newFiles.size}
89+ |Number of files selected for batch = ${batchFiles.size}
90+ |Number of seen files = ${seenFiles.size}
91+ |Number of files purged from tracking map = $numPurged
92+ """ .stripMargin)
93+
7494 if (batchFiles.nonEmpty) {
7595 maxBatchId += 1
7696 metadataLog.add(maxBatchId, batchFiles)
@@ -104,22 +124,26 @@ class FileStreamSource(
104124 val files = metadataLog.get(Some (startId + 1 ), Some (endId)).flatMap(_._2)
105125 logInfo(s " Processing ${files.length} files from ${startId + 1 }: $endId" )
106126 logTrace(s " Files are: \n\t " + files.mkString(" \n\t " ))
107- val newOptions = new CaseInsensitiveMap (options).filterKeys(_ != " path" )
108127 val newDataSource =
109128 DataSource (
110129 sparkSession,
111- paths = files,
130+ paths = files.map(_.path) ,
112131 userSpecifiedSchema = Some (schema),
113132 className = fileFormatClassName,
114- options = newOptions )
133+ options = sourceOptions.optionMapWithoutPath )
115134 Dataset .ofRows(sparkSession, LogicalRelation (newDataSource.resolveRelation()))
116135 }
117136
118- private def fetchAllFiles (): Seq [String ] = {
137+ /**
138+ * Returns a list of files found, sorted by their timestamp.
139+ */
140+ private def fetchAllFiles (): Seq [FileEntry ] = {
119141 val startTime = System .nanoTime
120142 val globbedPaths = SparkHadoopUtil .get.globPathIfNecessary(qualifiedBasePath)
121143 val catalog = new ListingFileCatalog (sparkSession, globbedPaths, options, Some (new StructType ))
122- val files = catalog.allFiles().sortBy(_.getModificationTime).map(_.getPath.toUri.toString)
144+ val files = catalog.allFiles().sortBy(_.getModificationTime).map { status =>
145+ FileEntry (status.getPath.toUri.toString, status.getModificationTime)
146+ }
123147 val endTime = System .nanoTime
124148 val listingTimeMs = (endTime.toDouble - startTime) / 1000000
125149 if (listingTimeMs > 2000 ) {
@@ -132,20 +156,71 @@ class FileStreamSource(
132156 files
133157 }
134158
135- private def getMaxFilesPerBatch (): Option [Int ] = {
136- new CaseInsensitiveMap (options)
137- .get(" maxFilesPerTrigger" )
138- .map { str =>
139- Try (str.toInt).toOption.filter(_ > 0 ).getOrElse {
140- throw new IllegalArgumentException (
141- s " Invalid value ' $str' for option 'maxFilesPerTrigger', must be a positive integer " )
142- }
143- }
144- }
145-
146159 override def getOffset : Option [Offset ] = Some (fetchMaxOffset()).filterNot(_.offset == - 1 )
147160
148161 override def toString : String = s " FileStreamSource[ $qualifiedBasePath] "
149162
150163 override def stop () {}
151164}
165+
166+
167+ object FileStreamSource {
168+
169+ /** Timestamp for file modification time, in ms since January 1, 1970 UTC. */
170+ type Timestamp = Long
171+
172+ case class FileEntry (path : String , timestamp : Timestamp ) extends Serializable
173+
174+ /**
175+ * A custom hash map used to track the list of files seen. This map is not thread-safe.
176+ *
177+ * To prevent the hash map from growing indefinitely, a purge function is available to
178+ * remove files "maxAgeMs" older than the latest file.
179+ */
180+ class SeenFilesMap (maxAgeMs : Long ) {
181+ require(maxAgeMs >= 0 )
182+
183+ /** Mapping from file to its timestamp. */
184+ private val map = new java.util.HashMap [String , Timestamp ]
185+
186+ private var lastTimestamp : Timestamp = 0L
187+
188+ private def ageThreshold : Timestamp = lastTimestamp - maxAgeMs
189+
190+ /** Add a new file to the map. */
191+ def add (file : FileEntry ): Unit = {
192+ map.put(file.path, file.timestamp)
193+ if (file.timestamp > lastTimestamp) {
194+ lastTimestamp = file.timestamp
195+ }
196+ }
197+
198+ /**
199+ * Returns true if we should consider this file a new file. The file is only considered "new"
200+ * if it is new enough that we are still tracking, and we have not seen it before.
201+ */
202+ def isNewFile (file : FileEntry ): Boolean = {
203+ file.timestamp > ageThreshold && ! map.containsKey(file.path)
204+ }
205+
206+ /** Removes aged entries and returns the number of files removed. */
207+ def purge (): Int = {
208+ val iter = map.entrySet().iterator()
209+ var count = 0
210+ while (iter.hasNext) {
211+ val entry = iter.next()
212+ if (entry.getValue < lastTimestamp - maxAgeMs) {
213+ count += 1
214+ iter.remove()
215+ }
216+ }
217+ count
218+ }
219+
220+ def size : Int = map.size()
221+
222+ def allEntries : Seq [FileEntry ] = {
223+ map.entrySet().asScala.map(entry => FileEntry (entry.getKey, entry.getValue)).toSeq
224+ }
225+ }
226+ }
0 commit comments