Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -331,18 +331,18 @@ class StreamingContext private[streaming] (
* @param directory HDFS directory to monitor for new file
* @param filter Function to filter paths to process
* @param newFilesOnly Should process only new files and ignore existing files in the directory
* @param recursive Should search through the directory recursively to find new files
* @tparam K Key type for reading HDFS file
* @tparam V Value type for reading HDFS file
* @tparam F Input format for reading HDFS file
*/
def fileStream[
K: ClassTag,
V: ClassTag,
F <: NewInputFormat[K, V]: ClassTag
] (directory: String, filter: Path => Boolean, newFilesOnly: Boolean): InputDStream[(K, V)] = {
new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly)
}

K: ClassTag,
V: ClassTag,
F <: NewInputFormat[K, V]: ClassTag
] (directory: String, filter: Path => Boolean, newFilesOnly: Boolean, recursive: Boolean): DStream[(K, V)] = {
new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly, recursive)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like an api change - please add default value to recursive

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have included a default value on the FileInputDStream but not on the API itself.

Wondering if we want to introduce default values to the more granular version of the API. Currently, it looks like the exposed API essentially has two versions for these methods -- one that assumes default values and one that exposes all the parameters of the DStream constructor.

Thoughts?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In which version of spark can we get the API with support for nested directory streaming?

/**
* Create a input stream that monitors a Hadoop-compatible filesystem
* for new files and reads them as text files (using key as LongWritable, value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.streaming.dstream
import java.io.{ObjectInputStream, IOException}
import scala.collection.mutable.{HashSet, HashMap}
import scala.reflect.ClassTag
import org.apache.hadoop.fs.{FileSystem, Path, PathFilter}
import org.apache.hadoop.fs.{FileSystem, Path, PathFilter, FileStatus}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.spark.rdd.RDD
Expand All @@ -34,18 +34,19 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
@transient ssc_ : StreamingContext,
directory: String,
filter: Path => Boolean = FileInputDStream.defaultFilter,
newFilesOnly: Boolean = true)
newFilesOnly: Boolean = true,
recursive: Boolean = false)
extends InputDStream[(K, V)](ssc_) {

protected[streaming] override val checkpointData = new FileInputDStreamCheckpointData

// files found in the last interval
private val lastFoundFiles = new HashSet[String]

// Files with mod time earlier than this is ignored. This is updated every interval
// such that in the current interval, files older than any file found in the
// previous interval will be ignored. Obviously this time keeps moving forward.
private var ignoreTime = if (newFilesOnly) 0L else System.currentTimeMillis()
private var ignoreTime = if (newFilesOnly) System.currentTimeMillis() else 0L
private var recursiveMinTime = 0L

// Latest file mod time seen till any point of time
@transient private var path_ : Path = null
Expand Down Expand Up @@ -96,6 +97,23 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
fileModTimes.clearOldValues(lastNewFileFindingTime - 1)
}

/**
* Find files recursively in a directory
*/
private def recursiveFileList(
fileStatuses: List[FileStatus],
paths: List[Path] = List[Path]()
): List[Path] = fileStatuses match {

case f :: tail if (fs.getContentSummary(f.getPath).getDirectoryCount > 1) =>
recursiveFileList(fs.listStatus(f.getPath).toList ::: tail, paths)
case f :: tail if f.isDir => recursiveFileList(tail, f.getPath :: paths)
case f :: tail => recursiveFileList(tail, paths)
case _ => paths

}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the given directory contains both files and directories then the files will be ignored and only lowest level of subdirectories will be considered.
Example:
If the input directory has following structure.
/input/directory/
/input/directory/

And if the data that to be considered is available in files in "/input/directory/" then it will be ignored.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about this?
private def recursiveListDirs(fileStatuses: List[FileStatus],
paths: Set[Path] = Set[Path]
): Set[Path] = fileStatuses match {
case f :: tail if f.isDir => recursiveListDirs(fs.listStatus(f.getPath).toList ::: tail, paths)
case f :: tail => recursiveListDirs(tail, paths + f.getPath.getParent)
case _ => paths
}


/**
* Find files which have modification timestamp <= current time and return a 3-tuple of
* (new files found, latest modification time among them, files with latest modification time)
Expand All @@ -104,7 +122,14 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
logDebug("Trying to get new files for time " + currentTime)
lastNewFileFindingTime = System.currentTimeMillis
val filter = new CustomPathFilter(currentTime)
val newFiles = fs.listStatus(directoryPath, filter).map(_.getPath.toString)

val filePaths: Array[Path] = if (recursive)
recursiveFileList(fs.listStatus(directoryPath).toList).toArray

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the input directory is already the lowest level of directory then it will not consider any files in it.
example:
Consider the following directory.
/a/file1.txt
/a/file2.txt and so on .
If the input directory is given as "/a", there will be no output.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can call this like

val filePaths: Array[Path] = if (recursive)
recursiveListDirs(List(fs.getFileStatus(new Path(directoryPath)))).toArray

else
Array(directoryPath)

val newFiles: Array[String] = fs.listStatus(filePaths, filter).map(_.getPath.toString)

val timeTaken = System.currentTimeMillis - lastNewFileFindingTime
logInfo("Finding new files took " + timeTaken + " ms")
logDebug("# cached file times = " + fileModTimes.size)
Expand All @@ -115,6 +140,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
"files in the monitored directory."
)
}
logInfo("minNewFileModTime: " ++ filter.minNewFileModTime.toString)
(newFiles, filter.minNewFileModTime)
}

Expand Down Expand Up @@ -218,6 +244,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
}
val modTime = getFileModTime(path)
logDebug("Mod time for " + path + " is " + modTime)

if (modTime < ignoreTime) {
// Reject file if it was created before the ignore time (or, before last interval)
logDebug("Mod time " + modTime + " less than ignore time " + ignoreTime)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ import akka.actor.Props
import akka.util.ByteString

import java.io.{File, BufferedWriter, OutputStreamWriter}
import org.apache.hadoop.fs.{Path}
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat

import java.net.{InetSocketAddress, SocketException, ServerSocket}
import java.nio.charset.Charset
import java.util.concurrent.{Executors, TimeUnit, ArrayBlockingQueue}
Expand All @@ -43,12 +47,14 @@ import org.apache.spark.streaming.receiver.{ActorHelper, Receiver}
class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {

test("socket input stream") {
if (false) {
// Start the server
val testServer = new TestServer()
testServer.start()

// Set up the streaming context and input streams
val ssc = new StreamingContext(conf, batchDuration)
logInfo("port: " + testServer.port.toString)
val networkStream = ssc.socketTextStream(
"localhost", testServer.port, StorageLevel.MEMORY_AND_DISK)
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
Expand Down Expand Up @@ -89,9 +95,68 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
for (i <- 0 until output.size) {
assert(output(i) === expectedOutput(i))
}
}
}


test("recursive file input stream") {
// Disable manual clock as FileInputDStream does not work with manual clock
conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock")

// Set up the streaming context and input streams
val testDir = Files.createTempDir()
val ssc = new StreamingContext(conf, batchDuration)
val defaultFilter = (path: Path) => { !path.getName().startsWith(".") }


val fileStream = ssc.fileStream[ LongWritable, Text, TextInputFormat](
testDir.toString, defaultFilter, true, true
).map(_._2.toString)


val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
def output = outputBuffer.flatMap(x => x)
val outputStream = new TestOutputStream(fileStream, outputBuffer)
outputStream.register()
ssc.start()

// Create files in the temporary directory so that Spark Streaming can read data from it
val input = Seq(1, 2, 3, 4, 5)
val expectedOutput = input.map(_.toString)
Thread.sleep(1000)
for (i <- 0 until input.size) {
val dir = new File(testDir.toString + "/folder/adsf/").mkdirs()
val file = new File(testDir, "/folder/adsf/" + i.toString)
Files.write(input(i) + "\n", file, Charset.forName("UTF-8"))
logInfo("Created file " + file)
Thread.sleep(batchDuration.milliseconds)
Thread.sleep(1000)
}
val startTime = System.currentTimeMillis()
Thread.sleep(1000)
val timeTaken = System.currentTimeMillis() - startTime
assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms")
logInfo("Stopping context")
ssc.stop()

// Verify whether data received by Spark Streaming was as expected
logInfo("--------------------------------")
logInfo("output, size = " + outputBuffer.size)
outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]"))
logInfo("expected output, size = " + expectedOutput.size)
expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]"))
logInfo("--------------------------------")

// Verify whether all the elements received are as expected
// (whether the elements were received one in each interval is not verified)
assert(output.toList === expectedOutput.toList)

Utils.deleteRecursively(testDir)

// Enable manual clock back again for other tests
conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
}

test("file input stream") {
// Disable manual clock as FileInputDStream does not work with manual clock
conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock")
Expand Down Expand Up @@ -140,6 +205,8 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {

// Enable manual clock back again for other tests
conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")


}

// TODO: This test works in IntelliJ but not through SBT
Expand Down