From ab1b1efb1aa5213c07db18ad29146908ac7c206d Mon Sep 17 00:00:00 2001 From: lqjaclee Date: Tue, 23 Oct 2018 22:38:02 +0800 Subject: [PATCH 1/5] HADOOP-15870 Fix the nextPos --- .../main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java index 68f98e4abead2..48cbeef2c6c10 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java @@ -607,8 +607,8 @@ public synchronized int available() throws IOException { */ @InterfaceAudience.Private @InterfaceStability.Unstable - public synchronized long remainingInFile() { - return this.contentLength - this.pos; + public synchronized long remainingInFile() throws IOException { + return this.contentLength - this.getPos(); } /** From 881ee4bbf25a28a9f58485ab3ab06aa6105ec784 Mon Sep 17 00:00:00 2001 From: lqjaclee Date: Wed, 24 Oct 2018 22:19:33 +0800 Subject: [PATCH 2/5] HADOOP-15870 1, Fix remainingInCurrentRequest 2, append assert in the Junit Test --- .../fs/contract/AbstractContractSeekTest.java | 21 ++++++++++++++++++- .../apache/hadoop/fs/s3a/S3AInputStream.java | 4 ++-- 2 files changed, 22 insertions(+), 3 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractSeekTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractSeekTest.java index 3c1377a5a4981..69be0f452f1f2 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractSeekTest.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractSeekTest.java @@ -99,14 +99,18 @@ public void testSeekZeroByteFile() throws Throwable { describe("seek and read a 0 byte file"); instream = getFileSystem().open(zeroByteFile); assertEquals(0, instream.getPos()); + assertEquals(0,instream.available()); //expect initial read to fai; int result = instream.read(); assertMinusOne("initial byte read", result); + assertEquals(0,instream.available()); byte[] buffer = new byte[1]; //expect that seek to 0 works instream.seek(0); + assertEquals(0,instream.available()); //reread, expect same exception result = instream.read(); + assertEquals(0,instream.available()); assertMinusOne("post-seek byte read", result); result = instream.read(buffer, 0, 1); assertMinusOne("post-seek buffer read", result); @@ -172,6 +176,11 @@ public void testSeekReadClosedFile() throws Throwable { // sure there's no other exception like an NPE. } + try{ + instream.available(); + }catch (IOException | IllegalStateException e) { + // expected a closed file + } //and close again instream.close(); } @@ -194,6 +203,7 @@ public void testNegativeSeek() throws Throwable { //bad seek -expected, but not as preferred as an EOFException handleRelaxedException("a negative seek", "EOFException", e); } + assertTrue("The available should be positive integer",instream.available() > 0); assertEquals(0, instream.getPos()); } @@ -205,6 +215,7 @@ public void testSeekFile() throws Throwable { //expect that seek to 0 works instream.seek(0); int result = instream.read(); + assertTrue("The available should be positive integer",instream.available() > 0); assertEquals(0, result); assertEquals(1, instream.read()); assertEquals(2, instream.getPos()); @@ -226,7 +237,9 @@ public void testSeekAndReadPastEndOfFile() throws Throwable { //go just before the end instream.seek(TEST_FILE_LEN - 2); assertTrue("Premature EOF", instream.read() != -1); + assertTrue("The available should be positive integer",instream.available() > 0); assertTrue("Premature EOF", instream.read() != -1); + assertTrue("The available should be positive integer",instream.available() > 0); assertMinusOne("read past end of file", instream.read()); } @@ -261,6 +274,7 @@ public void testSeekPastEndOfFileThenReseekAndRead() throws Throwable { //now go back and try to read from a valid point in the file instream.seek(1); assertTrue("Premature EOF", instream.read() != -1); + assertTrue("The available should be positive integer",instream.available() > 0); } /** @@ -278,6 +292,7 @@ public void testSeekBigFile() throws Throwable { //expect that seek to 0 works instream.seek(0); int result = instream.read(); + assertTrue("The available should be positive integer",instream.available() > 0); assertEquals(0, result); assertEquals(1, instream.read()); assertEquals(2, instream.read()); @@ -296,6 +311,7 @@ public void testSeekBigFile() throws Throwable { instream.seek(0); assertEquals(0, instream.getPos()); instream.read(); + assertTrue("The available should be positive integer",instream.available() > 0); assertEquals(1, instream.getPos()); byte[] buf = new byte[80 * 1024]; instream.readFully(1, buf, 0, buf.length); @@ -314,7 +330,7 @@ public void testPositionedBulkReadDoesntChangePosition() throws Throwable { instream.seek(39999); assertTrue(-1 != instream.read()); assertEquals(40000, instream.getPos()); - + assertTrue("The available should be positive integer",instream.available() > 0); int v = 256; byte[] readBuffer = new byte[v]; assertEquals(v, instream.read(128, readBuffer, 0, v)); @@ -322,6 +338,7 @@ public void testPositionedBulkReadDoesntChangePosition() throws Throwable { assertEquals(40000, instream.getPos()); //content is the same too assertEquals("@40000", block[40000], (byte) instream.read()); + assertTrue("The available should be positive integer",instream.available() > 0); //now verify the picked up data for (int i = 0; i < 256; i++) { assertEquals("@" + i, block[i + 128], readBuffer[i]); @@ -376,6 +393,7 @@ public void testReadFullyZeroByteFile() throws Throwable { assertEquals(0, instream.getPos()); byte[] buffer = new byte[1]; instream.readFully(0, buffer, 0, 0); + assertEquals(0,instream.available()); assertEquals(0, instream.getPos()); // seek to 0 read 0 bytes from it instream.seek(0); @@ -587,6 +605,7 @@ public void testReadAtExactEOF() throws Throwable { instream = getFileSystem().open(smallSeekFile); instream.seek(TEST_FILE_LEN -1); assertTrue("read at last byte", instream.read() > 0); + assertTrue("The available should be positive integer",instream.available() > 0); assertEquals("read just past EOF", -1, instream.read()); } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java index 48cbeef2c6c10..1892d18142ad1 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java @@ -192,7 +192,7 @@ private synchronized void reopen(String reason, long targetPos, long length, } @Override - public synchronized long getPos() throws IOException { + public synchronized long getPos() { return (nextReadPos < 0) ? 0 : nextReadPos; } @@ -619,7 +619,7 @@ public synchronized long remainingInFile() throws IOException { @InterfaceAudience.Private @InterfaceStability.Unstable public synchronized long remainingInCurrentRequest() { - return this.contentRangeFinish - this.pos; + return this.contentRangeFinish - this.getPos(); } @InterfaceAudience.Private From 755a6425ec67afbe37532dd8e44349a9e153197f Mon Sep 17 00:00:00 2001 From: lqjaclee Date: Sun, 18 Nov 2018 14:16:47 +0800 Subject: [PATCH 3/5] HADOOP-15870 fix junit --- .../apache/hadoop/fs/contract/AbstractContractSeekTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractSeekTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractSeekTest.java index 69be0f452f1f2..4405ab02b9fce 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractSeekTest.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractSeekTest.java @@ -239,7 +239,7 @@ public void testSeekAndReadPastEndOfFile() throws Throwable { assertTrue("Premature EOF", instream.read() != -1); assertTrue("The available should be positive integer",instream.available() > 0); assertTrue("Premature EOF", instream.read() != -1); - assertTrue("The available should be positive integer",instream.available() > 0); + assertEquals("The available should be zero",0,instream.available()); assertMinusOne("read past end of file", instream.read()); } @@ -605,7 +605,7 @@ public void testReadAtExactEOF() throws Throwable { instream = getFileSystem().open(smallSeekFile); instream.seek(TEST_FILE_LEN -1); assertTrue("read at last byte", instream.read() > 0); - assertTrue("The available should be positive integer",instream.available() > 0); + assertEquals("The available should be positive integer",0,instream.available()); assertEquals("read just past EOF", -1, instream.read()); } } From 9cec90d202f1db35c986f1601535f2708b6f5b17 Mon Sep 17 00:00:00 2001 From: lqjaclee Date: Wed, 21 Nov 2018 23:24:10 +0800 Subject: [PATCH 4/5] HADOOP-15870 update the available logic fix junit --- .../hadoop/fs/contract/AbstractContractSeekTest.java | 2 +- .../java/org/apache/hadoop/fs/s3a/S3AInputStream.java | 10 ++++++---- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractSeekTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractSeekTest.java index 4405ab02b9fce..a1ffa4aded04b 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractSeekTest.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractSeekTest.java @@ -203,7 +203,7 @@ public void testNegativeSeek() throws Throwable { //bad seek -expected, but not as preferred as an EOFException handleRelaxedException("a negative seek", "EOFException", e); } - assertTrue("The available should be positive integer",instream.available() > 0); + assertEquals("The available should be zero",0,instream.available()); assertEquals(0, instream.getPos()); } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java index 1892d18142ad1..ab3433dade3f5 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java @@ -38,6 +38,7 @@ import java.io.IOException; import java.net.SocketTimeoutException; +import static java.lang.Math.min; import static org.apache.commons.lang3.StringUtils.isNotEmpty; /** @@ -253,7 +254,7 @@ private void seekInStream(long targetPos, long length) throws IOException { // then choose whichever comes first: the range or the EOF long remainingInCurrentRequest = remainingInCurrentRequest(); - long forwardSeekLimit = Math.min(remainingInCurrentRequest, + long forwardSeekLimit = min(remainingInCurrentRequest, forwardSeekRange); boolean skipForward = remainingInCurrentRequest > 0 && diff <= forwardSeekLimit; @@ -593,12 +594,13 @@ public synchronized boolean resetConnection() throws IOException { @Override public synchronized int available() throws IOException { checkNotClosed(); - + int availableSize = this.wrappedStream == null ? 0 : this.wrappedStream.available(); long remaining = remainingInFile(); if (remaining > Integer.MAX_VALUE) { return Integer.MAX_VALUE; } - return (int)remaining; + long validatedSize = min(availableSize, remaining); + return (int)validatedSize; } /** @@ -776,7 +778,7 @@ static long calculateRequestLimit( } // cannot read past the end of the object - rangeLimit = Math.min(contentLength, rangeLimit); + rangeLimit = min(contentLength, rangeLimit); return rangeLimit; } From 1d3e21b0224bd7f58824a2a40a04dbcceaea5a9e Mon Sep 17 00:00:00 2001 From: lqjaclee Date: Thu, 22 Nov 2018 23:03:35 +0800 Subject: [PATCH 5/5] HADOOP-15870 Fix the nextPos --- .../fs/contract/AbstractContractSeekTest.java | 40 ++++++++++++------- 1 file changed, 25 insertions(+), 15 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractSeekTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractSeekTest.java index a1ffa4aded04b..7a55c22b405ad 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractSeekTest.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractSeekTest.java @@ -30,6 +30,7 @@ import java.io.EOFException; import java.io.IOException; +import java.io.InputStream; import java.util.Random; import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile; @@ -99,18 +100,18 @@ public void testSeekZeroByteFile() throws Throwable { describe("seek and read a 0 byte file"); instream = getFileSystem().open(zeroByteFile); assertEquals(0, instream.getPos()); - assertEquals(0,instream.available()); + assertNotAvailable(instream); //expect initial read to fai; int result = instream.read(); assertMinusOne("initial byte read", result); - assertEquals(0,instream.available()); + assertNotAvailable(instream); byte[] buffer = new byte[1]; //expect that seek to 0 works instream.seek(0); - assertEquals(0,instream.available()); + assertNotAvailable(instream); //reread, expect same exception result = instream.read(); - assertEquals(0,instream.available()); + assertNotAvailable(instream); assertMinusOne("post-seek byte read", result); result = instream.read(buffer, 0, 1); assertMinusOne("post-seek buffer read", result); @@ -203,7 +204,7 @@ public void testNegativeSeek() throws Throwable { //bad seek -expected, but not as preferred as an EOFException handleRelaxedException("a negative seek", "EOFException", e); } - assertEquals("The available should be zero",0,instream.available()); + assertTrue("The available should be zero",instream.available() >= 0); assertEquals(0, instream.getPos()); } @@ -215,7 +216,7 @@ public void testSeekFile() throws Throwable { //expect that seek to 0 works instream.seek(0); int result = instream.read(); - assertTrue("The available should be positive integer",instream.available() > 0); + assertAvailable(instream); assertEquals(0, result); assertEquals(1, instream.read()); assertEquals(2, instream.getPos()); @@ -237,9 +238,9 @@ public void testSeekAndReadPastEndOfFile() throws Throwable { //go just before the end instream.seek(TEST_FILE_LEN - 2); assertTrue("Premature EOF", instream.read() != -1); - assertTrue("The available should be positive integer",instream.available() > 0); + assertAvailable(instream); assertTrue("Premature EOF", instream.read() != -1); - assertEquals("The available should be zero",0,instream.available()); + assertNotAvailable(instream); assertMinusOne("read past end of file", instream.read()); } @@ -274,7 +275,7 @@ public void testSeekPastEndOfFileThenReseekAndRead() throws Throwable { //now go back and try to read from a valid point in the file instream.seek(1); assertTrue("Premature EOF", instream.read() != -1); - assertTrue("The available should be positive integer",instream.available() > 0); + assertAvailable(instream); } /** @@ -292,7 +293,7 @@ public void testSeekBigFile() throws Throwable { //expect that seek to 0 works instream.seek(0); int result = instream.read(); - assertTrue("The available should be positive integer",instream.available() > 0); + assertAvailable(instream); assertEquals(0, result); assertEquals(1, instream.read()); assertEquals(2, instream.read()); @@ -311,7 +312,7 @@ public void testSeekBigFile() throws Throwable { instream.seek(0); assertEquals(0, instream.getPos()); instream.read(); - assertTrue("The available should be positive integer",instream.available() > 0); + assertAvailable(instream); assertEquals(1, instream.getPos()); byte[] buf = new byte[80 * 1024]; instream.readFully(1, buf, 0, buf.length); @@ -330,7 +331,7 @@ public void testPositionedBulkReadDoesntChangePosition() throws Throwable { instream.seek(39999); assertTrue(-1 != instream.read()); assertEquals(40000, instream.getPos()); - assertTrue("The available should be positive integer",instream.available() > 0); + assertAvailable(instream); int v = 256; byte[] readBuffer = new byte[v]; assertEquals(v, instream.read(128, readBuffer, 0, v)); @@ -338,7 +339,7 @@ public void testPositionedBulkReadDoesntChangePosition() throws Throwable { assertEquals(40000, instream.getPos()); //content is the same too assertEquals("@40000", block[40000], (byte) instream.read()); - assertTrue("The available should be positive integer",instream.available() > 0); + assertAvailable(instream); //now verify the picked up data for (int i = 0; i < 256; i++) { assertEquals("@" + i, block[i + 128], readBuffer[i]); @@ -393,7 +394,7 @@ public void testReadFullyZeroByteFile() throws Throwable { assertEquals(0, instream.getPos()); byte[] buffer = new byte[1]; instream.readFully(0, buffer, 0, 0); - assertEquals(0,instream.available()); + assertNotAvailable(instream); assertEquals(0, instream.getPos()); // seek to 0 read 0 bytes from it instream.seek(0); @@ -605,7 +606,16 @@ public void testReadAtExactEOF() throws Throwable { instream = getFileSystem().open(smallSeekFile); instream.seek(TEST_FILE_LEN -1); assertTrue("read at last byte", instream.read() > 0); - assertEquals("The available should be positive integer",0,instream.available()); + assertNotAvailable(instream); assertEquals("read just past EOF", -1, instream.read()); } + + private void assertAvailable(InputStream inputStream) throws IOException { + assertTrue("Data available in " + instream, inputStream.available() >0 ); + } + + private void assertNotAvailable(InputStream inputStream) throws IOException { + assertTrue("The steam is not available", inputStream.available() == 0); + } + }