From 48757558aafc9217f0b675e883b9c064e2af9386 Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Thu, 10 Apr 2014 11:21:58 +0800 Subject: [PATCH 1/4] add minSplits for WholeTextFiles --- .../main/scala/org/apache/spark/SparkContext.scala | 10 ++++++---- .../spark/input/WholeTextFileInputFormat.scala | 14 ++++++++++++++ .../scala/org/apache/spark/rdd/NewHadoopRDD.scala | 11 ++++++++++- 3 files changed, 30 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 5a36e6f5c19a9..833342b832fd3 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -456,12 +456,13 @@ class SparkContext(config: SparkConf) extends Logging { * * @note Small files are preferred, as each file will be loaded fully in memory. */ - def wholeTextFiles(path: String): RDD[(String, String)] = { + def wholeTextFiles(path: String, minSplits: Int = defaultMinSplits): RDD[(String, String)] = { newAPIHadoopFile( path, classOf[WholeTextFileInputFormat], classOf[String], - classOf[String]) + classOf[String], + minSplits = minSplits) } /** @@ -584,11 +585,12 @@ class SparkContext(config: SparkConf) extends Logging { fClass: Class[F], kClass: Class[K], vClass: Class[V], - conf: Configuration = hadoopConfiguration): RDD[(K, V)] = { + conf: Configuration = hadoopConfiguration, + minSplits: Int = 1): RDD[(K, V)] = { val job = new NewHadoopJob(conf) NewFileInputFormat.addInputPath(job, new Path(path)) val updatedConf = job.getConfiguration - new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf) + new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf, minSplits) } /** diff --git a/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala b/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala index 4887fb6b84eb2..23c2a136f47ac 100644 --- a/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala +++ b/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala @@ -17,7 +17,10 @@ package org.apache.spark.input +import scala.collection.JavaConversions._ + import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.FileStatus import org.apache.hadoop.mapreduce.InputSplit import org.apache.hadoop.mapreduce.JobContext import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat @@ -44,4 +47,15 @@ private[spark] class WholeTextFileInputFormat extends CombineFileInputFormat[Str context, classOf[WholeTextFileRecordReader]) } + + /** + * Allow minSplits set by end-user in order to keep compatibility with old Hadoop API. + */ + def setMaxSplitSize(context: JobContext, minSplits: Int) { + val files = listStatus(context) + val totalLen = files.map { file => + if (file.isDir) 0L else file.getLen + }.sum + super.setMaxSplitSize(totalLen / (if (minSplits == 0) 1 else minSplits)) + } } diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index 2d8dfa5a1645a..03fa70d5b3c29 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -26,6 +26,7 @@ import org.apache.hadoop.mapreduce._ import org.apache.spark.{InterruptibleIterator, Logging, Partition, SerializableWritable, SparkContext, TaskContext} import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.input.WholeTextFileInputFormat private[spark] class NewHadoopPartition(rddId: Int, val index: Int, @transient rawSplit: InputSplit with Writable) @@ -56,7 +57,8 @@ class NewHadoopRDD[K, V]( inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], - @transient conf: Configuration) + @transient conf: Configuration, + minSplits: Int = 1) extends RDD[(K, V)](sc, Nil) with SparkHadoopMapReduceUtil with Logging { @@ -74,10 +76,17 @@ class NewHadoopRDD[K, V]( override def getPartitions: Array[Partition] = { val inputFormat = inputFormatClass.newInstance + if (inputFormat.isInstanceOf[Configurable]) { inputFormat.asInstanceOf[Configurable].setConf(conf) } + val jobContext = newJobContext(conf, jobId) + + if (inputFormat.isInstanceOf[WholeTextFileInputFormat]) { + inputFormat.asInstanceOf[WholeTextFileInputFormat].setMaxSplitSize(jobContext, minSplits) + } + val rawSplits = inputFormat.getSplits(jobContext).toArray val result = new Array[Partition](rawSplits.size) for (i <- 0 until rawSplits.size) { From 766d05b47f3454d83799bf0ea04a34ecc5686eea Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Thu, 10 Apr 2014 12:41:22 +0800 Subject: [PATCH 2/4] refine Java API and comments --- .../main/scala/org/apache/spark/SparkContext.scala | 2 +- .../apache/spark/api/java/JavaSparkContext.scala | 13 ++++++++++++- .../test/java/org/apache/spark/JavaAPISuite.java | 2 +- .../input/WholeTextFileRecordReaderSuite.scala | 2 +- 4 files changed, 15 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 833342b832fd3..525c9e35afd59 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -444,7 +444,7 @@ class SparkContext(config: SparkConf) extends Logging { * hdfs://a-hdfs-path/part-nnnnn * }}} * - * Do `val rdd = sparkContext.wholeTextFile("hdfs://a-hdfs-path")`, + * Do `val rdd = sparkContext.wholeTextFile("hdfs://a-hdfs-path", minSplits)` * *

then `rdd` contains * {{{ diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala index 1e8242a2cbbce..fcdd680d70438 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala @@ -167,7 +167,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork * hdfs://a-hdfs-path/part-nnnnn * }}} * - * Do `JavaPairRDD rdd = sparkContext.wholeTextFiles("hdfs://a-hdfs-path")`, + * Do + * `JavaPairRDD rdd = sparkContext.wholeTextFiles("hdfs://a-hdfs-path", minSplit)` * *

then `rdd` contains * {{{ @@ -179,6 +180,16 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork * * @note Small files are preferred, as each file will be loaded fully in memory. */ + def wholeTextFiles(path: String, minSplits: Int): JavaPairRDD[String, String] = + new JavaPairRDD(sc.wholeTextFiles(path, minSplits)) + + /** + * Read a directory of text files from HDFS, a local file system (available on all nodes), or any + * Hadoop-supported file system URI. Each file is read as a single record and returned in a + * key-value pair, where the key is the path of each file, the value is the content of each file. + * + * @see `wholeTextFiles(path: String, minSplits: Int)`. + */ def wholeTextFiles(path: String): JavaPairRDD[String, String] = new JavaPairRDD(sc.wholeTextFiles(path)) diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java index ab2fdac553349..8d2e9f1846343 100644 --- a/core/src/test/java/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java @@ -626,7 +626,7 @@ public void wholeTextFiles() throws IOException { container.put(tempDirName+"/part-00000", new Text(content1).toString()); container.put(tempDirName+"/part-00001", new Text(content2).toString()); - JavaPairRDD readRDD = sc.wholeTextFiles(tempDirName); + JavaPairRDD readRDD = sc.wholeTextFiles(tempDirName, 3); List> result = readRDD.collect(); for (Tuple2 res : result) { diff --git a/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala b/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala index e89b296d41026..33d6de9a76405 100644 --- a/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala +++ b/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala @@ -73,7 +73,7 @@ class WholeTextFileRecordReaderSuite extends FunSuite with BeforeAndAfterAll { createNativeFile(dir, filename, contents) } - val res = sc.wholeTextFiles(dir.toString).collect() + val res = sc.wholeTextFiles(dir.toString, 3).collect() assert(res.size === WholeTextFileRecordReaderSuite.fileNames.size, "Number of files read out does not fit with the actual value.") From c10af601697de8369cfca4101f94b866e231de07 Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Fri, 11 Apr 2014 10:48:15 +0800 Subject: [PATCH 3/4] refine comments and rewrite new class for wholeTextFile --- .../scala/org/apache/spark/SparkContext.scala | 21 ++++--- .../spark/api/java/JavaSparkContext.scala | 7 ++- .../input/WholeTextFileInputFormat.scala | 4 +- .../org/apache/spark/rdd/NewHadoopRDD.scala | 55 +++++++++++++++---- 4 files changed, 62 insertions(+), 25 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 525c9e35afd59..25f9571956ff9 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -444,7 +444,7 @@ class SparkContext(config: SparkConf) extends Logging { * hdfs://a-hdfs-path/part-nnnnn * }}} * - * Do `val rdd = sparkContext.wholeTextFile("hdfs://a-hdfs-path", minSplits)` + * Do `val rdd = sparkContext.wholeTextFile("hdfs://a-hdfs-path")` * *

then `rdd` contains * {{{ @@ -454,15 +454,21 @@ class SparkContext(config: SparkConf) extends Logging { * (a-hdfs-path/part-nnnnn, its content) * }}} * - * @note Small files are preferred, as each file will be loaded fully in memory. + * @note Small files are preferred, large file is also allowable, but may cause bad performance. + * + * @param minSplits A suggestion value of the minimal splitting number for input data. */ def wholeTextFiles(path: String, minSplits: Int = defaultMinSplits): RDD[(String, String)] = { - newAPIHadoopFile( - path, + val job = new NewHadoopJob(hadoopConfiguration) + NewFileInputFormat.addInputPath(job, new Path(path)) + val updateConf = job.getConfiguration + new WholeTextFileRDD( + this, classOf[WholeTextFileInputFormat], classOf[String], classOf[String], - minSplits = minSplits) + updateConf, + minSplits) } /** @@ -585,12 +591,11 @@ class SparkContext(config: SparkConf) extends Logging { fClass: Class[F], kClass: Class[K], vClass: Class[V], - conf: Configuration = hadoopConfiguration, - minSplits: Int = 1): RDD[(K, V)] = { + conf: Configuration = hadoopConfiguration): RDD[(K, V)] = { val job = new NewHadoopJob(conf) NewFileInputFormat.addInputPath(job, new Path(path)) val updatedConf = job.getConfiguration - new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf, minSplits) + new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf) } /** diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala index fcdd680d70438..7fbefe1cb0fb1 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala @@ -167,8 +167,7 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork * hdfs://a-hdfs-path/part-nnnnn * }}} * - * Do - * `JavaPairRDD rdd = sparkContext.wholeTextFiles("hdfs://a-hdfs-path", minSplit)` + * Do `JavaPairRDD rdd = sparkContext.wholeTextFiles("hdfs://a-hdfs-path")`, * *

then `rdd` contains * {{{ @@ -178,7 +177,9 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork * (a-hdfs-path/part-nnnnn, its content) * }}} * - * @note Small files are preferred, as each file will be loaded fully in memory. + * @note Small files are preferred, large file is also allowable, but may cause bad performance. + * + * @param minSplits A suggestion value of the minimal splitting number for input data. */ def wholeTextFiles(path: String, minSplits: Int): JavaPairRDD[String, String] = new JavaPairRDD(sc.wholeTextFiles(path, minSplits)) diff --git a/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala b/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala index 23c2a136f47ac..80d055a89573b 100644 --- a/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala +++ b/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala @@ -20,7 +20,6 @@ package org.apache.spark.input import scala.collection.JavaConversions._ import org.apache.hadoop.fs.Path -import org.apache.hadoop.fs.FileStatus import org.apache.hadoop.mapreduce.InputSplit import org.apache.hadoop.mapreduce.JobContext import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat @@ -56,6 +55,7 @@ private[spark] class WholeTextFileInputFormat extends CombineFileInputFormat[Str val totalLen = files.map { file => if (file.isDir) 0L else file.getLen }.sum - super.setMaxSplitSize(totalLen / (if (minSplits == 0) 1 else minSplits)) + val maxSplitSize = Math.ceil(totalLen * 1.0 / (if (minSplits == 0) 1 else minSplits)).toLong + super.setMaxSplitSize(maxSplitSize) } } diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index 03fa70d5b3c29..e955889ed0ee7 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -28,8 +28,10 @@ import org.apache.spark.{InterruptibleIterator, Logging, Partition, Serializable import org.apache.spark.annotation.DeveloperApi import org.apache.spark.input.WholeTextFileInputFormat -private[spark] -class NewHadoopPartition(rddId: Int, val index: Int, @transient rawSplit: InputSplit with Writable) +private[spark] class NewHadoopPartition( + rddId: Int, + val index: Int, + @transient rawSplit: InputSplit with Writable) extends Partition { val serializableHadoopSplit = new SerializableWritable(rawSplit) @@ -57,8 +59,7 @@ class NewHadoopRDD[K, V]( inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], - @transient conf: Configuration, - minSplits: Int = 1) + @transient conf: Configuration) extends RDD[(K, V)](sc, Nil) with SparkHadoopMapReduceUtil with Logging { @@ -67,18 +68,19 @@ class NewHadoopRDD[K, V]( private val confBroadcast = sc.broadcast(new SerializableWritable(conf)) // private val serializableConf = new SerializableWritable(conf) - private val jobtrackerId: String = { + private val jobTrackerId: String = { val formatter = new SimpleDateFormat("yyyyMMddHHmm") formatter.format(new Date()) } - @transient private val jobId = new JobID(jobtrackerId, id) + @transient protected val jobId = new JobID(jobTrackerId, id) override def getPartitions: Array[Partition] = { val inputFormat = inputFormatClass.newInstance - - if (inputFormat.isInstanceOf[Configurable]) { - inputFormat.asInstanceOf[Configurable].setConf(conf) + inputFormat match { + case configurable: Configurable => + configurable.setConf(conf) + case _ => } val jobContext = newJobContext(conf, jobId) @@ -100,11 +102,13 @@ class NewHadoopRDD[K, V]( val split = theSplit.asInstanceOf[NewHadoopPartition] logInfo("Input split: " + split.serializableHadoopSplit) val conf = confBroadcast.value.value - val attemptId = newTaskAttemptID(jobtrackerId, id, isMap = true, split.index, 0) + val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, split.index, 0) val hadoopAttemptContext = newTaskAttemptContext(conf, attemptId) val format = inputFormatClass.newInstance - if (format.isInstanceOf[Configurable]) { - format.asInstanceOf[Configurable].setConf(conf) + format match { + case configurable: Configurable => + configurable.setConf(conf) + case _ => } val reader = format.createRecordReader( split.serializableHadoopSplit.value, hadoopAttemptContext) @@ -150,3 +154,30 @@ class NewHadoopRDD[K, V]( def getConf: Configuration = confBroadcast.value.value } +private[spark] class WholeTextFileRDD( + sc : SparkContext, + inputFormatClass: Class[_ <: WholeTextFileInputFormat], + keyClass: Class[String], + valueClass: Class[String], + @transient conf: Configuration, + minSplits: Int) + extends NewHadoopRDD[String, String](sc, inputFormatClass, keyClass, valueClass, conf) { + + override def getPartitions: Array[Partition] = { + val inputFormat = inputFormatClass.newInstance + inputFormat match { + case configurable: Configurable => + configurable.setConf(conf) + case _ => + } + val jobContext = newJobContext(conf, jobId) + inputFormat.setMaxSplitSize(jobContext, minSplits) + val rawSplits = inputFormat.getSplits(jobContext).toArray + val result = new Array[Partition](rawSplits.size) + for (i <- 0 until rawSplits.size) { + result(i) = new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable]) + } + result + } +} + From 76417f61f4c1d20a7e44ff70f688c503ba87ec3f Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Sun, 13 Apr 2014 11:58:25 +0800 Subject: [PATCH 4/4] refine comments --- .../main/scala/org/apache/spark/SparkContext.scala | 2 +- .../scala/org/apache/spark/rdd/NewHadoopRDD.scala | 12 +++++------- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 25f9571956ff9..456070fa7c5ef 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -444,7 +444,7 @@ class SparkContext(config: SparkConf) extends Logging { * hdfs://a-hdfs-path/part-nnnnn * }}} * - * Do `val rdd = sparkContext.wholeTextFile("hdfs://a-hdfs-path")` + * Do `val rdd = sparkContext.wholeTextFile("hdfs://a-hdfs-path")`, * *

then `rdd` contains * {{{ diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index e955889ed0ee7..8684b645bc361 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -24,9 +24,13 @@ import org.apache.hadoop.conf.{Configurable, Configuration} import org.apache.hadoop.io.Writable import org.apache.hadoop.mapreduce._ -import org.apache.spark.{InterruptibleIterator, Logging, Partition, SerializableWritable, SparkContext, TaskContext} import org.apache.spark.annotation.DeveloperApi import org.apache.spark.input.WholeTextFileInputFormat +import org.apache.spark.InterruptibleIterator +import org.apache.spark.Logging +import org.apache.spark.Partition +import org.apache.spark.SerializableWritable +import org.apache.spark.{SparkContext, TaskContext} private[spark] class NewHadoopPartition( rddId: Int, @@ -82,13 +86,7 @@ class NewHadoopRDD[K, V]( configurable.setConf(conf) case _ => } - val jobContext = newJobContext(conf, jobId) - - if (inputFormat.isInstanceOf[WholeTextFileInputFormat]) { - inputFormat.asInstanceOf[WholeTextFileInputFormat].setMaxSplitSize(jobContext, minSplits) - } - val rawSplits = inputFormat.getSplits(jobContext).toArray val result = new Array[Partition](rawSplits.size) for (i <- 0 until rawSplits.size) {