1717
1818package org .apache .spark .input
1919
20- import org .apache .hadoop .conf .{Configuration , Configurable }
20+ import org .apache .hadoop .conf .{Configuration , Configurable => HConfigurable }
2121import com .google .common .io .{ByteStreams , Closeables }
2222
2323import org .apache .hadoop .io .Text
@@ -27,6 +27,18 @@ import org.apache.hadoop.mapreduce.lib.input.{CombineFileSplit, CombineFileRecor
2727import org .apache .hadoop .mapreduce .RecordReader
2828import org .apache .hadoop .mapreduce .TaskAttemptContext
2929
30+
31+ /**
32+ * A trait to implement [[org.apache.hadoop.conf.Configurable Configurable ]] interface.
33+ */
34+ private [spark] trait Configurable extends HConfigurable {
35+ private var conf : Configuration = _
36+ def setConf (c : Configuration ) {
37+ conf = c
38+ }
39+ def getConf : Configuration = conf
40+ }
41+
3042/**
3143 * A [[org.apache.hadoop.mapreduce.RecordReader RecordReader ]] for reading a single whole text file
3244 * out in a key-value pair, where the key is the file path and the value is the entire content of
@@ -38,12 +50,6 @@ private[spark] class WholeTextFileRecordReader(
3850 index : Integer )
3951 extends RecordReader [String , String ] with Configurable {
4052
41- private var conf : Configuration = _
42- def setConf (c : Configuration ) {
43- conf = c
44- }
45- def getConf : Configuration = conf
46-
4753 private [this ] val path = split.getPath(index)
4854 private [this ] val fs = path.getFileSystem(context.getConfiguration)
4955
@@ -90,7 +96,7 @@ private[spark] class WholeTextFileRecordReader(
9096 * A [[org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader CombineFileRecordReader ]]
9197 * that could pass Hadoop configuration to WholeTextFileRecordReader.
9298 */
93- private [spark] class WholeCombineFileRecordReader (
99+ private [spark] class ConfigurableCombineFileRecordReader (
94100 split : InputSplit ,
95101 context : TaskAttemptContext )
96102 extends CombineFileRecordReader [String , String ](
@@ -99,16 +105,10 @@ private[spark] class WholeCombineFileRecordReader(
99105 classOf [WholeTextFileRecordReader ]
100106 ) with Configurable {
101107
102- private var conf : Configuration = _
103- def setConf (c : Configuration ) {
104- conf = c
105- }
106- def getConf : Configuration = conf
107-
108108 override def initNextRecordReader (): Boolean = {
109109 val r = super .initNextRecordReader()
110110 if (r) {
111- this .curReader.asInstanceOf [WholeTextFileRecordReader ].setConf(conf )
111+ this .curReader.asInstanceOf [WholeTextFileRecordReader ].setConf(getConf )
112112 }
113113 r
114114 }
0 commit comments