Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public void setData(BufferData bufferData,
readOffset,
"readOffset",
startOffset,
startOffset + bufferData.getBuffer().limit() - 1);
startOffset + bufferData.getBuffer().limit());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

going to trust you here. we have shipped the s3a input stream with an off by one error in the past. surfaced in parquet but not ORC, and only in some cases...


data = bufferData;
buffer = bufferData.getBuffer().duplicate();
Expand Down Expand Up @@ -182,7 +182,7 @@ public int relative() {
*/
public boolean isWithinCurrentBuffer(long pos) {
throwIfInvalidBuffer();
long bufferEndOffset = bufferStartOffset + buffer.limit() - 1;
long bufferEndOffset = bufferStartOffset + buffer.limit();
return (pos >= bufferStartOffset) && (pos <= bufferEndOffset);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.hadoop.test.AbstractHadoopTestBase;

import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
Expand All @@ -43,6 +44,7 @@ public void testArgChecks() throws Exception {
new FilePosition(10, 5);
new FilePosition(5, 10);
new FilePosition(10, 5).setData(data, 3, 4);
new FilePosition(10, 10).setData(data, 3, 13);

// Verify it throws correctly.

Expand Down Expand Up @@ -94,11 +96,11 @@ public void testArgChecks() throws Exception {
"'readOffset' must not be negative", () -> pos.setData(data, 4, -4));

intercept(IllegalArgumentException.class,
"'readOffset' (15) must be within the range [4, 13]",
"'readOffset' (15) must be within the range [4, 14]",
() -> pos.setData(data, 4, 15));

intercept(IllegalArgumentException.class,
"'readOffset' (3) must be within the range [4, 13]",
"'readOffset' (3) must be within the range [4, 14]",
() -> pos.setData(data, 4, 3));

}
Expand Down Expand Up @@ -192,4 +194,31 @@ public void testBufferStats() {
}
assertTrue(pos.bufferFullyRead());
}

@Test
public void testBounds() {
int bufferSize = 8;
long fileSize = bufferSize;

ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
BufferData data = new BufferData(0, buffer);
FilePosition pos = new FilePosition(fileSize, bufferSize);

long eofOffset = fileSize;
pos.setData(data, 0, eofOffset);

assertThat(pos.isWithinCurrentBuffer(eofOffset))
.describedAs("EOF offset %d should be within the current buffer", eofOffset)
.isTrue();
assertThat(pos.absolute())
.describedAs("absolute() should return the EOF offset")
.isEqualTo(eofOffset);

assertThat(pos.setAbsolute(eofOffset))
.describedAs("setAbsolute() should return true on the EOF offset %d", eofOffset)
.isTrue();
assertThat(pos.absolute())
.describedAs("absolute() should return the EOF offset")
.isEqualTo(eofOffset);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.hadoop.fs.impl.prefetch.BlockManager;
import org.apache.hadoop.fs.impl.prefetch.BufferData;
import org.apache.hadoop.fs.impl.prefetch.ExecutorServiceFuturePool;
import org.apache.hadoop.fs.impl.prefetch.FilePosition;
import org.apache.hadoop.fs.s3a.S3AInputStream;
import org.apache.hadoop.fs.s3a.S3AReadOpContext;
import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
Expand Down Expand Up @@ -84,46 +85,6 @@ public S3ACachingInputStream(
fileSize);
}

/**
* Moves the current read position so that the next read will occur at {@code pos}.
*
* @param pos the next read will take place at this position.
*
* @throws IllegalArgumentException if pos is outside of the range [0, file size].
*/
@Override
public void seek(long pos) throws IOException {
throwIfClosed();
throwIfInvalidSeek(pos);

// The call to setAbsolute() returns true if the target position is valid and
// within the current block. Therefore, no additional work is needed when we get back true.
if (!getFilePosition().setAbsolute(pos)) {
LOG.info("seek({})", getOffsetStr(pos));
// We could be here in two cases:
// -- the target position is invalid:
// We ignore this case here as the next read will return an error.
// -- it is valid but outside of the current block.
if (getFilePosition().isValid()) {
// There are two cases to consider:
// -- the seek was issued after this buffer was fully read.
// In this case, it is very unlikely that this buffer will be needed again;
// therefore we release the buffer without caching.
// -- if we are jumping out of the buffer before reading it completely then
// we will likely need this buffer again (as observed empirically for Parquet)
// therefore we issue an async request to cache this buffer.
if (!getFilePosition().bufferFullyRead()) {
blockManager.requestCaching(getFilePosition().data());
} else {
blockManager.release(getFilePosition().data());
}
getFilePosition().invalidate();
blockManager.cancelPrefetches();
}
setSeekTargetPos(pos);
}
}

@Override
public void close() throws IOException {
// Close the BlockManager first, cancelling active prefetches,
Expand All @@ -139,36 +100,45 @@ protected boolean ensureCurrentBuffer() throws IOException {
return false;
}

if (getFilePosition().isValid() && getFilePosition()
.buffer()
.hasRemaining()) {
return true;
long readPos = getNextReadPos();
if (!getBlockData().isValidOffset(readPos)) {
return false;
}

long readPos;
int prefetchCount;

if (getFilePosition().isValid()) {
// A sequential read results in a prefetch.
readPos = getFilePosition().absolute();
prefetchCount = numBlocksToPrefetch;
} else {
// A seek invalidates the current position.
// We prefetch only 1 block immediately after a seek operation.
readPos = getSeekTargetPos();
prefetchCount = 1;
}
// Determine whether this is an out of order read.
FilePosition filePosition = getFilePosition();
boolean outOfOrderRead = !filePosition.setAbsolute(readPos);

if (!getBlockData().isValidOffset(readPos)) {
return false;
if (!outOfOrderRead && filePosition.buffer().hasRemaining()) {
// Use the current buffer.
return true;
}

if (getFilePosition().isValid()) {
if (getFilePosition().bufferFullyRead()) {
blockManager.release(getFilePosition().data());
if (filePosition.isValid()) {
// We are jumping out of the current buffer. There are two cases to consider:
if (filePosition.bufferFullyRead()) {
// This buffer was fully read:
// it is very unlikely that this buffer will be needed again;
// therefore we release the buffer without caching.
blockManager.release(filePosition.data());
} else {
blockManager.requestCaching(getFilePosition().data());
// We will likely need this buffer again (as observed empirically for Parquet)
// therefore we issue an async request to cache this buffer.
blockManager.requestCaching(filePosition.data());
}
filePosition.invalidate();
}

int prefetchCount;
if (outOfOrderRead) {
LOG.debug("lazy-seek({})", getOffsetStr(readPos));
blockManager.cancelPrefetches();

// We prefetch only 1 block immediately after a seek operation.
prefetchCount = 1;
} else {
// A sequential read results in a prefetch.
prefetchCount = numBlocksToPrefetch;
}

int toBlockNumber = getBlockData().getBlockNumber(readPos);
Expand All @@ -186,7 +156,7 @@ protected boolean ensureCurrentBuffer() throws IOException {
.trackDuration(STREAM_READ_BLOCK_ACQUIRE_AND_READ),
() -> blockManager.get(toBlockNumber));

getFilePosition().setData(data, startOffset, readPos);
filePosition.setData(data, startOffset, readPos);
return true;
}

Expand All @@ -197,7 +167,7 @@ public String toString() {
}

StringBuilder sb = new StringBuilder();
sb.append(String.format("fpos = (%s)%n", getFilePosition()));
sb.append(String.format("%s%n", super.toString()));
sb.append(blockManager.toString());
return sb.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.slf4j.LoggerFactory;

import org.apache.hadoop.fs.impl.prefetch.BufferData;
import org.apache.hadoop.fs.impl.prefetch.FilePosition;
import org.apache.hadoop.fs.s3a.S3AInputStream;
import org.apache.hadoop.fs.s3a.S3AReadOpContext;
import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
Expand Down Expand Up @@ -86,17 +87,22 @@ protected boolean ensureCurrentBuffer() throws IOException {
return false;
}

if (!getFilePosition().isValid()) {
FilePosition filePosition = getFilePosition();
if (filePosition.isValid()) {
// Update current position (lazy seek).
filePosition.setAbsolute(getNextReadPos());
} else {
// Read entire file into buffer.
buffer.clear();
int numBytesRead =
getReader().read(buffer, 0, buffer.capacity());
if (numBytesRead <= 0) {
return false;
}
BufferData data = new BufferData(0, buffer);
getFilePosition().setData(data, 0, getSeekTargetPos());
filePosition.setData(data, 0, getNextReadPos());
}

return getFilePosition().buffer().hasRemaining();
return filePosition.buffer().hasRemaining();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,16 @@ public abstract class S3ARemoteInputStream
private volatile boolean closed;

/**
* Current position within the file.
* Internal position within the file. Updated lazily
* after a seek before a read.
*/
private FilePosition fpos;

/** The target of the most recent seek operation. */
private long seekTargetPos;
/**
* This is the actual position within the file, used by
* lazy seek to decide whether to seek on the next read or not.
*/
private long nextReadPos;

/** Information about each block of the mapped S3 file. */
private BlockData blockData;
Expand Down Expand Up @@ -146,7 +150,7 @@ public S3ARemoteInputStream(
this.remoteObject = getS3File();
this.reader = new S3ARemoteObjectReader(remoteObject);

this.seekTargetPos = 0;
this.nextReadPos = 0;
}

/**
Expand Down Expand Up @@ -212,7 +216,8 @@ private void setInputPolicy(S3AInputPolicy inputPolicy) {
public int available() throws IOException {
throwIfClosed();

if (!ensureCurrentBuffer()) {
// Update the current position in the current buffer, if possible.
if (!fpos.setAbsolute(nextReadPos)) {
return 0;
}

Expand All @@ -228,11 +233,7 @@ public int available() throws IOException {
public long getPos() throws IOException {
throwIfClosed();

if (fpos.isValid()) {
return fpos.absolute();
} else {
return seekTargetPos;
}
return nextReadPos;
}

/**
Expand All @@ -247,10 +248,7 @@ public void seek(long pos) throws IOException {
throwIfClosed();
throwIfInvalidSeek(pos);

if (!fpos.setAbsolute(pos)) {
fpos.invalidate();
seekTargetPos = pos;
}
nextReadPos = pos;
}

/**
Expand All @@ -268,14 +266,15 @@ public int read() throws IOException {
throwIfClosed();

if (remoteObject.size() == 0
|| seekTargetPos >= remoteObject.size()) {
|| nextReadPos >= remoteObject.size()) {
return -1;
}

if (!ensureCurrentBuffer()) {
return -1;
}

nextReadPos++;
incrementBytesRead(1);

return fpos.buffer().get() & 0xff;
Expand Down Expand Up @@ -315,7 +314,7 @@ public int read(byte[] buffer, int offset, int len) throws IOException {
}

if (remoteObject.size() == 0
|| seekTargetPos >= remoteObject.size()) {
|| nextReadPos >= remoteObject.size()) {
return -1;
}

Expand All @@ -334,6 +333,7 @@ public int read(byte[] buffer, int offset, int len) throws IOException {
ByteBuffer buf = fpos.buffer();
int bytesToRead = Math.min(numBytesRemaining, buf.remaining());
buf.get(buffer, offset, bytesToRead);
nextReadPos += bytesToRead;
incrementBytesRead(bytesToRead);
offset += bytesToRead;
numBytesRemaining -= bytesToRead;
Expand Down Expand Up @@ -367,12 +367,8 @@ protected boolean isClosed() {
return closed;
}

protected long getSeekTargetPos() {
return seekTargetPos;
}

protected void setSeekTargetPos(long pos) {
seekTargetPos = pos;
protected long getNextReadPos() {
return nextReadPos;
}

protected BlockData getBlockData() {
Expand Down Expand Up @@ -443,6 +439,18 @@ public boolean markSupported() {
return false;
}

@Override
public String toString() {
if (isClosed()) {
return "closed";
}

StringBuilder sb = new StringBuilder();
sb.append(String.format("nextReadPos = (%d)%n", nextReadPos));
sb.append(String.format("fpos = (%s)", fpos));
return sb.toString();
}

protected void throwIfClosed() throws IOException {
if (closed) {
throw new IOException(
Expand All @@ -453,6 +461,8 @@ protected void throwIfClosed() throws IOException {
protected void throwIfInvalidSeek(long pos) throws EOFException {
if (pos < 0) {
throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK + " " + pos);
} else if (pos > this.getBlockData().getFileSize()) {
throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF + " " + pos);
}
}

Expand Down
Loading