Skip to content

Commit ed426f3

Browse files
author
Sital Kedia
committed
Address review comments
1 parent 8076e27 commit ed426f3

File tree

1 file changed

+34
-27
lines changed

1 file changed

+34
-27
lines changed

core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java

Lines changed: 34 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.io.EOFException;
2222
import java.io.IOException;
2323
import java.io.InputStream;
24+
import java.io.InterruptedIOException;
2425
import java.nio.ByteBuffer;
2526
import java.util.concurrent.ExecutorService;
2627
import java.util.concurrent.TimeUnit;
@@ -82,10 +83,11 @@ public class ReadAheadInputStream extends InputStream {
8283
*/
8384
public ReadAheadInputStream(InputStream inputStream, int bufferSizeInBytes, int readAheadThresholdInBytes) {
8485
Preconditions.checkArgument(bufferSizeInBytes > 0,
85-
"bufferSizeInBytes should be greater than 0");
86+
"bufferSizeInBytes should be greater than 0, but the value is " + bufferSizeInBytes);
8687
Preconditions.checkArgument(readAheadThresholdInBytes > 0 &&
8788
readAheadThresholdInBytes < bufferSizeInBytes,
88-
"readAheadThresholdInBytes should be greater than 0 and less than bufferSizeInBytes" );
89+
"readAheadThresholdInBytes should be greater than 0 and less than bufferSizeInBytes, but the" +
90+
"value is " + readAheadThresholdInBytes );
8991
activeBuffer = ByteBuffer.allocate(bufferSizeInBytes);
9092
readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes);
9193
this.readAheadThresholdInBytes = readAheadThresholdInBytes;
@@ -95,22 +97,22 @@ public ReadAheadInputStream(InputStream inputStream, int bufferSizeInBytes, int
9597
}
9698

9799
private boolean isEndOfStream() {
98-
if (!activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && endOfStream) {
99-
return true;
100-
}
101-
return false;
100+
return (!activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && endOfStream);
102101
}
102+
103103
private void readAsync(final ByteBuffer byteBuffer) throws IOException {
104104
stateChangeLock.lock();
105-
if (endOfStream || readInProgress) {
105+
final byte[] arr = byteBuffer.array();
106+
try {
107+
if (endOfStream || readInProgress) {
108+
return;
109+
}
110+
byteBuffer.position(0);
111+
byteBuffer.flip();
112+
readInProgress = true;
113+
} finally {
106114
stateChangeLock.unlock();
107-
return;
108115
}
109-
byteBuffer.position(0);
110-
byteBuffer.flip();
111-
readInProgress = true;
112-
final byte[] arr = byteBuffer.array();
113-
stateChangeLock.unlock();
114116
executorService.execute(new Runnable() {
115117
@Override
116118
public void run() {
@@ -122,7 +124,7 @@ public void run() {
122124
// So there is no race condition in both the situations.
123125
boolean handled = false;
124126
int read = 0;
125-
Exception exception = new Exception("Unknown exception in ReadAheadInputStream");
127+
Exception exception = null;
126128
try {
127129
while (true) {
128130
read = underlyingInputStream.read(arr);
@@ -159,12 +161,13 @@ private void signalAsyncReadComplete() {
159161
}
160162
}
161163

162-
private void waitForAsyncReadComplete() {
164+
private void waitForAsyncReadComplete() throws IOException {
163165
stateChangeLock.lock();
164166
try {
165-
if (readInProgress)
167+
while (readInProgress)
166168
asyncReadComplete.await();
167169
} catch (InterruptedException e) {
170+
throw new InterruptedIOException(e.getMessage());
168171
} finally {
169172
stateChangeLock.unlock();
170173
}
@@ -189,12 +192,10 @@ public int read(byte[] b, int offset, int len) throws IOException {
189192
}
190193
stateChangeLock.lock();
191194
try {
192-
len = readInternal(b, offset, len);
193-
}
194-
finally {
195+
return readInternal(b, offset, len);
196+
} finally {
195197
stateChangeLock.unlock();
196198
}
197-
return len;
198199
}
199200

200201
/**
@@ -234,10 +235,12 @@ private int readInternal(byte[] b, int offset, int len) throws IOException {
234235
public int available() throws IOException {
235236
stateChangeLock.lock();
236237
// Make sure we have no integer overflow.
237-
int val = (int) Math.min((long) Integer.MAX_VALUE,
238-
(long) activeBuffer.remaining() + readAheadBuffer.remaining());
239-
stateChangeLock.unlock();
240-
return val;
238+
try {
239+
return (int) Math.min((long) Integer.MAX_VALUE,
240+
(long) activeBuffer.remaining() + readAheadBuffer.remaining());
241+
} finally {
242+
stateChangeLock.unlock();
243+
}
241244
}
242245

243246
@Override
@@ -297,19 +300,23 @@ private long skipInternal(long n) throws IOException {
297300

298301
@Override
299302
public void close() throws IOException {
303+
InterruptedException interruptedException = null;
300304
executorService.shutdown();
301305
try {
302-
executorService.awaitTermination(10, TimeUnit.MILLISECONDS);
306+
executorService.awaitTermination(10, TimeUnit.SECONDS);
303307
} catch (InterruptedException e) {
308+
interruptedException = e;
304309
}
305310
underlyingInputStream.close();
306311
stateChangeLock.lock();
307312
try {
308313
StorageUtils.dispose(activeBuffer);
309314
StorageUtils.dispose(readAheadBuffer);
310-
}
311-
finally {
315+
} finally {
312316
stateChangeLock.unlock();
317+
if (interruptedException != null) {
318+
throw new InterruptedIOException(interruptedException.getMessage());
319+
}
313320
}
314321
}
315322
}

0 commit comments

Comments
 (0)