From 739341f543517c15d92219a239ab156a1adcb469 Mon Sep 17 00:00:00 2001 From: surq Date: Wed, 15 Oct 2014 15:16:40 +0800 Subject: [PATCH 1/4] promote the speed of convert files to RDDS --- .../apache/spark/streaming/dstream/FileInputDStream.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index 9eecbfaef363f..189f5f7777d44 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -27,6 +27,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.rdd.UnionRDD import org.apache.spark.streaming.{StreamingContext, Time} import org.apache.spark.util.TimeStampedHashMap +import scala.collection.mutable.ArrayBuffer private[streaming] @@ -120,14 +121,14 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas /** Generate one RDD from an array of files */ private def filesToRDD(files: Seq[String]): RDD[(K, V)] = { - val fileRDDs = files.map(file => context.sparkContext.newAPIHadoopFile[K, V, F](file)) - files.zip(fileRDDs).foreach { case (file, rdd) => { + val fileRDDs = for (file <- files; rdd = context.sparkContext.newAPIHadoopFile[K, V, F](file)) yield { if (rdd.partitions.size == 0) { logError("File " + file + " has no data in it. Spark Streaming can only ingest " + "files that have been \"moved\" to the directory assigned to the file stream. " + "Refer to the streaming programming guide for more details.") } - }} + rdd + } new UnionRDD(context.sparkContext, fileRDDs) } From 626ef973058e882f96b6ae64c3d31840884d2638 Mon Sep 17 00:00:00 2001 From: surq Date: Wed, 15 Oct 2014 17:06:32 +0800 Subject: [PATCH 2/4] remove redundant import(ArrayBuffer) --- .../org/apache/spark/streaming/dstream/FileInputDStream.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index 189f5f7777d44..f09ebec471787 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -27,7 +27,6 @@ import org.apache.spark.rdd.RDD import org.apache.spark.rdd.UnionRDD import org.apache.spark.streaming.{StreamingContext, Time} import org.apache.spark.util.TimeStampedHashMap -import scala.collection.mutable.ArrayBuffer private[streaming] From 178066f77a1373f979e5369dbe1337a5bb650d32 Mon Sep 17 00:00:00 2001 From: surq Date: Mon, 3 Nov 2014 12:00:56 +0800 Subject: [PATCH 3/4] modify code's style. [Exceeds 100 columns] --- .../org/apache/spark/streaming/dstream/FileInputDStream.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index f09ebec471787..454aba6de7d0f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -120,7 +120,8 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas /** Generate one RDD from an array of files */ private def filesToRDD(files: Seq[String]): RDD[(K, V)] = { - val fileRDDs = for (file <- files; rdd = context.sparkContext.newAPIHadoopFile[K, V, F](file)) yield { + val fileRDDs = for (file <- files; rdd = context.sparkContext.newAPIHadoopFile[K, V, F](file)) + yield { if (rdd.partitions.size == 0) { logError("File " + file + " has no data in it. Spark Streaming can only ingest " + "files that have been \"moved\" to the directory assigned to the file stream. " + From 321bbe84ee3126db50f15f15a509ad647431b95a Mon Sep 17 00:00:00 2001 From: surq Date: Mon, 10 Nov 2014 10:28:10 +0800 Subject: [PATCH 4/4] updated the code style.The style from [for...yield]to [files.map(file=>{})] --- .../apache/spark/streaming/dstream/FileInputDStream.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index 789a3a45fa277..55d6cf6a783ea 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -120,15 +120,15 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas /** Generate one RDD from an array of files */ private def filesToRDD(files: Seq[String]): RDD[(K, V)] = { - val fileRDDs = for (file <- files; rdd = context.sparkContext.newAPIHadoopFile[K, V, F](file)) - yield { + val fileRDDs = files.map(file =>{ + val rdd = context.sparkContext.newAPIHadoopFile[K, V, F](file) if (rdd.partitions.size == 0) { logError("File " + file + " has no data in it. Spark Streaming can only ingest " + "files that have been \"moved\" to the directory assigned to the file stream. " + "Refer to the streaming programming guide for more details.") } rdd - } + }) new UnionRDD(context.sparkContext, fileRDDs) }