From 8550c2619aba22b40dc109171b395522ccfaaf08 Mon Sep 17 00:00:00 2001 From: freeman Date: Thu, 25 Dec 2014 04:31:31 -0500 Subject: [PATCH 01/29] Expose additional argument combination --- .../main/scala/org/apache/spark/SparkContext.scala | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 57bc3d4e4ae36..c0cabea7ecfa7 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -764,6 +764,19 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli vm.runtimeClass.asInstanceOf[Class[V]]) } + /** Get an RDD for a Hadoop file with an arbitrary new API InputFormat, + * using class tags but also allowing for an arbitrary configuration. */ + def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]] + (path: String, conf: Configuration = hadoopConfiguration) + (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = { + newAPIHadoopFile( + path, + fm.runtimeClass.asInstanceOf[Class[F]], + km.runtimeClass.asInstanceOf[Class[K]], + vm.runtimeClass.asInstanceOf[Class[V]], + conf) + } + /** * Get an RDD for a given Hadoop file with an arbitrary new API InputFormat * and extra configuration options to pass to the input format. From ecef0eb8d4bf30627e5b35c40c2f4204e1670390 Mon Sep 17 00:00:00 2001 From: freeman Date: Thu, 25 Dec 2014 04:34:49 -0500 Subject: [PATCH 02/29] Add binaryRecordsStream to python --- python/pyspark/streaming/context.py | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py index d48f3598e33b2..5b11454f7af62 100644 --- a/python/pyspark/streaming/context.py +++ b/python/pyspark/streaming/context.py @@ -21,7 +21,7 @@ from py4j.java_gateway import java_import, JavaObject from pyspark import RDD, SparkConf -from pyspark.serializers import UTF8Deserializer, CloudPickleSerializer +from pyspark.serializers import NoOpSerializer, UTF8Deserializer, CloudPickleSerializer from pyspark.context import SparkContext from pyspark.storagelevel import StorageLevel from pyspark.streaming.dstream import DStream @@ -251,6 +251,19 @@ def textFileStream(self, directory): """ return DStream(self._jssc.textFileStream(directory), self, UTF8Deserializer()) + def binaryRecordsStream(self, directory, recordLength): + """ + Create an input stream that monitors a Hadoop-compatible file system + for new files and reads them as flat binary files with records of + fixed length. Files must be written to the monitored directory by "moving" + them from another location within the same file system. + File names starting with . are ignored. + + @param directory: Directory to load data from + @param recordLength: Length of each record in bytes + """ + return DStream(self._jssc.binaryRecordsStream(directory, recordLength), self, NoOpSerializer()) + def _check_serializers(self, rdds): # make sure they have same serializer if len(set(rdd._jrdd_deserializer for rdd in rdds)) > 1: From fe4e803f8810c19aac02e7c8927af1d08b2f0a94 Mon Sep 17 00:00:00 2001 From: freeman Date: Thu, 25 Dec 2014 04:35:12 -0500 Subject: [PATCH 03/29] Add binaryRecordStream to Java API --- .../streaming/api/java/JavaStreamingContext.scala | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index d8695b8e05962..53814967f6d84 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -208,6 +208,18 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable { ssc.textFileStream(directory) } + /** + * Create an input stream that monitors a Hadoop-compatible filesystem + * for new files and reads them as flat binary files with fixed record lengths, + * yielding byte arrays + * @param directory HDFS directory to monitor for new files + * @param recordLength The length at which to split the records + */ + + def binaryRecordsStream(directory: String, recordLength: Int): JavaDStream[Array[Byte]] = { + ssc.binaryRecordsStream(directory, recordLength) + } + /** * Create an input stream from network source hostname:port, where data is received * as serialized blocks (serialized using the Spark's serializer) that can be directly From 36cb0fd576abb20b9c3210774ec9ff0471e2cf48 Mon Sep 17 00:00:00 2001 From: freeman Date: Thu, 25 Dec 2014 04:35:41 -0500 Subject: [PATCH 04/29] Add binaryRecordsStream to scala --- .../spark/streaming/StreamingContext.scala | 21 ++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index ecab5510a8e7b..882e981c2d00c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -28,10 +28,11 @@ import scala.reflect.ClassTag import akka.actor.{Props, SupervisorStrategy} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path -import org.apache.hadoop.io.{LongWritable, Text} +import org.apache.hadoop.io.{BytesWritable, LongWritable, Text} import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} import org.apache.hadoop.mapreduce.lib.input.TextInputFormat import org.apache.spark._ +import org.apache.spark.input.FixedLengthBinaryInputFormat import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream._ @@ -386,6 +387,24 @@ class StreamingContext private[streaming] ( queueStream(queue, oneAtATime, sc.makeRDD(Seq[T](), 1)) } + /** + * Create an input stream that monitors a Hadoop-compatible filesystem + * for new files and reads them as flat binary files, assuming a fixed length per record, + * generating one byte array per record. Files must be written to the monitored directory + * by "moving" them from another location within the same file system. File names + * starting with . are ignored. + * @param directory HDFS directory to monitor for new file + */ + def binaryRecordsStream( + directory: String, + recordLength: Int): DStream[Array[Byte]] = { + val conf = sc_.hadoopConfiguration + conf.setInt(FixedLengthBinaryInputFormat.RECORD_LENGTH_PROPERTY, recordLength) + val br = fileStream[LongWritable, BytesWritable, FixedLengthBinaryInputFormat](directory) + val data = br.map{ case (k, v) => v.getBytes} + data + } + /** * Create an input stream from a queue of RDDs. In each batch, * it will process either one or all of the RDDs returned by the queue. From 23dd69f318aedbf12cab10380a50d94ce8c3ca92 Mon Sep 17 00:00:00 2001 From: freeman Date: Thu, 25 Dec 2014 04:35:52 -0500 Subject: [PATCH 05/29] Tests for binaryRecordsStream --- .../spark/streaming/InputStreamsSuite.scala | 43 +++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala index 307052a4a9cbb..0445204b1f667 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala @@ -96,6 +96,9 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { } } + test("binary records stream") { + testBinaryRecordsStream() + } test("file input stream - newFilesOnly = true") { testFileStream(newFilesOnly = true) @@ -233,6 +236,46 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { } } + def testBinaryRecordsStream() { + var ssc: StreamingContext = null + val testDir: File = null + try { + val testDir = Utils.createTempDir() + + Thread.sleep(1000) + // Set up the streaming context and input streams + val newConf = conf.clone.set( + "spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock") + ssc = new StreamingContext(newConf, batchDuration) + + val fileStream = ssc.binaryRecordsStream(testDir.toString, 1) + + val outputBuffer = new ArrayBuffer[Seq[Array[Byte]]] with SynchronizedBuffer[Seq[Array[Byte]]] + val outputStream = new TestOutputStream(fileStream, outputBuffer) + outputStream.register() + ssc.start() + + // Create files in the directory with binary data + val input = Seq(1, 2, 3, 4, 5) + input.foreach { i => + Thread.sleep(batchDuration.milliseconds) + val file = new File(testDir, i.toString) + Files.write(Array[Byte](i.toByte), file) + logInfo("Created file " + file) + } + + // Verify contents of output + eventually(timeout(maxWaitTimeMillis milliseconds), interval(100 milliseconds)) { + val expectedOutput = input.map(i => i.toByte) + val obtainedOutput = outputBuffer.flatten.toList.map(i => i(0).toByte) + assert(obtainedOutput === expectedOutput) + } + } finally { + if (ssc != null) ssc.stop() + if (testDir != null) Utils.deleteRecursively(testDir) + } + } + def testFileStream(newFilesOnly: Boolean) { var ssc: StreamingContext = null val testDir: File = null From 9398bcb615c6cbf033b796c0837c99aba83303b4 Mon Sep 17 00:00:00 2001 From: freeman Date: Thu, 25 Dec 2014 04:40:06 -0500 Subject: [PATCH 06/29] Expose optional hadoop configuration --- .../spark/streaming/StreamingContext.scala | 22 +++++++++++++++++-- .../streaming/dstream/FileInputDStream.scala | 7 ++++-- 2 files changed, 25 insertions(+), 4 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 882e981c2d00c..17dbee7c2ca0f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -338,7 +338,7 @@ class StreamingContext private[streaming] ( V: ClassTag, F <: NewInputFormat[K, V]: ClassTag ] (directory: String): InputDStream[(K, V)] = { - new FileInputDStream[K, V, F](this, directory) + new FileInputDStream[K, V, F](this, directory, conf=None) } /** @@ -358,7 +358,25 @@ class StreamingContext private[streaming] ( V: ClassTag, F <: NewInputFormat[K, V]: ClassTag ] (directory: String, filter: Path => Boolean, newFilesOnly: Boolean): InputDStream[(K, V)] = { - new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly) + new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly, conf=None) + } + + /** + * Create a input stream that monitors a Hadoop-compatible filesystem + * for new files and reads them using the given key-value types and input format. + * Files must be written to the monitored directory by "moving" them from another + * location within the same file system. File names starting with . are ignored. + * @param directory HDFS directory to monitor for new file + * @tparam K Key type for reading HDFS file + * @tparam V Value type for reading HDFS file + * @tparam F Input format for reading HDFS file + */ + def fileStream[ + K: ClassTag, + V: ClassTag, + F <: NewInputFormat[K, V]: ClassTag + ] (directory: String, conf: Configuration): InputDStream[(K, V)] = { + new FileInputDStream[K, V, F](this, directory=directory, conf=Option(conf)) } /** diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index 5f13fdc5579ed..ac4779f6e8b36 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -22,6 +22,7 @@ import java.io.{IOException, ObjectInputStream} import scala.collection.mutable import scala.reflect.ClassTag +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path, PathFilter} import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} @@ -71,7 +72,8 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas @transient ssc_ : StreamingContext, directory: String, filter: Path => Boolean = FileInputDStream.defaultFilter, - newFilesOnly: Boolean = true) + newFilesOnly: Boolean = true, + conf: Option[Configuration]) extends InputDStream[(K, V)](ssc_) { // Data to be saved as part of the streaming checkpoints @@ -232,7 +234,8 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas /** Generate one RDD from an array of files */ private def filesToRDD(files: Seq[String]): RDD[(K, V)] = { val fileRDDs = files.map(file =>{ - val rdd = context.sparkContext.newAPIHadoopFile[K, V, F](file) + val hadoopConfiguration = conf.getOrElse(ssc.sparkContext.hadoopConfiguration) + val rdd = context.sparkContext.newAPIHadoopFile[K, V, F](file, hadoopConfiguration) if (rdd.partitions.size == 0) { logError("File " + file + " has no data in it. Spark Streaming can only ingest " + "files that have been \"moved\" to the directory assigned to the file stream. " + From 28bff9bab7be7c2f614a011f6b68e2103234c1df Mon Sep 17 00:00:00 2001 From: freeman Date: Thu, 25 Dec 2014 05:02:42 -0500 Subject: [PATCH 07/29] Fix missing arg --- .../scala/org/apache/spark/streaming/StreamingContext.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 17dbee7c2ca0f..23f5aae3dee1c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -418,7 +418,7 @@ class StreamingContext private[streaming] ( recordLength: Int): DStream[Array[Byte]] = { val conf = sc_.hadoopConfiguration conf.setInt(FixedLengthBinaryInputFormat.RECORD_LENGTH_PROPERTY, recordLength) - val br = fileStream[LongWritable, BytesWritable, FixedLengthBinaryInputFormat](directory) + val br = fileStream[LongWritable, BytesWritable, FixedLengthBinaryInputFormat](directory, conf) val data = br.map{ case (k, v) => v.getBytes} data } From 8b70fbcf785074c7cde873cf10e8d5f0ea9e3979 Mon Sep 17 00:00:00 2001 From: freeman Date: Thu, 25 Dec 2014 05:03:01 -0500 Subject: [PATCH 08/29] Reorganization --- .../spark/streaming/StreamingContext.scala | 28 +++++++++---------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 23f5aae3dee1c..d743e7173faec 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -391,20 +391,6 @@ class StreamingContext private[streaming] ( fileStream[LongWritable, Text, TextInputFormat](directory).map(_._2.toString) } - /** - * Create an input stream from a queue of RDDs. In each batch, - * it will process either one or all of the RDDs returned by the queue. - * @param queue Queue of RDDs - * @param oneAtATime Whether only one RDD should be consumed from the queue in every interval - * @tparam T Type of objects in the RDD - */ - def queueStream[T: ClassTag]( - queue: Queue[RDD[T]], - oneAtATime: Boolean = true - ): InputDStream[T] = { - queueStream(queue, oneAtATime, sc.makeRDD(Seq[T](), 1)) - } - /** * Create an input stream that monitors a Hadoop-compatible filesystem * for new files and reads them as flat binary files, assuming a fixed length per record, @@ -423,6 +409,20 @@ class StreamingContext private[streaming] ( data } + /** + * Create an input stream from a queue of RDDs. In each batch, + * it will process either one or all of the RDDs returned by the queue. + * @param queue Queue of RDDs + * @param oneAtATime Whether only one RDD should be consumed from the queue in every interval + * @tparam T Type of objects in the RDD + */ + def queueStream[T: ClassTag]( + queue: Queue[RDD[T]], + oneAtATime: Boolean = true + ): InputDStream[T] = { + queueStream(queue, oneAtATime, sc.makeRDD(Seq[T](), 1)) + } + /** * Create an input stream from a queue of RDDs. In each batch, * it will process either one or all of the RDDs returned by the queue. From 2843e9de60f23bbce3ac185c09b8575a7513fe0d Mon Sep 17 00:00:00 2001 From: freeman Date: Thu, 25 Dec 2014 12:43:20 -0500 Subject: [PATCH 09/29] Add params to docstring --- .../scala/org/apache/spark/streaming/StreamingContext.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index d743e7173faec..ff427ad5cb21c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -367,6 +367,7 @@ class StreamingContext private[streaming] ( * Files must be written to the monitored directory by "moving" them from another * location within the same file system. File names starting with . are ignored. * @param directory HDFS directory to monitor for new file + * @param conf Hadoop configuration * @tparam K Key type for reading HDFS file * @tparam V Value type for reading HDFS file * @tparam F Input format for reading HDFS file @@ -398,6 +399,7 @@ class StreamingContext private[streaming] ( * by "moving" them from another location within the same file system. File names * starting with . are ignored. * @param directory HDFS directory to monitor for new file + * @param recordLength length of each record in bytes */ def binaryRecordsStream( directory: String, From 94d90d0fbc576c4e475bb0a053e6c35d53152cf4 Mon Sep 17 00:00:00 2001 From: freeman Date: Thu, 25 Dec 2014 12:44:09 -0500 Subject: [PATCH 10/29] Spelling --- .../apache/spark/streaming/api/java/JavaStreamingContext.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index 53814967f6d84..e1dbcbc910c29 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -176,7 +176,7 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable { /** * Create an input stream from network source hostname:port. Data is received using - * a TCP socket and the receive bytes it interepreted as object using the given + * a TCP socket and the receive bytes it interpreted as object using the given * converter. * @param hostname Hostname to connect to for receiving data * @param port Port to connect to for receiving data From 1c739aa67a006a62a6ee8f294ff60568f9031476 Mon Sep 17 00:00:00 2001 From: freeman Date: Thu, 25 Dec 2014 12:48:04 -0500 Subject: [PATCH 11/29] Simpler default arg handling --- .../main/scala/org/apache/spark/SparkContext.scala | 13 ------------- .../apache/spark/streaming/StreamingContext.scala | 4 ++-- .../spark/streaming/dstream/FileInputDStream.scala | 14 ++++++++++---- 3 files changed, 12 insertions(+), 19 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index c0cabea7ecfa7..57bc3d4e4ae36 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -764,19 +764,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli vm.runtimeClass.asInstanceOf[Class[V]]) } - /** Get an RDD for a Hadoop file with an arbitrary new API InputFormat, - * using class tags but also allowing for an arbitrary configuration. */ - def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]] - (path: String, conf: Configuration = hadoopConfiguration) - (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = { - newAPIHadoopFile( - path, - fm.runtimeClass.asInstanceOf[Class[F]], - km.runtimeClass.asInstanceOf[Class[K]], - vm.runtimeClass.asInstanceOf[Class[V]], - conf) - } - /** * Get an RDD for a given Hadoop file with an arbitrary new API InputFormat * and extra configuration options to pass to the input format. diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index ff427ad5cb21c..e7f298d42b515 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -338,7 +338,7 @@ class StreamingContext private[streaming] ( V: ClassTag, F <: NewInputFormat[K, V]: ClassTag ] (directory: String): InputDStream[(K, V)] = { - new FileInputDStream[K, V, F](this, directory, conf=None) + new FileInputDStream[K, V, F](this, directory) } /** @@ -358,7 +358,7 @@ class StreamingContext private[streaming] ( V: ClassTag, F <: NewInputFormat[K, V]: ClassTag ] (directory: String, filter: Path => Boolean, newFilesOnly: Boolean): InputDStream[(K, V)] = { - new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly, conf=None) + new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly) } /** diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index ac4779f6e8b36..d4d0a8c9439fb 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -68,12 +68,12 @@ import org.apache.spark.util.{TimeStampedHashMap, Utils} * processing semantics are undefined. */ private[streaming] -class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : ClassTag]( +class FileInputDStream[K, V, F <: NewInputFormat[K,V]]( @transient ssc_ : StreamingContext, directory: String, filter: Path => Boolean = FileInputDStream.defaultFilter, newFilesOnly: Boolean = true, - conf: Option[Configuration]) + conf: Option[Configuration] = None)(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]) extends InputDStream[(K, V)](ssc_) { // Data to be saved as part of the streaming checkpoints @@ -234,8 +234,14 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas /** Generate one RDD from an array of files */ private def filesToRDD(files: Seq[String]): RDD[(K, V)] = { val fileRDDs = files.map(file =>{ - val hadoopConfiguration = conf.getOrElse(ssc.sparkContext.hadoopConfiguration) - val rdd = context.sparkContext.newAPIHadoopFile[K, V, F](file, hadoopConfiguration) + val rdd = conf match { + case Some(config) => context.sparkContext.newAPIHadoopFile(file, + fm.runtimeClass.asInstanceOf[Class[F]], + km.runtimeClass.asInstanceOf[Class[K]], + vm.runtimeClass.asInstanceOf[Class[V]], + config) + case None => context.sparkContext.newAPIHadoopFile[K, V, F](file) + } if (rdd.partitions.size == 0) { logError("File " + file + " has no data in it. Spark Streaming can only ingest " + "files that have been \"moved\" to the directory assigned to the file stream. " + From 029d49c143c7bed603db3ca43b44d212de516df8 Mon Sep 17 00:00:00 2001 From: freeman Date: Thu, 25 Dec 2014 12:50:42 -0500 Subject: [PATCH 12/29] Formatting --- .../org/apache/spark/streaming/dstream/FileInputDStream.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index d4d0a8c9439fb..f48590be9cca8 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -73,7 +73,8 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]]( directory: String, filter: Path => Boolean = FileInputDStream.defaultFilter, newFilesOnly: Boolean = true, - conf: Option[Configuration] = None)(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]) + conf: Option[Configuration] = None) + (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]) extends InputDStream[(K, V)](ssc_) { // Data to be saved as part of the streaming checkpoints From a4324a38f8155f6b3e776326925af61f16a2fdfb Mon Sep 17 00:00:00 2001 From: freeman Date: Thu, 25 Dec 2014 12:56:45 -0500 Subject: [PATCH 13/29] Line length --- python/pyspark/streaming/context.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py index 5b11454f7af62..18aaae93b05f2 100644 --- a/python/pyspark/streaming/context.py +++ b/python/pyspark/streaming/context.py @@ -262,7 +262,8 @@ def binaryRecordsStream(self, directory, recordLength): @param directory: Directory to load data from @param recordLength: Length of each record in bytes """ - return DStream(self._jssc.binaryRecordsStream(directory, recordLength), self, NoOpSerializer()) + return DStream(self._jssc.binaryRecordsStream(directory, recordLength), self, + NoOpSerializer()) def _check_serializers(self, rdds): # make sure they have same serializer From d3e75b2bad2ba5048b36300cfd61b7cb5c39414b Mon Sep 17 00:00:00 2001 From: freeman Date: Thu, 25 Dec 2014 14:29:06 -0500 Subject: [PATCH 14/29] Add tests in python --- python/pyspark/streaming/tests.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index a8d876d0fa3b3..d509c9999e0db 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -21,6 +21,7 @@ import operator import unittest import tempfile +import struct from pyspark.context import SparkConf, SparkContext, RDD from pyspark.streaming.context import StreamingContext @@ -455,6 +456,20 @@ def test_text_file_stream(self): self.wait_for(result, 2) self.assertEqual([range(10), range(10)], result) + def test_binary_records_stream(self): + d = tempfile.mkdtemp() + self.ssc = StreamingContext(self.sc, self.duration) + dstream = self.ssc.binaryRecordsStream(d, 10).map( + lambda v: struct.unpack("10b", str(v))) + result = self._collect(dstream, 2, block=False) + self.ssc.start() + for name in ('a', 'b'): + time.sleep(1) + with open(os.path.join(d, name), "wb") as f: + f.write(bytearray(range(10))) + self.wait_for(result, 2) + self.assertEqual([range(10),range(10)], map(lambda v: list(v[0]), result)) + def test_union(self): input = [range(i + 1) for i in range(3)] dstream = self.ssc.queueStream(input) From becb34474fd165ee8aae9d207532869bce3ef743 Mon Sep 17 00:00:00 2001 From: freeman Date: Thu, 25 Dec 2014 14:31:07 -0500 Subject: [PATCH 15/29] Formatting --- .../scala/org/apache/spark/streaming/InputStreamsSuite.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala index 0445204b1f667..96ec4baff618c 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala @@ -250,7 +250,8 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { val fileStream = ssc.binaryRecordsStream(testDir.toString, 1) - val outputBuffer = new ArrayBuffer[Seq[Array[Byte]]] with SynchronizedBuffer[Seq[Array[Byte]]] + val outputBuffer = new ArrayBuffer[Seq[Array[Byte]]] + with SynchronizedBuffer[Seq[Array[Byte]]] val outputStream = new TestOutputStream(fileStream, outputBuffer) outputStream.register() ssc.start() From fcb915c2fbba80b9d7b765425e203b0e3796c59d Mon Sep 17 00:00:00 2001 From: freeman Date: Thu, 25 Dec 2014 14:53:23 -0500 Subject: [PATCH 16/29] Formatting --- python/pyspark/streaming/tests.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index d509c9999e0db..608f8e26473a6 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -468,7 +468,7 @@ def test_binary_records_stream(self): with open(os.path.join(d, name), "wb") as f: f.write(bytearray(range(10))) self.wait_for(result, 2) - self.assertEqual([range(10),range(10)], map(lambda v: list(v[0]), result)) + self.assertEqual([range(10), range(10)], map(lambda v: list(v[0]), result)) def test_union(self): input = [range(i + 1) for i in range(3)] From 317b6d1dc45f0706987c3258beaa64be08df4b3c Mon Sep 17 00:00:00 2001 From: freeman Date: Mon, 5 Jan 2015 17:37:06 -0500 Subject: [PATCH 17/29] Make test inline --- .../spark/streaming/InputStreamsSuite.scala | 80 +++++++++---------- 1 file changed, 38 insertions(+), 42 deletions(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala index 96ec4baff618c..0f6c11287ea10 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala @@ -97,7 +97,44 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { } test("binary records stream") { - testBinaryRecordsStream() + var ssc: StreamingContext = null + val testDir: File = null + try { + val testDir = Utils.createTempDir() + + Thread.sleep(1000) + // Set up the streaming context and input streams + val newConf = conf.clone.set( + "spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock") + ssc = new StreamingContext(newConf, batchDuration) + + val fileStream = ssc.binaryRecordsStream(testDir.toString, 1) + + val outputBuffer = new ArrayBuffer[Seq[Array[Byte]]] + with SynchronizedBuffer[Seq[Array[Byte]]] + val outputStream = new TestOutputStream(fileStream, outputBuffer) + outputStream.register() + ssc.start() + + // Create files in the directory with binary data + val input = Seq(1, 2, 3, 4, 5) + input.foreach { i => + Thread.sleep(batchDuration.milliseconds) + val file = new File(testDir, i.toString) + Files.write(Array[Byte](i.toByte), file) + logInfo("Created file " + file) + } + + // Verify contents of output + eventually(timeout(maxWaitTimeMillis milliseconds), interval(100 milliseconds)) { + val expectedOutput = input.map(i => i.toByte) + val obtainedOutput = outputBuffer.flatten.toList.map(i => i(0).toByte) + assert(obtainedOutput === expectedOutput) + } + } finally { + if (ssc != null) ssc.stop() + if (testDir != null) Utils.deleteRecursively(testDir) + } } test("file input stream - newFilesOnly = true") { @@ -236,47 +273,6 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { } } - def testBinaryRecordsStream() { - var ssc: StreamingContext = null - val testDir: File = null - try { - val testDir = Utils.createTempDir() - - Thread.sleep(1000) - // Set up the streaming context and input streams - val newConf = conf.clone.set( - "spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock") - ssc = new StreamingContext(newConf, batchDuration) - - val fileStream = ssc.binaryRecordsStream(testDir.toString, 1) - - val outputBuffer = new ArrayBuffer[Seq[Array[Byte]]] - with SynchronizedBuffer[Seq[Array[Byte]]] - val outputStream = new TestOutputStream(fileStream, outputBuffer) - outputStream.register() - ssc.start() - - // Create files in the directory with binary data - val input = Seq(1, 2, 3, 4, 5) - input.foreach { i => - Thread.sleep(batchDuration.milliseconds) - val file = new File(testDir, i.toString) - Files.write(Array[Byte](i.toByte), file) - logInfo("Created file " + file) - } - - // Verify contents of output - eventually(timeout(maxWaitTimeMillis milliseconds), interval(100 milliseconds)) { - val expectedOutput = input.map(i => i.toByte) - val obtainedOutput = outputBuffer.flatten.toList.map(i => i(0).toByte) - assert(obtainedOutput === expectedOutput) - } - } finally { - if (ssc != null) ssc.stop() - if (testDir != null) Utils.deleteRecursively(testDir) - } - } - def testFileStream(newFilesOnly: Boolean) { var ssc: StreamingContext = null val testDir: File = null From 7373f739bfecef32b48e075b5afcdbcabfb4b06b Mon Sep 17 00:00:00 2001 From: freeman Date: Fri, 30 Jan 2015 14:54:56 -0800 Subject: [PATCH 18/29] Add note and defensive assertion for byte length --- .../src/main/scala/org/apache/spark/SparkContext.scala | 9 ++++++++- .../org/apache/spark/streaming/StreamingContext.scala | 10 +++++++++- 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 3c61c10820ba9..112f820f2c16b 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -657,6 +657,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * * Load data from a flat binary file, assuming the length of each record is constant. * + * '''Note:''' Normally getBytes returns an array padded with extra values, + * but the FixedLengthBinaryInputFormat ensures that it will always be backed + * by a byte array of the correct length (the recordLength) + * * @param path Directory to the input data files * @param recordLength The length at which to split the records * @return An RDD of data with values, represented as byte arrays @@ -671,7 +675,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli classOf[LongWritable], classOf[BytesWritable], conf=conf) - val data = br.map{ case (k, v) => v.getBytes} + val data = br.map{ case (k, v) => + val bytes = v.getBytes + assert(bytes.length == recordLength, "Byte array does not have correct length") + bytes} data } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 4e6aaac33ddf3..2cc5f18e0301a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -397,6 +397,11 @@ class StreamingContext private[streaming] ( * generating one byte array per record. Files must be written to the monitored directory * by "moving" them from another location within the same file system. File names * starting with . are ignored. + * + * '''Note:''' Normally getBytes returns an array padded with extra values, + * but the FixedLengthBinaryInputFormat ensures that it will always be backed + * by a byte array of the correct length (the recordLength) + * * @param directory HDFS directory to monitor for new file * @param recordLength length of each record in bytes */ @@ -406,7 +411,10 @@ class StreamingContext private[streaming] ( val conf = sc_.hadoopConfiguration conf.setInt(FixedLengthBinaryInputFormat.RECORD_LENGTH_PROPERTY, recordLength) val br = fileStream[LongWritable, BytesWritable, FixedLengthBinaryInputFormat](directory, conf) - val data = br.map{ case (k, v) => v.getBytes} + val data = br.map{ case (k, v) => + val bytes = v.getBytes + assert(bytes.length == recordLength, "Byte array does not have correct length") + bytes} data } From 9a3715a1e6a71040d234da52bf848b0bb109a591 Mon Sep 17 00:00:00 2001 From: freeman Date: Fri, 30 Jan 2015 14:55:57 -0800 Subject: [PATCH 19/29] Refactor to reflect changes to FileInputSuite --- .../spark/streaming/InputStreamsSuite.scala | 61 +++++++++++-------- 1 file changed, 36 insertions(+), 25 deletions(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala index 053530472d408..01084a457db4f 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala @@ -96,42 +96,53 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { } test("binary records stream") { - var ssc: StreamingContext = null val testDir: File = null try { + val batchDuration = Seconds(2) val testDir = Utils.createTempDir() + // Create a file that exists before the StreamingContext is created: + val existingFile = new File(testDir, "0") + Files.write("0\n", existingFile, Charset.forName("UTF-8")) + assert(existingFile.setLastModified(10000) && existingFile.lastModified === 10000) - Thread.sleep(1000) // Set up the streaming context and input streams - val newConf = conf.clone.set( - "spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock") - ssc = new StreamingContext(newConf, batchDuration) - - val fileStream = ssc.binaryRecordsStream(testDir.toString, 1) - - val outputBuffer = new ArrayBuffer[Seq[Array[Byte]]] - with SynchronizedBuffer[Seq[Array[Byte]]] - val outputStream = new TestOutputStream(fileStream, outputBuffer) - outputStream.register() - ssc.start() - - // Create files in the directory with binary data - val input = Seq(1, 2, 3, 4, 5) - input.foreach { i => - Thread.sleep(batchDuration.milliseconds) - val file = new File(testDir, i.toString) - Files.write(Array[Byte](i.toByte), file) - logInfo("Created file " + file) - } + withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc => + val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] + // This `setTime` call ensures that the clock is past the creation time of `existingFile` + clock.setTime(existingFile.lastModified + batchDuration.milliseconds) + val batchCounter = new BatchCounter(ssc) + val fileStream = ssc.binaryRecordsStream(testDir.toString, 1) + val outputBuffer = new ArrayBuffer[Seq[Array[Byte]]] + with SynchronizedBuffer[Seq[Array[Byte]]] + val outputStream = new TestOutputStream(fileStream, outputBuffer) + outputStream.register() + ssc.start() + + // Advance the clock so that the files are created after StreamingContext starts, but + // not enough to trigger a batch + clock.addToTime(batchDuration.milliseconds / 2) + + val input = Seq(1, 2, 3, 4, 5) + input.foreach { i => + Thread.sleep(batchDuration.milliseconds) + val file = new File(testDir, i.toString) + Files.write(Array[Byte](i.toByte), file) + assert(file.setLastModified(clock.currentTime())) + assert(file.lastModified === clock.currentTime) + logInfo("Created file " + file) + // Advance the clock after creating the file to avoid a race when + // setting its modification time + clock.addToTime(batchDuration.milliseconds) + eventually(eventuallyTimeout) { + assert(batchCounter.getNumCompletedBatches === i) + } + } - // Verify contents of output - eventually(timeout(maxWaitTimeMillis milliseconds), interval(100 milliseconds)) { val expectedOutput = input.map(i => i.toByte) val obtainedOutput = outputBuffer.flatten.toList.map(i => i(0).toByte) assert(obtainedOutput === expectedOutput) } } finally { - if (ssc != null) ssc.stop() if (testDir != null) Utils.deleteRecursively(testDir) } } From 47560f47547142543548e502358ecf2d3d218b67 Mon Sep 17 00:00:00 2001 From: freeman Date: Fri, 30 Jan 2015 18:30:33 -0800 Subject: [PATCH 20/29] Space formatting --- core/src/main/scala/org/apache/spark/SparkContext.scala | 5 +++-- .../scala/org/apache/spark/streaming/StreamingContext.scala | 5 +++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 112f820f2c16b..4fae9fe8dd29b 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -675,10 +675,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli classOf[LongWritable], classOf[BytesWritable], conf=conf) - val data = br.map{ case (k, v) => + val data = br.map { case (k, v) => val bytes = v.getBytes assert(bytes.length == recordLength, "Byte array does not have correct length") - bytes} + bytes + } data } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 2cc5f18e0301a..1f67c50ba51bc 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -411,10 +411,11 @@ class StreamingContext private[streaming] ( val conf = sc_.hadoopConfiguration conf.setInt(FixedLengthBinaryInputFormat.RECORD_LENGTH_PROPERTY, recordLength) val br = fileStream[LongWritable, BytesWritable, FixedLengthBinaryInputFormat](directory, conf) - val data = br.map{ case (k, v) => + val data = br.map { case (k, v) => val bytes = v.getBytes assert(bytes.length == recordLength, "Byte array does not have correct length") - bytes} + bytes + } data } From b85bffc7e12e3386842f4073a49c6160caa03dd5 Mon Sep 17 00:00:00 2001 From: freeman Date: Fri, 30 Jan 2015 18:30:46 -0800 Subject: [PATCH 21/29] Formatting --- .../org/apache/spark/streaming/dstream/FileInputDStream.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index 383638430be57..6379b88527ec8 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -241,7 +241,8 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]]( private def filesToRDD(files: Seq[String]): RDD[(K, V)] = { val fileRDDs = files.map(file =>{ val rdd = conf match { - case Some(config) => context.sparkContext.newAPIHadoopFile(file, + case Some(config) => context.sparkContext.newAPIHadoopFile( + file, fm.runtimeClass.asInstanceOf[Class[F]], km.runtimeClass.asInstanceOf[Class[K]], vm.runtimeClass.asInstanceOf[Class[V]], From 14bca9aff82b7323a624c4e05a46b60dd140ec99 Mon Sep 17 00:00:00 2001 From: freeman Date: Fri, 30 Jan 2015 18:31:32 -0800 Subject: [PATCH 22/29] Add experimental tag --- .../scala/org/apache/spark/streaming/StreamingContext.scala | 2 ++ .../apache/spark/streaming/api/java/JavaStreamingContext.scala | 3 ++- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 1f67c50ba51bc..86c98a0624424 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -31,6 +31,7 @@ import org.apache.hadoop.io.{BytesWritable, LongWritable, Text} import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} import org.apache.hadoop.mapreduce.lib.input.TextInputFormat import org.apache.spark._ +import org.apache.spark.annotation.Experimental import org.apache.spark.input.FixedLengthBinaryInputFormat import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel @@ -405,6 +406,7 @@ class StreamingContext private[streaming] ( * @param directory HDFS directory to monitor for new file * @param recordLength length of each record in bytes */ + @Experimental def binaryRecordsStream( directory: String, recordLength: Int): DStream[Array[Byte]] = { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index b04a519fbc1d3..eeeddb9ac9df0 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -29,6 +29,7 @@ import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.annotation.Experimental import org.apache.spark.api.java.{JavaPairRDD, JavaRDD, JavaSparkContext} import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2} import org.apache.spark.rdd.RDD @@ -216,7 +217,7 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable { * @param directory HDFS directory to monitor for new files * @param recordLength The length at which to split the records */ - + @Experimental def binaryRecordsStream(directory: String, recordLength: Int): JavaDStream[Array[Byte]] = { ssc.binaryRecordsStream(directory, recordLength) } From 34d20ef07fadce23cbd9726c45ee07f6c7b12165 Mon Sep 17 00:00:00 2001 From: freeman Date: Mon, 2 Feb 2015 22:28:02 -0500 Subject: [PATCH 23/29] Add experimental tag --- .../apache/spark/streaming/api/java/JavaStreamingContext.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index eeeddb9ac9df0..f0e3fafa43478 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -211,6 +211,7 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable { } /** + * :: Experimental :: * Create an input stream that monitors a Hadoop-compatible filesystem * for new files and reads them as flat binary files with fixed record lengths, * yielding byte arrays From c2cfa6d7abb0ae4cdc77a5e48ef2cb47bfe82ff5 Mon Sep 17 00:00:00 2001 From: freeman Date: Tue, 3 Feb 2015 16:34:45 -0500 Subject: [PATCH 24/29] Expose new version of fileStream with conf in java --- .../api/java/JavaStreamingContext.scala | 31 +++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index f0e3fafa43478..b6e869dbc1497 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -312,6 +312,37 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable { ssc.fileStream[K, V, F](directory, fn, newFilesOnly) } + /** + * Create an input stream that monitors a Hadoop-compatible filesystem + * for new files and reads them using the given key-value types and input format. + * Files must be written to the monitored directory by "moving" them from another + * location within the same file system. File names starting with . are ignored. + * @param directory HDFS directory to monitor for new file + * @param kClass class of key for reading HDFS file + * @param vClass class of value for reading HDFS file + * @param fClass class of input format for reading HDFS file + * @param filter Function to filter paths to process + * @param newFilesOnly Should process only new files and ignore existing files in the directory + * @param conf Hadoop configuration + * @tparam K Key type for reading HDFS file + * @tparam V Value type for reading HDFS file + * @tparam F Input format for reading HDFS file + */ + def fileStream[K, V, F <: NewInputFormat[K, V]]( + directory: String, + kClass: Class[K], + vClass: Class[V], + fClass: Class[F], + filter: JFunction[Path, JBoolean], + newFilesOnly: Boolean, + conf: Configuration): JavaPairInputDStream[K, V] = { + implicit val cmk: ClassTag[K] = ClassTag(kClass) + implicit val cmv: ClassTag[V] = ClassTag(vClass) + implicit val cmf: ClassTag[F] = ClassTag(fClass) + def fn = (x: Path) => filter.call(x).booleanValue() + ssc.fileStream[K, V, F](directory, fn, newFilesOnly, conf) + } + /** * Create an input stream with any arbitrary user implemented actor receiver. * @param props Props object defining creation of the actor From 30eba6788a7e687f53c8503ab6ee004b9f79f4b8 Mon Sep 17 00:00:00 2001 From: freeman Date: Tue, 3 Feb 2015 17:02:34 -0500 Subject: [PATCH 25/29] Add filter and newFilesOnly alongside conf - Ensures the argument including conf is a superset of existing arguments - Use default versions for binaryRecordStream --- .../apache/spark/streaming/StreamingContext.scala | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 86c98a0624424..2b58ef18a4913 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -367,6 +367,8 @@ class StreamingContext private[streaming] ( * Files must be written to the monitored directory by "moving" them from another * location within the same file system. File names starting with . are ignored. * @param directory HDFS directory to monitor for new file + * @param filter Function to filter paths to process + * @param newFilesOnly Should process only new files and ignore existing files in the directory * @param conf Hadoop configuration * @tparam K Key type for reading HDFS file * @tparam V Value type for reading HDFS file @@ -376,8 +378,11 @@ class StreamingContext private[streaming] ( K: ClassTag, V: ClassTag, F <: NewInputFormat[K, V]: ClassTag - ] (directory: String, conf: Configuration): InputDStream[(K, V)] = { - new FileInputDStream[K, V, F](this, directory=directory, conf=Option(conf)) + ] (directory: String, + filter: Path => Boolean, + newFilesOnly: Boolean, + conf: Configuration): InputDStream[(K, V)] = { + new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly, Option(conf)) } /** @@ -412,7 +417,8 @@ class StreamingContext private[streaming] ( recordLength: Int): DStream[Array[Byte]] = { val conf = sc_.hadoopConfiguration conf.setInt(FixedLengthBinaryInputFormat.RECORD_LENGTH_PROPERTY, recordLength) - val br = fileStream[LongWritable, BytesWritable, FixedLengthBinaryInputFormat](directory, conf) + val br = fileStream[LongWritable, BytesWritable, FixedLengthBinaryInputFormat]( + directory, FileInputDStream.defaultFilter : Path => Boolean, newFilesOnly=true, conf) val data = br.map { case (k, v) => val bytes = v.getBytes assert(bytes.length == recordLength, "Byte array does not have correct length") From c4237b8e6d617fa0a307d8e6e173e421a1d2b458 Mon Sep 17 00:00:00 2001 From: freeman Date: Tue, 3 Feb 2015 17:26:51 -0500 Subject: [PATCH 26/29] Add experimental tag --- .../scala/org/apache/spark/streaming/StreamingContext.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 2b58ef18a4913..101ab2457694e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -398,6 +398,8 @@ class StreamingContext private[streaming] ( } /** + * :: Experimental :: + * * Create an input stream that monitors a Hadoop-compatible filesystem * for new files and reads them as flat binary files, assuming a fixed length per record, * generating one byte array per record. Files must be written to the monitored directory From eba925c125341a1ed694c5f3ac3ab3914488f880 Mon Sep 17 00:00:00 2001 From: freeman Date: Tue, 3 Feb 2015 17:27:13 -0500 Subject: [PATCH 27/29] Simplify notes --- core/src/main/scala/org/apache/spark/SparkContext.scala | 5 ++--- .../scala/org/apache/spark/streaming/StreamingContext.scala | 5 ++--- .../spark/streaming/api/java/JavaStreamingContext.scala | 1 + 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 4fae9fe8dd29b..f4681ad642dcf 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -657,9 +657,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * * Load data from a flat binary file, assuming the length of each record is constant. * - * '''Note:''' Normally getBytes returns an array padded with extra values, - * but the FixedLengthBinaryInputFormat ensures that it will always be backed - * by a byte array of the correct length (the recordLength) + * '''Note:''' We ensure that each record in the resulting RDD + * has the provided record length. * * @param path Directory to the input data files * @param recordLength The length at which to split the records diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 101ab2457694e..b740cd47381e3 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -406,9 +406,8 @@ class StreamingContext private[streaming] ( * by "moving" them from another location within the same file system. File names * starting with . are ignored. * - * '''Note:''' Normally getBytes returns an array padded with extra values, - * but the FixedLengthBinaryInputFormat ensures that it will always be backed - * by a byte array of the correct length (the recordLength) + * '''Note:''' We ensure that each record in the resulting RDDs of the DStream + * has the provided record length. * * @param directory HDFS directory to monitor for new file * @param recordLength length of each record in bytes diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index b6e869dbc1497..290143bca137b 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -212,6 +212,7 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable { /** * :: Experimental :: + * * Create an input stream that monitors a Hadoop-compatible filesystem * for new files and reads them as flat binary files with fixed record lengths, * yielding byte arrays From 5ff1b75a8d5cd6e8ff77ec555a795efd15161595 Mon Sep 17 00:00:00 2001 From: freeman Date: Wed, 4 Feb 2015 00:24:02 -0500 Subject: [PATCH 28/29] Add note to java streaming context --- .../spark/streaming/api/java/JavaStreamingContext.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index 290143bca137b..0f7ae7a1c7de8 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -216,6 +216,10 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable { * Create an input stream that monitors a Hadoop-compatible filesystem * for new files and reads them as flat binary files with fixed record lengths, * yielding byte arrays + * + * '''Note:''' We ensure that the byte array for each record in the + * resulting RDDs of the DStream has the provided record length. + * * @param directory HDFS directory to monitor for new files * @param recordLength The length at which to split the records */ From b676534067a626260b6921ba17a04b6e03ff587a Mon Sep 17 00:00:00 2001 From: freeman Date: Wed, 4 Feb 2015 00:27:36 -0500 Subject: [PATCH 29/29] Clarify note --- core/src/main/scala/org/apache/spark/SparkContext.scala | 2 +- .../scala/org/apache/spark/streaming/StreamingContext.scala | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index f4681ad642dcf..27ee7700fb3fa 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -657,7 +657,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * * Load data from a flat binary file, assuming the length of each record is constant. * - * '''Note:''' We ensure that each record in the resulting RDD + * '''Note:''' We ensure that the byte array for each record in the resulting RDD * has the provided record length. * * @param path Directory to the input data files diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index b740cd47381e3..ddc435cf1a2e6 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -399,15 +399,15 @@ class StreamingContext private[streaming] ( /** * :: Experimental :: - * + * * Create an input stream that monitors a Hadoop-compatible filesystem * for new files and reads them as flat binary files, assuming a fixed length per record, * generating one byte array per record. Files must be written to the monitored directory * by "moving" them from another location within the same file system. File names * starting with . are ignored. * - * '''Note:''' We ensure that each record in the resulting RDDs of the DStream - * has the provided record length. + * '''Note:''' We ensure that the byte array for each record in the + * resulting RDDs of the DStream has the provided record length. * * @param directory HDFS directory to monitor for new file * @param recordLength length of each record in bytes