Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
8550c26
Expose additional argument combination
freeman-lab Dec 25, 2014
ecef0eb
Add binaryRecordsStream to python
freeman-lab Dec 25, 2014
fe4e803
Add binaryRecordStream to Java API
freeman-lab Dec 25, 2014
36cb0fd
Add binaryRecordsStream to scala
freeman-lab Dec 25, 2014
23dd69f
Tests for binaryRecordsStream
freeman-lab Dec 25, 2014
9398bcb
Expose optional hadoop configuration
freeman-lab Dec 25, 2014
28bff9b
Fix missing arg
freeman-lab Dec 25, 2014
8b70fbc
Reorganization
freeman-lab Dec 25, 2014
2843e9d
Add params to docstring
freeman-lab Dec 25, 2014
94d90d0
Spelling
freeman-lab Dec 25, 2014
1c739aa
Simpler default arg handling
freeman-lab Dec 25, 2014
029d49c
Formatting
freeman-lab Dec 25, 2014
a4324a3
Line length
freeman-lab Dec 25, 2014
d3e75b2
Add tests in python
freeman-lab Dec 25, 2014
becb344
Formatting
freeman-lab Dec 25, 2014
fcb915c
Formatting
freeman-lab Dec 25, 2014
317b6d1
Make test inline
freeman-lab Jan 5, 2015
3ceb684
Merge remote-tracking branch 'upstream/master' into streaming-binary-…
freeman-lab Jan 30, 2015
7373f73
Add note and defensive assertion for byte length
freeman-lab Jan 30, 2015
9a3715a
Refactor to reflect changes to FileInputSuite
freeman-lab Jan 30, 2015
47560f4
Space formatting
freeman-lab Jan 31, 2015
b85bffc
Formatting
freeman-lab Jan 31, 2015
14bca9a
Add experimental tag
freeman-lab Jan 31, 2015
34d20ef
Add experimental tag
freeman-lab Feb 3, 2015
c2cfa6d
Expose new version of fileStream with conf in java
freeman-lab Feb 3, 2015
30eba67
Add filter and newFilesOnly alongside conf
freeman-lab Feb 3, 2015
c4237b8
Add experimental tag
freeman-lab Feb 3, 2015
eba925c
Simplify notes
freeman-lab Feb 3, 2015
5ff1b75
Add note to java streaming context
freeman-lab Feb 4, 2015
b676534
Clarify note
freeman-lab Feb 4, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -657,6 +657,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
*
* Load data from a flat binary file, assuming the length of each record is constant.
*
* '''Note:''' We ensure that the byte array for each record in the resulting RDD
* has the provided record length.
*
* @param path Directory to the input data files
* @param recordLength The length at which to split the records
* @return An RDD of data with values, represented as byte arrays
Expand All @@ -671,7 +674,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
classOf[LongWritable],
classOf[BytesWritable],
conf=conf)
val data = br.map{ case (k, v) => v.getBytes}
val data = br.map { case (k, v) =>
val bytes = v.getBytes
assert(bytes.length == recordLength, "Byte array does not have correct length")
bytes
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Is this something that the user should be made aware of in the docs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean something more than these notes we're adding? I just clarified the notes a bit to make it obvious the check is on the byte array.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant should the user be told that the system can throw error when the records are not of the expected size. I dont have any strong feeling on this, just wondering.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha, I think it's ok as is then. Given what FixedLengthInputFormat is doing, this is more a defensive assertion, it's not something the user should hit due to an inappropriate input.

}
data
}

Expand Down
16 changes: 15 additions & 1 deletion python/pyspark/streaming/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from py4j.java_gateway import java_import, JavaObject

from pyspark import RDD, SparkConf
from pyspark.serializers import UTF8Deserializer, CloudPickleSerializer
from pyspark.serializers import NoOpSerializer, UTF8Deserializer, CloudPickleSerializer
from pyspark.context import SparkContext
from pyspark.storagelevel import StorageLevel
from pyspark.streaming.dstream import DStream
Expand Down Expand Up @@ -251,6 +251,20 @@ def textFileStream(self, directory):
"""
return DStream(self._jssc.textFileStream(directory), self, UTF8Deserializer())

def binaryRecordsStream(self, directory, recordLength):
"""
Create an input stream that monitors a Hadoop-compatible file system
for new files and reads them as flat binary files with records of
fixed length. Files must be written to the monitored directory by "moving"
them from another location within the same file system.
File names starting with . are ignored.

@param directory: Directory to load data from
@param recordLength: Length of each record in bytes
"""
return DStream(self._jssc.binaryRecordsStream(directory, recordLength), self,
NoOpSerializer())

def _check_serializers(self, rdds):
# make sure they have same serializer
if len(set(rdd._jrdd_deserializer for rdd in rdds)) > 1:
Expand Down
15 changes: 15 additions & 0 deletions python/pyspark/streaming/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import operator
import unittest
import tempfile
import struct

from pyspark.context import SparkConf, SparkContext, RDD
from pyspark.streaming.context import StreamingContext
Expand Down Expand Up @@ -455,6 +456,20 @@ def test_text_file_stream(self):
self.wait_for(result, 2)
self.assertEqual([range(10), range(10)], result)

def test_binary_records_stream(self):
d = tempfile.mkdtemp()
self.ssc = StreamingContext(self.sc, self.duration)
dstream = self.ssc.binaryRecordsStream(d, 10).map(
lambda v: struct.unpack("10b", str(v)))
result = self._collect(dstream, 2, block=False)
self.ssc.start()
for name in ('a', 'b'):
time.sleep(1)
with open(os.path.join(d, name), "wb") as f:
f.write(bytearray(range(10)))
self.wait_for(result, 2)
self.assertEqual([range(10), range(10)], map(lambda v: list(v[0]), result))

def test_union(self):
input = [range(i + 1) for i in range(3)]
dstream = self.ssc.queueStream(input)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@ import scala.reflect.ClassTag
import akka.actor.{Props, SupervisorStrategy}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.io.{BytesWritable, LongWritable, Text}
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.spark._
import org.apache.spark.annotation.Experimental
import org.apache.spark.input.FixedLengthBinaryInputFormat
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream._
Expand Down Expand Up @@ -359,6 +361,30 @@ class StreamingContext private[streaming] (
new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly)
}

/**
* Create a input stream that monitors a Hadoop-compatible filesystem
* for new files and reads them using the given key-value types and input format.
* Files must be written to the monitored directory by "moving" them from another
* location within the same file system. File names starting with . are ignored.
* @param directory HDFS directory to monitor for new file
* @param filter Function to filter paths to process
* @param newFilesOnly Should process only new files and ignore existing files in the directory
* @param conf Hadoop configuration
* @tparam K Key type for reading HDFS file
* @tparam V Value type for reading HDFS file
* @tparam F Input format for reading HDFS file
*/
def fileStream[
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you make these parameters a superset of the parameters in the other fileStream? Otherwise it seems like people can either use a conf or use a filter

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fine with me, though would require wiring the extra arguments (filter and newFilesOnly) through to all the binary record methods (because they'll also need the conf). Should we expose those arguments in binaryRecords? Or just use their defaults?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the wiring to binaryRecords? Those two parameters are only relevant to
DStream, so nothing to pass on down to the RDDs
On Feb 2, 2015 7:30 PM, "Jeremy Freeman" [email protected] wrote:

In
streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
#3803 (comment):

@@ -361,6 +363,25 @@ class StreamingContext private[streaming] (

/**
* Create a input stream that monitors a Hadoop-compatible filesystem

  • * for new files and reads them using the given key-value types and input format.
  • * Files must be written to the monitored directory by "moving" them from another
  • * location within the same file system. File names starting with . are ignored.
  • * @param directory HDFS directory to monitor for new file
  • * @param conf Hadoop configuration
  • * @tparam K Key type for reading HDFS file
  • * @tparam V Value type for reading HDFS file
  • * @tparam F Input format for reading HDFS file
  • */
  • def fileStream[

Fine with me, though would require wiring the extra arguments (filter and
newFilesOnly) through to all the binary record methods (because they'll
also need the conf). Should we expose those arguments in binaryRecords?
Or just use their defaults?


Reply to this email directly or view it on GitHub
https://github.com/apache/spark/pull/3803/files#r23980541.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, meant exposing them to binaryRecordsStream. That's calling fileStream, and to include conf as an arg we'll also need to either specify filter and newFilesOnly or pass defaults, right?

K: ClassTag,
V: ClassTag,
F <: NewInputFormat[K, V]: ClassTag
] (directory: String,
filter: Path => Boolean,
newFilesOnly: Boolean,
conf: Configuration): InputDStream[(K, V)] = {
new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly, Option(conf))
}

/**
* Create a input stream that monitors a Hadoop-compatible filesystem
* for new files and reads them as text files (using key as LongWritable, value
Expand All @@ -371,6 +397,37 @@ class StreamingContext private[streaming] (
fileStream[LongWritable, Text, TextInputFormat](directory).map(_._2.toString)
}

/**
* :: Experimental ::
*
* Create an input stream that monitors a Hadoop-compatible filesystem
* for new files and reads them as flat binary files, assuming a fixed length per record,
* generating one byte array per record. Files must be written to the monitored directory
* by "moving" them from another location within the same file system. File names
* starting with . are ignored.
*
* '''Note:''' We ensure that the byte array for each record in the
* resulting RDDs of the DStream has the provided record length.
*
* @param directory HDFS directory to monitor for new file
* @param recordLength length of each record in bytes
*/
@Experimental
def binaryRecordsStream(
directory: String,
recordLength: Int): DStream[Array[Byte]] = {
val conf = sc_.hadoopConfiguration
conf.setInt(FixedLengthBinaryInputFormat.RECORD_LENGTH_PROPERTY, recordLength)
val br = fileStream[LongWritable, BytesWritable, FixedLengthBinaryInputFormat](
directory, FileInputDStream.defaultFilter : Path => Boolean, newFilesOnly=true, conf)
val data = br.map { case (k, v) =>
val bytes = v.getBytes
assert(bytes.length == recordLength, "Byte array does not have correct length")
bytes
}
data
}

/**
* Create an input stream from a queue of RDDs. In each batch,
* it will process either one or all of the RDDs returned by the queue.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.annotation.Experimental
import org.apache.spark.api.java.{JavaPairRDD, JavaRDD, JavaSparkContext}
import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
import org.apache.spark.rdd.RDD
Expand Down Expand Up @@ -177,7 +178,7 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {

/**
* Create an input stream from network source hostname:port. Data is received using
* a TCP socket and the receive bytes it interepreted as object using the given
* a TCP socket and the receive bytes it interpreted as object using the given
* converter.
* @param hostname Hostname to connect to for receiving data
* @param port Port to connect to for receiving data
Expand Down Expand Up @@ -209,6 +210,24 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
ssc.textFileStream(directory)
}

/**
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please expose the new version of filestream in javaStreamingContext.

* :: Experimental ::
*
* Create an input stream that monitors a Hadoop-compatible filesystem
* for new files and reads them as flat binary files with fixed record lengths,
* yielding byte arrays
*
* '''Note:''' We ensure that the byte array for each record in the
* resulting RDDs of the DStream has the provided record length.
*
* @param directory HDFS directory to monitor for new files
* @param recordLength The length at which to split the records
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldnt this have the note as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, added!

@Experimental
def binaryRecordsStream(directory: String, recordLength: Int): JavaDStream[Array[Byte]] = {
ssc.binaryRecordsStream(directory, recordLength)
}

/**
* Create an input stream from network source hostname:port, where data is received
* as serialized blocks (serialized using the Spark's serializer) that can be directly
Expand Down Expand Up @@ -298,6 +317,37 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
ssc.fileStream[K, V, F](directory, fn, newFilesOnly)
}

/**
* Create an input stream that monitors a Hadoop-compatible filesystem
* for new files and reads them using the given key-value types and input format.
* Files must be written to the monitored directory by "moving" them from another
* location within the same file system. File names starting with . are ignored.
* @param directory HDFS directory to monitor for new file
* @param kClass class of key for reading HDFS file
* @param vClass class of value for reading HDFS file
* @param fClass class of input format for reading HDFS file
* @param filter Function to filter paths to process
* @param newFilesOnly Should process only new files and ignore existing files in the directory
* @param conf Hadoop configuration
* @tparam K Key type for reading HDFS file
* @tparam V Value type for reading HDFS file
* @tparam F Input format for reading HDFS file
*/
def fileStream[K, V, F <: NewInputFormat[K, V]](
directory: String,
kClass: Class[K],
vClass: Class[V],
fClass: Class[F],
filter: JFunction[Path, JBoolean],
newFilesOnly: Boolean,
conf: Configuration): JavaPairInputDStream[K, V] = {
implicit val cmk: ClassTag[K] = ClassTag(kClass)
implicit val cmv: ClassTag[V] = ClassTag(vClass)
implicit val cmf: ClassTag[F] = ClassTag(fClass)
def fn = (x: Path) => filter.call(x).booleanValue()
ssc.fileStream[K, V, F](directory, fn, newFilesOnly, conf)
}

/**
* Create an input stream with any arbitrary user implemented actor receiver.
* @param props Props object defining creation of the actor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentHashMap
import scala.collection.mutable
import scala.reflect.ClassTag

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path, PathFilter}
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}

Expand Down Expand Up @@ -68,11 +69,13 @@ import org.apache.spark.util.{TimeStampedHashMap, Utils}
* processing semantics are undefined.
*/
private[streaming]
class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : ClassTag](
class FileInputDStream[K, V, F <: NewInputFormat[K,V]](
@transient ssc_ : StreamingContext,
directory: String,
filter: Path => Boolean = FileInputDStream.defaultFilter,
newFilesOnly: Boolean = true)
newFilesOnly: Boolean = true,
conf: Option[Configuration] = None)
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F])
extends InputDStream[(K, V)](ssc_) {

// This is a def so that it works during checkpoint recovery:
Expand Down Expand Up @@ -237,7 +240,15 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
/** Generate one RDD from an array of files */
private def filesToRDD(files: Seq[String]): RDD[(K, V)] = {
val fileRDDs = files.map(file =>{
val rdd = context.sparkContext.newAPIHadoopFile[K, V, F](file)
val rdd = conf match {
case Some(config) => context.sparkContext.newAPIHadoopFile(
file,
fm.runtimeClass.asInstanceOf[Class[F]],
km.runtimeClass.asInstanceOf[Class[K]],
vm.runtimeClass.asInstanceOf[Class[V]],
config)
case None => context.sparkContext.newAPIHadoopFile[K, V, F](file)
}
if (rdd.partitions.size == 0) {
logError("File " + file + " has no data in it. Spark Streaming can only ingest " +
"files that have been \"moved\" to the directory assigned to the file stream. " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,57 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
}
}

test("binary records stream") {
val testDir: File = null
try {
val batchDuration = Seconds(2)
val testDir = Utils.createTempDir()
// Create a file that exists before the StreamingContext is created:
val existingFile = new File(testDir, "0")
Files.write("0\n", existingFile, Charset.forName("UTF-8"))
assert(existingFile.setLastModified(10000) && existingFile.lastModified === 10000)

// Set up the streaming context and input streams
withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
// This `setTime` call ensures that the clock is past the creation time of `existingFile`
clock.setTime(existingFile.lastModified + batchDuration.milliseconds)
val batchCounter = new BatchCounter(ssc)
val fileStream = ssc.binaryRecordsStream(testDir.toString, 1)
val outputBuffer = new ArrayBuffer[Seq[Array[Byte]]]
with SynchronizedBuffer[Seq[Array[Byte]]]
val outputStream = new TestOutputStream(fileStream, outputBuffer)
outputStream.register()
ssc.start()

// Advance the clock so that the files are created after StreamingContext starts, but
// not enough to trigger a batch
clock.addToTime(batchDuration.milliseconds / 2)

val input = Seq(1, 2, 3, 4, 5)
input.foreach { i =>
Thread.sleep(batchDuration.milliseconds)
val file = new File(testDir, i.toString)
Files.write(Array[Byte](i.toByte), file)
assert(file.setLastModified(clock.currentTime()))
assert(file.lastModified === clock.currentTime)
logInfo("Created file " + file)
// Advance the clock after creating the file to avoid a race when
// setting its modification time
clock.addToTime(batchDuration.milliseconds)
eventually(eventuallyTimeout) {
assert(batchCounter.getNumCompletedBatches === i)
}
}

val expectedOutput = input.map(i => i.toByte)
val obtainedOutput = outputBuffer.flatten.toList.map(i => i(0).toByte)
assert(obtainedOutput === expectedOutput)
}
} finally {
if (testDir != null) Utils.deleteRecursively(testDir)
}
}

test("file input stream - newFilesOnly = true") {
testFileStream(newFilesOnly = true)
Expand Down