Skip to content

Commit 16d5403

Browse files
monthonkahmarsuhail
authored andcommitted
HADOOP-18175. fix test failures with prefetching s3a input stream (apache#4212)
Contributed by Monthon Klongklaew
1 parent b653cfa commit 16d5403

File tree

6 files changed

+31
-22
lines changed

6 files changed

+31
-22
lines changed

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3File.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,7 @@ public InputStream openForRead(long offset, int size) throws IOException {
164164
Validate.checkLessOrEqual(offset, "offset", size(), "size()");
165165
Validate.checkLessOrEqual(size, "size", size() - offset, "size() - offset");
166166

167+
streamStatistics.streamOpened();
167168
final GetObjectRequest request = client.newGetRequest(this.s3Attributes.getKey())
168169
.withRange(offset, offset + size - 1);
169170
this.changeTracker.maybeApplyConstraint(request);

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3InputStream.java

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,10 @@ public void seek(long pos) throws IOException {
254254
public int read() throws IOException {
255255
this.throwIfClosed();
256256

257+
if (this.s3File.size() == 0 || this.seekTargetPos >= this.s3File.size()) {
258+
return -1;
259+
}
260+
257261
if (!ensureCurrentBuffer()) {
258262
return -1;
259263
}
@@ -296,6 +300,10 @@ public int read(byte[] buffer, int offset, int len) throws IOException {
296300
return 0;
297301
}
298302

303+
if (this.s3File.size() == 0 || this.seekTargetPos >= this.s3File.size()) {
304+
return -1;
305+
}
306+
299307
if (!ensureCurrentBuffer()) {
300308
return -1;
301309
}
@@ -427,18 +435,8 @@ protected void throwIfClosed() throws IOException {
427435
}
428436

429437
protected void throwIfInvalidSeek(long pos) throws EOFException {
430-
long fileSize = this.s3File.size();
431438
if (pos < 0) {
432439
throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK + " " + pos);
433-
} else {
434-
if (fileSize == 0 && pos == 0) {
435-
// Do nothing. Valid combination.
436-
return;
437-
}
438-
439-
if (pos >= fileSize) {
440-
throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF + " " + pos);
441-
}
442440
}
443441
}
444442

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3PrefetchingInputStream.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,8 +103,7 @@ public synchronized int available() throws IOException {
103103
*/
104104
@Override
105105
public synchronized long getPos() throws IOException {
106-
this.throwIfClosed();
107-
return this.inputStream.getPos();
106+
return this.isClosed() ? 0 : this.inputStream.getPos();
108107
}
109108

110109
/**

hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARequesterPays.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.hadoop.fs.statistics.StreamStatisticNames;
3232

3333
import static org.apache.hadoop.fs.s3a.Constants.ALLOW_REQUESTER_PAYS;
34+
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
3435
import static org.apache.hadoop.fs.s3a.Constants.S3A_BUCKET_PROBE;
3536
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
3637

@@ -78,11 +79,16 @@ public void testRequesterPaysOptionSuccess() throws Throwable {
7879
inputStream.seek(0);
7980
inputStream.readByte();
8081

81-
// Verify > 1 call was made, so we're sure it is correctly configured for each request
82-
IOStatisticAssertions
83-
.assertThatStatisticCounter(inputStream.getIOStatistics(),
84-
StreamStatisticNames.STREAM_READ_OPENED)
85-
.isGreaterThan(1);
82+
if (conf.getBoolean(PREFETCH_ENABLED_KEY, true)) {
83+
// For S3PrefetchingInputStream, verify a call was made
84+
IOStatisticAssertions.assertThatStatisticCounter(inputStream.getIOStatistics(),
85+
StreamStatisticNames.STREAM_READ_OPENED).isEqualTo(1);
86+
} else {
87+
// For S3AInputStream, verify > 1 call was made,
88+
// so we're sure it is correctly configured for each request
89+
IOStatisticAssertions.assertThatStatisticCounter(inputStream.getIOStatistics(),
90+
StreamStatisticNames.STREAM_READ_OPENED).isGreaterThan(1);
91+
}
8692

8793
// Check list calls work without error
8894
fs.listFiles(requesterPaysPath.getParent(), false);

hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AUnbuffer.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import org.apache.hadoop.fs.FSDataInputStream;
2222
import org.apache.hadoop.fs.Path;
23+
import org.apache.hadoop.fs.StreamCapabilities;
2324
import org.apache.hadoop.fs.contract.ContractTestUtils;
2425
import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
2526
import org.apache.hadoop.fs.statistics.IOStatistics;
@@ -33,6 +34,7 @@
3334

3435
import java.io.IOException;
3536

37+
import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
3638
import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_BYTES;
3739
import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_BYTES_READ_CLOSE;
3840
import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_TOTAL_BYTES;
@@ -72,6 +74,7 @@ public void testUnbuffer() throws IOException {
7274
IOStatisticsSnapshot iostats = new IOStatisticsSnapshot();
7375
// Open file, read half the data, and then call unbuffer
7476
try (FSDataInputStream inputStream = getFileSystem().open(dest)) {
77+
skipIfCannotUnbuffer(inputStream);
7578
assertTrue(inputStream.getWrappedStream() instanceof S3AInputStream);
7679
int bytesToRead = 8;
7780
readAndAssertBytesRead(inputStream, bytesToRead);
@@ -138,6 +141,7 @@ public void testUnbufferStreamStatistics() throws IOException {
138141
Object streamStatsStr;
139142
try {
140143
inputStream = fs.open(dest);
144+
skipIfCannotUnbuffer(inputStream);
141145
streamStatsStr = demandStringifyIOStatisticsSource(inputStream);
142146

143147
LOG.info("initial stream statistics {}", streamStatsStr);
@@ -192,6 +196,12 @@ private boolean isObjectStreamOpen(FSDataInputStream inputStream) {
192196
return ((S3AInputStream) inputStream.getWrappedStream()).isObjectStreamOpen();
193197
}
194198

199+
private void skipIfCannotUnbuffer(FSDataInputStream inputStream) {
200+
if (!inputStream.hasCapability(StreamCapabilities.UNBUFFER)) {
201+
skip("input stream does not support unbuffer");
202+
}
203+
}
204+
195205
/**
196206
* Read the specified number of bytes from the given
197207
* {@link FSDataInputStream} and assert that

hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3InputStream.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -169,11 +169,6 @@ private void testSeekHelper(S3InputStream inputStream, int bufferSize, int fileS
169169
EOFException.class,
170170
FSExceptionMessages.NEGATIVE_SEEK,
171171
() -> inputStream.seek(-1));
172-
173-
ExceptionAsserts.assertThrows(
174-
EOFException.class,
175-
FSExceptionMessages.CANNOT_SEEK_PAST_EOF,
176-
() -> inputStream.seek(fileSize + 1));
177172
}
178173

179174
@Test

0 commit comments

Comments
 (0)