From 3cf944c37f1224fadd37b08bba7638b2893fae8c Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Tue, 12 Sep 2017 16:02:55 -0700 Subject: [PATCH] Close the underlying input stream safely This commit includes the following changes: - Close the underlying input stream only if it's not being read. - Always read to `readAheadBuffer`. --- .../apache/spark/io/ReadAheadInputStream.java | 235 ++++++++++++------ 1 file changed, 165 insertions(+), 70 deletions(-) diff --git a/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java b/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java index cce305839fa5..618bd42d0e65 100644 --- a/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java +++ b/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java @@ -14,7 +14,10 @@ package org.apache.spark.io; import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; import org.apache.spark.util.ThreadUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.annotation.concurrent.GuardedBy; import java.io.EOFException; @@ -37,6 +40,8 @@ */ public class ReadAheadInputStream extends InputStream { + private static final Logger logger = LoggerFactory.getLogger(ReadAheadInputStream.class); + private ReentrantLock stateChangeLock = new ReentrantLock(); @GuardedBy("stateChangeLock") @@ -57,7 +62,20 @@ public class ReadAheadInputStream extends InputStream { private boolean readAborted; @GuardedBy("stateChangeLock") - private Exception readException; + private Throwable readException; + + @GuardedBy("stateChangeLock") + // whether the close method is called. + private boolean isClosed; + + @GuardedBy("stateChangeLock") + // true when the close method will close the underlying input stream. This is valid only if + // `isClosed` is true. + private boolean isUnderlyingInputStreamBeingClosed; + + @GuardedBy("stateChangeLock") + // whether there is a read ahead task running, + private boolean isReading; // If the remaining data size in the current buffer is below this threshold, // we issue an async read from the underlying input stream. @@ -75,18 +93,18 @@ public class ReadAheadInputStream extends InputStream { * Creates a ReadAheadInputStream with the specified buffer size and read-ahead * threshold * - * @param inputStream The underlying input stream. - * @param bufferSizeInBytes The buffer size. - * @param readAheadThresholdInBytes If the active buffer has less data than the read-ahead - * threshold, an async read is triggered. + * @param inputStream The underlying input stream. + * @param bufferSizeInBytes The buffer size. + * @param readAheadThresholdInBytes If the active buffer has less data than the read-ahead + * threshold, an async read is triggered. */ public ReadAheadInputStream(InputStream inputStream, int bufferSizeInBytes, int readAheadThresholdInBytes) { Preconditions.checkArgument(bufferSizeInBytes > 0, - "bufferSizeInBytes should be greater than 0, but the value is " + bufferSizeInBytes); + "bufferSizeInBytes should be greater than 0, but the value is " + bufferSizeInBytes); Preconditions.checkArgument(readAheadThresholdInBytes > 0 && - readAheadThresholdInBytes < bufferSizeInBytes, - "readAheadThresholdInBytes should be greater than 0 and less than bufferSizeInBytes, but the" + - "value is " + readAheadThresholdInBytes ); + readAheadThresholdInBytes < bufferSizeInBytes, + "readAheadThresholdInBytes should be greater than 0 and less than bufferSizeInBytes, but the" + + "value is " + readAheadThresholdInBytes); activeBuffer = ByteBuffer.allocate(bufferSizeInBytes); readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes); this.readAheadThresholdInBytes = readAheadThresholdInBytes; @@ -99,57 +117,104 @@ private boolean isEndOfStream() { return (!activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && endOfStream); } - private void readAsync(final ByteBuffer byteBuffer) throws IOException { + private void checkReadException() throws IOException { + if (readAborted) { + Throwables.propagateIfPossible(readException, IOException.class); + throw new IOException(readException); + } + } + + /** Read data from underlyingInputStream to readAheadBuffer asynchronously. */ + private void readAsync() throws IOException { stateChangeLock.lock(); - final byte[] arr = byteBuffer.array(); + final byte[] arr = readAheadBuffer.array(); try { if (endOfStream || readInProgress) { return; } - byteBuffer.position(0); - byteBuffer.flip(); + checkReadException(); + readAheadBuffer.position(0); + readAheadBuffer.flip(); readInProgress = true; } finally { stateChangeLock.unlock(); } executorService.execute(new Runnable() { + @Override public void run() { + stateChangeLock.lock(); + try { + if (isClosed) { + readInProgress = false; + return; + } + // Flip this so that the close method will not close the underlying input stream when we + // are reading. + isReading = true; + } finally { + stateChangeLock.unlock(); + } + // Please note that it is safe to release the lock and read into the read ahead buffer // because either of following two conditions will hold - 1. The active buffer has // data available to read so the reader will not read from the read ahead buffer. // 2. This is the first time read is called or the active buffer is exhausted, // in that case the reader waits for this async read to complete. // So there is no race condition in both the situations. - boolean handled = false; int read = 0; - Exception exception = null; + Throwable exception = null; try { while (true) { read = underlyingInputStream.read(arr); if (0 != read) break; } - handled = true; - } catch (Exception ex) { + } catch (Throwable ex) { exception = ex; + if (ex instanceof Error) { + // `readException` may not be reported to the user. Rethrow Error to make sure at least + // The user can see Error in UncaughtExceptionHandler. + throw (Error) ex; + } } finally { stateChangeLock.lock(); - if (read < 0 || (exception instanceof EOFException) ) { + if (read < 0 || (exception instanceof EOFException)) { endOfStream = true; - } else if (!handled) { + } else if (exception != null) { readAborted = true; - readException = exception != null ? exception: new Exception("Unknown exception in ReadAheadInputStream"); + readException = exception; } else { - byteBuffer.limit(read); + readAheadBuffer.limit(read); } readInProgress = false; signalAsyncReadComplete(); stateChangeLock.unlock(); + closeUnderlyingInputStreamIfNecessary(); } } }); } + private void closeUnderlyingInputStreamIfNecessary() { + boolean needToCloseUnderlyingInputStream = false; + stateChangeLock.lock(); + try { + isReading = false; + if (isClosed && !isUnderlyingInputStreamBeingClosed) { + // close method cannot close underlyingInputStream because we were reading. + needToCloseUnderlyingInputStream = true; + } + } finally { + stateChangeLock.unlock(); + } + if (needToCloseUnderlyingInputStream) { + try { + underlyingInputStream.close(); + } catch (IOException e) { + logger.warn(e.getMessage(), e); + } + } + } private void signalAsyncReadComplete() { stateChangeLock.lock(); @@ -163,27 +228,28 @@ private void signalAsyncReadComplete() { private void waitForAsyncReadComplete() throws IOException { stateChangeLock.lock(); try { - while (readInProgress) - asyncReadComplete.await(); + while (readInProgress) { + asyncReadComplete.await(); + } } catch (InterruptedException e) { - throw new InterruptedIOException(e.getMessage()); + InterruptedIOException iio = new InterruptedIOException(e.getMessage()); + iio.initCause(e); + throw iio; } finally { stateChangeLock.unlock(); } + checkReadException(); } @Override public int read() throws IOException { - int val = read(oneByte.get(), 0, 1); - if (val == -1) { - return -1; - } - return oneByte.get()[0] & 0xFF; + byte[] oneByteArray = oneByte.get(); + return read(oneByteArray, 0, 1) == -1 ? -1 : oneByteArray[0] & 0xFF; } @Override public int read(byte[] b, int offset, int len) throws IOException { - if (offset < 0 || len < 0 || offset + len < 0 || offset + len > b.length) { + if (offset < 0 || len < 0 || len > b.length - offset) { throw new IndexOutOfBoundsException(); } if (len == 0) { @@ -197,6 +263,15 @@ public int read(byte[] b, int offset, int len) throws IOException { } } + /** + * flip the active and read ahead buffer + */ + private void swapBuffers() { + ByteBuffer temp = activeBuffer; + activeBuffer = readAheadBuffer; + readAheadBuffer = temp; + } + /** * Internal read function which should be called only from read() api. The assumption is that * the stateChangeLock is already acquired in the caller before calling this function. @@ -204,28 +279,26 @@ public int read(byte[] b, int offset, int len) throws IOException { private int readInternal(byte[] b, int offset, int len) throws IOException { assert (stateChangeLock.isLocked()); if (!activeBuffer.hasRemaining()) { - if (!readInProgress) { - // This condition will only be triggered for the first time read is called. - readAsync(activeBuffer); - } waitForAsyncReadComplete(); - } - if (readAborted) { - throw new IOException(readException); - } - if (isEndOfStream()) { - return -1; + if (readAheadBuffer.hasRemaining()) { + swapBuffers(); + } else { + // The first read or activeBuffer is skipped. + readAsync(); + waitForAsyncReadComplete(); + if (isEndOfStream()) { + return -1; + } + swapBuffers(); + } + } else { + checkReadException(); } len = Math.min(len, activeBuffer.remaining()); activeBuffer.get(b, offset, len); if (activeBuffer.remaining() <= readAheadThresholdInBytes && !readAheadBuffer.hasRemaining()) { - readAsync(readAheadBuffer); - } - if (!activeBuffer.hasRemaining()) { - ByteBuffer temp = activeBuffer; - activeBuffer = readAheadBuffer; - readAheadBuffer = temp; + readAsync(); } return len; } @@ -236,7 +309,7 @@ public int available() throws IOException { // Make sure we have no integer overflow. try { return (int) Math.min((long) Integer.MAX_VALUE, - (long) activeBuffer.remaining() + readAheadBuffer.remaining()); + (long) activeBuffer.remaining() + readAheadBuffer.remaining()); } finally { stateChangeLock.unlock(); } @@ -263,15 +336,20 @@ public long skip(long n) throws IOException { */ private long skipInternal(long n) throws IOException { assert (stateChangeLock.isLocked()); - if (readInProgress) { - waitForAsyncReadComplete(); + waitForAsyncReadComplete(); + if (isEndOfStream()) { + return 0; } if (available() >= n) { // we can skip from the internal buffers - int toSkip = (int)n; + int toSkip = (int) n; if (toSkip <= activeBuffer.remaining()) { // Only skipping from active buffer is sufficient activeBuffer.position(toSkip + activeBuffer.position()); + if (activeBuffer.remaining() <= readAheadThresholdInBytes + && !readAheadBuffer.hasRemaining()) { + readAsync(); + } return n; } // We need to skip from both active buffer and read ahead buffer @@ -279,35 +357,52 @@ private long skipInternal(long n) throws IOException { activeBuffer.position(0); activeBuffer.flip(); readAheadBuffer.position(toSkip + readAheadBuffer.position()); - // flip the active and read ahead buffer - ByteBuffer temp = activeBuffer; - activeBuffer = readAheadBuffer; - readAheadBuffer = temp; - readAsync(readAheadBuffer); + swapBuffers(); + readAsync(); return n; + } else { + int skippedBytes = available(); + long toSkip = n - skippedBytes; + activeBuffer.position(0); + activeBuffer.flip(); + readAheadBuffer.position(0); + readAheadBuffer.flip(); + long skippedFromInputStream = underlyingInputStream.skip(toSkip); + readAsync(); + return skippedBytes + skippedFromInputStream; } - int skippedBytes = available(); - long toSkip = n - skippedBytes; - activeBuffer.position(0); - activeBuffer.flip(); - readAheadBuffer.position(0); - readAheadBuffer.flip(); - long skippedFromInputStream = underlyingInputStream.skip(toSkip); - readAsync(activeBuffer); - return skippedBytes + skippedFromInputStream; } @Override public void close() throws IOException { - executorService.shutdown(); + boolean isSafeToCloseUnderlyingInputStream = false; + stateChangeLock.lock(); try { - executorService.awaitTermination(10, TimeUnit.SECONDS); - stateChangeLock.lock(); - underlyingInputStream.close(); - } catch (InterruptedException e) { - throw new InterruptedIOException(e.getMessage()); + if (isClosed) { + return; + } + isClosed = true; + if (!isReading) { + // Nobody is reading, so we can close the underlying input stream in this method. + isSafeToCloseUnderlyingInputStream = true; + // Flip this to make sure the read ahead task will not close the underlying input stream. + isUnderlyingInputStreamBeingClosed = true; + } } finally { stateChangeLock.unlock(); } + + try { + executorService.shutdownNow(); + executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); + } catch (InterruptedException e) { + InterruptedIOException iio = new InterruptedIOException(e.getMessage()); + iio.initCause(e); + throw iio; + } finally { + if (isSafeToCloseUnderlyingInputStream) { + underlyingInputStream.close(); + } + } } }