Skip to content

Commit c7b60a5

Browse files
author
Sital Kedia
committed
Review comments
1 parent 28e767d commit c7b60a5

File tree

1 file changed

+5
-7
lines changed

1 file changed

+5
-7
lines changed

core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -94,8 +94,8 @@ public ReadAheadInputStream(InputStream inputStream, int bufferSizeInBytes, int
9494
readAheadBuffer.flip();
9595
}
9696

97-
private boolean hasRemaining() {
98-
if(activeBuffer.remaining() == 0 && readAheadBuffer.remaining() == 0 && endOfStream) {
97+
private boolean isEndOfStream() {
98+
if(!activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && endOfStream) {
9999
return true;
100100
}
101101
return false;
@@ -109,13 +109,11 @@ private void readAsync(final ByteBuffer byteBuffer) throws IOException {
109109
byteBuffer.position(0);
110110
byteBuffer.flip();
111111
readInProgress = true;
112+
final byte[] arr = byteBuffer.array();
112113
stateChangeLock.unlock();
113114
executorService.execute(new Runnable() {
114115
@Override
115116
public void run() {
116-
stateChangeLock.lock();
117-
byte[] arr = byteBuffer.array();
118-
stateChangeLock.unlock();
119117
// Please note that it is safe to release the lock and read into the read ahead buffer
120118
// because either of following two conditions will hold - 1. The active buffer has
121119
// data available to read so the reader will not read from the read ahead buffer.
@@ -139,7 +137,7 @@ public void run() {
139137
endOfStream = true;
140138
} else if (!handled) {
141139
readAborted = true;
142-
readException = exception;
140+
readException = exception != null ? exception: new Exception("Unknown exception in ReadAheadInputStream");
143141
} else {
144142
byteBuffer.limit(read);
145143
}
@@ -215,7 +213,7 @@ private int readInternal(byte[] b, int offset, int len) throws IOException {
215213
if (readAborted) {
216214
throw new IOException(readException);
217215
}
218-
if (hasRemaining()) {
216+
if (isEndOfStream()) {
219217
return -1;
220218
}
221219
len = Math.min(len, activeBuffer.remaining());

0 commit comments

Comments
 (0)