Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ public InputStream openForRead(long offset, int size) throws IOException {
Validate.checkLessOrEqual(offset, "offset", size(), "size()");
Validate.checkLessOrEqual(size, "size", size() - offset, "size() - offset");

streamStatistics.streamOpened();
final GetObjectRequest request = client.newGetRequest(this.s3Attributes.getKey())
.withRange(offset, offset + size - 1);
this.changeTracker.maybeApplyConstraint(request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,10 @@ public void seek(long pos) throws IOException {
public int read() throws IOException {
this.throwIfClosed();

if (this.s3File.size() == 0 || this.seekTargetPos >= this.s3File.size()) {
return -1;
}

if (!ensureCurrentBuffer()) {
return -1;
}
Expand Down Expand Up @@ -296,6 +300,10 @@ public int read(byte[] buffer, int offset, int len) throws IOException {
return 0;
}

if (this.s3File.size() == 0 || this.seekTargetPos >= this.s3File.size()) {
return -1;
}

if (!ensureCurrentBuffer()) {
return -1;
}
Expand Down Expand Up @@ -427,18 +435,8 @@ protected void throwIfClosed() throws IOException {
}

protected void throwIfInvalidSeek(long pos) throws EOFException {
long fileSize = this.s3File.size();
if (pos < 0) {
throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK + " " + pos);
} else {
if (fileSize == 0 && pos == 0) {
// Do nothing. Valid combination.
return;
}

if (pos >= fileSize) {
throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF + " " + pos);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,7 @@ public synchronized int available() throws IOException {
*/
@Override
public synchronized long getPos() throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this method no longer throws an IOException

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this.inputStream.getPos() below could throw it, so I have to pass it up here

this.throwIfClosed();
return this.inputStream.getPos();
return this.isClosed() ? 0 : this.inputStream.getPos();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.hadoop.fs.statistics.StreamStatisticNames;

import static org.apache.hadoop.fs.s3a.Constants.ALLOW_REQUESTER_PAYS;
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
import static org.apache.hadoop.fs.s3a.Constants.S3A_BUCKET_PROBE;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;

Expand Down Expand Up @@ -71,11 +72,16 @@ public void testRequesterPaysOptionSuccess() throws Throwable {
inputStream.seek(0);
inputStream.readByte();

// Verify > 1 call was made, so we're sure it is correctly configured for each request
IOStatisticAssertions
.assertThatStatisticCounter(inputStream.getIOStatistics(),
StreamStatisticNames.STREAM_READ_OPENED)
.isGreaterThan(1);
if (conf.getBoolean(PREFETCH_ENABLED_KEY, true)) {
// For S3PrefetchingInputStream, verify a call was made
IOStatisticAssertions.assertThatStatisticCounter(inputStream.getIOStatistics(),
StreamStatisticNames.STREAM_READ_OPENED).isEqualTo(1);
} else {
// For S3AInputStream, verify > 1 call was made,
// so we're sure it is correctly configured for each request
IOStatisticAssertions.assertThatStatisticCounter(inputStream.getIOStatistics(),
StreamStatisticNames.STREAM_READ_OPENED).isGreaterThan(1);
}

// Check list calls work without error
fs.listFiles(requesterPaysPath.getParent(), false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
import org.apache.hadoop.fs.statistics.IOStatistics;
Expand All @@ -33,6 +34,7 @@

import java.io.IOException;

import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_BYTES;
import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_BYTES_READ_CLOSE;
import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_TOTAL_BYTES;
Expand Down Expand Up @@ -72,6 +74,7 @@ public void testUnbuffer() throws IOException {
IOStatisticsSnapshot iostats = new IOStatisticsSnapshot();
// Open file, read half the data, and then call unbuffer
try (FSDataInputStream inputStream = getFileSystem().open(dest)) {
skipIfCannotUnbuffer(inputStream);
assertTrue(inputStream.getWrappedStream() instanceof S3AInputStream);
int bytesToRead = 8;
readAndAssertBytesRead(inputStream, bytesToRead);
Expand Down Expand Up @@ -138,6 +141,7 @@ public void testUnbufferStreamStatistics() throws IOException {
Object streamStatsStr;
try {
inputStream = fs.open(dest);
skipIfCannotUnbuffer(inputStream);
streamStatsStr = demandStringifyIOStatisticsSource(inputStream);

LOG.info("initial stream statistics {}", streamStatsStr);
Expand Down Expand Up @@ -192,6 +196,12 @@ private boolean isObjectStreamOpen(FSDataInputStream inputStream) {
return ((S3AInputStream) inputStream.getWrappedStream()).isObjectStreamOpen();
}

private void skipIfCannotUnbuffer(FSDataInputStream inputStream) {
if (!inputStream.hasCapability(StreamCapabilities.UNBUFFER)) {
skip("input stream does not support unbuffer");
}
}

/**
* Read the specified number of bytes from the given
* {@link FSDataInputStream} and assert that
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,11 +169,6 @@ private void testSeekHelper(S3InputStream inputStream, int bufferSize, int fileS
EOFException.class,
FSExceptionMessages.NEGATIVE_SEEK,
() -> inputStream.seek(-1));

ExceptionAsserts.assertThrows(
EOFException.class,
FSExceptionMessages.CANNOT_SEEK_PAST_EOF,
() -> inputStream.seek(fileSize + 1));
}

@Test
Expand Down