From bf800b928c4b2760c9bf39b5a7021138a838d5fe Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Sun, 16 Nov 2014 22:29:04 -0800 Subject: [PATCH 1/3] update doc for WholeCombineFileRecordReader --- .../org/apache/spark/input/WholeTextFileRecordReader.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala b/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala index 6d59b24eb059..40aaeac9d05b 100644 --- a/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala +++ b/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala @@ -87,9 +87,8 @@ private[spark] class WholeTextFileRecordReader( /** - * A [[org.apache.hadoop.mapreduce.RecordReader RecordReader]] for reading a single whole text file - * out in a key-value pair, where the key is the file path and the value is the entire content of - * the file. + * A [[org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader CombineFileRecordReader]] + * that could pass Hadoop configuration to WholeTextFileRecordReader. */ private[spark] class WholeCombineFileRecordReader( split: InputSplit, From 95d13eb335f441ba34615bada691655702df63a1 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Mon, 15 Dec 2014 17:22:29 -0800 Subject: [PATCH 2/3] address comment --- .../input/WholeTextFileInputFormat.scala | 11 ++----- .../input/WholeTextFileRecordReader.scala | 30 +++++++++---------- 2 files changed, 17 insertions(+), 24 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala b/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala index d3601cca832b..c21509d57847 100644 --- a/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala +++ b/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala @@ -19,7 +19,6 @@ package org.apache.spark.input import scala.collection.JavaConversions._ -import org.apache.hadoop.conf.{Configuration, Configurable} import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce.InputSplit import org.apache.hadoop.mapreduce.JobContext @@ -38,18 +37,12 @@ private[spark] class WholeTextFileInputFormat override protected def isSplitable(context: JobContext, file: Path): Boolean = false - private var conf: Configuration = _ - def setConf(c: Configuration) { - conf = c - } - def getConf: Configuration = conf - override def createRecordReader( split: InputSplit, context: TaskAttemptContext): RecordReader[String, String] = { - val reader = new WholeCombineFileRecordReader(split, context) - reader.setConf(conf) + val reader = new ConfigurableCombineFileRecordReader(split, context) + reader.setConf(getConf) reader } diff --git a/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala b/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala index 40aaeac9d05b..89e85abb60d1 100644 --- a/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala +++ b/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala @@ -17,7 +17,7 @@ package org.apache.spark.input -import org.apache.hadoop.conf.{Configuration, Configurable} +import org.apache.hadoop.conf.{Configuration, Configurable => HConfigurable} import com.google.common.io.{ByteStreams, Closeables} import org.apache.hadoop.io.Text @@ -27,6 +27,18 @@ import org.apache.hadoop.mapreduce.lib.input.{CombineFileSplit, CombineFileRecor import org.apache.hadoop.mapreduce.RecordReader import org.apache.hadoop.mapreduce.TaskAttemptContext + +/** + * A trait to implement [[org.apache.hadoop.conf.Configurable Configurable]] interface. + */ +private[spark] trait Configurable extends HConfigurable { + private var conf: Configuration = _ + def setConf(c: Configuration) { + conf = c + } + def getConf: Configuration = conf +} + /** * A [[org.apache.hadoop.mapreduce.RecordReader RecordReader]] for reading a single whole text file * out in a key-value pair, where the key is the file path and the value is the entire content of @@ -38,12 +50,6 @@ private[spark] class WholeTextFileRecordReader( index: Integer) extends RecordReader[String, String] with Configurable { - private var conf: Configuration = _ - def setConf(c: Configuration) { - conf = c - } - def getConf: Configuration = conf - private[this] val path = split.getPath(index) private[this] val fs = path.getFileSystem(context.getConfiguration) @@ -90,7 +96,7 @@ private[spark] class WholeTextFileRecordReader( * A [[org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader CombineFileRecordReader]] * that could pass Hadoop configuration to WholeTextFileRecordReader. */ -private[spark] class WholeCombineFileRecordReader( +private[spark] class ConfigurableCombineFileRecordReader( split: InputSplit, context: TaskAttemptContext) extends CombineFileRecordReader[String, String]( @@ -99,16 +105,10 @@ private[spark] class WholeCombineFileRecordReader( classOf[WholeTextFileRecordReader] ) with Configurable { - private var conf: Configuration = _ - def setConf(c: Configuration) { - conf = c - } - def getConf: Configuration = conf - override def initNextRecordReader(): Boolean = { val r = super.initNextRecordReader() if (r) { - this.curReader.asInstanceOf[WholeTextFileRecordReader].setConf(conf) + this.curReader.asInstanceOf[WholeTextFileRecordReader].setConf(getConf) } r } From dc3d21a5bedbb8d42a3584159216e2b6d5bf9330 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 15 Dec 2014 23:34:46 -0800 Subject: [PATCH 3/3] More genericization in ConfigurableCombineFileRecordReader. --- .../spark/input/WholeTextFileInputFormat.scala | 3 ++- .../spark/input/WholeTextFileRecordReader.scala | 14 ++++++++------ 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala b/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala index c21509d57847..aaef7c74eea3 100644 --- a/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala +++ b/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala @@ -41,7 +41,8 @@ private[spark] class WholeTextFileInputFormat split: InputSplit, context: TaskAttemptContext): RecordReader[String, String] = { - val reader = new ConfigurableCombineFileRecordReader(split, context) + val reader = + new ConfigurableCombineFileRecordReader(split, context, classOf[WholeTextFileRecordReader]) reader.setConf(getConf) reader } diff --git a/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala b/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala index 89e85abb60d1..1b1131b9b883 100644 --- a/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala +++ b/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala @@ -94,21 +94,23 @@ private[spark] class WholeTextFileRecordReader( /** * A [[org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader CombineFileRecordReader]] - * that could pass Hadoop configuration to WholeTextFileRecordReader. + * that can pass Hadoop Configuration to [[org.apache.hadoop.conf.Configurable Configurable]] + * RecordReaders. */ -private[spark] class ConfigurableCombineFileRecordReader( +private[spark] class ConfigurableCombineFileRecordReader[K, V]( split: InputSplit, - context: TaskAttemptContext) - extends CombineFileRecordReader[String, String]( + context: TaskAttemptContext, + recordReaderClass: Class[_ <: RecordReader[K, V] with HConfigurable]) + extends CombineFileRecordReader[K, V]( split.asInstanceOf[CombineFileSplit], context, - classOf[WholeTextFileRecordReader] + recordReaderClass ) with Configurable { override def initNextRecordReader(): Boolean = { val r = super.initNextRecordReader() if (r) { - this.curReader.asInstanceOf[WholeTextFileRecordReader].setConf(getConf) + this.curReader.asInstanceOf[HConfigurable].setConf(getConf) } r }