Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.spark.input

import scala.collection.JavaConversions._

import org.apache.hadoop.conf.{Configuration, Configurable}
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.InputSplit
import org.apache.hadoop.mapreduce.JobContext
Expand All @@ -38,18 +37,13 @@ private[spark] class WholeTextFileInputFormat

override protected def isSplitable(context: JobContext, file: Path): Boolean = false

private var conf: Configuration = _
def setConf(c: Configuration) {
conf = c
}
def getConf: Configuration = conf

override def createRecordReader(
split: InputSplit,
context: TaskAttemptContext): RecordReader[String, String] = {

val reader = new WholeCombineFileRecordReader(split, context)
reader.setConf(conf)
val reader =
new ConfigurableCombineFileRecordReader(split, context, classOf[WholeTextFileRecordReader])
reader.setConf(getConf)
reader
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.input

import org.apache.hadoop.conf.{Configuration, Configurable}
import org.apache.hadoop.conf.{Configuration, Configurable => HConfigurable}
import com.google.common.io.{ByteStreams, Closeables}

import org.apache.hadoop.io.Text
Expand All @@ -27,6 +27,18 @@ import org.apache.hadoop.mapreduce.lib.input.{CombineFileSplit, CombineFileRecor
import org.apache.hadoop.mapreduce.RecordReader
import org.apache.hadoop.mapreduce.TaskAttemptContext


/**
* A trait to implement [[org.apache.hadoop.conf.Configurable Configurable]] interface.
*/
private[spark] trait Configurable extends HConfigurable {
private var conf: Configuration = _
def setConf(c: Configuration) {
conf = c
}
def getConf: Configuration = conf
}

/**
* A [[org.apache.hadoop.mapreduce.RecordReader RecordReader]] for reading a single whole text file
* out in a key-value pair, where the key is the file path and the value is the entire content of
Expand All @@ -38,12 +50,6 @@ private[spark] class WholeTextFileRecordReader(
index: Integer)
extends RecordReader[String, String] with Configurable {

private var conf: Configuration = _
def setConf(c: Configuration) {
conf = c
}
def getConf: Configuration = conf

private[this] val path = split.getPath(index)
private[this] val fs = path.getFileSystem(context.getConfiguration)

Expand Down Expand Up @@ -87,29 +93,24 @@ private[spark] class WholeTextFileRecordReader(


/**
* A [[org.apache.hadoop.mapreduce.RecordReader RecordReader]] for reading a single whole text file
* out in a key-value pair, where the key is the file path and the value is the entire content of
* the file.
* A [[org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader CombineFileRecordReader]]
* that can pass Hadoop Configuration to [[org.apache.hadoop.conf.Configurable Configurable]]
* RecordReaders.
*/
private[spark] class WholeCombineFileRecordReader(
private[spark] class ConfigurableCombineFileRecordReader[K, V](
split: InputSplit,
context: TaskAttemptContext)
extends CombineFileRecordReader[String, String](
context: TaskAttemptContext,
recordReaderClass: Class[_ <: RecordReader[K, V] with HConfigurable])
extends CombineFileRecordReader[K, V](
split.asInstanceOf[CombineFileSplit],
context,
classOf[WholeTextFileRecordReader]
recordReaderClass
) with Configurable {

private var conf: Configuration = _
def setConf(c: Configuration) {
conf = c
}
def getConf: Configuration = conf

override def initNextRecordReader(): Boolean = {
val r = super.initNextRecordReader()
if (r) {
this.curReader.asInstanceOf[WholeTextFileRecordReader].setConf(conf)
this.curReader.asInstanceOf[HConfigurable].setConf(getConf)
}
r
}
Expand Down