Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
235 changes: 165 additions & 70 deletions core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@
package org.apache.spark.io;

import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import org.apache.spark.util.ThreadUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.concurrent.GuardedBy;
import java.io.EOFException;
Expand All @@ -37,6 +40,8 @@
*/
public class ReadAheadInputStream extends InputStream {

private static final Logger logger = LoggerFactory.getLogger(ReadAheadInputStream.class);

private ReentrantLock stateChangeLock = new ReentrantLock();

@GuardedBy("stateChangeLock")
Expand All @@ -57,7 +62,20 @@ public class ReadAheadInputStream extends InputStream {
private boolean readAborted;

@GuardedBy("stateChangeLock")
private Exception readException;
private Throwable readException;

@GuardedBy("stateChangeLock")
// whether the close method is called.
private boolean isClosed;

@GuardedBy("stateChangeLock")
// true when the close method will close the underlying input stream. This is valid only if
// `isClosed` is true.
private boolean isUnderlyingInputStreamBeingClosed;

@GuardedBy("stateChangeLock")
// whether there is a read ahead task running,
private boolean isReading;

// If the remaining data size in the current buffer is below this threshold,
// we issue an async read from the underlying input stream.
Expand All @@ -75,18 +93,18 @@ public class ReadAheadInputStream extends InputStream {
* Creates a <code>ReadAheadInputStream</code> with the specified buffer size and read-ahead
* threshold
*
* @param inputStream The underlying input stream.
* @param bufferSizeInBytes The buffer size.
* @param readAheadThresholdInBytes If the active buffer has less data than the read-ahead
* threshold, an async read is triggered.
* @param inputStream The underlying input stream.
* @param bufferSizeInBytes The buffer size.
* @param readAheadThresholdInBytes If the active buffer has less data than the read-ahead
* threshold, an async read is triggered.
*/
public ReadAheadInputStream(InputStream inputStream, int bufferSizeInBytes, int readAheadThresholdInBytes) {
Preconditions.checkArgument(bufferSizeInBytes > 0,
"bufferSizeInBytes should be greater than 0, but the value is " + bufferSizeInBytes);
"bufferSizeInBytes should be greater than 0, but the value is " + bufferSizeInBytes);
Preconditions.checkArgument(readAheadThresholdInBytes > 0 &&
readAheadThresholdInBytes < bufferSizeInBytes,
"readAheadThresholdInBytes should be greater than 0 and less than bufferSizeInBytes, but the" +
"value is " + readAheadThresholdInBytes );
readAheadThresholdInBytes < bufferSizeInBytes,
"readAheadThresholdInBytes should be greater than 0 and less than bufferSizeInBytes, but the" +
"value is " + readAheadThresholdInBytes);
activeBuffer = ByteBuffer.allocate(bufferSizeInBytes);
readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes);
this.readAheadThresholdInBytes = readAheadThresholdInBytes;
Expand All @@ -99,57 +117,104 @@ private boolean isEndOfStream() {
return (!activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && endOfStream);
}

private void readAsync(final ByteBuffer byteBuffer) throws IOException {
private void checkReadException() throws IOException {
if (readAborted) {
Throwables.propagateIfPossible(readException, IOException.class);
throw new IOException(readException);
}
}

/** Read data from underlyingInputStream to readAheadBuffer asynchronously. */
private void readAsync() throws IOException {
stateChangeLock.lock();
final byte[] arr = byteBuffer.array();
final byte[] arr = readAheadBuffer.array();
try {
if (endOfStream || readInProgress) {
return;
}
byteBuffer.position(0);
byteBuffer.flip();
checkReadException();
readAheadBuffer.position(0);
readAheadBuffer.flip();
readInProgress = true;
} finally {
stateChangeLock.unlock();
}
executorService.execute(new Runnable() {

@Override
public void run() {
stateChangeLock.lock();
try {
if (isClosed) {
readInProgress = false;
return;
}
// Flip this so that the close method will not close the underlying input stream when we
// are reading.
isReading = true;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this variable? Can we use readInProgress instead?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

readInProgress == true means the task will be submitted. But it may not run if the executor service is shut down. I changed shutdown to shutdownNow in order to interrupt the blocking read.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Instead of doing that should we just call waitForAsyncReadComplete in close() function call so that we can safely close the input stream? It would clean up the code a lot and make it less complicated.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

waitForAsyncReadComplete can be interrupted as well.

} finally {
stateChangeLock.unlock();
}

// Please note that it is safe to release the lock and read into the read ahead buffer
// because either of following two conditions will hold - 1. The active buffer has
// data available to read so the reader will not read from the read ahead buffer.
// 2. This is the first time read is called or the active buffer is exhausted,
// in that case the reader waits for this async read to complete.
// So there is no race condition in both the situations.
boolean handled = false;
int read = 0;
Exception exception = null;
Throwable exception = null;
try {
while (true) {
read = underlyingInputStream.read(arr);
if (0 != read) break;
}
handled = true;
} catch (Exception ex) {
} catch (Throwable ex) {
exception = ex;
if (ex instanceof Error) {
// `readException` may not be reported to the user. Rethrow Error to make sure at least
// The user can see Error in UncaughtExceptionHandler.
throw (Error) ex;
}
} finally {
stateChangeLock.lock();
if (read < 0 || (exception instanceof EOFException) ) {
if (read < 0 || (exception instanceof EOFException)) {
endOfStream = true;
} else if (!handled) {
} else if (exception != null) {
readAborted = true;
readException = exception != null ? exception: new Exception("Unknown exception in ReadAheadInputStream");
readException = exception;
} else {
byteBuffer.limit(read);
readAheadBuffer.limit(read);
}
readInProgress = false;
signalAsyncReadComplete();
stateChangeLock.unlock();
closeUnderlyingInputStreamIfNecessary();
}
}
});
}

private void closeUnderlyingInputStreamIfNecessary() {
boolean needToCloseUnderlyingInputStream = false;
stateChangeLock.lock();
try {
isReading = false;
if (isClosed && !isUnderlyingInputStreamBeingClosed) {
// close method cannot close underlyingInputStream because we were reading.
needToCloseUnderlyingInputStream = true;
}
} finally {
stateChangeLock.unlock();
}
if (needToCloseUnderlyingInputStream) {
try {
underlyingInputStream.close();
} catch (IOException e) {
logger.warn(e.getMessage(), e);
}
}
}

private void signalAsyncReadComplete() {
stateChangeLock.lock();
Expand All @@ -163,27 +228,28 @@ private void signalAsyncReadComplete() {
private void waitForAsyncReadComplete() throws IOException {
stateChangeLock.lock();
try {
while (readInProgress)
asyncReadComplete.await();
while (readInProgress) {
asyncReadComplete.await();
}
} catch (InterruptedException e) {
throw new InterruptedIOException(e.getMessage());
InterruptedIOException iio = new InterruptedIOException(e.getMessage());
iio.initCause(e);
throw iio;
} finally {
stateChangeLock.unlock();
}
checkReadException();
}

@Override
public int read() throws IOException {
int val = read(oneByte.get(), 0, 1);
if (val == -1) {
return -1;
}
return oneByte.get()[0] & 0xFF;
byte[] oneByteArray = oneByte.get();
return read(oneByteArray, 0, 1) == -1 ? -1 : oneByteArray[0] & 0xFF;
}

@Override
public int read(byte[] b, int offset, int len) throws IOException {
if (offset < 0 || len < 0 || offset + len < 0 || offset + len > b.length) {
if (offset < 0 || len < 0 || len > b.length - offset) {
throw new IndexOutOfBoundsException();
}
if (len == 0) {
Expand All @@ -197,35 +263,42 @@ public int read(byte[] b, int offset, int len) throws IOException {
}
}

/**
* flip the active and read ahead buffer
*/
private void swapBuffers() {
ByteBuffer temp = activeBuffer;
activeBuffer = readAheadBuffer;
readAheadBuffer = temp;
}

/**
* Internal read function which should be called only from read() api. The assumption is that
* the stateChangeLock is already acquired in the caller before calling this function.
*/
private int readInternal(byte[] b, int offset, int len) throws IOException {
assert (stateChangeLock.isLocked());
if (!activeBuffer.hasRemaining()) {
if (!readInProgress) {
// This condition will only be triggered for the first time read is called.
readAsync(activeBuffer);
}
waitForAsyncReadComplete();
}
if (readAborted) {
throw new IOException(readException);
}
if (isEndOfStream()) {
return -1;
if (readAheadBuffer.hasRemaining()) {
swapBuffers();
} else {
// The first read or activeBuffer is skipped.
readAsync();
waitForAsyncReadComplete();
if (isEndOfStream()) {
return -1;
}
swapBuffers();
}
} else {
checkReadException();
}
len = Math.min(len, activeBuffer.remaining());
activeBuffer.get(b, offset, len);

if (activeBuffer.remaining() <= readAheadThresholdInBytes && !readAheadBuffer.hasRemaining()) {
readAsync(readAheadBuffer);
}
if (!activeBuffer.hasRemaining()) {
ByteBuffer temp = activeBuffer;
activeBuffer = readAheadBuffer;
readAheadBuffer = temp;
readAsync();
}
return len;
}
Expand All @@ -236,7 +309,7 @@ public int available() throws IOException {
// Make sure we have no integer overflow.
try {
return (int) Math.min((long) Integer.MAX_VALUE,
(long) activeBuffer.remaining() + readAheadBuffer.remaining());
(long) activeBuffer.remaining() + readAheadBuffer.remaining());
} finally {
stateChangeLock.unlock();
}
Expand All @@ -263,51 +336,73 @@ public long skip(long n) throws IOException {
*/
private long skipInternal(long n) throws IOException {
assert (stateChangeLock.isLocked());
if (readInProgress) {
waitForAsyncReadComplete();
waitForAsyncReadComplete();
if (isEndOfStream()) {
return 0;
}
if (available() >= n) {
// we can skip from the internal buffers
int toSkip = (int)n;
int toSkip = (int) n;
if (toSkip <= activeBuffer.remaining()) {
// Only skipping from active buffer is sufficient
activeBuffer.position(toSkip + activeBuffer.position());
if (activeBuffer.remaining() <= readAheadThresholdInBytes
&& !readAheadBuffer.hasRemaining()) {
readAsync();
}
return n;
}
// We need to skip from both active buffer and read ahead buffer
toSkip -= activeBuffer.remaining();
activeBuffer.position(0);
activeBuffer.flip();
readAheadBuffer.position(toSkip + readAheadBuffer.position());
// flip the active and read ahead buffer
ByteBuffer temp = activeBuffer;
activeBuffer = readAheadBuffer;
readAheadBuffer = temp;
readAsync(readAheadBuffer);
swapBuffers();
readAsync();
return n;
} else {
int skippedBytes = available();
long toSkip = n - skippedBytes;
activeBuffer.position(0);
activeBuffer.flip();
readAheadBuffer.position(0);
readAheadBuffer.flip();
long skippedFromInputStream = underlyingInputStream.skip(toSkip);
readAsync();
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not look right, we should be reading into the activeBuffer instead?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed your codes to always write to readAheadBuffer so that it's easy to know which one we are writing to. readInternal will handle the case that activeBuffer is empty.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, that makes sense.

return skippedBytes + skippedFromInputStream;
}
int skippedBytes = available();
long toSkip = n - skippedBytes;
activeBuffer.position(0);
activeBuffer.flip();
readAheadBuffer.position(0);
readAheadBuffer.flip();
long skippedFromInputStream = underlyingInputStream.skip(toSkip);
readAsync(activeBuffer);
return skippedBytes + skippedFromInputStream;
}

@Override
public void close() throws IOException {
executorService.shutdown();
boolean isSafeToCloseUnderlyingInputStream = false;
stateChangeLock.lock();
try {
executorService.awaitTermination(10, TimeUnit.SECONDS);
stateChangeLock.lock();
underlyingInputStream.close();
} catch (InterruptedException e) {
throw new InterruptedIOException(e.getMessage());
if (isClosed) {
return;
}
isClosed = true;
if (!isReading) {
// Nobody is reading, so we can close the underlying input stream in this method.
isSafeToCloseUnderlyingInputStream = true;
// Flip this to make sure the read ahead task will not close the underlying input stream.
isUnderlyingInputStreamBeingClosed = true;
}
} finally {
stateChangeLock.unlock();
}

try {
executorService.shutdownNow();
executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
} catch (InterruptedException e) {
InterruptedIOException iio = new InterruptedIOException(e.getMessage());
iio.initCause(e);
throw iio;
} finally {
if (isSafeToCloseUnderlyingInputStream) {
underlyingInputStream.close();
}
}
}
}