From bb1a5fe5b70ebb10d323e6ca1211a5e412934464 Mon Sep 17 00:00:00 2001 From: Alessandro Passaro Date: Wed, 21 Sep 2022 16:26:42 +0100 Subject: [PATCH 1/8] HADOOP-18378: Implement lazy seek in S3A prefetching. Make S3APrefetchingInputStream.seek() completely lazy. Calls to seek() will not affect the current buffer nor interfere with prefetching, until read() is called. This change allows various usage patterns to benefit from prefetching, e.g. when calling readFully(position, buffer) in a loop for contiguous positions the intermediate internal calls to seek() will be noops and prefetching will have the same performance as in a sequential read. --- .../hadoop/fs/impl/prefetch/FilePosition.java | 4 +- .../fs/impl/prefetch/TestFilePosition.java | 24 ++++- .../s3a/prefetch/S3ACachingInputStream.java | 100 ++++++------------ .../s3a/prefetch/S3AInMemoryInputStream.java | 12 ++- .../fs/s3a/prefetch/S3ARemoteInputStream.java | 54 ++++++---- .../markdown/tools/hadoop-aws/prefetching.md | 24 +++-- .../s3a/ITestS3APrefetchingInputStream.java | 58 ++++++++-- .../prefetch/TestS3ARemoteInputStream.java | 24 +++-- 8 files changed, 181 insertions(+), 119 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/FilePosition.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/FilePosition.java index 7cd3bb3de2b58..286bdd7ae8996 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/FilePosition.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/FilePosition.java @@ -116,7 +116,7 @@ public void setData(BufferData bufferData, readOffset, "readOffset", startOffset, - startOffset + bufferData.getBuffer().limit() - 1); + startOffset + bufferData.getBuffer().limit()); data = bufferData; buffer = bufferData.getBuffer().duplicate(); @@ -182,7 +182,7 @@ public int relative() { */ public boolean isWithinCurrentBuffer(long pos) { throwIfInvalidBuffer(); - long bufferEndOffset = bufferStartOffset + buffer.limit() - 1; + long bufferEndOffset = bufferStartOffset + buffer.limit(); return (pos >= bufferStartOffset) && (pos <= bufferEndOffset); } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/prefetch/TestFilePosition.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/prefetch/TestFilePosition.java index e86c4be97b961..a462e51e94210 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/prefetch/TestFilePosition.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/prefetch/TestFilePosition.java @@ -43,6 +43,7 @@ public void testArgChecks() throws Exception { new FilePosition(10, 5); new FilePosition(5, 10); new FilePosition(10, 5).setData(data, 3, 4); + new FilePosition(10, 10).setData(data, 3, 13); // Verify it throws correctly. @@ -94,11 +95,11 @@ public void testArgChecks() throws Exception { "'readOffset' must not be negative", () -> pos.setData(data, 4, -4)); intercept(IllegalArgumentException.class, - "'readOffset' (15) must be within the range [4, 13]", + "'readOffset' (15) must be within the range [4, 14]", () -> pos.setData(data, 4, 15)); intercept(IllegalArgumentException.class, - "'readOffset' (3) must be within the range [4, 13]", + "'readOffset' (3) must be within the range [4, 14]", () -> pos.setData(data, 4, 3)); } @@ -192,4 +193,23 @@ public void testBufferStats() { } assertTrue(pos.bufferFullyRead()); } + + @Test + public void testBounds() { + int bufferSize = 8; + long fileSize = bufferSize; + + ByteBuffer buffer = ByteBuffer.allocate(bufferSize); + BufferData data = new BufferData(0, buffer); + FilePosition pos = new FilePosition(fileSize, bufferSize); + + long eofOffset = fileSize; + pos.setData(data, 0, eofOffset); + + assertTrue(pos.isWithinCurrentBuffer(eofOffset)); + assertEquals(eofOffset, pos.absolute()); + + assertTrue(pos.setAbsolute(eofOffset)); + assertEquals(eofOffset, pos.absolute()); + } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ACachingInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ACachingInputStream.java index 0afd071246415..5f22712f4a208 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ACachingInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ACachingInputStream.java @@ -28,6 +28,7 @@ import org.apache.hadoop.fs.impl.prefetch.BlockManager; import org.apache.hadoop.fs.impl.prefetch.BufferData; import org.apache.hadoop.fs.impl.prefetch.ExecutorServiceFuturePool; +import org.apache.hadoop.fs.impl.prefetch.FilePosition; import org.apache.hadoop.fs.s3a.S3AInputStream; import org.apache.hadoop.fs.s3a.S3AReadOpContext; import org.apache.hadoop.fs.s3a.S3ObjectAttributes; @@ -84,46 +85,6 @@ public S3ACachingInputStream( fileSize); } - /** - * Moves the current read position so that the next read will occur at {@code pos}. - * - * @param pos the next read will take place at this position. - * - * @throws IllegalArgumentException if pos is outside of the range [0, file size]. - */ - @Override - public void seek(long pos) throws IOException { - throwIfClosed(); - throwIfInvalidSeek(pos); - - // The call to setAbsolute() returns true if the target position is valid and - // within the current block. Therefore, no additional work is needed when we get back true. - if (!getFilePosition().setAbsolute(pos)) { - LOG.info("seek({})", getOffsetStr(pos)); - // We could be here in two cases: - // -- the target position is invalid: - // We ignore this case here as the next read will return an error. - // -- it is valid but outside of the current block. - if (getFilePosition().isValid()) { - // There are two cases to consider: - // -- the seek was issued after this buffer was fully read. - // In this case, it is very unlikely that this buffer will be needed again; - // therefore we release the buffer without caching. - // -- if we are jumping out of the buffer before reading it completely then - // we will likely need this buffer again (as observed empirically for Parquet) - // therefore we issue an async request to cache this buffer. - if (!getFilePosition().bufferFullyRead()) { - blockManager.requestCaching(getFilePosition().data()); - } else { - blockManager.release(getFilePosition().data()); - } - getFilePosition().invalidate(); - blockManager.cancelPrefetches(); - } - setSeekTargetPos(pos); - } - } - @Override public void close() throws IOException { // Close the BlockManager first, cancelling active prefetches, @@ -139,36 +100,45 @@ protected boolean ensureCurrentBuffer() throws IOException { return false; } - if (getFilePosition().isValid() && getFilePosition() - .buffer() - .hasRemaining()) { - return true; + long readPos = getNextReadPos(); + if (!getBlockData().isValidOffset(readPos)) { + return false; } - long readPos; - int prefetchCount; - - if (getFilePosition().isValid()) { - // A sequential read results in a prefetch. - readPos = getFilePosition().absolute(); - prefetchCount = numBlocksToPrefetch; - } else { - // A seek invalidates the current position. - // We prefetch only 1 block immediately after a seek operation. - readPos = getSeekTargetPos(); - prefetchCount = 1; - } + // Determine whether this is an out of order read. + FilePosition filePosition = getFilePosition(); + boolean outOfOrderRead = !filePosition.setAbsolute(readPos); - if (!getBlockData().isValidOffset(readPos)) { - return false; + if (!outOfOrderRead && filePosition.buffer().hasRemaining()) { + // Use the current buffer. + return true; } - if (getFilePosition().isValid()) { - if (getFilePosition().bufferFullyRead()) { - blockManager.release(getFilePosition().data()); + if(filePosition.isValid()) { + // We are jumping out of the current buffer. There are two cases to consider: + if (filePosition.bufferFullyRead()) { + // This buffer was fully read: + // it is very unlikely that this buffer will be needed again; + // therefore we release the buffer without caching. + blockManager.release(filePosition.data()); } else { - blockManager.requestCaching(getFilePosition().data()); + // We will likely need this buffer again (as observed empirically for Parquet) + // therefore we issue an async request to cache this buffer. + blockManager.requestCaching(filePosition.data()); } + filePosition.invalidate(); + } + + int prefetchCount; + if (outOfOrderRead) { + LOG.debug("lazy-seek({})", getOffsetStr(readPos)); + blockManager.cancelPrefetches(); + + // We prefetch only 1 block immediately after a seek operation. + prefetchCount = 1; + } else { + // A sequential read results in a prefetch. + prefetchCount = numBlocksToPrefetch; } int toBlockNumber = getBlockData().getBlockNumber(readPos); @@ -186,7 +156,7 @@ protected boolean ensureCurrentBuffer() throws IOException { .trackDuration(STREAM_READ_BLOCK_ACQUIRE_AND_READ), () -> blockManager.get(toBlockNumber)); - getFilePosition().setData(data, startOffset, readPos); + filePosition.setData(data, startOffset, readPos); return true; } @@ -197,7 +167,7 @@ public String toString() { } StringBuilder sb = new StringBuilder(); - sb.append(String.format("fpos = (%s)%n", getFilePosition())); + sb.append(String.format("%s%n", super.toString())); sb.append(blockManager.toString()); return sb.toString(); } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3AInMemoryInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3AInMemoryInputStream.java index 322459a958912..e8bfe946f4abf 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3AInMemoryInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3AInMemoryInputStream.java @@ -26,6 +26,7 @@ import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.impl.prefetch.BufferData; +import org.apache.hadoop.fs.impl.prefetch.FilePosition; import org.apache.hadoop.fs.s3a.S3AInputStream; import org.apache.hadoop.fs.s3a.S3AReadOpContext; import org.apache.hadoop.fs.s3a.S3ObjectAttributes; @@ -86,7 +87,12 @@ protected boolean ensureCurrentBuffer() throws IOException { return false; } - if (!getFilePosition().isValid()) { + FilePosition filePosition = getFilePosition(); + if (filePosition.isValid()) { + // Update current position (lazy seek). + filePosition.setAbsolute(getNextReadPos()); + } else { + // Read entire file into buffer. buffer.clear(); int numBytesRead = getReader().read(buffer, 0, buffer.capacity()); @@ -94,9 +100,9 @@ protected boolean ensureCurrentBuffer() throws IOException { return false; } BufferData data = new BufferData(0, buffer); - getFilePosition().setData(data, 0, getSeekTargetPos()); + filePosition.setData(data, 0, getNextReadPos()); } - return getFilePosition().buffer().hasRemaining(); + return filePosition.buffer().hasRemaining(); } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ARemoteInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ARemoteInputStream.java index 0f46a8ed5e534..38d740bd74f94 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ARemoteInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ARemoteInputStream.java @@ -77,12 +77,16 @@ public abstract class S3ARemoteInputStream private volatile boolean closed; /** - * Current position within the file. + * Internal position within the file. Updated lazily + * after a seek before a read. */ private FilePosition fpos; - /** The target of the most recent seek operation. */ - private long seekTargetPos; + /** + * This is the actual position within the file, used by + * lazy seek to decide whether to seek on the next read or not. + */ + private long nextReadPos; /** Information about each block of the mapped S3 file. */ private BlockData blockData; @@ -146,7 +150,7 @@ public S3ARemoteInputStream( this.remoteObject = getS3File(); this.reader = new S3ARemoteObjectReader(remoteObject); - this.seekTargetPos = 0; + this.nextReadPos = 0; } /** @@ -212,7 +216,8 @@ private void setInputPolicy(S3AInputPolicy inputPolicy) { public int available() throws IOException { throwIfClosed(); - if (!ensureCurrentBuffer()) { + // Update the current position in the current buffer, if possible. + if (!fpos.setAbsolute(nextReadPos)) { return 0; } @@ -228,11 +233,7 @@ public int available() throws IOException { public long getPos() throws IOException { throwIfClosed(); - if (fpos.isValid()) { - return fpos.absolute(); - } else { - return seekTargetPos; - } + return nextReadPos; } /** @@ -247,10 +248,7 @@ public void seek(long pos) throws IOException { throwIfClosed(); throwIfInvalidSeek(pos); - if (!fpos.setAbsolute(pos)) { - fpos.invalidate(); - seekTargetPos = pos; - } + nextReadPos = pos; } /** @@ -268,7 +266,7 @@ public int read() throws IOException { throwIfClosed(); if (remoteObject.size() == 0 - || seekTargetPos >= remoteObject.size()) { + || nextReadPos >= remoteObject.size()) { return -1; } @@ -276,6 +274,7 @@ public int read() throws IOException { return -1; } + nextReadPos++; incrementBytesRead(1); return fpos.buffer().get() & 0xff; @@ -315,7 +314,7 @@ public int read(byte[] buffer, int offset, int len) throws IOException { } if (remoteObject.size() == 0 - || seekTargetPos >= remoteObject.size()) { + || nextReadPos >= remoteObject.size()) { return -1; } @@ -334,6 +333,7 @@ public int read(byte[] buffer, int offset, int len) throws IOException { ByteBuffer buf = fpos.buffer(); int bytesToRead = Math.min(numBytesRemaining, buf.remaining()); buf.get(buffer, offset, bytesToRead); + nextReadPos += bytesToRead; incrementBytesRead(bytesToRead); offset += bytesToRead; numBytesRemaining -= bytesToRead; @@ -367,12 +367,8 @@ protected boolean isClosed() { return closed; } - protected long getSeekTargetPos() { - return seekTargetPos; - } - - protected void setSeekTargetPos(long pos) { - seekTargetPos = pos; + protected long getNextReadPos() { + return nextReadPos; } protected BlockData getBlockData() { @@ -443,6 +439,18 @@ public boolean markSupported() { return false; } + @Override + public String toString() { + if (isClosed()) { + return "closed"; + } + + StringBuilder sb = new StringBuilder(); + sb.append(String.format("nextReadPos = (%d)%n", nextReadPos)); + sb.append(String.format("fpos = (%s)", fpos)); + return sb.toString(); + } + protected void throwIfClosed() throws IOException { if (closed) { throw new IOException( @@ -453,6 +461,8 @@ protected void throwIfClosed() throws IOException { protected void throwIfInvalidSeek(long pos) throws EOFException { if (pos < 0) { throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK + " " + pos); + } else if (pos > this.getBlockData().getFileSize()) { + throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF + " " + pos); } } diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/prefetching.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/prefetching.md index 9b4e888d604ca..36621b881cab8 100644 --- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/prefetching.md +++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/prefetching.md @@ -158,7 +158,7 @@ The buffer for the block furthest from the current block is released. Once a buffer has been acquired by `CachingBlockManager`, if the buffer is in a *READY* state, it is returned. This means that data was already read into this buffer asynchronously by a prefetch. -If it's state is *BLANK* then data is read into it using +If its state is *BLANK* then data is read into it using `S3Reader.read(ByteBuffer buffer, long offset, int size).` For the second read call, `in.read(buffer, 0, 8MB)`, since the block sizes are of 8MB and only 5MB @@ -170,7 +170,10 @@ The number of blocks to be prefetched is determined by `fs.s3a.prefetch.block.co ##### Random Reads -If the caller makes the following calls: +The `CachingInputStream` also caches prefetched blocks. This happens when `read()` is issued +after a `seek()` outside the current block, but the current block still has not been fully read. + +For example, consider the following calls: ``` in.read(buffer, 0, 5MB) @@ -180,13 +183,14 @@ in.seek(2MB) in.read(buffer, 0, 4MB) ``` -The `CachingInputStream` also caches prefetched blocks. -This happens when a `seek()` is issued for outside the current block and the current block still has -not been fully read. +For the above read sequence, after the `seek(10MB)` call is issued, block 0 has not been read +completely so the subsequent call to `read()` will cache it, on the assumption that the caller +will probably want to read from it again. -For the above read sequence, when the `seek(10MB)` call is issued, block 0 has not been read -completely so cache it as the caller will probably want to read from it again. +After `seek(2MB)` is called, the position is back inside block 0. The next read can now be +satisfied from the locally cached block file, which is typically orders of magnitude faster +than a network based read. -When `seek(2MB)` is called, the position is back inside block 0. -The next read can now be satisfied from the locally cached block file, which is typically orders of -magnitude faster than a network based read. \ No newline at end of file +NB: `seek()` is implemented lazily, so it only keeps track of the new position but does not +otherwise affect the internal state of the stream. Only when a `read()` is issued, it will call +the `ensureCurrentBuffer()` method and fetch a new block if required. \ No newline at end of file diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java index 24f74b3a0212e..4d3891aab59aa 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java @@ -136,6 +136,39 @@ public void testReadLargeFileFully() throws Throwable { StoreStatisticNames.ACTION_EXECUTOR_ACQUIRED + SUFFIX_MAX).isGreaterThan(0); } + @Test + public void testReadLargeFileFullyLazySeek() throws Throwable { + describe("read a large file using readFully(position,buffer,offset,length), uses S3ACachingInputStream"); + IOStatistics ioStats; + openFS(); + + try (FSDataInputStream in = largeFileFS.open(largeFile)) { + ioStats = in.getIOStatistics(); + + byte[] buffer = new byte[S_1M * 10]; + long bytesRead = 0; + + while (bytesRead < largeFileSize) { + in.readFully(bytesRead, buffer, 0, (int) Math.min(buffer.length, + largeFileSize - bytesRead)); + bytesRead += buffer.length; + // Blocks are fully read, no blocks should be cached + verifyStatisticGaugeValue(ioStats, StreamStatisticNames.STREAM_READ_BLOCKS_IN_FILE_CACHE, + 0); + } + + // Assert that first block is read synchronously, following blocks are prefetched + verifyStatisticCounterValue(ioStats, StreamStatisticNames.STREAM_READ_PREFETCH_OPERATIONS, + numBlocks - 1); + verifyStatisticCounterValue(ioStats, StoreStatisticNames.ACTION_HTTP_GET_REQUEST, numBlocks); + verifyStatisticCounterValue(ioStats, StreamStatisticNames.STREAM_READ_OPENED, numBlocks); + } + // Verify that once stream is closed, all memory is freed + verifyStatisticGaugeValue(ioStats, StreamStatisticNames.STREAM_READ_ACTIVE_MEMORY_IN_USE, 0); + assertThatStatisticMaximum(ioStats, + StoreStatisticNames.ACTION_EXECUTOR_ACQUIRED + SUFFIX_MAX).isGreaterThan(0); + } + @Test public void testRandomReadLargeFile() throws Throwable { describe("random read on a large file, uses S3ACachingInputStream"); @@ -147,19 +180,26 @@ public void testRandomReadLargeFile() throws Throwable { byte[] buffer = new byte[blockSize]; - // Don't read the block completely so it gets cached on seek + // Don't read block 0 completely so it gets cached on read after seek in.read(buffer, 0, blockSize - S_1K * 10); - in.seek(blockSize + S_1K * 10); - // Backwards seek, will use cached block + + // Seek to block 2 and read all of it + in.seek(blockSize * 2); + in.read(buffer, 0, blockSize); + + // Seek to block 4 but don't read: noop. + in.seek(blockSize * 4); + + // Backwards seek, will use cached block 0 in.seek(S_1K * 5); in.read(); - verifyStatisticCounterValue(ioStats, StoreStatisticNames.ACTION_HTTP_GET_REQUEST, 2); - verifyStatisticCounterValue(ioStats, StreamStatisticNames.STREAM_READ_OPENED, 2); - verifyStatisticCounterValue(ioStats, StreamStatisticNames.STREAM_READ_PREFETCH_OPERATIONS, 1); - // block 0 is cached when we seek to block 1, block 1 is cached as it is being prefetched - // when we seek out of block 0, see cancelPrefetches() - verifyStatisticGaugeValue(ioStats, StreamStatisticNames.STREAM_READ_BLOCKS_IN_FILE_CACHE, 2); + // Expected to get block 0 (partially read), 1 (prefetch), 2 (fully read), 3 (prefetch) + // Blocks 0, 1, 3 were not fully read, so remain in the file cache + verifyStatisticCounterValue(ioStats, StoreStatisticNames.ACTION_HTTP_GET_REQUEST, 4); + verifyStatisticCounterValue(ioStats, StreamStatisticNames.STREAM_READ_OPENED, 4); + verifyStatisticCounterValue(ioStats, StreamStatisticNames.STREAM_READ_PREFETCH_OPERATIONS, 2); + verifyStatisticGaugeValue(ioStats, StreamStatisticNames.STREAM_READ_BLOCKS_IN_FILE_CACHE, 3); } verifyStatisticGaugeValue(ioStats, StreamStatisticNames.STREAM_READ_BLOCKS_IN_FILE_CACHE, 0); verifyStatisticGaugeValue(ioStats, StreamStatisticNames.STREAM_READ_ACTIVE_MEMORY_IN_USE, 0); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/TestS3ARemoteInputStream.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/TestS3ARemoteInputStream.java index 4ab33ef6cd074..7b9b991ba8d07 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/TestS3ARemoteInputStream.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/TestS3ARemoteInputStream.java @@ -121,8 +121,8 @@ public void testRead() throws Exception { private void testReadHelper(S3ARemoteInputStream inputStream, int bufferSize) throws Exception { - assertEquals(bufferSize, inputStream.available()); assertEquals(0, inputStream.read()); + assertEquals(bufferSize - 1, inputStream.available()); assertEquals(1, inputStream.read()); byte[] buffer = new byte[2]; @@ -170,12 +170,14 @@ private void testSeekHelper(S3ARemoteInputStream inputStream, int bufferSize, int fileSize) throws Exception { + assertEquals(0, inputStream.available()); assertEquals(0, inputStream.getPos()); - inputStream.seek(7); - assertEquals(7, inputStream.getPos()); + inputStream.seek(bufferSize); + assertEquals(0, inputStream.available()); + assertEquals(bufferSize, inputStream.getPos()); inputStream.seek(0); + assertEquals(0, inputStream.available()); - assertEquals(bufferSize, inputStream.available()); for (int i = 0; i < fileSize; i++) { assertEquals(i, inputStream.read()); } @@ -187,11 +189,20 @@ private void testSeekHelper(S3ARemoteInputStream inputStream, } } + // Can seek to the EOF: read() will then return -1. + inputStream.seek(fileSize); + assertEquals(-1, inputStream.read()); + // Test invalid seeks. ExceptionAsserts.assertThrows( EOFException.class, FSExceptionMessages.NEGATIVE_SEEK, () -> inputStream.seek(-1)); + + ExceptionAsserts.assertThrows( + EOFException.class, + FSExceptionMessages.CANNOT_SEEK_PAST_EOF, + () -> inputStream.seek(fileSize + 1)); } @Test @@ -217,7 +228,7 @@ private void testRandomSeekHelper(S3ARemoteInputStream inputStream, assertEquals(7, inputStream.getPos()); inputStream.seek(0); - assertEquals(bufferSize, inputStream.available()); + assertEquals(0, inputStream.available()); for (int i = 0; i < fileSize; i++) { assertEquals(i, inputStream.read()); } @@ -251,9 +262,10 @@ public void testClose() throws Exception { private void testCloseHelper(S3ARemoteInputStream inputStream, int bufferSize) throws Exception { - assertEquals(bufferSize, inputStream.available()); + assertEquals(0, inputStream.available()); assertEquals(0, inputStream.read()); assertEquals(1, inputStream.read()); + assertEquals(bufferSize - 2, inputStream.available()); inputStream.close(); From 015b025683169ca1d3d5b46011811cb47710e092 Mon Sep 17 00:00:00 2001 From: Alessandro Passaro Date: Fri, 30 Sep 2022 09:51:32 +0100 Subject: [PATCH 2/8] Fix blanks and checkstyle issues. --- .../src/site/markdown/tools/hadoop-aws/prefetching.md | 10 +++++----- .../hadoop/fs/s3a/ITestS3APrefetchingInputStream.java | 3 ++- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/prefetching.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/prefetching.md index 36621b881cab8..e966c2dce4cb3 100644 --- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/prefetching.md +++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/prefetching.md @@ -184,13 +184,13 @@ in.read(buffer, 0, 4MB) ``` For the above read sequence, after the `seek(10MB)` call is issued, block 0 has not been read -completely so the subsequent call to `read()` will cache it, on the assumption that the caller +completely so the subsequent call to `read()` will cache it, on the assumption that the caller will probably want to read from it again. -After `seek(2MB)` is called, the position is back inside block 0. The next read can now be -satisfied from the locally cached block file, which is typically orders of magnitude faster +After `seek(2MB)` is called, the position is back inside block 0. The next read can now be +satisfied from the locally cached block file, which is typically orders of magnitude faster than a network based read. -NB: `seek()` is implemented lazily, so it only keeps track of the new position but does not -otherwise affect the internal state of the stream. Only when a `read()` is issued, it will call +NB: `seek()` is implemented lazily, so it only keeps track of the new position but does not +otherwise affect the internal state of the stream. Only when a `read()` is issued, it will call the `ensureCurrentBuffer()` method and fetch a new block if required. \ No newline at end of file diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java index 4d3891aab59aa..676a842d9408f 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java @@ -138,7 +138,8 @@ public void testReadLargeFileFully() throws Throwable { @Test public void testReadLargeFileFullyLazySeek() throws Throwable { - describe("read a large file using readFully(position,buffer,offset,length), uses S3ACachingInputStream"); + describe("read a large file using readFully(position,buffer,offset,length)," + + " uses S3ACachingInputStream"); IOStatistics ioStats; openFS(); From 293ef662dc1c2d9227f7d149bcc5f478da40571d Mon Sep 17 00:00:00 2001 From: Alessandro Passaro Date: Mon, 3 Oct 2022 10:04:24 +0100 Subject: [PATCH 3/8] Fix missing space. --- .../apache/hadoop/fs/s3a/prefetch/S3ACachingInputStream.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ACachingInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ACachingInputStream.java index 5f22712f4a208..f9ee4e412fc4f 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ACachingInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ACachingInputStream.java @@ -114,7 +114,7 @@ protected boolean ensureCurrentBuffer() throws IOException { return true; } - if(filePosition.isValid()) { + if (filePosition.isValid()) { // We are jumping out of the current buffer. There are two cases to consider: if (filePosition.bufferFullyRead()) { // This buffer was fully read: From d9c4262b674ca11e3edfc6b5f08fca2b652843cc Mon Sep 17 00:00:00 2001 From: Alessandro Passaro Date: Mon, 3 Oct 2022 10:04:34 +0100 Subject: [PATCH 4/8] Add static imports. --- .../s3a/ITestS3APrefetchingInputStream.java | 54 ++++++++++--------- 1 file changed, 30 insertions(+), 24 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java index 676a842d9408f..504a6d1ee7cf4 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java @@ -41,7 +41,13 @@ import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticMaximum; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticGaugeValue; +import static org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_EXECUTOR_ACQUIRED; +import static org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_HTTP_GET_REQUEST; import static org.apache.hadoop.fs.statistics.StoreStatisticNames.SUFFIX_MAX; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_ACTIVE_MEMORY_IN_USE; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BLOCKS_IN_FILE_CACHE; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_OPENED; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_PREFETCH_OPERATIONS; import static org.apache.hadoop.io.IOUtils.cleanupWithLogger; /** @@ -120,20 +126,20 @@ public void testReadLargeFileFully() throws Throwable { in.readFully(buffer, 0, (int) Math.min(buffer.length, largeFileSize - bytesRead)); bytesRead += buffer.length; // Blocks are fully read, no blocks should be cached - verifyStatisticGaugeValue(ioStats, StreamStatisticNames.STREAM_READ_BLOCKS_IN_FILE_CACHE, + verifyStatisticGaugeValue(ioStats, STREAM_READ_BLOCKS_IN_FILE_CACHE, 0); } // Assert that first block is read synchronously, following blocks are prefetched - verifyStatisticCounterValue(ioStats, StreamStatisticNames.STREAM_READ_PREFETCH_OPERATIONS, + verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCH_OPERATIONS, numBlocks - 1); - verifyStatisticCounterValue(ioStats, StoreStatisticNames.ACTION_HTTP_GET_REQUEST, numBlocks); - verifyStatisticCounterValue(ioStats, StreamStatisticNames.STREAM_READ_OPENED, numBlocks); + verifyStatisticCounterValue(ioStats, ACTION_HTTP_GET_REQUEST, numBlocks); + verifyStatisticCounterValue(ioStats, STREAM_READ_OPENED, numBlocks); } // Verify that once stream is closed, all memory is freed - verifyStatisticGaugeValue(ioStats, StreamStatisticNames.STREAM_READ_ACTIVE_MEMORY_IN_USE, 0); + verifyStatisticGaugeValue(ioStats, STREAM_READ_ACTIVE_MEMORY_IN_USE, 0); assertThatStatisticMaximum(ioStats, - StoreStatisticNames.ACTION_EXECUTOR_ACQUIRED + SUFFIX_MAX).isGreaterThan(0); + ACTION_EXECUTOR_ACQUIRED + SUFFIX_MAX).isGreaterThan(0); } @Test @@ -154,20 +160,20 @@ public void testReadLargeFileFullyLazySeek() throws Throwable { largeFileSize - bytesRead)); bytesRead += buffer.length; // Blocks are fully read, no blocks should be cached - verifyStatisticGaugeValue(ioStats, StreamStatisticNames.STREAM_READ_BLOCKS_IN_FILE_CACHE, + verifyStatisticGaugeValue(ioStats, STREAM_READ_BLOCKS_IN_FILE_CACHE, 0); } // Assert that first block is read synchronously, following blocks are prefetched - verifyStatisticCounterValue(ioStats, StreamStatisticNames.STREAM_READ_PREFETCH_OPERATIONS, + verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCH_OPERATIONS, numBlocks - 1); - verifyStatisticCounterValue(ioStats, StoreStatisticNames.ACTION_HTTP_GET_REQUEST, numBlocks); - verifyStatisticCounterValue(ioStats, StreamStatisticNames.STREAM_READ_OPENED, numBlocks); + verifyStatisticCounterValue(ioStats, ACTION_HTTP_GET_REQUEST, numBlocks); + verifyStatisticCounterValue(ioStats, STREAM_READ_OPENED, numBlocks); } // Verify that once stream is closed, all memory is freed - verifyStatisticGaugeValue(ioStats, StreamStatisticNames.STREAM_READ_ACTIVE_MEMORY_IN_USE, 0); + verifyStatisticGaugeValue(ioStats, STREAM_READ_ACTIVE_MEMORY_IN_USE, 0); assertThatStatisticMaximum(ioStats, - StoreStatisticNames.ACTION_EXECUTOR_ACQUIRED + SUFFIX_MAX).isGreaterThan(0); + ACTION_EXECUTOR_ACQUIRED + SUFFIX_MAX).isGreaterThan(0); } @Test @@ -197,15 +203,15 @@ public void testRandomReadLargeFile() throws Throwable { // Expected to get block 0 (partially read), 1 (prefetch), 2 (fully read), 3 (prefetch) // Blocks 0, 1, 3 were not fully read, so remain in the file cache - verifyStatisticCounterValue(ioStats, StoreStatisticNames.ACTION_HTTP_GET_REQUEST, 4); - verifyStatisticCounterValue(ioStats, StreamStatisticNames.STREAM_READ_OPENED, 4); - verifyStatisticCounterValue(ioStats, StreamStatisticNames.STREAM_READ_PREFETCH_OPERATIONS, 2); - verifyStatisticGaugeValue(ioStats, StreamStatisticNames.STREAM_READ_BLOCKS_IN_FILE_CACHE, 3); + verifyStatisticCounterValue(ioStats, ACTION_HTTP_GET_REQUEST, 4); + verifyStatisticCounterValue(ioStats, STREAM_READ_OPENED, 4); + verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCH_OPERATIONS, 2); + verifyStatisticGaugeValue(ioStats, STREAM_READ_BLOCKS_IN_FILE_CACHE, 3); } - verifyStatisticGaugeValue(ioStats, StreamStatisticNames.STREAM_READ_BLOCKS_IN_FILE_CACHE, 0); - verifyStatisticGaugeValue(ioStats, StreamStatisticNames.STREAM_READ_ACTIVE_MEMORY_IN_USE, 0); + verifyStatisticGaugeValue(ioStats, STREAM_READ_BLOCKS_IN_FILE_CACHE, 0); + verifyStatisticGaugeValue(ioStats, STREAM_READ_ACTIVE_MEMORY_IN_USE, 0); assertThatStatisticMaximum(ioStats, - StoreStatisticNames.ACTION_EXECUTOR_ACQUIRED + SUFFIX_MAX).isGreaterThan(0); + ACTION_EXECUTOR_ACQUIRED + SUFFIX_MAX).isGreaterThan(0); } @Test @@ -225,14 +231,14 @@ public void testRandomReadSmallFile() throws Throwable { in.seek(S_1K * 12); in.read(buffer, 0, S_1K * 4); - verifyStatisticCounterValue(ioStats, StoreStatisticNames.ACTION_HTTP_GET_REQUEST, 1); - verifyStatisticCounterValue(ioStats, StreamStatisticNames.STREAM_READ_OPENED, 1); - verifyStatisticCounterValue(ioStats, StreamStatisticNames.STREAM_READ_PREFETCH_OPERATIONS, 0); + verifyStatisticCounterValue(ioStats, ACTION_HTTP_GET_REQUEST, 1); + verifyStatisticCounterValue(ioStats, STREAM_READ_OPENED, 1); + verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCH_OPERATIONS, 0); // The buffer pool is not used - verifyStatisticGaugeValue(ioStats, StreamStatisticNames.STREAM_READ_ACTIVE_MEMORY_IN_USE, 0); + verifyStatisticGaugeValue(ioStats, STREAM_READ_ACTIVE_MEMORY_IN_USE, 0); // no prefetch ops, so no action_executor_acquired assertThatStatisticMaximum(ioStats, - StoreStatisticNames.ACTION_EXECUTOR_ACQUIRED + SUFFIX_MAX).isEqualTo(-1); + ACTION_EXECUTOR_ACQUIRED + SUFFIX_MAX).isEqualTo(-1); } } From e00ca034e456f368335141b20fc06e6df78accef Mon Sep 17 00:00:00 2001 From: Alessandro Passaro Date: Mon, 3 Oct 2022 10:31:51 +0100 Subject: [PATCH 5/8] Refactor asserts on available(). --- .../prefetch/TestS3ARemoteInputStream.java | 25 +++++++++++++------ 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/TestS3ARemoteInputStream.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/TestS3ARemoteInputStream.java index 7b9b991ba8d07..2a1d03d2948de 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/TestS3ARemoteInputStream.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/TestS3ARemoteInputStream.java @@ -21,6 +21,7 @@ import java.io.EOFException; import java.io.IOException; +import java.io.InputStream; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -35,6 +36,7 @@ import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics; import org.apache.hadoop.test.AbstractHadoopTestBase; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertEquals; /** @@ -97,7 +99,7 @@ public void testRead0SizedFile() throws Exception { private void testRead0SizedFileHelper(S3ARemoteInputStream inputStream, int bufferSize) throws Exception { - assertEquals(0, inputStream.available()); + assertAvailable(0, inputStream); assertEquals(-1, inputStream.read()); assertEquals(-1, inputStream.read()); @@ -122,7 +124,7 @@ public void testRead() throws Exception { private void testReadHelper(S3ARemoteInputStream inputStream, int bufferSize) throws Exception { assertEquals(0, inputStream.read()); - assertEquals(bufferSize - 1, inputStream.available()); + assertAvailable(bufferSize - 1, inputStream); assertEquals(1, inputStream.read()); byte[] buffer = new byte[2]; @@ -170,13 +172,13 @@ private void testSeekHelper(S3ARemoteInputStream inputStream, int bufferSize, int fileSize) throws Exception { - assertEquals(0, inputStream.available()); + assertAvailable(0, inputStream); assertEquals(0, inputStream.getPos()); inputStream.seek(bufferSize); - assertEquals(0, inputStream.available()); + assertAvailable(0, inputStream); assertEquals(bufferSize, inputStream.getPos()); inputStream.seek(0); - assertEquals(0, inputStream.available()); + assertAvailable(0, inputStream); for (int i = 0; i < fileSize; i++) { assertEquals(i, inputStream.read()); @@ -228,7 +230,7 @@ private void testRandomSeekHelper(S3ARemoteInputStream inputStream, assertEquals(7, inputStream.getPos()); inputStream.seek(0); - assertEquals(0, inputStream.available()); + assertAvailable(0, inputStream); for (int i = 0; i < fileSize; i++) { assertEquals(i, inputStream.read()); } @@ -262,10 +264,10 @@ public void testClose() throws Exception { private void testCloseHelper(S3ARemoteInputStream inputStream, int bufferSize) throws Exception { - assertEquals(0, inputStream.available()); + assertAvailable(0, inputStream); assertEquals(0, inputStream.read()); assertEquals(1, inputStream.read()); - assertEquals(bufferSize - 2, inputStream.available()); + assertAvailable(bufferSize - 2, inputStream); inputStream.close(); @@ -288,4 +290,11 @@ private void testCloseHelper(S3ARemoteInputStream inputStream, int bufferSize) // Verify a second close() does not throw. inputStream.close(); } + + private static void assertAvailable(int expected, InputStream inputStream) + throws IOException { + assertThat(inputStream.available()) + .describedAs("Check available bytes") + .isEqualTo(expected); + } } From 1f769cdb652b916f82433e0e1faa235572df57df Mon Sep 17 00:00:00 2001 From: Alessandro Passaro Date: Mon, 3 Oct 2022 11:50:18 +0100 Subject: [PATCH 6/8] Add test descriptions. --- .../fs/impl/prefetch/TestFilePosition.java | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/prefetch/TestFilePosition.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/prefetch/TestFilePosition.java index a462e51e94210..12ab62556a104 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/prefetch/TestFilePosition.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/prefetch/TestFilePosition.java @@ -26,6 +26,7 @@ import org.apache.hadoop.test.AbstractHadoopTestBase; import static org.apache.hadoop.test.LambdaTestUtils.intercept; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -206,10 +207,18 @@ public void testBounds() { long eofOffset = fileSize; pos.setData(data, 0, eofOffset); - assertTrue(pos.isWithinCurrentBuffer(eofOffset)); - assertEquals(eofOffset, pos.absolute()); - - assertTrue(pos.setAbsolute(eofOffset)); - assertEquals(eofOffset, pos.absolute()); + assertThat(pos.isWithinCurrentBuffer(eofOffset)) + .describedAs("EOF offset %d should be within the current buffer", eofOffset) + .isTrue(); + assertThat(pos.absolute()) + .describedAs("absolute() should return the EOF offset") + .isEqualTo(eofOffset); + + assertThat(pos.setAbsolute(eofOffset)) + .describedAs("setAbsolute() should return true on the EOF offset %d", eofOffset) + .isTrue(); + assertThat(pos.absolute()) + .describedAs("absolute() should return the EOF offset") + .isEqualTo(eofOffset); } } From 7edb2f04b1e3431ae60f061085f6cae5f360eb0e Mon Sep 17 00:00:00 2001 From: Alessandro Passaro Date: Tue, 4 Oct 2022 08:28:11 +0100 Subject: [PATCH 7/8] Fix checkstyle issues. --- .../apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java index 504a6d1ee7cf4..d597c05e1dac3 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java @@ -32,8 +32,6 @@ import org.apache.hadoop.fs.contract.ContractTestUtils; import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest; import org.apache.hadoop.fs.statistics.IOStatistics; -import org.apache.hadoop.fs.statistics.StoreStatisticNames; -import org.apache.hadoop.fs.statistics.StreamStatisticNames; import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_DEFAULT_SIZE; import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY; From 42b51c47b2667afef859042ec607b851d1f124d7 Mon Sep 17 00:00:00 2001 From: Alessandro Passaro Date: Wed, 5 Oct 2022 10:17:23 +0100 Subject: [PATCH 8/8] Add inputStream description to test failure message. --- .../apache/hadoop/fs/s3a/prefetch/TestS3ARemoteInputStream.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/TestS3ARemoteInputStream.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/TestS3ARemoteInputStream.java index 2a1d03d2948de..d449a79a5a851 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/TestS3ARemoteInputStream.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/TestS3ARemoteInputStream.java @@ -294,7 +294,7 @@ private void testCloseHelper(S3ARemoteInputStream inputStream, int bufferSize) private static void assertAvailable(int expected, InputStream inputStream) throws IOException { assertThat(inputStream.available()) - .describedAs("Check available bytes") + .describedAs("Check available bytes on stream %s", inputStream) .isEqualTo(expected); } }