From 4a7eef4ceb32c627644d367dcfbf52cb73899e9d Mon Sep 17 00:00:00 2001 From: wangxiaojing Date: Sat, 11 Oct 2014 01:22:31 -0700 Subject: [PATCH 01/25] Support nested directories in Spark Streaming --- .../streaming/dstream/FileInputDStream.scala | 77 +++++++++++++++++-- 1 file changed, 72 insertions(+), 5 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index eca69f00188e4..683e0b2abc9fc 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -17,13 +17,13 @@ package org.apache.spark.streaming.dstream -import java.io.{IOException, ObjectInputStream} +import java.io.{FileNotFoundException, IOException, ObjectInputStream} import scala.collection.mutable import scala.reflect.ClassTag import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, Path, PathFilter} +import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter} import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} import org.apache.spark.{SparkConf, SerializableWritable} @@ -72,6 +72,7 @@ private[streaming] class FileInputDStream[K, V, F <: NewInputFormat[K,V]]( @transient ssc_ : StreamingContext, directory: String, + depth: Int = 1, filter: Path => Boolean = FileInputDStream.defaultFilter, newFilesOnly: Boolean = true, conf: Option[Configuration] = None) @@ -92,6 +93,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]]( // This is a def so that it works during checkpoint recovery: private def clock = ssc.scheduler.clock + require(depth >= 1, "nested directories depth must >= 1") // Data to be saved as part of the streaming checkpoints protected[streaming] override val checkpointData = new FileInputDStreamCheckpointData @@ -116,6 +118,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]]( // Set of files that were selected in the remembered batches @transient private var recentlySelectedFiles = new mutable.HashSet[String]() + @transient private val lastFoundDirs = new mutable.HashSet[Path]() // Read-through cache of file mod times, used to speed up mod time lookups @transient private var fileToModTime = new TimeStampedHashMap[String, Long](true) @@ -183,8 +186,68 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]]( val filter = new PathFilter { def accept(path: Path): Boolean = isNewFile(path, currentTime, modTimeIgnoreThreshold) } - val newFiles = fs.listStatus(directoryPath, filter).map(_.getPath.toString) - val timeTaken = clock.getTimeMillis() - lastNewFileFindingTime + val directoryDepth = directoryPath.depth() + + //nested directories + def dfs(status: FileStatus, currentDepth: Int): List[FileStatus] = { + val modTime = status.getModificationTime + status match { + case _ if currentDepth < 0 => Nil + case _ if !status.isDirectory => { + if (filter.accept(status.getPath)) { + status :: Nil + } else { + Nil + } + } + case _ if status.isDirectory => { + val path = status.getPath + val depthFilter = depth + directoryDepth - path.depth() + if (lastFoundDirs.contains(path) + && (status.getModificationTime > modTimeIgnoreThreshold)) { + fs.listStatus(path).toList.flatMap(dfs(_, depthFilter - 1)) + } else if (!lastFoundDirs.contains(path) && depthFilter >= 0 ) { + lastFoundDirs += path + fs.listStatus(path).toList.flatMap(dfs(_, depthFilter - 1)) + } else { + Nil + } + } + } + } + + var newFiles = List[String]() + if (lastFoundDirs.isEmpty) { + newFiles = dfs(fs.getFileStatus(directoryPath), depth).map(_.getPath.toString) + } else { + lastFoundDirs.filter { + path => + try { + /* If the modidication time of directory more than ignore time ,the directory + * is no change. + */ + val status = fs.getFileStatus(path) + if (status != null && status.getModificationTime > modTimeIgnoreThreshold) { + true + } else { + false + } + } + catch { + // If the directory do not found ,remove the drir from lastFoundDirs + case e: FileNotFoundException => { + lastFoundDirs.remove(path) + false + } + } + }.map { + path => + newFiles = fs.listStatus(path).toList.flatMap(dfs(_, + depth + directoryDepth - path.depth())).map(_.getPath.toString) + } + } + + val timeTaken = System.currentTimeMillis - lastNewFileFindingTime logInfo("Finding new files took " + timeTaken + " ms") logDebug("# cached file times = " + fileToModTime.size) if (timeTaken > slideDuration.milliseconds) { @@ -194,7 +257,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]]( "files in the monitored directory." ) } - newFiles + newFiles.toArray } catch { case e: Exception => logWarning("Error finding new files", e) @@ -223,6 +286,10 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]]( */ private def isNewFile(path: Path, currentTime: Long, modTimeIgnoreThreshold: Long): Boolean = { val pathStr = path.toString + if (path.getName().startsWith("_")) { + logDebug(s"startsWith: ${path.getName()}") + return false + } // Reject file if it does not satisfy filter if (!filter(path)) { logDebug(s"$pathStr rejected by filter") From 50ad7d40a1317da9264c7c67e52328fdec63e52c Mon Sep 17 00:00:00 2001 From: wangxiaojing Date: Sat, 11 Oct 2014 22:27:22 -0700 Subject: [PATCH 02/25] change Nit --- .../org/apache/spark/streaming/dstream/FileInputDStream.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index 683e0b2abc9fc..52ae474b714dd 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -137,7 +137,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]]( * Finds the files that were modified since the last time this method was called and makes * a union RDD out of them. Note that this maintains the list of files that were processed * in the latest modification time in the previous call to this method. This is because the - * modification time returned by the FileStatus API seems to return times only at the + * modification time ed by the FileStatus API seems to times only at the * granularity of seconds. And new files may have the same modification time as the * latest modification time in the previous call to this method yet was not reported in * the previous call. From c14def13058d96ad2fcffd8e3039dd1c248a7612 Mon Sep 17 00:00:00 2001 From: wangxiaojing Date: Thu, 16 Oct 2014 20:46:01 -0700 Subject: [PATCH 03/25] support depth --- .../spark/streaming/StreamingContext.scala | 14 +- .../streaming/dstream/FileInputDStream.scala | 65 ++++++-- .../spark/streaming/InputStreamsSuite.scala | 150 ++++++++++++++++++ 3 files changed, 210 insertions(+), 19 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 7f181bcecd4bf..4a76e82775483 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -372,8 +372,8 @@ class StreamingContext private[streaming] ( K: ClassTag, V: ClassTag, F <: NewInputFormat[K, V]: ClassTag - ] (directory: String): InputDStream[(K, V)] = { - new FileInputDStream[K, V, F](this, directory) + ] (directory: String,depth :Int =0): InputDStream[(K, V)] = { + new FileInputDStream[K, V, F](this, directory,depth) } /** @@ -392,8 +392,8 @@ class StreamingContext private[streaming] ( K: ClassTag, V: ClassTag, F <: NewInputFormat[K, V]: ClassTag - ] (directory: String, filter: Path => Boolean, newFilesOnly: Boolean): InputDStream[(K, V)] = { - new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly) + ] (directory: String,filter: Path => Boolean, newFilesOnly: Boolean, depth : Int =0): InputDStream[(K, V)] = { + new FileInputDStream[K, V, F](this, directory,depth, filter, newFilesOnly) } /** @@ -428,8 +428,10 @@ class StreamingContext private[streaming] ( * file system. File names starting with . are ignored. * @param directory HDFS directory to monitor for new file */ - def textFileStream(directory: String): DStream[String] = withNamedScope("text file stream") { - fileStream[LongWritable, Text, TextInputFormat](directory).map(_._2.toString) + def textFileStream( + directory: String, + depth: Int =0): DStream[String] = withNamedScope("text file stream") { + fileStream[LongWritable, Text, TextInputFormat](directory,depth).map(_._2.toString) } /** diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index 52ae474b714dd..7f9dfd3719beb 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -72,7 +72,7 @@ private[streaming] class FileInputDStream[K, V, F <: NewInputFormat[K,V]]( @transient ssc_ : StreamingContext, directory: String, - depth: Int = 1, + depth: Int = 0, filter: Path => Boolean = FileInputDStream.defaultFilter, newFilesOnly: Boolean = true, conf: Option[Configuration] = None) @@ -137,7 +137,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]]( * Finds the files that were modified since the last time this method was called and makes * a union RDD out of them. Note that this maintains the list of files that were processed * in the latest modification time in the previous call to this method. This is because the - * modification time ed by the FileStatus API seems to times only at the + * modification time returned by the FileStatus API seems to return times only at the * granularity of seconds. And new files may have the same modification time as the * latest modification time in the previous call to this method yet was not reported in * the previous call. @@ -166,11 +166,26 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]]( } /** +<<<<<<< HEAD +======= +<<<<<<< HEAD +<<<<<<< HEAD +>>>>>>> support depth * Find new files for the batch of `currentTime`. This is done by first calculating the * ignore threshold for file mod times, and then getting a list of files filtered based on * the current batch time and the ignore threshold. The ignore threshold is the max of * initial ignore threshold and the trailing end of the remember window (that is, which ever * is later in time). +<<<<<<< HEAD +======= +======= + * Find files which have modification timestamp <= current time and a 3-tuple of +======= + * Find files which have modification timestamp <= current time and return a 3-tuple of +>>>>>>> support depth + * (new files found, latest modification time among them, files with latest modification time) +>>>>>>> change Nit +>>>>>>> support depth */ private def findNewFiles(currentTime: Long): Array[String] = { try { @@ -266,6 +281,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]]( } } +<<<<<<< HEAD /** * Identify whether the given `path` is a new file for the batch of `currentTime`. For it to be * accepted, it has to pass the following criteria. @@ -314,6 +330,29 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]]( } logDebug(s"$pathStr accepted with mod time $modTime") return true +======= + def getPathList(path:Path, fs:FileSystem):List[Path]={ + var pathList = List[Path]() + pathList = path:: pathList + var tmp =List[Path]() + tmp=path::tmp + for(i <- 0 until depth){ + tmp =getSubPathList(tmp,fs) + pathList=tmp:::pathList + } + pathList + } + + def getSubPathList(path:List[Path],fs:FileSystem):List[Path]={ + val filter = new SubPathFilter() + var pathList = List[Path]() + path.map(subPath=>{ + fs.listStatus(subPath,filter).map(x=>{ + pathList = x.getPath()::pathList + }) + }) + pathList +>>>>>>> support depth } /** Generate one RDD from an array of files */ @@ -404,17 +443,17 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]]( } } -private[streaming] -object FileInputDStream { + private[streaming] + object FileInputDStream { - def defaultFilter(path: Path): Boolean = !path.getName().startsWith(".") + def defaultFilter(path: Path): Boolean = !path.getName().startsWith(".") - /** - * Calculate the number of last batches to remember, such that all the files selected in - * at least last minRememberDurationS duration can be remembered. - */ - def calculateNumBatchesToRemember(batchDuration: Duration, - minRememberDurationS: Duration): Int = { - math.ceil(minRememberDurationS.milliseconds.toDouble / batchDuration.milliseconds).toInt + /** + * Calculate the number of last batches to remember, such that all the files selected in + * at least last minRememberDurationS duration can be remembered. + */ + def calculateNumBatchesToRemember(batchDuration: Duration, + minRememberDurationS: Duration): Int = { + math.ceil(minRememberDurationS.milliseconds.toDouble / batchDuration.milliseconds).toInt + } } -} diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala index 93e6b0cd7c661..034d910b3b250 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala @@ -157,14 +157,164 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { } } +<<<<<<< HEAD test("file input stream - newFilesOnly = true") { testFileStream(newFilesOnly = true) } +======= + test("file input stream -depth = 0 ") { + // Disable manual clock as FileInputDStream does not work with manual clock + conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock") + + // Set up the streaming context and input streams + val testDir = Utils.createTempDir() + val ssc = new StreamingContext(conf, batchDuration) + val fileStream = ssc.textFileStream(testDir.toString) + val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]] + def output = outputBuffer.flatMap(x => x) + val outputStream = new TestOutputStream(fileStream, outputBuffer) + outputStream.register() + ssc.start() + + // Create files in the temporary directory so that Spark Streaming can read data from it + val input = Seq(1, 2, 3, 4, 5) + val expectedOutput = input.map(_.toString) + Thread.sleep(1000) + for (i <- 0 until input.size) { + val file = new File(testDir, i.toString) + Files.write(input(i) + "\n", file, Charset.forName("UTF-8")) + logInfo("Created file " + file) + Thread.sleep(batchDuration.milliseconds) + Thread.sleep(1000) + } + val startTime = System.currentTimeMillis() + Thread.sleep(1000) + val timeTaken = System.currentTimeMillis() - startTime + assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms") + logInfo("Stopping context") + ssc.stop() + + // Verify whether data received by Spark Streaming was as expected + logInfo("--------------------------------") + logInfo("output, size = " + outputBuffer.size) + outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]")) + logInfo("expected output, size = " + expectedOutput.size) + expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]")) + logInfo("--------------------------------") + + // Verify whether all the elements received are as expected + // (whether the elements were received one in each interval is not verified) + assert(output.toList === expectedOutput.toList) + + Utils.deleteRecursively(testDir) +>>>>>>> support depth test("file input stream - newFilesOnly = false") { testFileStream(newFilesOnly = false) } + test("file input stream -depth = 1") { + // Disable manual clock as FileInputDStream does not work with manual clock + conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock") + + // Set up the streaming context and input streams + val testDir = Utils.createTempDir() + val subDir = Utils.createTempDir(testDir.toString) + val ssc = new StreamingContext(conf, batchDuration) + val fileStream = ssc.textFileStream(testDir.toString,1) + val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]] + def output = outputBuffer.flatMap(x => x) + val outputStream = new TestOutputStream(fileStream, outputBuffer) + outputStream.register() + ssc.start() + + // Create files in the temporary directory so that Spark Streaming can read data from it + val input = Seq(1, 2, 3, 4, 5) + val expectedOutput = input.map(_.toString) + Thread.sleep(1000) + for (i <- 0 until input.size) { + val file = new File(subDir, i.toString) + Files.write(input(i) + "\n", file, Charset.forName("UTF-8")) + logInfo("Created file " + file) + Thread.sleep(batchDuration.milliseconds) + Thread.sleep(1000) + } + val startTime = System.currentTimeMillis() + Thread.sleep(1000) + val timeTaken = System.currentTimeMillis() - startTime + assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms") + logInfo("Stopping context") + ssc.stop() + + // Verify whether data received by Spark Streaming was as expected + logInfo("--------------------------------") + logInfo("output, size = " + outputBuffer.size) + outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]")) + logInfo("expected output, size = " + expectedOutput.size) + expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]")) + logInfo("--------------------------------") + + // Verify whether all the elements received are as expected + // (whether the elements were received one in each interval is not verified) + assert(output.toList === expectedOutput.toList) + + Utils.deleteRecursively(testDir) + + // Enable manual clock back again for other tests + conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock") + } + + test("file input stream -depth = 2") { + // Disable manual clock as FileInputDStream does not work with manual clock + conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock") + + // Set up the streaming context and input streams + val testDir = Utils.createTempDir() + val subDir = Utils.createTempDir(testDir.toString) + val triDir = Utils.createTempDir(subDir.toString) + val ssc = new StreamingContext(conf, batchDuration) + val fileStream = ssc.textFileStream(testDir.toString,2) + val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]] + def output = outputBuffer.flatMap(x => x) + val outputStream = new TestOutputStream(fileStream, outputBuffer) + outputStream.register() + ssc.start() + + // Create files in the temporary directory so that Spark Streaming can read data from it + val input = Seq(1, 2, 3, 4, 5) + val expectedOutput = input.map(_.toString) + Thread.sleep(1000) + for (i <- 0 until input.size) { + val file = new File(triDir, i.toString) + Files.write(input(i) + "\n", file, Charset.forName("UTF-8")) + Thread.sleep(batchDuration.milliseconds) + Thread.sleep(1000) + } + val startTime = System.currentTimeMillis() + Thread.sleep(1000) + val timeTaken = System.currentTimeMillis() - startTime + assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms") + logInfo("Stopping context") + ssc.stop() + + // Verify whether data received by Spark Streaming was as expected + logInfo("--------------------------------") + logInfo("output, size = " + outputBuffer.size) + outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]")) + logInfo("expected output, size = " + expectedOutput.size) + expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]")) + logInfo("--------------------------------") + + // Verify whether all the elements received are as expected + // (whether the elements were received one in each interval is not verified) + assert(output.toList === expectedOutput.toList) + + Utils.deleteRecursively(testDir) + + // Enable manual clock back again for other tests + conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock") + } + test("multi-thread receiver") { // set up the test receiver val numThreads = 10 From bfbec510d8963a2382ec29abc20b2b6208edce12 Mon Sep 17 00:00:00 2001 From: wangxiaojing Date: Thu, 16 Oct 2014 23:22:12 -0700 Subject: [PATCH 04/25] Change space before brace --- .../streaming/dstream/FileInputDStream.scala | 40 ------------------- 1 file changed, 40 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index 7f9dfd3719beb..0befbe989acfd 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -166,26 +166,11 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]]( } /** -<<<<<<< HEAD -======= -<<<<<<< HEAD -<<<<<<< HEAD ->>>>>>> support depth * Find new files for the batch of `currentTime`. This is done by first calculating the * ignore threshold for file mod times, and then getting a list of files filtered based on * the current batch time and the ignore threshold. The ignore threshold is the max of * initial ignore threshold and the trailing end of the remember window (that is, which ever * is later in time). -<<<<<<< HEAD -======= -======= - * Find files which have modification timestamp <= current time and a 3-tuple of -======= - * Find files which have modification timestamp <= current time and return a 3-tuple of ->>>>>>> support depth - * (new files found, latest modification time among them, files with latest modification time) ->>>>>>> change Nit ->>>>>>> support depth */ private def findNewFiles(currentTime: Long): Array[String] = { try { @@ -280,8 +265,6 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]]( Array.empty } } - -<<<<<<< HEAD /** * Identify whether the given `path` is a new file for the batch of `currentTime`. For it to be * accepted, it has to pass the following criteria. @@ -330,29 +313,6 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]]( } logDebug(s"$pathStr accepted with mod time $modTime") return true -======= - def getPathList(path:Path, fs:FileSystem):List[Path]={ - var pathList = List[Path]() - pathList = path:: pathList - var tmp =List[Path]() - tmp=path::tmp - for(i <- 0 until depth){ - tmp =getSubPathList(tmp,fs) - pathList=tmp:::pathList - } - pathList - } - - def getSubPathList(path:List[Path],fs:FileSystem):List[Path]={ - val filter = new SubPathFilter() - var pathList = List[Path]() - path.map(subPath=>{ - fs.listStatus(subPath,filter).map(x=>{ - pathList = x.getPath()::pathList - }) - }) - pathList ->>>>>>> support depth } /** Generate one RDD from an array of files */ From 0a8ecf8079560b5f581dda983e35a186db483f3a Mon Sep 17 00:00:00 2001 From: wangxiaojing Date: Fri, 17 Oct 2014 00:24:38 -0700 Subject: [PATCH 05/25] change process any files created in nested directories --- .../org/apache/spark/streaming/dstream/FileInputDStream.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index 0befbe989acfd..a6bbf6353be72 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -417,3 +417,5 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]]( math.ceil(minRememberDurationS.milliseconds.toDouble / batchDuration.milliseconds).toInt } } +} + From 1ce623d37a7c23149d630687a9c015aa15152ff9 Mon Sep 17 00:00:00 2001 From: wangxiaojing Date: Fri, 24 Oct 2014 00:12:17 -0700 Subject: [PATCH 06/25] reformat code --- .../spark/streaming/StreamingContext.scala | 16 ++++++++-------- .../streaming/dstream/FileInputDStream.scala | 3 +-- .../spark/streaming/InputStreamsSuite.scala | 8 ++++++-- 3 files changed, 15 insertions(+), 12 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 4a76e82775483..12492aaf5e7d3 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -369,11 +369,11 @@ class StreamingContext private[streaming] ( * @tparam F Input format for reading HDFS file */ def fileStream[ - K: ClassTag, - V: ClassTag, - F <: NewInputFormat[K, V]: ClassTag - ] (directory: String,depth :Int =0): InputDStream[(K, V)] = { - new FileInputDStream[K, V, F](this, directory,depth) + K: ClassTag, + V: ClassTag, + F <: NewInputFormat[K, V] : ClassTag + ](directory: String, depth: Int = 0): InputDStream[(K, V)] = { + new FileInputDStream[K, V, F](this, directory, depth) } /** @@ -392,8 +392,8 @@ class StreamingContext private[streaming] ( K: ClassTag, V: ClassTag, F <: NewInputFormat[K, V]: ClassTag - ] (directory: String,filter: Path => Boolean, newFilesOnly: Boolean, depth : Int =0): InputDStream[(K, V)] = { - new FileInputDStream[K, V, F](this, directory,depth, filter, newFilesOnly) + ](directory: String, filter: Path => Boolean, newFilesOnly: Boolean, depth: Int = 0): InputDStream[(K, V)] = { + new FileInputDStream[K, V, F](this, directory, depth, filter, newFilesOnly) } /** @@ -431,7 +431,7 @@ class StreamingContext private[streaming] ( def textFileStream( directory: String, depth: Int =0): DStream[String] = withNamedScope("text file stream") { - fileStream[LongWritable, Text, TextInputFormat](directory,depth).map(_._2.toString) + fileStream[LongWritable, Text, TextInputFormat](directory, depth).map(_._2.toString) } /** diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index a6bbf6353be72..cdb72b30cca40 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -72,7 +72,7 @@ private[streaming] class FileInputDStream[K, V, F <: NewInputFormat[K,V]]( @transient ssc_ : StreamingContext, directory: String, - depth: Int = 0, + depth: Int = 1, filter: Path => Boolean = FileInputDStream.defaultFilter, newFilesOnly: Boolean = true, conf: Option[Configuration] = None) @@ -417,5 +417,4 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]]( math.ceil(minRememberDurationS.milliseconds.toDouble / batchDuration.milliseconds).toInt } } -} diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala index 034d910b3b250..15eec8bd9b6f1 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala @@ -157,12 +157,16 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { } } +<<<<<<< HEAD <<<<<<< HEAD test("file input stream - newFilesOnly = true") { testFileStream(newFilesOnly = true) } ======= test("file input stream -depth = 0 ") { +======= + test("file input stream - depth = 0") { +>>>>>>> reformat code // Disable manual clock as FileInputDStream does not work with manual clock conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock") @@ -213,7 +217,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { testFileStream(newFilesOnly = false) } - test("file input stream -depth = 1") { + test("file input stream - depth = 1") { // Disable manual clock as FileInputDStream does not work with manual clock conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock") @@ -264,7 +268,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock") } - test("file input stream -depth = 2") { + test("file input stream - depth = 2") { // Disable manual clock as FileInputDStream does not work with manual clock conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock") From fe6e5ca2a61bc473cedcf1e0e982c05dce441d20 Mon Sep 17 00:00:00 2001 From: wangxiaojing Date: Fri, 24 Oct 2014 00:54:09 -0700 Subject: [PATCH 07/25] add a require(depth >= 0) --- .../scala/org/apache/spark/streaming/StreamingContext.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 12492aaf5e7d3..33ed6f2913876 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -369,9 +369,9 @@ class StreamingContext private[streaming] ( * @tparam F Input format for reading HDFS file */ def fileStream[ - K: ClassTag, - V: ClassTag, - F <: NewInputFormat[K, V] : ClassTag + K: ClassTag, + V: ClassTag, + F <: NewInputFormat[K, V]: ClassTag ](directory: String, depth: Int = 0): InputDStream[(K, V)] = { new FileInputDStream[K, V, F](this, directory, depth) } From 70319401d068b9b4c58cfca957e4612941be7300 Mon Sep 17 00:00:00 2001 From: wangxiaojing Date: Fri, 24 Oct 2014 01:54:03 -0700 Subject: [PATCH 08/25] reformat code --- .../scala/org/apache/spark/streaming/InputStreamsSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala index 15eec8bd9b6f1..d5cdeb5cacef0 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala @@ -225,7 +225,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { val testDir = Utils.createTempDir() val subDir = Utils.createTempDir(testDir.toString) val ssc = new StreamingContext(conf, batchDuration) - val fileStream = ssc.textFileStream(testDir.toString,1) + val fileStream = ssc.textFileStream(testDir.toString, 1) val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]] def output = outputBuffer.flatMap(x => x) val outputStream = new TestOutputStream(fileStream, outputBuffer) @@ -277,7 +277,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { val subDir = Utils.createTempDir(testDir.toString) val triDir = Utils.createTempDir(subDir.toString) val ssc = new StreamingContext(conf, batchDuration) - val fileStream = ssc.textFileStream(testDir.toString,2) + val fileStream = ssc.textFileStream(testDir.toString, 2) val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]] def output = outputBuffer.flatMap(x => x) val outputStream = new TestOutputStream(fileStream, outputBuffer) From 7bd4811358de035b75c61ae50f5cffbde8fa31a0 Mon Sep 17 00:00:00 2001 From: wangxiaojing Date: Mon, 27 Oct 2014 19:52:01 -0700 Subject: [PATCH 09/25] change performance --- .../spark/streaming/StreamingContext.scala | 4 +- .../spark/streaming/InputStreamsSuite.scala | 177 +++--------------- 2 files changed, 25 insertions(+), 156 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 33ed6f2913876..7ae3af5963215 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -372,7 +372,7 @@ class StreamingContext private[streaming] ( K: ClassTag, V: ClassTag, F <: NewInputFormat[K, V]: ClassTag - ](directory: String, depth: Int = 0): InputDStream[(K, V)] = { + ](directory: String, depth: Int = 1): InputDStream[(K, V)] = { new FileInputDStream[K, V, F](this, directory, depth) } @@ -392,7 +392,7 @@ class StreamingContext private[streaming] ( K: ClassTag, V: ClassTag, F <: NewInputFormat[K, V]: ClassTag - ](directory: String, filter: Path => Boolean, newFilesOnly: Boolean, depth: Int = 0): InputDStream[(K, V)] = { + ](directory: String, filter: Path => Boolean, newFilesOnly: Boolean, depth: Int = 1): InputDStream[(K, V)] = { new FileInputDStream[K, V, F](this, directory, depth, filter, newFilesOnly) } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala index d5cdeb5cacef0..cbd2a3aaf1b6a 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala @@ -157,168 +157,31 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { } } -<<<<<<< HEAD -<<<<<<< HEAD - test("file input stream - newFilesOnly = true") { + test("file input stream - newFilesOnly = true and depth = 1") { testFileStream(newFilesOnly = true) } -======= - test("file input stream -depth = 0 ") { -======= - test("file input stream - depth = 0") { ->>>>>>> reformat code - // Disable manual clock as FileInputDStream does not work with manual clock - conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock") - // Set up the streaming context and input streams - val testDir = Utils.createTempDir() - val ssc = new StreamingContext(conf, batchDuration) - val fileStream = ssc.textFileStream(testDir.toString) - val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]] - def output = outputBuffer.flatMap(x => x) - val outputStream = new TestOutputStream(fileStream, outputBuffer) - outputStream.register() - ssc.start() - - // Create files in the temporary directory so that Spark Streaming can read data from it - val input = Seq(1, 2, 3, 4, 5) - val expectedOutput = input.map(_.toString) - Thread.sleep(1000) - for (i <- 0 until input.size) { - val file = new File(testDir, i.toString) - Files.write(input(i) + "\n", file, Charset.forName("UTF-8")) - logInfo("Created file " + file) - Thread.sleep(batchDuration.milliseconds) - Thread.sleep(1000) - } - val startTime = System.currentTimeMillis() - Thread.sleep(1000) - val timeTaken = System.currentTimeMillis() - startTime - assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms") - logInfo("Stopping context") - ssc.stop() - - // Verify whether data received by Spark Streaming was as expected - logInfo("--------------------------------") - logInfo("output, size = " + outputBuffer.size) - outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]")) - logInfo("expected output, size = " + expectedOutput.size) - expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]")) - logInfo("--------------------------------") - - // Verify whether all the elements received are as expected - // (whether the elements were received one in each interval is not verified) - assert(output.toList === expectedOutput.toList) - - Utils.deleteRecursively(testDir) ->>>>>>> support depth - - test("file input stream - newFilesOnly = false") { + test("file input stream - newFilesOnly = false and depth = 1") { testFileStream(newFilesOnly = false) } - test("file input stream - depth = 1") { - // Disable manual clock as FileInputDStream does not work with manual clock - conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock") - - // Set up the streaming context and input streams - val testDir = Utils.createTempDir() - val subDir = Utils.createTempDir(testDir.toString) - val ssc = new StreamingContext(conf, batchDuration) - val fileStream = ssc.textFileStream(testDir.toString, 1) - val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]] - def output = outputBuffer.flatMap(x => x) - val outputStream = new TestOutputStream(fileStream, outputBuffer) - outputStream.register() - ssc.start() - - // Create files in the temporary directory so that Spark Streaming can read data from it - val input = Seq(1, 2, 3, 4, 5) - val expectedOutput = input.map(_.toString) - Thread.sleep(1000) - for (i <- 0 until input.size) { - val file = new File(subDir, i.toString) - Files.write(input(i) + "\n", file, Charset.forName("UTF-8")) - logInfo("Created file " + file) - Thread.sleep(batchDuration.milliseconds) - Thread.sleep(1000) - } - val startTime = System.currentTimeMillis() - Thread.sleep(1000) - val timeTaken = System.currentTimeMillis() - startTime - assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms") - logInfo("Stopping context") - ssc.stop() - - // Verify whether data received by Spark Streaming was as expected - logInfo("--------------------------------") - logInfo("output, size = " + outputBuffer.size) - outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]")) - logInfo("expected output, size = " + expectedOutput.size) - expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]")) - logInfo("--------------------------------") - - // Verify whether all the elements received are as expected - // (whether the elements were received one in each interval is not verified) - assert(output.toList === expectedOutput.toList) - - Utils.deleteRecursively(testDir) - - // Enable manual clock back again for other tests - conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock") + test("file input stream - newFilesOnly = true and depth = 2") { + testFileStream(newFilesOnly = true, 2) } - test("file input stream - depth = 2") { - // Disable manual clock as FileInputDStream does not work with manual clock - conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock") - - // Set up the streaming context and input streams - val testDir = Utils.createTempDir() - val subDir = Utils.createTempDir(testDir.toString) - val triDir = Utils.createTempDir(subDir.toString) - val ssc = new StreamingContext(conf, batchDuration) - val fileStream = ssc.textFileStream(testDir.toString, 2) - val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]] - def output = outputBuffer.flatMap(x => x) - val outputStream = new TestOutputStream(fileStream, outputBuffer) - outputStream.register() - ssc.start() - - // Create files in the temporary directory so that Spark Streaming can read data from it - val input = Seq(1, 2, 3, 4, 5) - val expectedOutput = input.map(_.toString) - Thread.sleep(1000) - for (i <- 0 until input.size) { - val file = new File(triDir, i.toString) - Files.write(input(i) + "\n", file, Charset.forName("UTF-8")) - Thread.sleep(batchDuration.milliseconds) - Thread.sleep(1000) - } - val startTime = System.currentTimeMillis() - Thread.sleep(1000) - val timeTaken = System.currentTimeMillis() - startTime - assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms") - logInfo("Stopping context") - ssc.stop() - - // Verify whether data received by Spark Streaming was as expected - logInfo("--------------------------------") - logInfo("output, size = " + outputBuffer.size) - outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]")) - logInfo("expected output, size = " + expectedOutput.size) - expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]")) - logInfo("--------------------------------") - - // Verify whether all the elements received are as expected - // (whether the elements were received one in each interval is not verified) - assert(output.toList === expectedOutput.toList) + test("file input stream - newFilesOnly = false and depth = 2") { + testFileStream(newFilesOnly = false, 2) + } - Utils.deleteRecursively(testDir) + test("file input stream - newFilesOnly = true and depth = 3") { + testFileStream(newFilesOnly = true, 3) + } - // Enable manual clock back again for other tests - conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock") + test("file input stream - newFilesOnly = false and depth = 3") { + testFileStream(newFilesOnly = false, 3) } + test("multi-thread receiver") { // set up the test receiver val numThreads = 10 @@ -447,6 +310,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { } } + test("test track the number of input stream") { val ssc = new StreamingContext(conf, batchDuration) @@ -471,12 +335,16 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { assert(receiverInputStreams.map(_.id) === Array(0, 1)) } - def testFileStream(newFilesOnly: Boolean) { + + def testFileStream(newFilesOnly: Boolean, depth :Int = 1) { val testDir: File = null try { val batchDuration = Seconds(2) - val testDir = Utils.createTempDir() - // Create a file that exists before the StreamingContext is created: + var testDir = Utils.createTempDir() + for (i <- 2 until depth) { + testDir = Utils.createTempDir(testDir.toString) + } + val existingFile = new File(testDir, "0") Files.write("0\n", existingFile, Charset.forName("UTF-8")) assert(existingFile.setLastModified(10000) && existingFile.lastModified === 10000) @@ -488,7 +356,8 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { clock.setTime(existingFile.lastModified + batchDuration.milliseconds) val batchCounter = new BatchCounter(ssc) val fileStream = ssc.fileStream[LongWritable, Text, TextInputFormat]( - testDir.toString, (x: Path) => true, newFilesOnly = newFilesOnly).map(_._2.toString) + testDir.toString, (x: Path) => true, + newFilesOnly = newFilesOnly, depth).map(_._2.toString) val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]] val outputStream = new TestOutputStream(fileStream, outputBuffer) outputStream.register() From 05b5fba4a8944c6942f01f36f3381c46d2d0a7f2 Mon Sep 17 00:00:00 2001 From: wangxiaojing Date: Tue, 28 Oct 2014 01:48:37 -0700 Subject: [PATCH 10/25] change filter name --- .../spark/streaming/InputStreamsSuite.scala | 20 ++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala index cbd2a3aaf1b6a..1418d11200361 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala @@ -383,20 +383,22 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { } } - // Verify that all the files have been read - val expectedOutput = if (newFilesOnly) { - input.map(_.toString).toSet - } else { - (Seq(0) ++ input).map(_.toString).toSet - } - assert(outputBuffer.flatten.toSet === expectedOutput) + // Verify that all the files have been read + val expectedOutput = if (newFilesOnly) { + input.map(_.toString).toSet + } else { + (Seq(0) ++ input).map(_.toString).toSet } - } finally { - if (testDir != null) Utils.deleteRecursively(testDir) + assert(outputBuffer.flatten.toSet === expectedOutput) } + }finally + { + if (testDir != null) Utils.deleteRecursively(testDir) } } +} + /** This is a server to test the network input stream */ class TestServer(portToBind: Int = 0) extends Logging { From e66b16650d4f7fedea0a96b881b988f5564fc7bb Mon Sep 17 00:00:00 2001 From: wangxiaojing Date: Mon, 3 Nov 2014 01:55:46 -0800 Subject: [PATCH 11/25] change line exceeds 100 columns --- .../org/apache/spark/streaming/StreamingContext.scala | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 7ae3af5963215..55f5b39a5628a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -392,7 +392,12 @@ class StreamingContext private[streaming] ( K: ClassTag, V: ClassTag, F <: NewInputFormat[K, V]: ClassTag - ](directory: String, filter: Path => Boolean, newFilesOnly: Boolean, depth: Int = 1): InputDStream[(K, V)] = { + ]( + directory: String, + filter: Path => Boolean, + newFilesOnly: Boolean, + depth: Int = 1 + ): InputDStream[(K, V)] = { new FileInputDStream[K, V, F](this, directory, depth, filter, newFilesOnly) } From a63c5a3fa63f68a8179c9f61ad6e75886597c8c8 Mon Sep 17 00:00:00 2001 From: wangxiaojing Date: Sun, 9 Nov 2014 21:47:09 -0800 Subject: [PATCH 12/25] line over 100 --- .../spark/streaming/dstream/FileInputDStream.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index cdb72b30cca40..a173405242121 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -240,11 +240,11 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]]( false } } - }.map { - path => - newFiles = fs.listStatus(path).toList.flatMap(dfs(_, - depth + directoryDepth - path.depth())).map(_.getPath.toString) - } + } + }.map { + path => + newFiles = fs.listStatus(path).toList.flatMap(dfs(_, + depth + directoryDepth - path.depth())).map(_.getPath.toString) } val timeTaken = System.currentTimeMillis - lastNewFileFindingTime From 0b4812eef0ab065b7ef5a6b2e2a269baa6bfe542 Mon Sep 17 00:00:00 2001 From: wangxiaojing Date: Thu, 4 Dec 2014 01:27:12 -0800 Subject: [PATCH 13/25] remove line --- .../org/apache/spark/streaming/dstream/FileInputDStream.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index a173405242121..caf8c2a75b5b5 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -407,7 +407,6 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]]( object FileInputDStream { def defaultFilter(path: Path): Boolean = !path.getName().startsWith(".") - /** * Calculate the number of last batches to remember, such that all the files selected in * at least last minRememberDurationS duration can be remembered. From 8990c3576ae91bc822dece2d1ef8a63a0465d13e Mon Sep 17 00:00:00 2001 From: wangxiaojing Date: Thu, 4 Dec 2014 02:21:19 -0800 Subject: [PATCH 14/25] style --- .../org/apache/spark/streaming/dstream/FileInputDStream.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index caf8c2a75b5b5..43cf6a1915eb8 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -188,7 +188,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]]( } val directoryDepth = directoryPath.depth() - //nested directories + // nested directories def dfs(status: FileStatus, currentDepth: Int): List[FileStatus] = { val modTime = status.getModificationTime status match { From a20743f5946310f1babcd0815be0363ae4bd470a Mon Sep 17 00:00:00 2001 From: wangxiaojing Date: Tue, 16 Dec 2014 20:49:36 -0800 Subject: [PATCH 15/25] change get depth --- .../apache/spark/streaming/dstream/FileInputDStream.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index 43cf6a1915eb8..258d6f680ef0c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -118,7 +118,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]]( // Set of files that were selected in the remembered batches @transient private var recentlySelectedFiles = new mutable.HashSet[String]() - @transient private val lastFoundDirs = new mutable.HashSet[Path]() + @transient private var lastFoundDirs = new mutable.HashSet[Path]() // Read-through cache of file mod times, used to speed up mod time lookups @transient private var fileToModTime = new TimeStampedHashMap[String, Long](true) @@ -186,11 +186,10 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]]( val filter = new PathFilter { def accept(path: Path): Boolean = isNewFile(path, currentTime, modTimeIgnoreThreshold) } - val directoryDepth = directoryPath.depth() + val directoryDepth = fs.getFileStatus(directoryPath).getPath.depth() // nested directories def dfs(status: FileStatus, currentDepth: Int): List[FileStatus] = { - val modTime = status.getModificationTime status match { case _ if currentDepth < 0 => Nil case _ if !status.isDirectory => { @@ -365,6 +364,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]]( new mutable.HashMap[Time, Array[String]] with mutable.SynchronizedMap[Time, Array[String]] recentlySelectedFiles = new mutable.HashSet[String]() fileToModTime = new TimeStampedHashMap[String, Long](true) + lastFoundDirs = new mutable.HashSet[Path]() } /** From d7f4880903f4b270bd1be2a5261fdc0b2403234b Mon Sep 17 00:00:00 2001 From: wangxiaojing Date: Tue, 23 Dec 2014 22:07:43 -0800 Subject: [PATCH 16/25] Use 'isDir' to modify the compatibility --- .../org/apache/spark/streaming/dstream/FileInputDStream.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index 258d6f680ef0c..41a3b5dca79d0 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -192,14 +192,14 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]]( def dfs(status: FileStatus, currentDepth: Int): List[FileStatus] = { status match { case _ if currentDepth < 0 => Nil - case _ if !status.isDirectory => { + case _ if !status.isDir => { if (filter.accept(status.getPath)) { status :: Nil } else { Nil } } - case _ if status.isDirectory => { + case _ if status.isDir => { val path = status.getPath val depthFilter = depth + directoryDepth - path.depth() if (lastFoundDirs.contains(path) From 99b05d6c6851b9f7184139d5d8ed40b27c861971 Mon Sep 17 00:00:00 2001 From: wangxiaojing Date: Wed, 7 Jan 2015 01:36:30 -0800 Subject: [PATCH 17/25] rebase --- docs/streaming-programming-guide.md | 9 +- .../spark/streaming/StreamingContext.scala | 7 +- .../streaming/dstream/FileInputDStream.scala | 97 ++++++++----------- .../spark/streaming/InputStreamsSuite.scala | 23 ++--- 4 files changed, 62 insertions(+), 74 deletions(-) diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md index bd863d48d53e3..fd5f925f12713 100644 --- a/docs/streaming-programming-guide.md +++ b/docs/streaming-programming-guide.md @@ -643,17 +643,17 @@ methods for creating DStreams from files and Akka actors as input sources.
- streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory) + streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory, depth)
- streamingContext.fileStream(dataDirectory); + streamingContext.fileStream(dataDirectory, depth);
- streamingContext.textFileStream(dataDirectory) + streamingContext.textFileStream(dataDirectory, depth)
- Spark Streaming will monitor the directory `dataDirectory` and process any files created in that directory (files written in nested directories not supported). Note that + Spark Streaming will monitor the directory `dataDirectory`, the `depth` is default 1 and process any files created in that directory. If supported files written in nested directories, set the `depth` is greater than 1. Note that + The files must have the same data format. + The files must be created in the `dataDirectory` by atomically *moving* or *renaming* them into @@ -661,6 +661,7 @@ methods for creating DStreams from files and Akka actors as input sources. + Once moved, the files must not be changed. So if the files are being continuously appended, the new data will not be read. For simple text files, there is an easier method `streamingContext.textFileStream(dataDirectory)`. And file streams do not require running a receiver, hence does not require allocating cores. + If Spark Streaming monitor the directory in nested directories, there is an easier method `streamingContext.textFileStream(dataDirectory, depth)`. Python API `fileStream` is not available in the Python API, only `textFileStream` is available. diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 55f5b39a5628a..3d33125bb49b7 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -364,6 +364,7 @@ class StreamingContext private[streaming] ( * Files must be written to the monitored directory by "moving" them from another * location within the same file system. File names starting with . are ignored. * @param directory HDFS directory to monitor for new file + * @param depth Searching depth of directory * @tparam K Key type for reading HDFS file * @tparam V Value type for reading HDFS file * @tparam F Input format for reading HDFS file @@ -372,7 +373,7 @@ class StreamingContext private[streaming] ( K: ClassTag, V: ClassTag, F <: NewInputFormat[K, V]: ClassTag - ](directory: String, depth: Int = 1): InputDStream[(K, V)] = { + ] (directory: String, depth: Int = 1): InputDStream[(K, V)] = { new FileInputDStream[K, V, F](this, directory, depth) } @@ -384,6 +385,7 @@ class StreamingContext private[streaming] ( * @param directory HDFS directory to monitor for new file * @param filter Function to filter paths to process * @param newFilesOnly Should process only new files and ignore existing files in the directory + * @param depth Searching depth of directory * @tparam K Key type for reading HDFS file * @tparam V Value type for reading HDFS file * @tparam F Input format for reading HDFS file @@ -392,7 +394,7 @@ class StreamingContext private[streaming] ( K: ClassTag, V: ClassTag, F <: NewInputFormat[K, V]: ClassTag - ]( + ] ( directory: String, filter: Path => Boolean, newFilesOnly: Boolean, @@ -432,6 +434,7 @@ class StreamingContext private[streaming] ( * monitored directory by "moving" them from another location within the same * file system. File names starting with . are ignored. * @param directory HDFS directory to monitor for new file + * @param depth Searching depth of directory */ def textFileStream( directory: String, diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index 41a3b5dca79d0..56148a6571a09 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -35,8 +35,10 @@ import org.apache.spark.util.{TimeStampedHashMap, Utils} * This class represents an input stream that monitors a Hadoop-compatible filesystem for new * files and creates a stream out of them. The way it works as follows. * - * At each batch interval, the file system is queried for files in the given directory and - * detected new files are selected for that batch. In this case "new" means files that + * At each batch interval, Use `depth` to find files in the directory recursively, + * the file system is queried for files in the given directory and detected new + * files are selected for that batch. If the `depth` is greater than 1, + * it is queried for files in the depth of the recursion, In this case "new" means files that * became visible to readers during that time period. Some extra care is needed to deal * with the fact that files may become visible after they are created. For this purpose, this * class remembers the information about the files selected in past batches for @@ -118,6 +120,8 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]]( // Set of files that were selected in the remembered batches @transient private var recentlySelectedFiles = new mutable.HashSet[String]() + + // Set of directories that were found from the beginning to the present @transient private var lastFoundDirs = new mutable.HashSet[Path]() // Read-through cache of file mod times, used to speed up mod time lookups @@ -166,7 +170,8 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]]( } /** - * Find new files for the batch of `currentTime`. This is done by first calculating the + * Find new files for the batch of `currentTime` in nested directories. + * This is done by first calculating the * ignore threshold for file mod times, and then getting a list of files filtered based on * the current batch time and the ignore threshold. The ignore threshold is the max of * initial ignore threshold and the trailing end of the remember window (that is, which ever @@ -188,64 +193,45 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]]( } val directoryDepth = fs.getFileStatus(directoryPath).getPath.depth() - // nested directories - def dfs(status: FileStatus, currentDepth: Int): List[FileStatus] = { - status match { - case _ if currentDepth < 0 => Nil - case _ if !status.isDir => { - if (filter.accept(status.getPath)) { - status :: Nil + // Nested directories to find new files. + def dfs(status: FileStatus): List[FileStatus] = { + val path = status.getPath + val depthFilter = depth + directoryDepth - path.depth() + if (status.isDir) { + if (depthFilter - 1 >= 0) { + if (lastFoundDirs.contains(path)) { + if (status.getModificationTime > modTimeIgnoreThreshold) { + fs.listStatus(path).toList.flatMap(dfs(_)) + } else Nil } else { - Nil - } - } - case _ if status.isDir => { - val path = status.getPath - val depthFilter = depth + directoryDepth - path.depth() - if (lastFoundDirs.contains(path) - && (status.getModificationTime > modTimeIgnoreThreshold)) { - fs.listStatus(path).toList.flatMap(dfs(_, depthFilter - 1)) - } else if (!lastFoundDirs.contains(path) && depthFilter >= 0 ) { lastFoundDirs += path - fs.listStatus(path).toList.flatMap(dfs(_, depthFilter - 1)) - } else { - Nil + fs.listStatus(path).toList.flatMap(dfs(_)) } - } + } else Nil + } else { + if (filter.accept(path)) status :: Nil else Nil } } - var newFiles = List[String]() - if (lastFoundDirs.isEmpty) { - newFiles = dfs(fs.getFileStatus(directoryPath), depth).map(_.getPath.toString) - } else { - lastFoundDirs.filter { - path => - try { - /* If the modidication time of directory more than ignore time ,the directory - * is no change. - */ - val status = fs.getFileStatus(path) - if (status != null && status.getModificationTime > modTimeIgnoreThreshold) { - true - } else { - false - } - } - catch { - // If the directory do not found ,remove the drir from lastFoundDirs - case e: FileNotFoundException => { - lastFoundDirs.remove(path) - false - } - } + val path = if (lastFoundDirs.isEmpty) Seq(fs.getFileStatus(directoryPath)) + else { + lastFoundDirs.filter { path => + // If the mod time of directory is more than ignore time, no new files in this directory. + try { + val status = fs.getFileStatus(path) + if (status != null && status.getModificationTime > modTimeIgnoreThreshold) true + else false } - }.map { - path => - newFiles = fs.listStatus(path).toList.flatMap(dfs(_, - depth + directoryDepth - path.depth())).map(_.getPath.toString) - } + catch { + // If the directory don't find, remove the directory from `lastFoundDirs` + case e: FileNotFoundException => + lastFoundDirs.remove(path) + false + } + } + }.flatMap(fs.listStatus(_)).toSeq + val newFiles = path.flatMap(dfs(_)).map(_.getPath.toString).toArray val timeTaken = System.currentTimeMillis - lastNewFileFindingTime logInfo("Finding new files took " + timeTaken + " ms") logDebug("# cached file times = " + fileToModTime.size) @@ -256,7 +242,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]]( "files in the monitored directory." ) } - newFiles.toArray + newFiles } catch { case e: Exception => logWarning("Error finding new files", e) @@ -284,6 +270,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]]( */ private def isNewFile(path: Path, currentTime: Long, modTimeIgnoreThreshold: Long): Boolean = { val pathStr = path.toString + // Reject file if it start with _ if (path.getName().startsWith("_")) { logDebug(s"startsWith: ${path.getName()}") return false @@ -364,7 +351,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]]( new mutable.HashMap[Time, Array[String]] with mutable.SynchronizedMap[Time, Array[String]] recentlySelectedFiles = new mutable.HashSet[String]() fileToModTime = new TimeStampedHashMap[String, Long](true) - lastFoundDirs = new mutable.HashSet[Path]() + lastFoundDirs = new mutable.HashSet[Path]() } /** diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala index 1418d11200361..d69b568db16f7 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala @@ -181,7 +181,6 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { testFileStream(newFilesOnly = false, 3) } - test("multi-thread receiver") { // set up the test receiver val numThreads = 10 @@ -344,7 +343,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { for (i <- 2 until depth) { testDir = Utils.createTempDir(testDir.toString) } - + // Create a file that exists before the StreamingContext is created: val existingFile = new File(testDir, "0") Files.write("0\n", existingFile, Charset.forName("UTF-8")) assert(existingFile.setLastModified(10000) && existingFile.lastModified === 10000) @@ -383,22 +382,20 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { } } - // Verify that all the files have been read - val expectedOutput = if (newFilesOnly) { - input.map(_.toString).toSet - } else { - (Seq(0) ++ input).map(_.toString).toSet + // Verify that all the files have been read + val expectedOutput = if (newFilesOnly) { + input.map(_.toString).toSet + } else { + (Seq(0) ++ input).map(_.toString).toSet + } + assert(outputBuffer.flatten.toSet === expectedOutput) } - assert(outputBuffer.flatten.toSet === expectedOutput) + } finally { + if (testDir != null) Utils.deleteRecursively(testDir) } - }finally - { - if (testDir != null) Utils.deleteRecursively(testDir) } } -} - /** This is a server to test the network input stream */ class TestServer(portToBind: Int = 0) extends Logging { From b6788a366e54575ce2fef7055fb3ffa093357fb9 Mon Sep 17 00:00:00 2001 From: wangxiaojing Date: Sun, 11 Jan 2015 21:26:59 -0800 Subject: [PATCH 18/25] support java Api --- .../org/apache/spark/streaming/StreamingContext.scala | 10 +++++----- .../streaming/api/java/JavaStreamingContext.scala | 9 ++++++--- .../java/org/apache/spark/streaming/JavaAPISuite.java | 2 +- 3 files changed, 12 insertions(+), 9 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 3d33125bb49b7..b423fc6618573 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -395,11 +395,11 @@ class StreamingContext private[streaming] ( V: ClassTag, F <: NewInputFormat[K, V]: ClassTag ] ( - directory: String, - filter: Path => Boolean, - newFilesOnly: Boolean, - depth: Int = 1 - ): InputDStream[(K, V)] = { + directory: String, + filter: Path => Boolean, + newFilesOnly: Boolean, + depth: Int = 1 + ): InputDStream[(K, V)] = { new FileInputDStream[K, V, F](this, directory, depth, filter, newFilesOnly) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index b639b94d5ca47..e235ccd004563 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -206,9 +206,10 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable { * monitored directory by "moving" them from another location within the same * file system. File names starting with . are ignored. * @param directory HDFS directory to monitor for new file + * @param depth Searching depth of directory */ - def textFileStream(directory: String): JavaDStream[String] = { - ssc.textFileStream(directory) + def textFileStream(directory: String, depth: Int = 1): JavaDStream[String] = { + ssc.textFileStream(directory,depth) } /** @@ -271,6 +272,7 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable { * Files must be written to the monitored directory by "moving" them from another * location within the same file system. File names starting with . are ignored. * @param directory HDFS directory to monitor for new file + * @param depth Searching depth of directory * @param kClass class of key for reading HDFS file * @param vClass class of value for reading HDFS file * @param fClass class of input format for reading HDFS file @@ -280,13 +282,14 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable { */ def fileStream[K, V, F <: NewInputFormat[K, V]]( directory: String, + depth: Int = 1, kClass: Class[K], vClass: Class[V], fClass: Class[F]): JavaPairInputDStream[K, V] = { implicit val cmk: ClassTag[K] = ClassTag(kClass) implicit val cmv: ClassTag[V] = ClassTag(vClass) implicit val cmf: ClassTag[F] = ClassTag(fClass) - ssc.fileStream[K, V, F](directory) + ssc.fileStream[K, V, F](directory, depth) } /** diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index 1077b1b2cb7e3..fff9d37ffb40b 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -1841,7 +1841,7 @@ public void testTextFileStream() throws IOException { File testDir = Utils.createTempDir(System.getProperty("java.io.tmpdir"), "spark"); List> expected = fileTestPrepare(testDir); - JavaDStream input = ssc.textFileStream(testDir.toString()); + JavaDStream input = ssc.textFileStream(testDir.toString(), 1); JavaTestUtils.attachTestOutputStream(input); List> result = JavaTestUtils.runStreams(ssc, 1, 1); From d2f606c80ee77aaf29ad57ec765312f5700a990f Mon Sep 17 00:00:00 2001 From: wangxiaojing Date: Mon, 12 Jan 2015 03:18:02 -0800 Subject: [PATCH 19/25] Add support python api --- examples/src/main/python/streaming/hdfs_wordcount.py | 2 +- python/pyspark/streaming/context.py | 4 ++-- python/pyspark/streaming/tests.py | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/examples/src/main/python/streaming/hdfs_wordcount.py b/examples/src/main/python/streaming/hdfs_wordcount.py index f815dd26823d1..4106309a6ddec 100644 --- a/examples/src/main/python/streaming/hdfs_wordcount.py +++ b/examples/src/main/python/streaming/hdfs_wordcount.py @@ -40,7 +40,7 @@ sc = SparkContext(appName="PythonStreamingHDFSWordCount") ssc = StreamingContext(sc, 1) - lines = ssc.textFileStream(sys.argv[1]) + lines = ssc.textFileStream(sys.argv[1], 1) counts = lines.flatMap(lambda line: line.split(" "))\ .map(lambda x: (x, 1))\ .reduceByKey(lambda a, b: a+b) diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py index ac5ba69e8dbbb..204184cd391b2 100644 --- a/python/pyspark/streaming/context.py +++ b/python/pyspark/streaming/context.py @@ -253,14 +253,14 @@ def socketTextStream(self, hostname, port, storageLevel=StorageLevel.MEMORY_AND_ return DStream(self._jssc.socketTextStream(hostname, port, jlevel), self, UTF8Deserializer()) - def textFileStream(self, directory): + def textFileStream(self, directory, depth): """ Create an input stream that monitors a Hadoop-compatible file system for new files and reads them as text files. Files must be wrriten to the monitored directory by "moving" them from another location within the same file system. File names starting with . are ignored. """ - return DStream(self._jssc.textFileStream(directory), self, UTF8Deserializer()) + return DStream(self._jssc.textFileStream(directory, depth), self, UTF8Deserializer()) def binaryRecordsStream(self, directory, recordLength): """ diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index 33ea8c9293d74..b245b6da0d0e7 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -464,7 +464,7 @@ def test_queue_stream(self): def test_text_file_stream(self): d = tempfile.mkdtemp() self.ssc = StreamingContext(self.sc, self.duration) - dstream2 = self.ssc.textFileStream(d).map(int) + dstream2 = self.ssc.textFileStream(d, 1).map(int) result = self._collect(dstream2, 2, block=False) self.ssc.start() for name in ('a', 'b'): @@ -524,7 +524,7 @@ def setup(): conf = SparkConf().set("spark.default.parallelism", 1) sc = SparkContext(conf=conf) ssc = StreamingContext(sc, 0.5) - dstream = ssc.textFileStream(inputd).map(lambda x: (x, 1)) + dstream = ssc.textFileStream(inputd, 1).map(lambda x: (x, 1)) wc = dstream.updateStateByKey(updater) wc.map(lambda x: "%s,%d" % x).saveAsTextFiles(outputd + "test") wc.checkpoint(.5) From 8e3a0544d93abe4c518ba1b05954688e5a41dfae Mon Sep 17 00:00:00 2001 From: wangxiaojing Date: Mon, 12 Jan 2015 19:20:07 -0800 Subject: [PATCH 20/25] Additional excludes for checking of Spark's binary compatibility --- project/MimaExcludes.scala | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 513bbaf98d804..5ed0230bb8500 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -197,7 +197,16 @@ object MimaExcludes { // SPARK-2757 ProblemFilters.exclude[IncompatibleResultTypeProblem]( "org.apache.spark.streaming.flume.sink.SparkAvroCallbackHandler." + - "removeAndGetProcessor") + "removeAndGetProcessor"), + // SPARK-3586 + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.streaming.StreamingContext.fileStream"), + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.streaming.StreamingContext.textFileStream"), + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.streaming.api.java.JavaStreamingContext.fileStream"), + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.streaming.api.java.JavaStreamingContext.textFileStream") ) ++ Seq( // SPARK-5123 (SparkSQL data type change) - alpha component only ProblemFilters.exclude[IncompatibleResultTypeProblem]( From 571730ade679f07c1cc1e04ad8b6778d611e565f Mon Sep 17 00:00:00 2001 From: wangxiaojing Date: Thu, 29 Jan 2015 21:58:48 -0800 Subject: [PATCH 21/25] rebase --- .../spark/streaming/api/java/JavaStreamingContext.scala | 6 ++++-- .../java/org/apache/spark/streaming/JavaAPISuite.java | 8 ++++++-- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index e235ccd004563..9400d48181d92 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -303,6 +303,7 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable { * @param fClass class of input format for reading HDFS file * @param filter Function to filter paths to process * @param newFilesOnly Should process only new files and ignore existing files in the directory + * @param depth Searching depth of directory * @tparam K Key type for reading HDFS file * @tparam V Value type for reading HDFS file * @tparam F Input format for reading HDFS file @@ -313,12 +314,13 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable { vClass: Class[V], fClass: Class[F], filter: JFunction[Path, JBoolean], - newFilesOnly: Boolean): JavaPairInputDStream[K, V] = { + newFilesOnly: Boolean, + depth: Int = 1): JavaPairInputDStream[K, V] = { implicit val cmk: ClassTag[K] = ClassTag(kClass) implicit val cmv: ClassTag[V] = ClassTag(vClass) implicit val cmf: ClassTag[F] = ClassTag(fClass) def fn: (Path) => Boolean = (x: Path) => filter.call(x).booleanValue() - ssc.fileStream[K, V, F](directory, fn, newFilesOnly) + ssc.fileStream[K, V, F](directory, fn, newFilesOnly, depth) } /** diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index fff9d37ffb40b..d6c22a9ee9f1e 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -1808,7 +1808,11 @@ public Integer call(String s) throws Exception { // Java arguments and assign it to a JavaDStream without producing type errors. Testing of the // InputStream functionality is deferred to the existing Scala tests. @Test - public void testSocketTextStream() { + public void testSocketTextStream( + + + + ) { JavaReceiverInputDStream test = ssc.socketTextStream("localhost", 12345); } @@ -1865,7 +1869,7 @@ public Boolean call(Path v1) throws Exception { return Boolean.TRUE; } }, - true); + true, 1); JavaDStream test = inputStream.map( new Function, String>() { From a4bfac244d637cf65a582c2ffc54b6daaa744dda Mon Sep 17 00:00:00 2001 From: wangxiaojing Date: Fri, 27 Feb 2015 01:02:01 -0800 Subject: [PATCH 22/25] rebase --- docs/streaming-programming-guide.md | 10 ++++----- .../main/python/streaming/hdfs_wordcount.py | 2 +- python/pyspark/streaming/context.py | 8 ++++++- .../spark/streaming/StreamingContext.scala | 14 ++++++++----- .../api/java/JavaStreamingContext.scala | 21 +++++++++++++------ .../streaming/dstream/FileInputDStream.scala | 9 ++++---- 6 files changed, 40 insertions(+), 24 deletions(-) diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md index fd5f925f12713..2df1ed77b86ae 100644 --- a/docs/streaming-programming-guide.md +++ b/docs/streaming-programming-guide.md @@ -643,25 +643,23 @@ methods for creating DStreams from files and Akka actors as input sources.
- streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory, depth) + streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)
- streamingContext.fileStream(dataDirectory, depth); + streamingContext.fileStream(dataDirectory);
- streamingContext.textFileStream(dataDirectory, depth) + streamingContext.textFileStream(dataDirectory)
- Spark Streaming will monitor the directory `dataDirectory`, the `depth` is default 1 and process any files created in that directory. If supported files written in nested directories, set the `depth` is greater than 1. Note that - + Spark Streaming will monitor the directory `dataDirectory` and process any files created in that directory (files written in nested directories not supported). It can also monitor files in subdirectories by setting the optional `depth` parameter to a value greater than 1. Note that + The files must have the same data format. + The files must be created in the `dataDirectory` by atomically *moving* or *renaming* them into the data directory. + Once moved, the files must not be changed. So if the files are being continuously appended, the new data will not be read. For simple text files, there is an easier method `streamingContext.textFileStream(dataDirectory)`. And file streams do not require running a receiver, hence does not require allocating cores. - If Spark Streaming monitor the directory in nested directories, there is an easier method `streamingContext.textFileStream(dataDirectory, depth)`. Python API `fileStream` is not available in the Python API, only `textFileStream` is available. diff --git a/examples/src/main/python/streaming/hdfs_wordcount.py b/examples/src/main/python/streaming/hdfs_wordcount.py index 4106309a6ddec..f815dd26823d1 100644 --- a/examples/src/main/python/streaming/hdfs_wordcount.py +++ b/examples/src/main/python/streaming/hdfs_wordcount.py @@ -40,7 +40,7 @@ sc = SparkContext(appName="PythonStreamingHDFSWordCount") ssc = StreamingContext(sc, 1) - lines = ssc.textFileStream(sys.argv[1], 1) + lines = ssc.textFileStream(sys.argv[1]) counts = lines.flatMap(lambda line: line.split(" "))\ .map(lambda x: (x, 1))\ .reduceByKey(lambda a, b: a+b) diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py index 204184cd391b2..3c5bf24dd84f7 100644 --- a/python/pyspark/streaming/context.py +++ b/python/pyspark/streaming/context.py @@ -253,13 +253,19 @@ def socketTextStream(self, hostname, port, storageLevel=StorageLevel.MEMORY_AND_ return DStream(self._jssc.socketTextStream(hostname, port, jlevel), self, UTF8Deserializer()) - def textFileStream(self, directory, depth): + def textFileStream(self, directory): """ Create an input stream that monitors a Hadoop-compatible file system for new files and reads them as text files. Files must be wrriten to the monitored directory by "moving" them from another location within the same file system. File names starting with . are ignored. """ + return DStream(self._jssc.textFileStream(directory), self, UTF8Deserializer()) + + def textFileStream(self, directory, depth): + """ + Create an input stream that monitor files in subdirectories. + """ return DStream(self._jssc.textFileStream(directory, depth), self, UTF8Deserializer()) def binaryRecordsStream(self, directory, recordLength): diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index b423fc6618573..7e8595470b8cc 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -412,6 +412,7 @@ class StreamingContext private[streaming] ( * @param filter Function to filter paths to process * @param newFilesOnly Should process only new files and ignore existing files in the directory * @param conf Hadoop configuration + * @param depth Searching depth of HDFS directory * @tparam K Key type for reading HDFS file * @tparam V Value type for reading HDFS file * @tparam F Input format for reading HDFS file @@ -423,8 +424,9 @@ class StreamingContext private[streaming] ( ] (directory: String, filter: Path => Boolean, newFilesOnly: Boolean, - conf: Configuration): InputDStream[(K, V)] = { - new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly, Option(conf)) + conf: Configuration, + depth: Int = 1): InputDStream[(K, V)] = { + new FileInputDStream[K, V, F](this, directory, depth, filter, newFilesOnly, Option(conf)) } /** @@ -434,7 +436,7 @@ class StreamingContext private[streaming] ( * monitored directory by "moving" them from another location within the same * file system. File names starting with . are ignored. * @param directory HDFS directory to monitor for new file - * @param depth Searching depth of directory + * @param depth Searching depth of HDFS directory */ def textFileStream( directory: String, @@ -456,15 +458,17 @@ class StreamingContext private[streaming] ( * * @param directory HDFS directory to monitor for new file * @param recordLength length of each record in bytes + * @param depth Searching depth of HDFS directory */ @Experimental def binaryRecordsStream( directory: String, - recordLength: Int): DStream[Array[Byte]] = withNamedScope("binary records stream") { + recordLength: Int, + depth: Int = 1): DStream[Array[Byte]] = withNamedScope("binary records stream") { val conf = sc_.hadoopConfiguration conf.setInt(FixedLengthBinaryInputFormat.RECORD_LENGTH_PROPERTY, recordLength) val br = fileStream[LongWritable, BytesWritable, FixedLengthBinaryInputFormat]( - directory, FileInputDStream.defaultFilter : Path => Boolean, newFilesOnly=true, conf) + directory, FileInputDStream.defaultFilter : Path => Boolean, newFilesOnly=true, conf, depth) val data = br.map { case (k, v) => val bytes = v.getBytes assert(bytes.length == recordLength, "Byte array does not have correct length") diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index 9400d48181d92..90fba842185fa 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -206,12 +206,20 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable { * monitored directory by "moving" them from another location within the same * file system. File names starting with . are ignored. * @param directory HDFS directory to monitor for new file - * @param depth Searching depth of directory */ - def textFileStream(directory: String, depth: Int = 1): JavaDStream[String] = { - ssc.textFileStream(directory,depth) + def textFileStream(directory: String): JavaDStream[String] = { + ssc.textFileStream(directory, 1) } + /** + * Create an input stream that monitor files in subdirectories for new files + * and reads them as text files. + * @param directory HDFS directory to monitor for new file + * @param depth Searching depth of HDFS directory + */ + def textFileStream(directory: String, depth: Int): JavaDStream[String] = { + ssc.textFileStream(directory,depth) + } /** * :: Experimental :: * @@ -272,7 +280,7 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable { * Files must be written to the monitored directory by "moving" them from another * location within the same file system. File names starting with . are ignored. * @param directory HDFS directory to monitor for new file - * @param depth Searching depth of directory + * @param depth Searching depth of HDFS directory * @param kClass class of key for reading HDFS file * @param vClass class of value for reading HDFS file * @param fClass class of input format for reading HDFS file @@ -346,12 +354,13 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable { fClass: Class[F], filter: JFunction[Path, JBoolean], newFilesOnly: Boolean, - conf: Configuration): JavaPairInputDStream[K, V] = { + conf: Configuration, + depth: Int = 1): JavaPairInputDStream[K, V] = { implicit val cmk: ClassTag[K] = ClassTag(kClass) implicit val cmv: ClassTag[V] = ClassTag(vClass) implicit val cmf: ClassTag[F] = ClassTag(fClass) def fn: (Path) => Boolean = (x: Path) => filter.call(x).booleanValue() - ssc.fileStream[K, V, F](directory, fn, newFilesOnly, conf) + ssc.fileStream[K, V, F](directory, fn, newFilesOnly, conf, depth) } /** diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index 56148a6571a09..d013116d6a48a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -35,11 +35,10 @@ import org.apache.spark.util.{TimeStampedHashMap, Utils} * This class represents an input stream that monitors a Hadoop-compatible filesystem for new * files and creates a stream out of them. The way it works as follows. * - * At each batch interval, Use `depth` to find files in the directory recursively, - * the file system is queried for files in the given directory and detected new - * files are selected for that batch. If the `depth` is greater than 1, - * it is queried for files in the depth of the recursion, In this case "new" means files that - * became visible to readers during that time period. Some extra care is needed to deal + * At each batch interval, the file system is queried for files in the given directory and + * detected new files are selected for that batch. It can also monitor files in subdirectories by + * setting the optional `depth` parameter to a value greater than 1. In this case "new" means + * files that became visible to readers during that time period. Some extra care is needed to deal * with the fact that files may become visible after they are created. For this purpose, this * class remembers the information about the files selected in past batches for * a certain duration (say, "remember window") as shown in the figure below. From 5e3fd3d79d85085ca3ac71a81b497391ee9a0e3e Mon Sep 17 00:00:00 2001 From: wangxiaojing Date: Fri, 27 Feb 2015 20:03:49 -0800 Subject: [PATCH 23/25] change API --- python/pyspark/streaming/context.py | 2 +- python/pyspark/streaming/tests.py | 4 +- .../spark/streaming/StreamingContext.scala | 124 ++++++++++++++++-- .../api/java/JavaStreamingContext.scala | 101 +++++++++++++- .../streaming/dstream/FileInputDStream.scala | 6 +- .../apache/spark/streaming/JavaAPISuite.java | 10 +- 6 files changed, 218 insertions(+), 29 deletions(-) diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py index 3c5bf24dd84f7..867fdab00d447 100644 --- a/python/pyspark/streaming/context.py +++ b/python/pyspark/streaming/context.py @@ -260,7 +260,7 @@ def textFileStream(self, directory): monitored directory by "moving" them from another location within the same file system. File names starting with . are ignored. """ - return DStream(self._jssc.textFileStream(directory), self, UTF8Deserializer()) + return textFileStream(self, directory, 1) def textFileStream(self, directory, depth): """ diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index b245b6da0d0e7..33ea8c9293d74 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -464,7 +464,7 @@ def test_queue_stream(self): def test_text_file_stream(self): d = tempfile.mkdtemp() self.ssc = StreamingContext(self.sc, self.duration) - dstream2 = self.ssc.textFileStream(d, 1).map(int) + dstream2 = self.ssc.textFileStream(d).map(int) result = self._collect(dstream2, 2, block=False) self.ssc.start() for name in ('a', 'b'): @@ -524,7 +524,7 @@ def setup(): conf = SparkConf().set("spark.default.parallelism", 1) sc = SparkContext(conf=conf) ssc = StreamingContext(sc, 0.5) - dstream = ssc.textFileStream(inputd, 1).map(lambda x: (x, 1)) + dstream = ssc.textFileStream(inputd).map(lambda x: (x, 1)) wc = dstream.updateStateByKey(updater) wc.map(lambda x: "%s,%d" % x).saveAsTextFiles(outputd + "test") wc.checkpoint(.5) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 7e8595470b8cc..71cb40b85703f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -364,7 +364,6 @@ class StreamingContext private[streaming] ( * Files must be written to the monitored directory by "moving" them from another * location within the same file system. File names starting with . are ignored. * @param directory HDFS directory to monitor for new file - * @param depth Searching depth of directory * @tparam K Key type for reading HDFS file * @tparam V Value type for reading HDFS file * @tparam F Input format for reading HDFS file @@ -373,7 +372,28 @@ class StreamingContext private[streaming] ( K: ClassTag, V: ClassTag, F <: NewInputFormat[K, V]: ClassTag - ] (directory: String, depth: Int = 1): InputDStream[(K, V)] = { + ] (directory: String): InputDStream[(K, V)] = { + new FileInputDStream[K, V, F](this, directory) + } + + /** + * Create a input stream that monitors a Hadoop-compatible filesystem + * for new files and reads them using the given key-value types and input format. + * Files must be written to the monitored directory by "moving" them from another + * location within the same file system. File names starting with . are ignored. + * It can also monitor files in subdirectories by setting the optional `depth` + * parameter to a value greater than 1. + * @param directory HDFS directory to monitor for new file + * @param depth Searching depth of HDFS directory + * @tparam K Key type for reading HDFS file + * @tparam V Value type for reading HDFS file + * @tparam F Input format for reading HDFS file + */ + def fileStream[ + K: ClassTag, + V: ClassTag, + F <: NewInputFormat[K, V]: ClassTag + ] (directory: String, depth: Int): InputDStream[(K, V)] = { new FileInputDStream[K, V, F](this, directory, depth) } @@ -385,7 +405,6 @@ class StreamingContext private[streaming] ( * @param directory HDFS directory to monitor for new file * @param filter Function to filter paths to process * @param newFilesOnly Should process only new files and ignore existing files in the directory - * @param depth Searching depth of directory * @tparam K Key type for reading HDFS file * @tparam V Value type for reading HDFS file * @tparam F Input format for reading HDFS file @@ -394,12 +413,33 @@ class StreamingContext private[streaming] ( K: ClassTag, V: ClassTag, F <: NewInputFormat[K, V]: ClassTag - ] ( - directory: String, - filter: Path => Boolean, - newFilesOnly: Boolean, - depth: Int = 1 - ): InputDStream[(K, V)] = { + ] (directory: String, filter: Path => Boolean, newFilesOnly: Boolean): InputDStream[(K, V)] = { + new FileInputDStream[K, V, F](this, directory, 1, filter, newFilesOnly) + } + + /** + * Create a input stream that monitors a Hadoop-compatible filesystem + * for new files and reads them using the given key-value types and input format. + * Files must be written to the monitored directory by "moving" them from another + * location within the same file system. + * It can also monitor files in subdirectories by setting the optional `depth` + * parameter to a value greater than 1. + * @param directory HDFS directory to monitor for new file + * @param filter Function to filter paths to process + * @param newFilesOnly Should process only new files and ignore existing files in the directory + * @param depth Searching depth of HDFS directory + * @tparam K Key type for reading HDFS file + * @tparam V Value type for reading HDFS file + * @tparam F Input format for reading HDFS file + */ + def fileStream[ + K: ClassTag, + V: ClassTag, + F <: NewInputFormat[K, V]: ClassTag + ] (directory: String, + filter: Path => Boolean, + newFilesOnly: Boolean, + depth: Int): InputDStream[(K, V)] = { new FileInputDStream[K, V, F](this, directory, depth, filter, newFilesOnly) } @@ -412,6 +452,32 @@ class StreamingContext private[streaming] ( * @param filter Function to filter paths to process * @param newFilesOnly Should process only new files and ignore existing files in the directory * @param conf Hadoop configuration + * @tparam K Key type for reading HDFS file + * @tparam V Value type for reading HDFS file + * @tparam F Input format for reading HDFS file + */ + def fileStream[ + K: ClassTag, + V: ClassTag, + F <: NewInputFormat[K, V]: ClassTag + ] (directory: String, + filter: Path => Boolean, + newFilesOnly: Boolean, + conf: Configuration): InputDStream[(K, V)] = { + new FileInputDStream[K, V, F](this, directory, 1, filter, newFilesOnly, Option(conf)) + } + + /** + * Create a input stream that monitors a Hadoop-compatible filesystem + * for new files and reads them using the given key-value types and input format. + * Files must be written to the monitored directory by "moving" them from another + * location within the same file system. File names starting with . are ignored. + * It can also monitor files in subdirectories by setting the optional `depth` + * parameter to a value greater than 1. + * @param directory HDFS directory to monitor for new file + * @param filter Function to filter paths to process + * @param newFilesOnly Should process only new files and ignore existing files in the directory + * @param conf Hadoop configuration * @param depth Searching depth of HDFS directory * @tparam K Key type for reading HDFS file * @tparam V Value type for reading HDFS file @@ -425,7 +491,7 @@ class StreamingContext private[streaming] ( filter: Path => Boolean, newFilesOnly: Boolean, conf: Configuration, - depth: Int = 1): InputDStream[(K, V)] = { + depth: Int): InputDStream[(K, V)] = { new FileInputDStream[K, V, F](this, directory, depth, filter, newFilesOnly, Option(conf)) } @@ -436,6 +502,20 @@ class StreamingContext private[streaming] ( * monitored directory by "moving" them from another location within the same * file system. File names starting with . are ignored. * @param directory HDFS directory to monitor for new file + */ + def textFileStream(directory: String): DStream[String] = { + fileStream[LongWritable, Text, TextInputFormat](directory).map(_._2.toString) + } + + /** + * Create a input stream that monitors a Hadoop-compatible filesystem + * for new files and reads them as text files (using key as LongWritable, value + * as Text and input format as TextInputFormat). Files must be written to the + * monitored directory by "moving" them from another location within the same + * file system. File names starting with . are ignored. + * It can also monitor files in subdirectories by setting the optional `depth` + * parameter to a value greater than 1. + * @param directory HDFS directory to monitor for new file * @param depth Searching depth of HDFS directory */ def textFileStream( @@ -452,6 +532,8 @@ class StreamingContext private[streaming] ( * generating one byte array per record. Files must be written to the monitored directory * by "moving" them from another location within the same file system. File names * starting with . are ignored. + * It can also monitor files in subdirectories by setting the optional `depth` + * parameter to a value greater than 1. * * '''Note:''' We ensure that the byte array for each record in the * resulting RDDs of the DStream has the provided record length. @@ -477,6 +559,28 @@ class StreamingContext private[streaming] ( data } + /** + * :: Experimental :: + * + * Create an input stream that monitors a Hadoop-compatible filesystem + * for new files and reads them as flat binary files, assuming a fixed length per record, + * generating one byte array per record. Files must be written to the monitored directory + * by "moving" them from another location within the same file system. File names + * starting with . are ignored. + * + * '''Note:''' We ensure that the byte array for each record in the + * resulting RDDs of the DStream has the provided record length. + * + * @param directory HDFS directory to monitor for new file + * @param recordLength length of each record in bytes + */ + @Experimental + def binaryRecordsStream( + directory: String, + recordLength: Int): DStream[Array[Byte]] = { + binaryRecordsStream(directory, recordLength, 1) + } + /** * Create an input stream from a queue of RDDs. In each batch, * it will process either one or all of the RDDs returned by the queue. diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index 90fba842185fa..0041a15bb7f91 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -208,7 +208,7 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable { * @param directory HDFS directory to monitor for new file */ def textFileStream(directory: String): JavaDStream[String] = { - ssc.textFileStream(directory, 1) + ssc.textFileStream(directory) } /** @@ -280,6 +280,32 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable { * Files must be written to the monitored directory by "moving" them from another * location within the same file system. File names starting with . are ignored. * @param directory HDFS directory to monitor for new file + * @param kClass class of key for reading HDFS file + * @param vClass class of value for reading HDFS file + * @param fClass class of input format for reading HDFS file + * @tparam K Key type for reading HDFS file + * @tparam V Value type for reading HDFS file + * @tparam F Input format for reading HDFS file + */ + def fileStream[K, V, F <: NewInputFormat[K, V]]( + directory: String, + kClass: Class[K], + vClass: Class[V], + fClass: Class[F]): JavaPairInputDStream[K, V] = { + implicit val cmk: ClassTag[K] = ClassTag(kClass) + implicit val cmv: ClassTag[V] = ClassTag(vClass) + implicit val cmf: ClassTag[F] = ClassTag(fClass) + ssc.fileStream[K, V, F](directory) + } + + /** + * Create an input stream that monitors a Hadoop-compatible filesystem + * for new files and reads them using the given key-value types and input format. + * Files must be written to the monitored directory by "moving" them from another + * location within the same file system. File names starting with . are ignored. + * It can also monitor files in subdirectories by setting the optional `depth` + * parameter to a value greater than 1. + * @param directory HDFS directory to monitor for new file * @param depth Searching depth of HDFS directory * @param kClass class of key for reading HDFS file * @param vClass class of value for reading HDFS file @@ -290,7 +316,7 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable { */ def fileStream[K, V, F <: NewInputFormat[K, V]]( directory: String, - depth: Int = 1, + depth: Int, kClass: Class[K], vClass: Class[V], fClass: Class[F]): JavaPairInputDStream[K, V] = { @@ -311,7 +337,38 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable { * @param fClass class of input format for reading HDFS file * @param filter Function to filter paths to process * @param newFilesOnly Should process only new files and ignore existing files in the directory - * @param depth Searching depth of directory + * @tparam K Key type for reading HDFS file + * @tparam V Value type for reading HDFS file + * @tparam F Input format for reading HDFS file + */ + def fileStream[K, V, F <: NewInputFormat[K, V]]( + directory: String, + kClass: Class[K], + vClass: Class[V], + fClass: Class[F], + filter: JFunction[Path, JBoolean], + newFilesOnly: Boolean): JavaPairInputDStream[K, V] = { + implicit val cmk: ClassTag[K] = ClassTag(kClass) + implicit val cmv: ClassTag[V] = ClassTag(vClass) + implicit val cmf: ClassTag[F] = ClassTag(fClass) + def fn = (x: Path) => filter.call(x).booleanValue() + ssc.fileStream[K, V, F](directory, fn, newFilesOnly) + } + + /** + * Create an input stream that monitors a Hadoop-compatible filesystem + * for new files and reads them using the given key-value types and input format. + * Files must be written to the monitored directory by "moving" them from another + * location within the same file system. File names starting with . are ignored. + * It can also monitor files in subdirectories by setting the optional `depth` + * parameter to a value greater than 1. + * @param directory HDFS directory to monitor for new file + * @param kClass class of key for reading HDFS file + * @param vClass class of value for reading HDFS file + * @param fClass class of input format for reading HDFS file + * @param filter Function to filter paths to process + * @param newFilesOnly Should process only new files and ignore existing files in the directory + * @param depth Searching depth of HDFS directory * @tparam K Key type for reading HDFS file * @tparam V Value type for reading HDFS file * @tparam F Input format for reading HDFS file @@ -323,7 +380,7 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable { fClass: Class[F], filter: JFunction[Path, JBoolean], newFilesOnly: Boolean, - depth: Int = 1): JavaPairInputDStream[K, V] = { + depth: Int): JavaPairInputDStream[K, V] = { implicit val cmk: ClassTag[K] = ClassTag(kClass) implicit val cmv: ClassTag[V] = ClassTag(vClass) implicit val cmf: ClassTag[F] = ClassTag(fClass) @@ -347,6 +404,40 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable { * @tparam V Value type for reading HDFS file * @tparam F Input format for reading HDFS file */ + def fileStream[K, V, F <: NewInputFormat[K, V]]( + directory: String, + kClass: Class[K], + vClass: Class[V], + fClass: Class[F], + filter: JFunction[Path, JBoolean], + newFilesOnly: Boolean, + conf: Configuration): JavaPairInputDStream[K, V] = { + implicit val cmk: ClassTag[K] = ClassTag(kClass) + implicit val cmv: ClassTag[V] = ClassTag(vClass) + implicit val cmf: ClassTag[F] = ClassTag(fClass) + def fn = (x: Path) => filter.call(x).booleanValue() + ssc.fileStream[K, V, F](directory, fn, newFilesOnly, conf) + } + + /** + * Create an input stream that monitors a Hadoop-compatible filesystem + * for new files and reads them using the given key-value types and input format. + * Files must be written to the monitored directory by "moving" them from another + * location within the same file system. File names starting with . are ignored. + * It can also monitor files in subdirectories by setting the optional `depth` + * parameter to a value greater than 1. + * @param directory HDFS directory to monitor for new file + * @param kClass class of key for reading HDFS file + * @param vClass class of value for reading HDFS file + * @param fClass class of input format for reading HDFS file + * @param filter Function to filter paths to process + * @param newFilesOnly Should process only new files and ignore existing files in the directory + * @param conf Hadoop configuration + * @param depth Searching depth of HDFS directory + * @tparam K Key type for reading HDFS file + * @tparam V Value type for reading HDFS file + * @tparam F Input format for reading HDFS file + */ def fileStream[K, V, F <: NewInputFormat[K, V]]( directory: String, kClass: Class[K], @@ -355,7 +446,7 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable { filter: JFunction[Path, JBoolean], newFilesOnly: Boolean, conf: Configuration, - depth: Int = 1): JavaPairInputDStream[K, V] = { + depth: Int): JavaPairInputDStream[K, V] = { implicit val cmk: ClassTag[K] = ClassTag(kClass) implicit val cmv: ClassTag[V] = ClassTag(vClass) implicit val cmf: ClassTag[F] = ClassTag(fClass) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index d013116d6a48a..f4b887744813f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -218,10 +218,8 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]]( // If the mod time of directory is more than ignore time, no new files in this directory. try { val status = fs.getFileStatus(path) - if (status != null && status.getModificationTime > modTimeIgnoreThreshold) true - else false - } - catch { + status != null && status.getModificationTime > modTimeIgnoreThreshold + } catch { // If the directory don't find, remove the directory from `lastFoundDirs` case e: FileNotFoundException => lastFoundDirs.remove(path) diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index d6c22a9ee9f1e..1077b1b2cb7e3 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -1808,11 +1808,7 @@ public Integer call(String s) throws Exception { // Java arguments and assign it to a JavaDStream without producing type errors. Testing of the // InputStream functionality is deferred to the existing Scala tests. @Test - public void testSocketTextStream( - - - - ) { + public void testSocketTextStream() { JavaReceiverInputDStream test = ssc.socketTextStream("localhost", 12345); } @@ -1845,7 +1841,7 @@ public void testTextFileStream() throws IOException { File testDir = Utils.createTempDir(System.getProperty("java.io.tmpdir"), "spark"); List> expected = fileTestPrepare(testDir); - JavaDStream input = ssc.textFileStream(testDir.toString(), 1); + JavaDStream input = ssc.textFileStream(testDir.toString()); JavaTestUtils.attachTestOutputStream(input); List> result = JavaTestUtils.runStreams(ssc, 1, 1); @@ -1869,7 +1865,7 @@ public Boolean call(Path v1) throws Exception { return Boolean.TRUE; } }, - true, 1); + true); JavaDStream test = inputStream.map( new Function, String>() { From e4b9c22f85aca7cc81b71646ffec77706cbcbe6e Mon Sep 17 00:00:00 2001 From: wangxiaojing Date: Sun, 1 Mar 2015 23:54:42 -0800 Subject: [PATCH 24/25] change MiMa failures --- project/MimaExcludes.scala | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 5ed0230bb8500..b93f4351cf7fe 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -202,11 +202,7 @@ object MimaExcludes { ProblemFilters.exclude[MissingMethodProblem]( "org.apache.spark.streaming.StreamingContext.fileStream"), ProblemFilters.exclude[MissingMethodProblem]( - "org.apache.spark.streaming.StreamingContext.textFileStream"), - ProblemFilters.exclude[MissingMethodProblem]( - "org.apache.spark.streaming.api.java.JavaStreamingContext.fileStream"), - ProblemFilters.exclude[MissingMethodProblem]( - "org.apache.spark.streaming.api.java.JavaStreamingContext.textFileStream") + "org.apache.spark.streaming.StreamingContext.textFileStream") ) ++ Seq( // SPARK-5123 (SparkSQL data type change) - alpha component only ProblemFilters.exclude[IncompatibleResultTypeProblem]( From 1a2aae91bbf39e0d1546e2e135f6428946410420 Mon Sep 17 00:00:00 2001 From: wangxiaojing Date: Wed, 20 May 2015 00:25:10 -0700 Subject: [PATCH 25/25] rebase --- project/MimaExcludes.scala | 7 +--- .../spark/streaming/StreamingContext.scala | 11 +++--- .../api/java/JavaStreamingContext.scala | 4 +-- .../streaming/dstream/FileInputDStream.scala | 35 +++++++++---------- 4 files changed, 26 insertions(+), 31 deletions(-) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index b93f4351cf7fe..513bbaf98d804 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -197,12 +197,7 @@ object MimaExcludes { // SPARK-2757 ProblemFilters.exclude[IncompatibleResultTypeProblem]( "org.apache.spark.streaming.flume.sink.SparkAvroCallbackHandler." + - "removeAndGetProcessor"), - // SPARK-3586 - ProblemFilters.exclude[MissingMethodProblem]( - "org.apache.spark.streaming.StreamingContext.fileStream"), - ProblemFilters.exclude[MissingMethodProblem]( - "org.apache.spark.streaming.StreamingContext.textFileStream") + "removeAndGetProcessor") ) ++ Seq( // SPARK-5123 (SparkSQL data type change) - alpha component only ProblemFilters.exclude[IncompatibleResultTypeProblem]( diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 71cb40b85703f..894ae676565c1 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -503,7 +503,7 @@ class StreamingContext private[streaming] ( * file system. File names starting with . are ignored. * @param directory HDFS directory to monitor for new file */ - def textFileStream(directory: String): DStream[String] = { + def textFileStream(directory: String): DStream[String] = withNamedScope("text file stream") { fileStream[LongWritable, Text, TextInputFormat](directory).map(_._2.toString) } @@ -520,7 +520,7 @@ class StreamingContext private[streaming] ( */ def textFileStream( directory: String, - depth: Int =0): DStream[String] = withNamedScope("text file stream") { + depth: Int): DStream[String] = withNamedScope("text file stream") { fileStream[LongWritable, Text, TextInputFormat](directory, depth).map(_._2.toString) } @@ -546,11 +546,12 @@ class StreamingContext private[streaming] ( def binaryRecordsStream( directory: String, recordLength: Int, - depth: Int = 1): DStream[Array[Byte]] = withNamedScope("binary records stream") { + depth: Int): DStream[Array[Byte]] = withNamedScope("binary records stream") { val conf = sc_.hadoopConfiguration conf.setInt(FixedLengthBinaryInputFormat.RECORD_LENGTH_PROPERTY, recordLength) val br = fileStream[LongWritable, BytesWritable, FixedLengthBinaryInputFormat]( - directory, FileInputDStream.defaultFilter : Path => Boolean, newFilesOnly=true, conf, depth) + directory, FileInputDStream.defaultFilter : Path => Boolean, + newFilesOnly=true, conf, depth) val data = br.map { case (k, v) => val bytes = v.getBytes assert(bytes.length == recordLength, "Byte array does not have correct length") @@ -577,7 +578,7 @@ class StreamingContext private[streaming] ( @Experimental def binaryRecordsStream( directory: String, - recordLength: Int): DStream[Array[Byte]] = { + recordLength: Int): DStream[Array[Byte]] = withNamedScope("binary records stream") { binaryRecordsStream(directory, recordLength, 1) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index 0041a15bb7f91..1a89b6bbc7e66 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -351,7 +351,7 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable { implicit val cmk: ClassTag[K] = ClassTag(kClass) implicit val cmv: ClassTag[V] = ClassTag(vClass) implicit val cmf: ClassTag[F] = ClassTag(fClass) - def fn = (x: Path) => filter.call(x).booleanValue() + def fn: (Path) => Boolean = (x: Path) => filter.call(x).booleanValue() ssc.fileStream[K, V, F](directory, fn, newFilesOnly) } @@ -415,7 +415,7 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable { implicit val cmk: ClassTag[K] = ClassTag(kClass) implicit val cmv: ClassTag[V] = ClassTag(vClass) implicit val cmf: ClassTag[F] = ClassTag(fClass) - def fn = (x: Path) => filter.call(x).booleanValue() + def fn: (Path) => Boolean = (x: Path) => filter.call(x).booleanValue() ssc.fileStream[K, V, F](directory, fn, newFilesOnly, conf) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index f4b887744813f..c4e8ccb4d794f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -19,18 +19,17 @@ package org.apache.spark.streaming.dstream import java.io.{FileNotFoundException, IOException, ObjectInputStream} -import scala.collection.mutable -import scala.reflect.ClassTag - import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter} import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} - -import org.apache.spark.{SparkConf, SerializableWritable} +import org.apache.spark.SerializableWritable import org.apache.spark.rdd.{RDD, UnionRDD} import org.apache.spark.streaming._ import org.apache.spark.util.{TimeStampedHashMap, Utils} +import scala.collection.mutable +import scala.reflect.ClassTag + /** * This class represents an input stream that monitors a Hadoop-compatible filesystem for new * files and creates a stream out of them. The way it works as follows. @@ -387,17 +386,17 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]]( } } - private[streaming] - object FileInputDStream { - - def defaultFilter(path: Path): Boolean = !path.getName().startsWith(".") - /** - * Calculate the number of last batches to remember, such that all the files selected in - * at least last minRememberDurationS duration can be remembered. - */ - def calculateNumBatchesToRemember(batchDuration: Duration, - minRememberDurationS: Duration): Int = { - math.ceil(minRememberDurationS.milliseconds.toDouble / batchDuration.milliseconds).toInt - } - } +private[streaming] +object FileInputDStream { + def defaultFilter(path: Path): Boolean = !path.getName().startsWith(".") + + /** + * Calculate the number of last batches to remember, such that all the files selected in + * at least last minRememberDurationS duration can be remembered. + */ + def calculateNumBatchesToRemember(batchDuration: Duration, + minRememberDurationS: Duration): Int = { + math.ceil(minRememberDurationS.milliseconds.toDouble / batchDuration.milliseconds).toInt + } +}