1414package org .apache .spark .io ;
1515
1616import com .google .common .base .Preconditions ;
17+ import com .google .common .base .Throwables ;
1718import org .apache .spark .util .ThreadUtils ;
19+ import org .slf4j .Logger ;
20+ import org .slf4j .LoggerFactory ;
1821
1922import javax .annotation .concurrent .GuardedBy ;
2023import java .io .EOFException ;
3740 */
3841public class ReadAheadInputStream extends InputStream {
3942
43+ private static final Logger logger = LoggerFactory .getLogger (ReadAheadInputStream .class );
44+
4045 private ReentrantLock stateChangeLock = new ReentrantLock ();
4146
4247 @ GuardedBy ("stateChangeLock" )
@@ -57,7 +62,20 @@ public class ReadAheadInputStream extends InputStream {
5762 private boolean readAborted ;
5863
5964 @ GuardedBy ("stateChangeLock" )
60- private Exception readException ;
65+ private Throwable readException ;
66+
67+ @ GuardedBy ("stateChangeLock" )
68+ // whether the close method is called.
69+ private boolean isClosed ;
70+
71+ @ GuardedBy ("stateChangeLock" )
72+ // true when the close method will close the underlying input stream. This is valid only if
73+ // `isClosed` is true.
74+ private boolean isUnderlyingInputStreamBeingClosed ;
75+
76+ @ GuardedBy ("stateChangeLock" )
77+ // whether there is a read ahead task running,
78+ private boolean isReading ;
6179
6280 // If the remaining data size in the current buffer is below this threshold,
6381 // we issue an async read from the underlying input stream.
@@ -75,18 +93,18 @@ public class ReadAheadInputStream extends InputStream {
7593 * Creates a <code>ReadAheadInputStream</code> with the specified buffer size and read-ahead
7694 * threshold
7795 *
78- * @param inputStream The underlying input stream.
79- * @param bufferSizeInBytes The buffer size.
80- * @param readAheadThresholdInBytes If the active buffer has less data than the read-ahead
81- * threshold, an async read is triggered.
96+ * @param inputStream The underlying input stream.
97+ * @param bufferSizeInBytes The buffer size.
98+ * @param readAheadThresholdInBytes If the active buffer has less data than the read-ahead
99+ * threshold, an async read is triggered.
82100 */
83101 public ReadAheadInputStream (InputStream inputStream , int bufferSizeInBytes , int readAheadThresholdInBytes ) {
84102 Preconditions .checkArgument (bufferSizeInBytes > 0 ,
85- "bufferSizeInBytes should be greater than 0, but the value is " + bufferSizeInBytes );
103+ "bufferSizeInBytes should be greater than 0, but the value is " + bufferSizeInBytes );
86104 Preconditions .checkArgument (readAheadThresholdInBytes > 0 &&
87- readAheadThresholdInBytes < bufferSizeInBytes ,
88- "readAheadThresholdInBytes should be greater than 0 and less than bufferSizeInBytes, but the" +
89- "value is " + readAheadThresholdInBytes );
105+ readAheadThresholdInBytes < bufferSizeInBytes ,
106+ "readAheadThresholdInBytes should be greater than 0 and less than bufferSizeInBytes, but the" +
107+ "value is " + readAheadThresholdInBytes );
90108 activeBuffer = ByteBuffer .allocate (bufferSizeInBytes );
91109 readAheadBuffer = ByteBuffer .allocate (bufferSizeInBytes );
92110 this .readAheadThresholdInBytes = readAheadThresholdInBytes ;
@@ -99,57 +117,104 @@ private boolean isEndOfStream() {
99117 return (!activeBuffer .hasRemaining () && !readAheadBuffer .hasRemaining () && endOfStream );
100118 }
101119
102- private void readAsync (final ByteBuffer byteBuffer ) throws IOException {
120+ private void checkReadException () throws IOException {
121+ if (readAborted ) {
122+ Throwables .propagateIfPossible (readException , IOException .class );
123+ throw new IOException (readException );
124+ }
125+ }
126+
127+ /** Read data from underlyingInputStream to readAheadBuffer asynchronously. */
128+ private void readAsync () throws IOException {
103129 stateChangeLock .lock ();
104- final byte [] arr = byteBuffer .array ();
130+ final byte [] arr = readAheadBuffer .array ();
105131 try {
106132 if (endOfStream || readInProgress ) {
107133 return ;
108134 }
109- byteBuffer .position (0 );
110- byteBuffer .flip ();
135+ checkReadException ();
136+ readAheadBuffer .position (0 );
137+ readAheadBuffer .flip ();
111138 readInProgress = true ;
112139 } finally {
113140 stateChangeLock .unlock ();
114141 }
115142 executorService .execute (new Runnable () {
143+
116144 @ Override
117145 public void run () {
146+ stateChangeLock .lock ();
147+ try {
148+ if (isClosed ) {
149+ readInProgress = false ;
150+ return ;
151+ }
152+ // Flip this so that the close method will not close the underlying input stream when we
153+ // are reading.
154+ isReading = true ;
155+ } finally {
156+ stateChangeLock .unlock ();
157+ }
158+
118159 // Please note that it is safe to release the lock and read into the read ahead buffer
119160 // because either of following two conditions will hold - 1. The active buffer has
120161 // data available to read so the reader will not read from the read ahead buffer.
121162 // 2. This is the first time read is called or the active buffer is exhausted,
122163 // in that case the reader waits for this async read to complete.
123164 // So there is no race condition in both the situations.
124- boolean handled = false ;
125165 int read = 0 ;
126- Exception exception = null ;
166+ Throwable exception = null ;
127167 try {
128168 while (true ) {
129169 read = underlyingInputStream .read (arr );
130170 if (0 != read ) break ;
131171 }
132- handled = true ;
133- } catch (Exception ex ) {
172+ } catch (Throwable ex ) {
134173 exception = ex ;
174+ if (ex instanceof Error ) {
175+ // `readException` may not be reported to the user. Rethrow Error to make sure at least
176+ // The user can see Error in UncaughtExceptionHandler.
177+ throw (Error ) ex ;
178+ }
135179 } finally {
136180 stateChangeLock .lock ();
137- if (read < 0 || (exception instanceof EOFException ) ) {
181+ if (read < 0 || (exception instanceof EOFException )) {
138182 endOfStream = true ;
139- } else if (! handled ) {
183+ } else if (exception != null ) {
140184 readAborted = true ;
141- readException = exception != null ? exception : new Exception ( "Unknown exception in ReadAheadInputStream" ) ;
185+ readException = exception ;
142186 } else {
143- byteBuffer .limit (read );
187+ readAheadBuffer .limit (read );
144188 }
145189 readInProgress = false ;
146190 signalAsyncReadComplete ();
147191 stateChangeLock .unlock ();
192+ closeUnderlyingInputStreamIfNecessary ();
148193 }
149194 }
150195 });
151196 }
152197
198+ private void closeUnderlyingInputStreamIfNecessary () {
199+ boolean needToCloseUnderlyingInputStream = false ;
200+ stateChangeLock .lock ();
201+ try {
202+ isReading = false ;
203+ if (isClosed && !isUnderlyingInputStreamBeingClosed ) {
204+ // close method cannot close underlyingInputStream because we were reading.
205+ needToCloseUnderlyingInputStream = true ;
206+ }
207+ } finally {
208+ stateChangeLock .unlock ();
209+ }
210+ if (needToCloseUnderlyingInputStream ) {
211+ try {
212+ underlyingInputStream .close ();
213+ } catch (IOException e ) {
214+ logger .warn (e .getMessage (), e );
215+ }
216+ }
217+ }
153218
154219 private void signalAsyncReadComplete () {
155220 stateChangeLock .lock ();
@@ -163,27 +228,28 @@ private void signalAsyncReadComplete() {
163228 private void waitForAsyncReadComplete () throws IOException {
164229 stateChangeLock .lock ();
165230 try {
166- while (readInProgress )
167- asyncReadComplete .await ();
231+ while (readInProgress ) {
232+ asyncReadComplete .await ();
233+ }
168234 } catch (InterruptedException e ) {
169- throw new InterruptedIOException (e .getMessage ());
235+ InterruptedIOException iio = new InterruptedIOException (e .getMessage ());
236+ iio .initCause (e );
237+ throw iio ;
170238 } finally {
171239 stateChangeLock .unlock ();
172240 }
241+ checkReadException ();
173242 }
174243
175244 @ Override
176245 public int read () throws IOException {
177- int val = read (oneByte .get (), 0 , 1 );
178- if (val == -1 ) {
179- return -1 ;
180- }
181- return oneByte .get ()[0 ] & 0xFF ;
246+ byte [] oneByteArray = oneByte .get ();
247+ return read (oneByteArray , 0 , 1 ) == -1 ? -1 : oneByteArray [0 ] & 0xFF ;
182248 }
183249
184250 @ Override
185251 public int read (byte [] b , int offset , int len ) throws IOException {
186- if (offset < 0 || len < 0 || offset + len < 0 || offset + len > b .length ) {
252+ if (offset < 0 || len < 0 || len > b .length - offset ) {
187253 throw new IndexOutOfBoundsException ();
188254 }
189255 if (len == 0 ) {
@@ -197,35 +263,42 @@ public int read(byte[] b, int offset, int len) throws IOException {
197263 }
198264 }
199265
266+ /**
267+ * flip the active and read ahead buffer
268+ */
269+ private void swapBuffers () {
270+ ByteBuffer temp = activeBuffer ;
271+ activeBuffer = readAheadBuffer ;
272+ readAheadBuffer = temp ;
273+ }
274+
200275 /**
201276 * Internal read function which should be called only from read() api. The assumption is that
202277 * the stateChangeLock is already acquired in the caller before calling this function.
203278 */
204279 private int readInternal (byte [] b , int offset , int len ) throws IOException {
205280 assert (stateChangeLock .isLocked ());
206281 if (!activeBuffer .hasRemaining ()) {
207- if (!readInProgress ) {
208- // This condition will only be triggered for the first time read is called.
209- readAsync (activeBuffer );
210- }
211282 waitForAsyncReadComplete ();
212- }
213- if (readAborted ) {
214- throw new IOException (readException );
215- }
216- if (isEndOfStream ()) {
217- return -1 ;
283+ if (readAheadBuffer .hasRemaining ()) {
284+ swapBuffers ();
285+ } else {
286+ // The first read or activeBuffer is skipped.
287+ readAsync ();
288+ waitForAsyncReadComplete ();
289+ if (isEndOfStream ()) {
290+ return -1 ;
291+ }
292+ swapBuffers ();
293+ }
294+ } else {
295+ checkReadException ();
218296 }
219297 len = Math .min (len , activeBuffer .remaining ());
220298 activeBuffer .get (b , offset , len );
221299
222300 if (activeBuffer .remaining () <= readAheadThresholdInBytes && !readAheadBuffer .hasRemaining ()) {
223- readAsync (readAheadBuffer );
224- }
225- if (!activeBuffer .hasRemaining ()) {
226- ByteBuffer temp = activeBuffer ;
227- activeBuffer = readAheadBuffer ;
228- readAheadBuffer = temp ;
301+ readAsync ();
229302 }
230303 return len ;
231304 }
@@ -236,7 +309,7 @@ public int available() throws IOException {
236309 // Make sure we have no integer overflow.
237310 try {
238311 return (int ) Math .min ((long ) Integer .MAX_VALUE ,
239- (long ) activeBuffer .remaining () + readAheadBuffer .remaining ());
312+ (long ) activeBuffer .remaining () + readAheadBuffer .remaining ());
240313 } finally {
241314 stateChangeLock .unlock ();
242315 }
@@ -263,51 +336,73 @@ public long skip(long n) throws IOException {
263336 */
264337 private long skipInternal (long n ) throws IOException {
265338 assert (stateChangeLock .isLocked ());
266- if (readInProgress ) {
267- waitForAsyncReadComplete ();
339+ waitForAsyncReadComplete ();
340+ if (isEndOfStream ()) {
341+ return 0 ;
268342 }
269343 if (available () >= n ) {
270344 // we can skip from the internal buffers
271- int toSkip = (int )n ;
345+ int toSkip = (int ) n ;
272346 if (toSkip <= activeBuffer .remaining ()) {
273347 // Only skipping from active buffer is sufficient
274348 activeBuffer .position (toSkip + activeBuffer .position ());
349+ if (activeBuffer .remaining () <= readAheadThresholdInBytes
350+ && !readAheadBuffer .hasRemaining ()) {
351+ readAsync ();
352+ }
275353 return n ;
276354 }
277355 // We need to skip from both active buffer and read ahead buffer
278356 toSkip -= activeBuffer .remaining ();
279357 activeBuffer .position (0 );
280358 activeBuffer .flip ();
281359 readAheadBuffer .position (toSkip + readAheadBuffer .position ());
282- // flip the active and read ahead buffer
283- ByteBuffer temp = activeBuffer ;
284- activeBuffer = readAheadBuffer ;
285- readAheadBuffer = temp ;
286- readAsync (readAheadBuffer );
360+ swapBuffers ();
361+ readAsync ();
287362 return n ;
363+ } else {
364+ int skippedBytes = available ();
365+ long toSkip = n - skippedBytes ;
366+ activeBuffer .position (0 );
367+ activeBuffer .flip ();
368+ readAheadBuffer .position (0 );
369+ readAheadBuffer .flip ();
370+ long skippedFromInputStream = underlyingInputStream .skip (toSkip );
371+ readAsync ();
372+ return skippedBytes + skippedFromInputStream ;
288373 }
289- int skippedBytes = available ();
290- long toSkip = n - skippedBytes ;
291- activeBuffer .position (0 );
292- activeBuffer .flip ();
293- readAheadBuffer .position (0 );
294- readAheadBuffer .flip ();
295- long skippedFromInputStream = underlyingInputStream .skip (toSkip );
296- readAsync (activeBuffer );
297- return skippedBytes + skippedFromInputStream ;
298374 }
299375
300376 @ Override
301377 public void close () throws IOException {
302- executorService .shutdown ();
378+ boolean isSafeToCloseUnderlyingInputStream = false ;
379+ stateChangeLock .lock ();
303380 try {
304- executorService .awaitTermination (10 , TimeUnit .SECONDS );
305- stateChangeLock .lock ();
306- underlyingInputStream .close ();
307- } catch (InterruptedException e ) {
308- throw new InterruptedIOException (e .getMessage ());
381+ if (isClosed ) {
382+ return ;
383+ }
384+ isClosed = true ;
385+ if (!isReading ) {
386+ // Nobody is reading, so we can close the underlying input stream in this method.
387+ isSafeToCloseUnderlyingInputStream = true ;
388+ // Flip this to make sure the read ahead task will not close the underlying input stream.
389+ isUnderlyingInputStreamBeingClosed = true ;
390+ }
309391 } finally {
310392 stateChangeLock .unlock ();
311393 }
394+
395+ try {
396+ executorService .shutdownNow ();
397+ executorService .awaitTermination (Long .MAX_VALUE , TimeUnit .SECONDS );
398+ } catch (InterruptedException e ) {
399+ InterruptedIOException iio = new InterruptedIOException (e .getMessage ());
400+ iio .initCause (e );
401+ throw iio ;
402+ } finally {
403+ if (isSafeToCloseUnderlyingInputStream ) {
404+ underlyingInputStream .close ();
405+ }
406+ }
312407 }
313408}
0 commit comments