Skip to content

Commit 2d85159

Browse files
committed
Refactor PR and handle some corner cases
1 parent 26dce26 commit 2d85159

File tree

4 files changed

+88
-48
lines changed

4 files changed

+88
-48
lines changed

docs/streaming-programming-guide.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -653,7 +653,7 @@ methods for creating DStreams from files and Akka actors as input sources.
653653
</div>
654654
</div>
655655

656-
Spark Streaming will monitor the directory `dataDirectory` and process any files created in that directory (files written in nested directories not supported). It can also monitor files in subdirectories by setting the optional `depth` parameter to a value greater than 1. Note that
656+
Spark Streaming will monitor the directory `dataDirectory` and process any files created in that directory. It won't search the nested directories by default. You can set the optional `depth` parameter to a value greater than 1 to monitor files in subdirectories. Note that
657657
+ The files must have the same data format.
658658
+ The files must be created in the `dataDirectory` by atomically *moving* or *renaming* them into
659659
the data directory.

python/pyspark/streaming/context.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -253,18 +253,16 @@ def socketTextStream(self, hostname, port, storageLevel=StorageLevel.MEMORY_AND_
253253
return DStream(self._jssc.socketTextStream(hostname, port, jlevel), self,
254254
UTF8Deserializer())
255255

256-
def textFileStream(self, directory):
256+
def textFileStream(self, directory, depth=1):
257257
"""
258258
Create an input stream that monitors a Hadoop-compatible file system
259259
for new files and reads them as text files. Files must be wrriten to the
260260
monitored directory by "moving" them from another location within the same
261261
file system. File names starting with . are ignored.
262-
"""
263-
return textFileStream(self, directory, 1)
264262
265-
def textFileStream(self, directory, depth):
266-
"""
267-
Create an input stream that monitor files in subdirectories.
263+
@param directory: The directory to monitor
264+
@param depth: The max depth to search in the directory. The default
265+
value 1 means only searching files in the current directory
268266
"""
269267
return DStream(self._jssc.textFileStream(directory, depth), self, UTF8Deserializer())
270268

streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala

Lines changed: 52 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
129129
@transient private var lastNewFileFindingTime = 0L
130130

131131
@transient private var path_ : Path = null
132+
@transient private var directoryDepth_ : Int = -1
132133
@transient private var fs_ : FileSystem = null
133134

134135
override def start() { }
@@ -186,48 +187,55 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
186187
)
187188
logDebug(s"Getting new files for time $currentTime, " +
188189
s"ignoring files older than $modTimeIgnoreThreshold")
189-
val filter = new PathFilter {
190+
val newFileFilter = new PathFilter {
190191
def accept(path: Path): Boolean = isNewFile(path, currentTime, modTimeIgnoreThreshold)
191192
}
192-
val directoryDepth = fs.getFileStatus(directoryPath).getPath.depth()
193+
val rootDirectoryDepth = directoryDepth
193194

194-
// Nested directories to find new files.
195-
def dfs(status: FileStatus): List[FileStatus] = {
195+
// Search nested directories to find new files.
196+
def searchFilesRecursively(status: FileStatus, files: mutable.ArrayBuffer[String]): Unit = {
196197
val path = status.getPath
197198
if (status.isDir) {
198-
val depthFilter = depth + directoryDepth - path.depth()
199-
if (depthFilter - 1 >= 0) {
199+
// Note: A user may set depth = Int.MaxValue to search all nested directories.
200+
if (depth > path.depth() - rootDirectoryDepth) {
200201
if (lastFoundDirs.contains(path)) {
201202
if (status.getModificationTime > modTimeIgnoreThreshold) {
202-
fs.listStatus(path).toList.flatMap(dfs(_))
203-
} else Nil
203+
fs.listStatus(path).foreach(searchFilesRecursively(_, files))
204+
}
204205
} else {
205206
lastFoundDirs += path
206-
fs.listStatus(path).toList.flatMap(dfs(_))
207+
fs.listStatus(path).foreach(searchFilesRecursively(_, files))
207208
}
208-
} else Nil
209+
}
209210
} else {
210-
if (filter.accept(path)) status :: Nil else Nil
211+
if (newFileFilter.accept(path)) {
212+
files += path.toString
213+
}
211214
}
212215
}
213216

214-
val path = if (lastFoundDirs.isEmpty) Seq(fs.getFileStatus(directoryPath))
215-
else {
216-
lastFoundDirs.filter { path =>
217-
// If the mod time of directory is more than ignore time, no new files in this directory.
218-
try {
219-
val status = fs.getFileStatus(path)
220-
status != null && status.getModificationTime > modTimeIgnoreThreshold
221-
} catch {
222-
// If the directory don't find, remove the directory from `lastFoundDirs`
223-
case e: FileNotFoundException =>
224-
lastFoundDirs.remove(path)
225-
false
217+
val validDirs: Iterable[Path] =
218+
if (lastFoundDirs.isEmpty) {
219+
Seq(directoryPath)
220+
}
221+
else {
222+
lastFoundDirs.filter { path =>
223+
// If the mod time of directory is more than ignore time, no new files in this directory
224+
try {
225+
val status = fs.getFileStatus(path)
226+
status != null && status.getModificationTime > modTimeIgnoreThreshold
227+
} catch {
228+
// If the directory don't find, remove the directory from `lastFoundDirs`
229+
case e: FileNotFoundException =>
230+
lastFoundDirs.remove(path)
231+
false
232+
}
226233
}
227234
}
228-
}.flatMap(fs.listStatus(_)).toSeq
229235

230-
val newFiles = path.flatMap(dfs(_)).map(_.getPath.toString).toArray
236+
val newFiles = mutable.ArrayBuffer[String]()
237+
validDirs.flatMap(fs.listStatus(_)). // Get sub dirs and files
238+
foreach(searchFilesRecursively(_, newFiles))
231239
val timeTaken = clock.getTimeMillis() - lastNewFileFindingTime
232240
logInfo("Finding new files took " + timeTaken + " ms")
233241
logDebug("# cached file times = " + fileToModTime.size)
@@ -238,7 +246,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
238246
"files in the monitored directory."
239247
)
240248
}
241-
newFiles
249+
newFiles.toArray
242250
} catch {
243251
case e: Exception =>
244252
logWarning("Error finding new files", e)
@@ -321,17 +329,32 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
321329
}
322330

323331
private def directoryPath: Path = {
324-
if (path_ == null) path_ = new Path(directory)
332+
if (fs_ == null) init()
325333
path_
326334
}
327335

336+
private def directoryDepth: Int = {
337+
if (fs_ == null) init()
338+
directoryDepth_
339+
}
340+
328341
private def fs: FileSystem = {
329-
if (fs_ == null) fs_ = directoryPath.getFileSystem(ssc.sparkContext.hadoopConfiguration)
342+
if (fs_ == null) init()
330343
fs_
331344
}
332345

333-
private def reset() {
346+
private def init(): Unit = {
347+
val originPath = new Path(directory)
348+
fs_ = originPath.getFileSystem(ssc.sparkContext.hadoopConfiguration)
349+
// Get the absolute path
350+
path_ = fs_.getFileStatus(originPath).getPath
351+
directoryDepth_ = path_.depth()
352+
}
353+
354+
private def reset() {
334355
fs_ = null
356+
path_ = null
357+
directoryDepth_ = -1
335358
}
336359

337360
@throws(classOf[IOException])

streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala

Lines changed: 31 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,18 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
212212
testFileStream(newFilesOnly = false, 3)
213213
}
214214

215+
test("file input stream - newFilesOnly = false and depth is too small") {
216+
testFileStream(newFilesOnly = false, 3, 2)
217+
}
218+
219+
test("file input stream - newFilesOnly = true and depth = Int.MaxValue") {
220+
testFileStream(newFilesOnly = true, 3, Int.MaxValue)
221+
}
222+
223+
test("file input stream - newFilesOnly = false and depth = Int.MaxValue") {
224+
testFileStream(newFilesOnly = false, 3, Int.MaxValue)
225+
}
226+
215227
test("multi-thread receiver") {
216228
// set up the test receiver
217229
val numThreads = 10
@@ -364,12 +376,16 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
364376
assert(receiverInputStreams.map(_.id) === Array(0, 1))
365377
}
366378

367-
def testFileStream(newFilesOnly: Boolean, depth: Int = 1) {
368-
val testDir: File = null
379+
def testFileStream(newFilesOnly: Boolean, depth: Int = 1): Unit = {
380+
testFileStream(newFilesOnly, depth, depth)
381+
}
382+
383+
def testFileStream(newFilesOnly: Boolean, createDepth: Int, searchDepth: Int) {
384+
val rootDir = Utils.createTempDir()
369385
try {
370386
val batchDuration = Seconds(2)
371-
var testDir = Utils.createTempDir()
372-
for (i <- 2 until depth) {
387+
var testDir = rootDir
388+
for (i <- 1 until createDepth) {
373389
testDir = Utils.createTempDir(testDir.toString)
374390
}
375391
// Create a file that exists before the StreamingContext is created:
@@ -384,8 +400,8 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
384400
clock.setTime(existingFile.lastModified + batchDuration.milliseconds)
385401
val batchCounter = new BatchCounter(ssc)
386402
val fileStream = ssc.fileStream[LongWritable, Text, TextInputFormat](
387-
testDir.toString, (x: Path) => true,
388-
newFilesOnly = newFilesOnly, depth).map(_._2.toString)
403+
rootDir.toString, (x: Path) => true,
404+
newFilesOnly = newFilesOnly, searchDepth).map(_._2.toString)
389405
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
390406
val outputStream = new TestOutputStream(fileStream, outputBuffer)
391407
outputStream.register()
@@ -412,15 +428,18 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
412428
}
413429

414430
// Verify that all the files have been read
415-
val expectedOutput = if (newFilesOnly) {
416-
input.map(_.toString).toSet
417-
} else {
418-
(Seq(0) ++ input).map(_.toString).toSet
419-
}
431+
val expectedOutput =
432+
if (createDepth > searchDepth) {
433+
Set()
434+
} else if (newFilesOnly) {
435+
input.map(_.toString).toSet
436+
} else {
437+
(Seq(0) ++ input).map(_.toString).toSet
438+
}
420439
assert(outputBuffer.flatten.toSet === expectedOutput)
421440
}
422441
} finally {
423-
if (testDir != null) Utils.deleteRecursively(testDir)
442+
Utils.deleteRecursively(rootDir)
424443
}
425444
}
426445
}

0 commit comments

Comments
 (0)