Skip to content

Commit 4163e38

Browse files
committed
fixing line length and output from FSDataInputStream to DataInputStream to minimize sensitivity to Hadoop API changes
1 parent 19812a8 commit 4163e38

File tree

1 file changed

+26
-33
lines changed

1 file changed

+26
-33
lines changed

core/src/main/scala/org/apache/spark/input/RawFileInput.scala

Lines changed: 26 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -18,25 +18,24 @@
1818
package org.apache.spark.input
1919

2020
import scala.collection.JavaConversions._
21-
import com.google.common.io.{ByteStreams, Closeables}
21+
import com.google.common.io.{ ByteStreams, Closeables }
2222
import org.apache.hadoop.mapreduce.InputSplit
2323
import org.apache.hadoop.conf.Configuration
2424
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit
2525
import org.apache.hadoop.mapreduce.RecordReader
2626
import org.apache.hadoop.mapreduce.TaskAttemptContext
27-
import org.apache.hadoop.fs.{FSDataInputStream, Path}
27+
import org.apache.hadoop.fs.{ FSDataInputStream, Path }
2828
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat
2929
import org.apache.hadoop.mapreduce.JobContext
3030
import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader
31-
import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataOutputStream, DataInputStream}
32-
31+
import java.io.{ ByteArrayInputStream, ByteArrayOutputStream, DataOutputStream, DataInputStream }
3332

3433
/**
3534
* A general format for reading whole files in as streams, byte arrays,
3635
* or other functions to be added
3736
*/
3837
abstract class StreamFileInputFormat[T]
39-
extends CombineFileInputFormat[String,T] {
38+
extends CombineFileInputFormat[String, T] {
4039
override protected def isSplitable(context: JobContext, file: Path): Boolean = false
4140
/**
4241
* Allow minPartitions set by end-user in order to keep compatibility with old Hadoop API.
@@ -51,8 +50,7 @@ abstract class StreamFileInputFormat[T]
5150
super.setMaxSplitSize(maxSplitSize)
5251
}
5352

54-
def createRecordReader(split: InputSplit, taContext: TaskAttemptContext):
55-
RecordReader[String,T]
53+
def createRecordReader(split: InputSplit, taContext: TaskAttemptContext): RecordReader[String, T]
5654

5755
}
5856

@@ -62,13 +60,14 @@ abstract class StreamFileInputFormat[T]
6260
* @note TaskAttemptContext is not serializable resulting in the confBytes construct
6361
* @note CombineFileSplit is not serializable resulting in the splitBytes construct
6462
*/
65-
class PortableDataStream(@transient isplit: CombineFileSplit, @transient context: TaskAttemptContext, index: Integer)
63+
class PortableDataStream(@transient isplit: CombineFileSplit,
64+
@transient context: TaskAttemptContext, index: Integer)
6665
extends Serializable {
6766
// transient forces file to be reopened after being serialization
6867
// it is also used for non-serializable classes
6968

7069
@transient
71-
private var fileIn: FSDataInputStream = null.asInstanceOf[FSDataInputStream]
70+
private var fileIn: DataInputStream = null.asInstanceOf[DataInputStream]
7271
@transient
7372
private var isOpen = false
7473

@@ -111,12 +110,12 @@ class PortableDataStream(@transient isplit: CombineFileSplit, @transient context
111110
/**
112111
* create a new DataInputStream from the split and context
113112
*/
114-
def open(): FSDataInputStream = {
113+
def open(): DataInputStream = {
115114
if (!isOpen) {
116115
val pathp = split.getPath(index)
117116
val fs = pathp.getFileSystem(conf)
118117
fileIn = fs.open(pathp)
119-
isOpen=true
118+
isOpen = true
120119
}
121120
fileIn
122121
}
@@ -138,7 +137,7 @@ class PortableDataStream(@transient isplit: CombineFileSplit, @transient context
138137
if (isOpen) {
139138
try {
140139
fileIn.close()
141-
isOpen=false
140+
isOpen = false
142141
} catch {
143142
case ioe: java.io.IOException => // do nothing
144143
}
@@ -152,20 +151,17 @@ class PortableDataStream(@transient isplit: CombineFileSplit, @transient context
152151
* to reading files out as streams
153152
*/
154153
abstract class StreamBasedRecordReader[T](
155-
split: CombineFileSplit,
156-
context: TaskAttemptContext,
157-
index: Integer)
154+
split: CombineFileSplit,
155+
context: TaskAttemptContext,
156+
index: Integer)
158157
extends RecordReader[String, T] {
159158

160-
161-
162159
// True means the current file has been processed, then skip it.
163160
private var processed = false
164161

165162
private var key = ""
166163
private var value: T = null.asInstanceOf[T]
167164

168-
169165
override def initialize(split: InputSplit, context: TaskAttemptContext) = {}
170166
override def close() = {}
171167

@@ -175,8 +171,6 @@ abstract class StreamBasedRecordReader[T](
175171

176172
override def getCurrentValue = value
177173

178-
179-
180174
override def nextKeyValue = {
181175
if (!processed) {
182176
val fileIn = new PortableDataStream(split, context, index)
@@ -202,9 +196,9 @@ abstract class StreamBasedRecordReader[T](
202196
* Reads the record in directly as a stream for other objects to manipulate and handle
203197
*/
204198
private[spark] class StreamRecordReader(
205-
split: CombineFileSplit,
206-
context: TaskAttemptContext,
207-
index: Integer)
199+
split: CombineFileSplit,
200+
context: TaskAttemptContext,
201+
index: Integer)
208202
extends StreamBasedRecordReader[PortableDataStream](split, context, index) {
209203

210204
def parseStream(inStream: PortableDataStream): PortableDataStream = inStream
@@ -215,12 +209,11 @@ private[spark] class StreamRecordReader(
215209
* BinaryRecordReader (as Byte array)
216210
*/
217211
private[spark] class StreamInputFormat extends StreamFileInputFormat[PortableDataStream] {
218-
override def createRecordReader(split: InputSplit, taContext: TaskAttemptContext)=
219-
{
220-
new CombineFileRecordReader[String,PortableDataStream](
221-
split.asInstanceOf[CombineFileSplit], taContext, classOf[StreamRecordReader]
222-
)
223-
}
212+
override def createRecordReader(split: InputSplit, taContext: TaskAttemptContext) =
213+
{
214+
new CombineFileRecordReader[String, PortableDataStream](
215+
split.asInstanceOf[CombineFileSplit], taContext, classOf[StreamRecordReader])
216+
}
224217
}
225218

226219
/**
@@ -229,10 +222,10 @@ private[spark] class StreamInputFormat extends StreamFileInputFormat[PortableDat
229222
* the file as a byte array
230223
*/
231224
abstract class BinaryRecordReader[T](
232-
split: CombineFileSplit,
233-
context: TaskAttemptContext,
234-
index: Integer)
235-
extends StreamBasedRecordReader[T](split,context,index) {
225+
split: CombineFileSplit,
226+
context: TaskAttemptContext,
227+
index: Integer)
228+
extends StreamBasedRecordReader[T](split, context, index) {
236229

237230
def parseStream(inpStream: PortableDataStream): T = {
238231
val inStream = inpStream.open()

0 commit comments

Comments
 (0)