Skip to content

Commit f8d0191

Browse files
committed
avoid seeking beyond eof
1 parent e35a366 commit f8d0191

File tree

1 file changed

+15
-8
lines changed

1 file changed

+15
-8
lines changed

core/src/main/scala/org/apache/spark/input/EscapedTextInputFormat.scala

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import java.io.{BufferedReader, IOException, InputStreamReader}
2222
import scala.collection.mutable.ArrayBuffer
2323

2424
import org.apache.hadoop.conf.Configuration
25-
import org.apache.hadoop.fs.FSDataInputStream
25+
import org.apache.hadoop.fs.{FileSystem, Path}
2626
import org.apache.hadoop.io.compress.CompressionCodecFactory
2727
import org.apache.hadoop.mapreduce.{InputSplit, RecordReader, TaskAttemptContext}
2828
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
@@ -105,11 +105,15 @@ private[input] class EscapedTextRecordReader extends RecordReader[Long, Array[St
105105
throw new IOException(s"Do not support compressed files but found $file.")
106106
}
107107
val fs = file.getFileSystem(conf)
108-
val in = fs.open(file)
109-
start = findNext(in, split.getStart)
110-
end = findNext(in, split.getStart + split.getLength)
108+
val size = fs.getFileStatus(file).getLen
109+
start = findNext(fs, file, size, split.getStart)
110+
end = findNext(fs, file, size, split.getStart + split.getLength)
111111
cur = start
112-
in.seek(cur)
112+
val in = fs.open(file)
113+
if (cur > 0L) {
114+
in.seek(cur - 1L)
115+
in.read()
116+
}
113117
reader = new BufferedReader(new InputStreamReader(in), defaultBufferSize)
114118
}
115119

@@ -147,9 +151,11 @@ private[input] class EscapedTextRecordReader extends RecordReader[Long, Array[St
147151
* position that is not escaped.
148152
* @return the start position of the next record
149153
*/
150-
private def findNext(in: FSDataInputStream, start: Long): Long = {
151-
if (start == 0L) return 0L
152-
var pos = start
154+
private def findNext(fs: FileSystem, file: Path, size: Long, offset: Long): Long = {
155+
if (offset == 0L) return 0L
156+
if (offset >= size) return size
157+
var pos = offset
158+
val in = fs.open(file)
153159
in.seek(pos)
154160
val br = new BufferedReader(new InputStreamReader(in), defaultBufferSize)
155161
var escaped = true
@@ -183,6 +189,7 @@ private[input] class EscapedTextRecordReader extends RecordReader[Long, Array[St
183189
}
184190
}
185191
}
192+
in.close()
186193
pos
187194
}
188195

0 commit comments

Comments
 (0)