From c9f4bdebb139c740ac28862f15d2ec2586271043 Mon Sep 17 00:00:00 2001 From: Bobby Wang Date: Thu, 22 Jul 2021 08:34:36 +0800 Subject: [PATCH 1/3] Fix s3a NPE when reading When IOException happens during "wrappedStream.read", onReadFailure->reopen will be called and reopen will try to re-open "wrappedStream", but what if exception happens during getting S3Object, then "wrappedStream" will be null, finally, the "retry" may re-execute the read block and cause the NPE --- .../org/apache/hadoop/fs/s3a/S3AInputStream.java | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java index b65dcc95293ce..c8dcc725d4de4 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java @@ -419,6 +419,12 @@ public synchronized int read() throws IOException { () -> { int b; try { + // When exception happens before re-setting wrappedStream in "reopen" called + // by onReadFailure, then wrappedStream will be null. But the **retry** may + // re-execute this block and cause NPE if we don't check wrappedStream + if (wrappedStream == null) { + throw new IOException("Null IO stream for reading " + uri); + } b = wrappedStream.read(); } catch (EOFException e) { return -1; @@ -507,6 +513,12 @@ public synchronized int read(byte[] buf, int off, int len) () -> { int bytes; try { + // When exception happens before re-setting wrappedStream in "reopen" called + // by onReadFailure, then wrappedStream will be null. But the **retry** may + // re-execute this block and cause NPE if we don't check wrappedStream + if (wrappedStream == null) { + throw new IOException("Null IO stream for reading " + uri); + } bytes = wrappedStream.read(buf, off, len); } catch (EOFException e) { // the base implementation swallows EOFs. From 68755aa092e84a2b8994c9d5e1f35a9c76d7e614 Mon Sep 17 00:00:00 2001 From: Bobby Wang Date: Sun, 25 Jul 2021 07:46:21 +0800 Subject: [PATCH 2/3] add test --- .../hadoop/fs/s3a/TestS3AInputStreamRetry.java | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AInputStreamRetry.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AInputStreamRetry.java index 05a07ce444c92..90f6af26d4837 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AInputStreamRetry.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AInputStreamRetry.java @@ -23,6 +23,7 @@ import java.net.SocketException; import java.nio.charset.Charset; +import com.amazonaws.SdkClientException; import com.amazonaws.services.s3.model.GetObjectRequest; import com.amazonaws.services.s3.model.ObjectMetadata; import com.amazonaws.services.s3.model.S3Object; @@ -120,10 +121,23 @@ private S3AInputStream.InputStreamCallbacks getMockedInputStreamCallback() { return new S3AInputStream.InputStreamCallbacks() { private final S3Object mockedS3Object = getMockedS3Object(); + private Integer mockedS3ObjectIndex = 0; @Override public S3Object getObject(GetObjectRequest request) { // Set s3 client to return mocked s3object with defined read behavior. + mockedS3ObjectIndex++; + // open() -> lazySeek() -> reopen() + // -> getObject (mockedS3ObjectIndex=1) -> getObjectContent(objectInputStreamBad1) + // read() -> objectInputStreamBad1 throws exception -> onReadFailure -> reopen + // -> getObject (mockedS3ObjectIndex=2) -> getObjectContent(objectInputStreamBad2) + // -> retry -> wrappedStream.read -> objectInputStreamBad2 throws exception + // -> onReadFailure -> reopen -> getObject (mockedS3ObjectIndex=3) throws exception + // -> reopen throws exception + // -> retry -> re-execute wrappedStream.read (we need to check if wrappedStream == null) + if (mockedS3ObjectIndex == 3) { + throw new SdkClientException("Failed to get S3Object"); + } return mockedS3Object; } From d7659fbc5e6883579e759c40a5faf6f000326c3a Mon Sep 17 00:00:00 2001 From: Bobby Wang Date: Wed, 28 Jul 2021 07:02:49 +0800 Subject: [PATCH 3/3] resolve comments --- .../apache/hadoop/fs/s3a/S3AInputStream.java | 41 +++++++++---------- .../fs/s3a/TestS3AInputStreamRetry.java | 17 +++++--- 2 files changed, 30 insertions(+), 28 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java index c8dcc725d4de4..86317ab21c089 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java @@ -418,21 +418,21 @@ public synchronized int read() throws IOException { int byteRead = invoker.retry("read", pathStr, true, () -> { int b; + // When exception happens before re-setting wrappedStream in "reopen" called + // by onReadFailure, then wrappedStream will be null. But the **retry** may + // re-execute this block and cause NPE if we don't check wrappedStream + if (wrappedStream == null) { + reopen("failure recovery", getPos(), 1, false); + } try { - // When exception happens before re-setting wrappedStream in "reopen" called - // by onReadFailure, then wrappedStream will be null. But the **retry** may - // re-execute this block and cause NPE if we don't check wrappedStream - if (wrappedStream == null) { - throw new IOException("Null IO stream for reading " + uri); - } b = wrappedStream.read(); } catch (EOFException e) { return -1; } catch (SocketTimeoutException e) { - onReadFailure(e, 1, true); + onReadFailure(e, true); throw e; } catch (IOException e) { - onReadFailure(e, 1, false); + onReadFailure(e, false); throw e; } return b; @@ -450,15 +450,12 @@ public synchronized int read() throws IOException { } /** - * Handle an IOE on a read by attempting to re-open the stream. + * Close the stream on read failure. * The filesystem's readException count will be incremented. * @param ioe exception caught. - * @param length length of data being attempted to read - * @throws IOException any exception thrown on the re-open attempt. */ @Retries.OnceTranslated - private void onReadFailure(IOException ioe, int length, boolean forceAbort) - throws IOException { + private void onReadFailure(IOException ioe, boolean forceAbort) { if (LOG.isDebugEnabled()) { LOG.debug("Got exception while trying to read from stream {}, " + "client: {} object: {}, trying to recover: ", @@ -469,7 +466,7 @@ private void onReadFailure(IOException ioe, int length, boolean forceAbort) uri, client, object); } streamStatistics.readException(); - reopen("failure recovery", pos, length, forceAbort); + closeStream("failure recovery", contentRangeFinish, forceAbort); } /** @@ -512,22 +509,22 @@ public synchronized int read(byte[] buf, int off, int len) int bytesRead = invoker.retry("read", pathStr, true, () -> { int bytes; + // When exception happens before re-setting wrappedStream in "reopen" called + // by onReadFailure, then wrappedStream will be null. But the **retry** may + // re-execute this block and cause NPE if we don't check wrappedStream + if (wrappedStream == null) { + reopen("failure recovery", getPos(), 1, false); + } try { - // When exception happens before re-setting wrappedStream in "reopen" called - // by onReadFailure, then wrappedStream will be null. But the **retry** may - // re-execute this block and cause NPE if we don't check wrappedStream - if (wrappedStream == null) { - throw new IOException("Null IO stream for reading " + uri); - } bytes = wrappedStream.read(buf, off, len); } catch (EOFException e) { // the base implementation swallows EOFs. return -1; } catch (SocketTimeoutException e) { - onReadFailure(e, len, true); + onReadFailure(e, true); throw e; } catch (IOException e) { - onReadFailure(e, len, false); + onReadFailure(e, false); throw e; } return bytes; diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AInputStreamRetry.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AInputStreamRetry.java index 90f6af26d4837..beeb6676a7ba0 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AInputStreamRetry.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AInputStreamRetry.java @@ -129,12 +129,17 @@ public S3Object getObject(GetObjectRequest request) { mockedS3ObjectIndex++; // open() -> lazySeek() -> reopen() // -> getObject (mockedS3ObjectIndex=1) -> getObjectContent(objectInputStreamBad1) - // read() -> objectInputStreamBad1 throws exception -> onReadFailure -> reopen - // -> getObject (mockedS3ObjectIndex=2) -> getObjectContent(objectInputStreamBad2) - // -> retry -> wrappedStream.read -> objectInputStreamBad2 throws exception - // -> onReadFailure -> reopen -> getObject (mockedS3ObjectIndex=3) throws exception - // -> reopen throws exception - // -> retry -> re-execute wrappedStream.read (we need to check if wrappedStream == null) + // read() -> objectInputStreamBad1 throws exception + // -> onReadFailure -> close wrappedStream + // -> retry(1) -> wrappedStream==null -> reopen -> getObject (mockedS3ObjectIndex=2) + // -> getObjectContent(objectInputStreamBad2)-> objectInputStreamBad2 + // -> wrappedStream.read -> objectInputStreamBad2 throws exception + // -> onReadFailure -> close wrappedStream + // -> retry(2) -> wrappedStream==null -> reopen + // -> getObject (mockedS3ObjectIndex=3) throws exception + // -> retry(3) -> wrappedStream==null -> reopen -> getObject (mockedS3ObjectIndex=4) + // -> getObjectContent(objectInputStreamGood)-> objectInputStreamGood + // -> wrappedStream.read if (mockedS3ObjectIndex == 3) { throw new SdkClientException("Failed to get S3Object"); }