diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractSeekTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractSeekTest.java index 3c1377a5a4981..7a55c22b405ad 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractSeekTest.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractSeekTest.java @@ -30,6 +30,7 @@ import java.io.EOFException; import java.io.IOException; +import java.io.InputStream; import java.util.Random; import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile; @@ -99,14 +100,18 @@ public void testSeekZeroByteFile() throws Throwable { describe("seek and read a 0 byte file"); instream = getFileSystem().open(zeroByteFile); assertEquals(0, instream.getPos()); + assertNotAvailable(instream); //expect initial read to fai; int result = instream.read(); assertMinusOne("initial byte read", result); + assertNotAvailable(instream); byte[] buffer = new byte[1]; //expect that seek to 0 works instream.seek(0); + assertNotAvailable(instream); //reread, expect same exception result = instream.read(); + assertNotAvailable(instream); assertMinusOne("post-seek byte read", result); result = instream.read(buffer, 0, 1); assertMinusOne("post-seek buffer read", result); @@ -172,6 +177,11 @@ public void testSeekReadClosedFile() throws Throwable { // sure there's no other exception like an NPE. } + try{ + instream.available(); + }catch (IOException | IllegalStateException e) { + // expected a closed file + } //and close again instream.close(); } @@ -194,6 +204,7 @@ public void testNegativeSeek() throws Throwable { //bad seek -expected, but not as preferred as an EOFException handleRelaxedException("a negative seek", "EOFException", e); } + assertTrue("The available should be zero",instream.available() >= 0); assertEquals(0, instream.getPos()); } @@ -205,6 +216,7 @@ public void testSeekFile() throws Throwable { //expect that seek to 0 works instream.seek(0); int result = instream.read(); + assertAvailable(instream); assertEquals(0, result); assertEquals(1, instream.read()); assertEquals(2, instream.getPos()); @@ -226,7 +238,9 @@ public void testSeekAndReadPastEndOfFile() throws Throwable { //go just before the end instream.seek(TEST_FILE_LEN - 2); assertTrue("Premature EOF", instream.read() != -1); + assertAvailable(instream); assertTrue("Premature EOF", instream.read() != -1); + assertNotAvailable(instream); assertMinusOne("read past end of file", instream.read()); } @@ -261,6 +275,7 @@ public void testSeekPastEndOfFileThenReseekAndRead() throws Throwable { //now go back and try to read from a valid point in the file instream.seek(1); assertTrue("Premature EOF", instream.read() != -1); + assertAvailable(instream); } /** @@ -278,6 +293,7 @@ public void testSeekBigFile() throws Throwable { //expect that seek to 0 works instream.seek(0); int result = instream.read(); + assertAvailable(instream); assertEquals(0, result); assertEquals(1, instream.read()); assertEquals(2, instream.read()); @@ -296,6 +312,7 @@ public void testSeekBigFile() throws Throwable { instream.seek(0); assertEquals(0, instream.getPos()); instream.read(); + assertAvailable(instream); assertEquals(1, instream.getPos()); byte[] buf = new byte[80 * 1024]; instream.readFully(1, buf, 0, buf.length); @@ -314,7 +331,7 @@ public void testPositionedBulkReadDoesntChangePosition() throws Throwable { instream.seek(39999); assertTrue(-1 != instream.read()); assertEquals(40000, instream.getPos()); - + assertAvailable(instream); int v = 256; byte[] readBuffer = new byte[v]; assertEquals(v, instream.read(128, readBuffer, 0, v)); @@ -322,6 +339,7 @@ public void testPositionedBulkReadDoesntChangePosition() throws Throwable { assertEquals(40000, instream.getPos()); //content is the same too assertEquals("@40000", block[40000], (byte) instream.read()); + assertAvailable(instream); //now verify the picked up data for (int i = 0; i < 256; i++) { assertEquals("@" + i, block[i + 128], readBuffer[i]); @@ -376,6 +394,7 @@ public void testReadFullyZeroByteFile() throws Throwable { assertEquals(0, instream.getPos()); byte[] buffer = new byte[1]; instream.readFully(0, buffer, 0, 0); + assertNotAvailable(instream); assertEquals(0, instream.getPos()); // seek to 0 read 0 bytes from it instream.seek(0); @@ -587,6 +606,16 @@ public void testReadAtExactEOF() throws Throwable { instream = getFileSystem().open(smallSeekFile); instream.seek(TEST_FILE_LEN -1); assertTrue("read at last byte", instream.read() > 0); + assertNotAvailable(instream); assertEquals("read just past EOF", -1, instream.read()); } + + private void assertAvailable(InputStream inputStream) throws IOException { + assertTrue("Data available in " + instream, inputStream.available() >0 ); + } + + private void assertNotAvailable(InputStream inputStream) throws IOException { + assertTrue("The steam is not available", inputStream.available() == 0); + } + } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java index 68f98e4abead2..ab3433dade3f5 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java @@ -38,6 +38,7 @@ import java.io.IOException; import java.net.SocketTimeoutException; +import static java.lang.Math.min; import static org.apache.commons.lang3.StringUtils.isNotEmpty; /** @@ -192,7 +193,7 @@ private synchronized void reopen(String reason, long targetPos, long length, } @Override - public synchronized long getPos() throws IOException { + public synchronized long getPos() { return (nextReadPos < 0) ? 0 : nextReadPos; } @@ -253,7 +254,7 @@ private void seekInStream(long targetPos, long length) throws IOException { // then choose whichever comes first: the range or the EOF long remainingInCurrentRequest = remainingInCurrentRequest(); - long forwardSeekLimit = Math.min(remainingInCurrentRequest, + long forwardSeekLimit = min(remainingInCurrentRequest, forwardSeekRange); boolean skipForward = remainingInCurrentRequest > 0 && diff <= forwardSeekLimit; @@ -593,12 +594,13 @@ public synchronized boolean resetConnection() throws IOException { @Override public synchronized int available() throws IOException { checkNotClosed(); - + int availableSize = this.wrappedStream == null ? 0 : this.wrappedStream.available(); long remaining = remainingInFile(); if (remaining > Integer.MAX_VALUE) { return Integer.MAX_VALUE; } - return (int)remaining; + long validatedSize = min(availableSize, remaining); + return (int)validatedSize; } /** @@ -607,8 +609,8 @@ public synchronized int available() throws IOException { */ @InterfaceAudience.Private @InterfaceStability.Unstable - public synchronized long remainingInFile() { - return this.contentLength - this.pos; + public synchronized long remainingInFile() throws IOException { + return this.contentLength - this.getPos(); } /** @@ -619,7 +621,7 @@ public synchronized long remainingInFile() { @InterfaceAudience.Private @InterfaceStability.Unstable public synchronized long remainingInCurrentRequest() { - return this.contentRangeFinish - this.pos; + return this.contentRangeFinish - this.getPos(); } @InterfaceAudience.Private @@ -776,7 +778,7 @@ static long calculateRequestLimit( } // cannot read past the end of the object - rangeLimit = Math.min(contentLength, rangeLimit); + rangeLimit = min(contentLength, rangeLimit); return rangeLimit; }