@@ -25,6 +25,7 @@ import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit
2525import org .apache .hadoop .mapreduce .RecordReader
2626import org .apache .hadoop .mapreduce .TaskAttemptContext
2727import org .apache .hadoop .fs .{ FSDataInputStream , Path }
28+ import org .apache .spark .annotation .DeveloperApi
2829import org .apache .hadoop .mapreduce .lib .input .CombineFileInputFormat
2930import org .apache .hadoop .mapreduce .JobContext
3031import org .apache .hadoop .mapreduce .lib .input .CombineFileRecordReader
@@ -60,6 +61,7 @@ abstract class StreamFileInputFormat[T]
6061 * @note TaskAttemptContext is not serializable resulting in the confBytes construct
6162 * @note CombineFileSplit is not serializable resulting in the splitBytes construct
6263 */
64+ @ DeveloperApi
6365class PortableDataStream (@ transient isplit : CombineFileSplit ,
6466 @ transient context : TaskAttemptContext , index : Integer )
6567 extends Serializable {
@@ -205,8 +207,7 @@ private[spark] class StreamRecordReader(
205207}
206208
207209/**
208- * A class for extracting the information from the file using the
209- * BinaryRecordReader (as Byte array)
210+ * The format for the PortableDataStream files
210211 */
211212private [spark] class StreamInputFormat extends StreamFileInputFormat [PortableDataStream ] {
212213 override def createRecordReader (split : InputSplit , taContext : TaskAttemptContext ) =
@@ -216,22 +217,3 @@ private[spark] class StreamInputFormat extends StreamFileInputFormat[PortableDat
216217 }
217218}
218219
219- /**
220- * A [[org.apache.hadoop.mapreduce.RecordReader RecordReader ]] for reading a single binary file
221- * out in a key-value pair, where the key is the file path and the value is the entire content of
222- * the file as a byte array
223- */
224- abstract class BinaryRecordReader [T ](
225- split : CombineFileSplit ,
226- context : TaskAttemptContext ,
227- index : Integer )
228- extends StreamBasedRecordReader [T ](split, context, index) {
229-
230- def parseStream (inpStream : PortableDataStream ): T = {
231- val inStream = inpStream.open()
232- val innerBuffer = ByteStreams .toByteArray(inStream)
233- Closeables .close(inStream, false )
234- parseByteArray(innerBuffer)
235- }
236- def parseByteArray (inArray : Array [Byte ]): T
237- }
0 commit comments