Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
4a7eef4
Support nested directories in Spark Streaming
wangxiaojing Oct 11, 2014
50ad7d4
change Nit
wangxiaojing Oct 12, 2014
c14def1
support depth
wangxiaojing Oct 17, 2014
bfbec51
Change space before brace
wangxiaojing Oct 17, 2014
0a8ecf8
change process any files created in nested directories
wangxiaojing Oct 17, 2014
1ce623d
reformat code
wangxiaojing Oct 24, 2014
fe6e5ca
add a require(depth >= 0)
wangxiaojing Oct 24, 2014
7031940
reformat code
wangxiaojing Oct 24, 2014
7bd4811
change performance
wangxiaojing Oct 28, 2014
05b5fba
change filter name
wangxiaojing Oct 28, 2014
e66b166
change line exceeds 100 columns
wangxiaojing Nov 3, 2014
a63c5a3
line over 100
wangxiaojing Nov 10, 2014
0b4812e
remove line
wangxiaojing Dec 4, 2014
8990c35
style
wangxiaojing Dec 4, 2014
a20743f
change get depth
wangxiaojing Dec 17, 2014
d7f4880
Use 'isDir' to modify the compatibility
wangxiaojing Dec 24, 2014
99b05d6
rebase
wangxiaojing Jan 7, 2015
b6788a3
support java Api
wangxiaojing Jan 12, 2015
d2f606c
Add support python api
wangxiaojing Jan 12, 2015
8e3a054
Additional excludes for checking of Spark's binary compatibility
wangxiaojing Jan 13, 2015
571730a
rebase
wangxiaojing Jan 30, 2015
a4bfac2
rebase
wangxiaojing Feb 27, 2015
5e3fd3d
change API
wangxiaojing Feb 28, 2015
e4b9c22
change MiMa failures
wangxiaojing Mar 2, 2015
1a2aae9
rebase
wangxiaojing May 20, 2015
0a916cc
Merge branch 'master' into pr2765
zsxwing Jun 2, 2015
26dce26
Address comments
zsxwing Jun 2, 2015
2d85159
Refactor PR and handle some corner cases
zsxwing Jun 2, 2015
d7f42c2
Merge branch 'master' into pr2765
zsxwing Jul 1, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions docs/streaming-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -653,8 +653,7 @@ methods for creating DStreams from files and Akka actors as input sources.
</div>
</div>

Spark Streaming will monitor the directory `dataDirectory` and process any files created in that directory (files written in nested directories not supported). Note that

Spark Streaming will monitor the directory `dataDirectory` and process any files created in that directory. It won't search the nested directories by default. You can set the optional `depth` parameter to a value greater than 1 to monitor files in subdirectories. Note that
+ The files must have the same data format.
+ The files must be created in the `dataDirectory` by atomically *moving* or *renaming* them into
the data directory.
Expand Down
8 changes: 6 additions & 2 deletions python/pyspark/streaming/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,14 +253,18 @@ def socketTextStream(self, hostname, port, storageLevel=StorageLevel.MEMORY_AND_
return DStream(self._jssc.socketTextStream(hostname, port, jlevel), self,
UTF8Deserializer())

def textFileStream(self, directory):
def textFileStream(self, directory, depth=1):
"""
Create an input stream that monitors a Hadoop-compatible file system
for new files and reads them as text files. Files must be wrriten to the
monitored directory by "moving" them from another location within the same
file system. File names starting with . are ignored.

@param directory: The directory to monitor
@param depth: The max depth to search in the directory. The default
value 1 means only searching files in the current directory
"""
return DStream(self._jssc.textFileStream(directory), self, UTF8Deserializer())
return DStream(self._jssc.textFileStream(directory, depth), self, UTF8Deserializer())

def binaryRecordsStream(self, directory, recordLength):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,27 @@ class StreamingContext private[streaming] (
new FileInputDStream[K, V, F](this, directory)
}

/**
* Create a input stream that monitors a Hadoop-compatible filesystem
* for new files and reads them using the given key-value types and input format.
* Files must be written to the monitored directory by "moving" them from another
* location within the same file system. File names starting with . are ignored.
* It can also monitor files in subdirectories by setting the optional `depth`
* parameter to a value greater than 1.
* @param directory HDFS directory to monitor for new file
* @param depth Searching depth of HDFS directory
* @tparam K Key type for reading HDFS file
* @tparam V Value type for reading HDFS file
* @tparam F Input format for reading HDFS file
*/
def fileStream[
K: ClassTag,
V: ClassTag,
F <: NewInputFormat[K, V]: ClassTag
] (directory: String, depth: Int): InputDStream[(K, V)] = {
new FileInputDStream[K, V, F](this, directory, depth)
}

/**
* Create a input stream that monitors a Hadoop-compatible filesystem
* for new files and reads them using the given key-value types and input format.
Expand All @@ -403,7 +424,33 @@ class StreamingContext private[streaming] (
V: ClassTag,
F <: NewInputFormat[K, V]: ClassTag
] (directory: String, filter: Path => Boolean, newFilesOnly: Boolean): InputDStream[(K, V)] = {
new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly)
new FileInputDStream[K, V, F](this, directory, 1, filter, newFilesOnly)
}

/**
* Create a input stream that monitors a Hadoop-compatible filesystem
* for new files and reads them using the given key-value types and input format.
* Files must be written to the monitored directory by "moving" them from another
* location within the same file system.
* It can also monitor files in subdirectories by setting the optional `depth`
* parameter to a value greater than 1.
* @param directory HDFS directory to monitor for new file
* @param filter Function to filter paths to process
* @param newFilesOnly Should process only new files and ignore existing files in the directory
* @param depth Searching depth of HDFS directory
* @tparam K Key type for reading HDFS file
* @tparam V Value type for reading HDFS file
* @tparam F Input format for reading HDFS file
*/
def fileStream[
K: ClassTag,
V: ClassTag,
F <: NewInputFormat[K, V]: ClassTag
] (directory: String,
filter: Path => Boolean,
newFilesOnly: Boolean,
depth: Int): InputDStream[(K, V)] = {
new FileInputDStream[K, V, F](this, directory, depth, filter, newFilesOnly)
}

/**
Expand All @@ -427,7 +474,35 @@ class StreamingContext private[streaming] (
filter: Path => Boolean,
newFilesOnly: Boolean,
conf: Configuration): InputDStream[(K, V)] = {
new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly, Option(conf))
new FileInputDStream[K, V, F](this, directory, 1, filter, newFilesOnly, Option(conf))
}

/**
* Create a input stream that monitors a Hadoop-compatible filesystem
* for new files and reads them using the given key-value types and input format.
* Files must be written to the monitored directory by "moving" them from another
* location within the same file system. File names starting with . are ignored.
* It can also monitor files in subdirectories by setting the optional `depth`
* parameter to a value greater than 1.
* @param directory HDFS directory to monitor for new file
* @param filter Function to filter paths to process
* @param newFilesOnly Should process only new files and ignore existing files in the directory
* @param conf Hadoop configuration
* @param depth Searching depth of HDFS directory
* @tparam K Key type for reading HDFS file
* @tparam V Value type for reading HDFS file
* @tparam F Input format for reading HDFS file
*/
def fileStream[
K: ClassTag,
V: ClassTag,
F <: NewInputFormat[K, V]: ClassTag
] (directory: String,
filter: Path => Boolean,
newFilesOnly: Boolean,
conf: Configuration,
depth: Int): InputDStream[(K, V)] = {
new FileInputDStream[K, V, F](this, directory, depth, filter, newFilesOnly, Option(conf))
}

/**
Expand All @@ -442,6 +517,23 @@ class StreamingContext private[streaming] (
fileStream[LongWritable, Text, TextInputFormat](directory).map(_._2.toString)
}

/**
* Create a input stream that monitors a Hadoop-compatible filesystem
* for new files and reads them as text files (using key as LongWritable, value
* as Text and input format as TextInputFormat). Files must be written to the
* monitored directory by "moving" them from another location within the same
* file system. File names starting with . are ignored.
* It can also monitor files in subdirectories by setting the optional `depth`
* parameter to a value greater than 1.
* @param directory HDFS directory to monitor for new file
* @param depth Searching depth of HDFS directory
*/
def textFileStream(
directory: String,
depth: Int): DStream[String] = withNamedScope("text file stream") {
fileStream[LongWritable, Text, TextInputFormat](directory, depth).map(_._2.toString)
}

/**
* :: Experimental ::
*
Expand All @@ -450,21 +542,25 @@ class StreamingContext private[streaming] (
* generating one byte array per record. Files must be written to the monitored directory
* by "moving" them from another location within the same file system. File names
* starting with . are ignored.
* It can also monitor files in subdirectories by setting the optional `depth`
* parameter to a value greater than 1.
*
* '''Note:''' We ensure that the byte array for each record in the
* resulting RDDs of the DStream has the provided record length.
*
* @param directory HDFS directory to monitor for new file
* @param recordLength length of each record in bytes
* @param depth Searching depth of HDFS directory
*/
@Experimental
def binaryRecordsStream(
directory: String,
recordLength: Int): DStream[Array[Byte]] = withNamedScope("binary records stream") {
recordLength: Int,
depth: Int): DStream[Array[Byte]] = withNamedScope("binary records stream") {
val conf = sc_.hadoopConfiguration
conf.setInt(FixedLengthBinaryInputFormat.RECORD_LENGTH_PROPERTY, recordLength)
val br = fileStream[LongWritable, BytesWritable, FixedLengthBinaryInputFormat](
directory, FileInputDStream.defaultFilter: Path => Boolean, newFilesOnly = true, conf)
directory, FileInputDStream.defaultFilter: Path => Boolean, newFilesOnly = true, conf, depth)
val data = br.map { case (k, v) =>
val bytes = v.getBytes
require(bytes.length == recordLength, "Byte array does not have correct length. " +
Expand All @@ -474,6 +570,28 @@ class StreamingContext private[streaming] (
data
}

/**
* :: Experimental ::
*
* Create an input stream that monitors a Hadoop-compatible filesystem
* for new files and reads them as flat binary files, assuming a fixed length per record,
* generating one byte array per record. Files must be written to the monitored directory
* by "moving" them from another location within the same file system. File names
* starting with . are ignored.
*
* '''Note:''' We ensure that the byte array for each record in the
* resulting RDDs of the DStream has the provided record length.
*
* @param directory HDFS directory to monitor for new file
* @param recordLength length of each record in bytes
*/
@Experimental
def binaryRecordsStream(
directory: String,
recordLength: Int): DStream[Array[Byte]] = withNamedScope("binary records stream") {
binaryRecordsStream(directory, recordLength, 1)
}

/**
* Create an input stream from a queue of RDDs. In each batch,
* it will process either one or all of the RDDs returned by the queue.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,15 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
ssc.textFileStream(directory)
}

/**
* Create an input stream that monitor files in subdirectories for new files
* and reads them as text files.
* @param directory HDFS directory to monitor for new file
* @param depth Searching depth of HDFS directory
*/
def textFileStream(directory: String, depth: Int): JavaDStream[String] = {
ssc.textFileStream(directory, depth)
}
/**
* :: Experimental ::
*
Expand Down Expand Up @@ -292,6 +301,34 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
ssc.fileStream[K, V, F](directory)
}

/**
* Create an input stream that monitors a Hadoop-compatible filesystem
* for new files and reads them using the given key-value types and input format.
* Files must be written to the monitored directory by "moving" them from another
* location within the same file system. File names starting with . are ignored.
* It can also monitor files in subdirectories by setting the optional `depth`
* parameter to a value greater than 1.
* @param directory HDFS directory to monitor for new file
* @param depth Searching depth of HDFS directory
* @param kClass class of key for reading HDFS file
* @param vClass class of value for reading HDFS file
* @param fClass class of input format for reading HDFS file
* @tparam K Key type for reading HDFS file
* @tparam V Value type for reading HDFS file
* @tparam F Input format for reading HDFS file
*/
def fileStream[K, V, F <: NewInputFormat[K, V]](
directory: String,
depth: Int,
kClass: Class[K],
vClass: Class[V],
fClass: Class[F]): JavaPairInputDStream[K, V] = {
implicit val cmk: ClassTag[K] = ClassTag(kClass)
implicit val cmv: ClassTag[V] = ClassTag(vClass)
implicit val cmf: ClassTag[F] = ClassTag(fClass)
ssc.fileStream[K, V, F](directory, depth)
}

/**
* Create an input stream that monitors a Hadoop-compatible filesystem
* for new files and reads them using the given key-value types and input format.
Expand Down Expand Up @@ -321,6 +358,39 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
ssc.fileStream[K, V, F](directory, fn, newFilesOnly)
}

/**
* Create an input stream that monitors a Hadoop-compatible filesystem
* for new files and reads them using the given key-value types and input format.
* Files must be written to the monitored directory by "moving" them from another
* location within the same file system. File names starting with . are ignored.
* It can also monitor files in subdirectories by setting the optional `depth`
* parameter to a value greater than 1.
* @param directory HDFS directory to monitor for new file
* @param kClass class of key for reading HDFS file
* @param vClass class of value for reading HDFS file
* @param fClass class of input format for reading HDFS file
* @param filter Function to filter paths to process
* @param newFilesOnly Should process only new files and ignore existing files in the directory
* @param depth Searching depth of HDFS directory
* @tparam K Key type for reading HDFS file
* @tparam V Value type for reading HDFS file
* @tparam F Input format for reading HDFS file
*/
def fileStream[K, V, F <: NewInputFormat[K, V]](
directory: String,
kClass: Class[K],
vClass: Class[V],
fClass: Class[F],
filter: JFunction[Path, JBoolean],
newFilesOnly: Boolean,
depth: Int): JavaPairInputDStream[K, V] = {
implicit val cmk: ClassTag[K] = ClassTag(kClass)
implicit val cmv: ClassTag[V] = ClassTag(vClass)
implicit val cmf: ClassTag[F] = ClassTag(fClass)
def fn: (Path) => Boolean = (x: Path) => filter.call(x).booleanValue()
ssc.fileStream[K, V, F](directory, fn, newFilesOnly, depth)
}

/**
* Create an input stream that monitors a Hadoop-compatible filesystem
* for new files and reads them using the given key-value types and input format.
Expand Down Expand Up @@ -352,6 +422,41 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
ssc.fileStream[K, V, F](directory, fn, newFilesOnly, conf)
}

/**
* Create an input stream that monitors a Hadoop-compatible filesystem
* for new files and reads them using the given key-value types and input format.
* Files must be written to the monitored directory by "moving" them from another
* location within the same file system. File names starting with . are ignored.
* It can also monitor files in subdirectories by setting the optional `depth`
* parameter to a value greater than 1.
* @param directory HDFS directory to monitor for new file
* @param kClass class of key for reading HDFS file
* @param vClass class of value for reading HDFS file
* @param fClass class of input format for reading HDFS file
* @param filter Function to filter paths to process
* @param newFilesOnly Should process only new files and ignore existing files in the directory
* @param conf Hadoop configuration
* @param depth Searching depth of HDFS directory
* @tparam K Key type for reading HDFS file
* @tparam V Value type for reading HDFS file
* @tparam F Input format for reading HDFS file
*/
def fileStream[K, V, F <: NewInputFormat[K, V]](
directory: String,
kClass: Class[K],
vClass: Class[V],
fClass: Class[F],
filter: JFunction[Path, JBoolean],
newFilesOnly: Boolean,
conf: Configuration,
depth: Int): JavaPairInputDStream[K, V] = {
implicit val cmk: ClassTag[K] = ClassTag(kClass)
implicit val cmv: ClassTag[V] = ClassTag(vClass)
implicit val cmf: ClassTag[F] = ClassTag(fClass)
def fn: (Path) => Boolean = (x: Path) => filter.call(x).booleanValue()
ssc.fileStream[K, V, F](directory, fn, newFilesOnly, conf, depth)
}

/**
* Create an input stream with any arbitrary user implemented actor receiver.
* @param props Props object defining creation of the actor
Expand Down
Loading