From 2097657b0506bd829d18558008a24a6287feabff Mon Sep 17 00:00:00 2001 From: Monthon Klongklaew Date: Tue, 19 Apr 2022 13:56:46 +0100 Subject: [PATCH 01/10] fix contract unbuffer test --- .../apache/hadoop/fs/s3a/read/S3PrefetchingInputStream.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3PrefetchingInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3PrefetchingInputStream.java index c874a8c37b829..0f5834da4cc8c 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3PrefetchingInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3PrefetchingInputStream.java @@ -103,8 +103,7 @@ public synchronized int available() throws IOException { */ @Override public synchronized long getPos() throws IOException { - this.throwIfClosed(); - return this.inputStream.getPos(); + return this.isClosed() ? 0 : this.inputStream.getPos(); } /** From c0c6bd88a4cebedf7582f5082c5fb1870a4d3d92 Mon Sep 17 00:00:00 2001 From: Monthon Klongklaew Date: Tue, 19 Apr 2022 15:28:15 +0100 Subject: [PATCH 02/10] fix requester pays test --- .../org/apache/hadoop/fs/s3a/read/S3File.java | 1 + .../hadoop/fs/s3a/ITestS3ARequesterPays.java | 17 +++++++++++------ 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3File.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3File.java index 501186a25492c..88854b87c81a0 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3File.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3File.java @@ -164,6 +164,7 @@ public InputStream openForRead(long offset, int size) throws IOException { Validate.checkLessOrEqual(offset, "offset", size(), "size()"); Validate.checkLessOrEqual(size, "size", size() - offset, "size() - offset"); + streamStatistics.streamOpened(); final GetObjectRequest request = client.newGetRequest(this.s3Attributes.getKey()) .withRange(offset, offset + size - 1); this.changeTracker.maybeApplyConstraint(request); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARequesterPays.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARequesterPays.java index c2e7684cad6da..0e1657f5eb031 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARequesterPays.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARequesterPays.java @@ -30,6 +30,7 @@ import org.apache.hadoop.fs.statistics.StreamStatisticNames; import static org.apache.hadoop.fs.s3a.Constants.ALLOW_REQUESTER_PAYS; +import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY; import static org.apache.hadoop.fs.s3a.Constants.S3A_BUCKET_PROBE; import static org.apache.hadoop.test.LambdaTestUtils.intercept; @@ -70,12 +71,16 @@ public void testRequesterPaysOptionSuccess() throws Throwable { // Jump back to the start, triggering a new GetObject request. inputStream.seek(0); inputStream.readByte(); - - // Verify > 1 call was made, so we're sure it is correctly configured for each request - IOStatisticAssertions - .assertThatStatisticCounter(inputStream.getIOStatistics(), - StreamStatisticNames.STREAM_READ_OPENED) - .isGreaterThan(1); + + if (conf.getBoolean(PREFETCH_ENABLED_KEY, true)) { + // For S3PrefetchingInputStream, verify a call was made + IOStatisticAssertions.assertThatStatisticCounter(inputStream.getIOStatistics(), + StreamStatisticNames.STREAM_READ_OPENED).isEqualTo(1); + } else { + // For S3InputStream, verify > 1 call was made, so we're sure it is correctly configured for each request + IOStatisticAssertions.assertThatStatisticCounter(inputStream.getIOStatistics(), + StreamStatisticNames.STREAM_READ_OPENED).isGreaterThan(1); + } // Check list calls work without error fs.listFiles(requesterPaysPath.getParent(), false); From 59ea8ac506e8f6b5764c573e0281f9e9162955ff Mon Sep 17 00:00:00 2001 From: Monthon Klongklaew Date: Tue, 19 Apr 2022 16:15:57 +0100 Subject: [PATCH 03/10] moving pos validation into read() --- .../apache/hadoop/fs/s3a/read/S3InputStream.java | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3InputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3InputStream.java index 0fa6e33200b7b..31fab072cdf1e 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3InputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3InputStream.java @@ -254,6 +254,10 @@ public void seek(long pos) throws IOException { public int read() throws IOException { this.throwIfClosed(); + if (this.s3File.size() == 0 || this.seekTargetPos >= this.s3File.size()) { + return -1; + } + if (!ensureCurrentBuffer()) { return -1; } @@ -427,18 +431,8 @@ protected void throwIfClosed() throws IOException { } protected void throwIfInvalidSeek(long pos) throws EOFException { - long fileSize = this.s3File.size(); if (pos < 0) { throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK + " " + pos); - } else { - if (fileSize == 0 && pos == 0) { - // Do nothing. Valid combination. - return; - } - - if (pos >= fileSize) { - throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF + " " + pos); - } } } From 0fb11920da019766d994fb2394be3738d20f0325 Mon Sep 17 00:00:00 2001 From: Monthon Klongklaew Date: Tue, 19 Apr 2022 16:46:28 +0100 Subject: [PATCH 04/10] skipping unbuffer tests for unsupported input stream --- .../org/apache/hadoop/fs/s3a/ITestS3AUnbuffer.java | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AUnbuffer.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AUnbuffer.java index 3d7ee0882efa4..664e1ea307274 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AUnbuffer.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AUnbuffer.java @@ -18,6 +18,7 @@ package org.apache.hadoop.fs.s3a; +import org.apache.hadoop.fs.CanUnbuffer; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.contract.ContractTestUtils; @@ -32,7 +33,9 @@ import org.junit.Test; import java.io.IOException; +import java.io.InputStream; +import static org.apache.hadoop.fs.contract.ContractTestUtils.skip; import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_BYTES; import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_BYTES_READ_CLOSE; import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_TOTAL_BYTES; @@ -72,6 +75,7 @@ public void testUnbuffer() throws IOException { IOStatisticsSnapshot iostats = new IOStatisticsSnapshot(); // Open file, read half the data, and then call unbuffer try (FSDataInputStream inputStream = getFileSystem().open(dest)) { + skipIfCannotUnbuffer(inputStream.getWrappedStream()); assertTrue(inputStream.getWrappedStream() instanceof S3AInputStream); int bytesToRead = 8; readAndAssertBytesRead(inputStream, bytesToRead); @@ -138,6 +142,7 @@ public void testUnbufferStreamStatistics() throws IOException { Object streamStatsStr; try { inputStream = fs.open(dest); + skipIfCannotUnbuffer(inputStream.getWrappedStream()); streamStatsStr = demandStringifyIOStatisticsSource(inputStream); LOG.info("initial stream statistics {}", streamStatsStr); @@ -192,6 +197,12 @@ private boolean isObjectStreamOpen(FSDataInputStream inputStream) { return ((S3AInputStream) inputStream.getWrappedStream()).isObjectStreamOpen(); } + private void skipIfCannotUnbuffer(InputStream inputStream) { + if (!(inputStream instanceof CanUnbuffer)) { + skip("input stream does not support unbuffer"); + } + } + /** * Read the specified number of bytes from the given * {@link FSDataInputStream} and assert that From 9516d27dacf391f59014e850c4da48e1ba2ec801 Mon Sep 17 00:00:00 2001 From: Monthon Klongklaew Date: Thu, 21 Apr 2022 10:19:55 +0100 Subject: [PATCH 05/10] adding pos validation in read() --- .../java/org/apache/hadoop/fs/s3a/read/S3InputStream.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3InputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3InputStream.java index 31fab072cdf1e..00d5fbc367d1b 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3InputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3InputStream.java @@ -300,6 +300,10 @@ public int read(byte[] buffer, int offset, int len) throws IOException { return 0; } + if (this.s3File.size() == 0 || this.seekTargetPos >= this.s3File.size()) { + return -1; + } + if (!ensureCurrentBuffer()) { return -1; } From 4d3b4cc36ef94cfb7a8334db2f990f68e855f397 Mon Sep 17 00:00:00 2001 From: Monthon Klongklaew Date: Thu, 21 Apr 2022 10:33:21 +0100 Subject: [PATCH 06/10] update unit test --- .../org/apache/hadoop/fs/s3a/read/TestS3InputStream.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3InputStream.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3InputStream.java index 010bc1c30b634..e3c6c002bff67 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3InputStream.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3InputStream.java @@ -169,11 +169,6 @@ private void testSeekHelper(S3InputStream inputStream, int bufferSize, int fileS EOFException.class, FSExceptionMessages.NEGATIVE_SEEK, () -> inputStream.seek(-1)); - - ExceptionAsserts.assertThrows( - EOFException.class, - FSExceptionMessages.CANNOT_SEEK_PAST_EOF, - () -> inputStream.seek(fileSize + 1)); } @Test From 3314e5e5e883ce93190e16c77461992ef12e1058 Mon Sep 17 00:00:00 2001 From: Monthon Klongklaew Date: Thu, 21 Apr 2022 13:37:34 +0100 Subject: [PATCH 07/10] fix checkstyle --- .../java/org/apache/hadoop/fs/s3a/ITestS3ARequesterPays.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARequesterPays.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARequesterPays.java index 0e1657f5eb031..9e83fa466c5f8 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARequesterPays.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARequesterPays.java @@ -71,13 +71,14 @@ public void testRequesterPaysOptionSuccess() throws Throwable { // Jump back to the start, triggering a new GetObject request. inputStream.seek(0); inputStream.readByte(); - + if (conf.getBoolean(PREFETCH_ENABLED_KEY, true)) { // For S3PrefetchingInputStream, verify a call was made IOStatisticAssertions.assertThatStatisticCounter(inputStream.getIOStatistics(), StreamStatisticNames.STREAM_READ_OPENED).isEqualTo(1); } else { - // For S3InputStream, verify > 1 call was made, so we're sure it is correctly configured for each request + // For S3InputStream, verify > 1 call was made, + // so we're sure it is correctly configured for each request IOStatisticAssertions.assertThatStatisticCounter(inputStream.getIOStatistics(), StreamStatisticNames.STREAM_READ_OPENED).isGreaterThan(1); } From ce15ebc251e87aa88b900283aaec4633181511fa Mon Sep 17 00:00:00 2001 From: Monthon Klongklaew Date: Tue, 26 Apr 2022 10:24:00 +0100 Subject: [PATCH 08/10] correct comment message --- .../java/org/apache/hadoop/fs/s3a/ITestS3ARequesterPays.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARequesterPays.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARequesterPays.java index 9e83fa466c5f8..9b9461c420a96 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARequesterPays.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARequesterPays.java @@ -77,7 +77,7 @@ public void testRequesterPaysOptionSuccess() throws Throwable { IOStatisticAssertions.assertThatStatisticCounter(inputStream.getIOStatistics(), StreamStatisticNames.STREAM_READ_OPENED).isEqualTo(1); } else { - // For S3InputStream, verify > 1 call was made, + // For S3AInputStream, verify > 1 call was made, // so we're sure it is correctly configured for each request IOStatisticAssertions.assertThatStatisticCounter(inputStream.getIOStatistics(), StreamStatisticNames.STREAM_READ_OPENED).isGreaterThan(1); From f0d25f761238670646262c0866094324a02e8e23 Mon Sep 17 00:00:00 2001 From: Monthon Klongklaew Date: Tue, 26 Apr 2022 10:50:16 +0100 Subject: [PATCH 09/10] change unbuffer capability checking --- .../apache/hadoop/fs/s3a/ITestS3AUnbuffer.java | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AUnbuffer.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AUnbuffer.java index 664e1ea307274..a92abb746bd97 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AUnbuffer.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AUnbuffer.java @@ -18,9 +18,9 @@ package org.apache.hadoop.fs.s3a; -import org.apache.hadoop.fs.CanUnbuffer; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.fs.contract.ContractTestUtils; import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics; import org.apache.hadoop.fs.statistics.IOStatistics; @@ -33,7 +33,6 @@ import org.junit.Test; import java.io.IOException; -import java.io.InputStream; import static org.apache.hadoop.fs.contract.ContractTestUtils.skip; import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_BYTES; @@ -75,7 +74,9 @@ public void testUnbuffer() throws IOException { IOStatisticsSnapshot iostats = new IOStatisticsSnapshot(); // Open file, read half the data, and then call unbuffer try (FSDataInputStream inputStream = getFileSystem().open(dest)) { - skipIfCannotUnbuffer(inputStream.getWrappedStream()); + if (!inputStream.hasCapability(StreamCapabilities.UNBUFFER)) { + skip("input stream does not support unbuffer"); + } assertTrue(inputStream.getWrappedStream() instanceof S3AInputStream); int bytesToRead = 8; readAndAssertBytesRead(inputStream, bytesToRead); @@ -142,7 +143,9 @@ public void testUnbufferStreamStatistics() throws IOException { Object streamStatsStr; try { inputStream = fs.open(dest); - skipIfCannotUnbuffer(inputStream.getWrappedStream()); + if (!inputStream.hasCapability(StreamCapabilities.UNBUFFER)) { + skip("input stream does not support unbuffer"); + } streamStatsStr = demandStringifyIOStatisticsSource(inputStream); LOG.info("initial stream statistics {}", streamStatsStr); @@ -197,12 +200,6 @@ private boolean isObjectStreamOpen(FSDataInputStream inputStream) { return ((S3AInputStream) inputStream.getWrappedStream()).isObjectStreamOpen(); } - private void skipIfCannotUnbuffer(InputStream inputStream) { - if (!(inputStream instanceof CanUnbuffer)) { - skip("input stream does not support unbuffer"); - } - } - /** * Read the specified number of bytes from the given * {@link FSDataInputStream} and assert that From e1eb23ed6dd6b7ff0661411e82d4b0d92ddd3051 Mon Sep 17 00:00:00 2001 From: Monthon Klongklaew Date: Tue, 26 Apr 2022 11:06:05 +0100 Subject: [PATCH 10/10] move unbuffer capability checking to a method --- .../org/apache/hadoop/fs/s3a/ITestS3AUnbuffer.java | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AUnbuffer.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AUnbuffer.java index a92abb746bd97..3a2d1b1b09a49 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AUnbuffer.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AUnbuffer.java @@ -74,9 +74,7 @@ public void testUnbuffer() throws IOException { IOStatisticsSnapshot iostats = new IOStatisticsSnapshot(); // Open file, read half the data, and then call unbuffer try (FSDataInputStream inputStream = getFileSystem().open(dest)) { - if (!inputStream.hasCapability(StreamCapabilities.UNBUFFER)) { - skip("input stream does not support unbuffer"); - } + skipIfCannotUnbuffer(inputStream); assertTrue(inputStream.getWrappedStream() instanceof S3AInputStream); int bytesToRead = 8; readAndAssertBytesRead(inputStream, bytesToRead); @@ -143,9 +141,7 @@ public void testUnbufferStreamStatistics() throws IOException { Object streamStatsStr; try { inputStream = fs.open(dest); - if (!inputStream.hasCapability(StreamCapabilities.UNBUFFER)) { - skip("input stream does not support unbuffer"); - } + skipIfCannotUnbuffer(inputStream); streamStatsStr = demandStringifyIOStatisticsSource(inputStream); LOG.info("initial stream statistics {}", streamStatsStr); @@ -200,6 +196,12 @@ private boolean isObjectStreamOpen(FSDataInputStream inputStream) { return ((S3AInputStream) inputStream.getWrappedStream()).isObjectStreamOpen(); } + private void skipIfCannotUnbuffer(FSDataInputStream inputStream) { + if (!inputStream.hasCapability(StreamCapabilities.UNBUFFER)) { + skip("input stream does not support unbuffer"); + } + } + /** * Read the specified number of bytes from the given * {@link FSDataInputStream} and assert that