Skip to content

Commit 266b1bd

Browse files
authored
HADOOP-17812. NPE in S3AInputStream read() after failure to reconnect to store (#3222)
This improves error handling after multiple failures reading data -when the read fails and attempts to reconnect() also fail. Contributed by Bobby Wang.
1 parent a218038 commit 266b1bd

File tree

2 files changed

+38
-10
lines changed

2 files changed

+38
-10
lines changed

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -418,15 +418,21 @@ public synchronized int read() throws IOException {
418418
int byteRead = invoker.retry("read", pathStr, true,
419419
() -> {
420420
int b;
421+
// When exception happens before re-setting wrappedStream in "reopen" called
422+
// by onReadFailure, then wrappedStream will be null. But the **retry** may
423+
// re-execute this block and cause NPE if we don't check wrappedStream
424+
if (wrappedStream == null) {
425+
reopen("failure recovery", getPos(), 1, false);
426+
}
421427
try {
422428
b = wrappedStream.read();
423429
} catch (EOFException e) {
424430
return -1;
425431
} catch (SocketTimeoutException e) {
426-
onReadFailure(e, 1, true);
432+
onReadFailure(e, true);
427433
throw e;
428434
} catch (IOException e) {
429-
onReadFailure(e, 1, false);
435+
onReadFailure(e, false);
430436
throw e;
431437
}
432438
return b;
@@ -444,15 +450,12 @@ public synchronized int read() throws IOException {
444450
}
445451

446452
/**
447-
* Handle an IOE on a read by attempting to re-open the stream.
453+
* Close the stream on read failure.
448454
* The filesystem's readException count will be incremented.
449455
* @param ioe exception caught.
450-
* @param length length of data being attempted to read
451-
* @throws IOException any exception thrown on the re-open attempt.
452456
*/
453457
@Retries.OnceTranslated
454-
private void onReadFailure(IOException ioe, int length, boolean forceAbort)
455-
throws IOException {
458+
private void onReadFailure(IOException ioe, boolean forceAbort) {
456459
if (LOG.isDebugEnabled()) {
457460
LOG.debug("Got exception while trying to read from stream {}, " +
458461
"client: {} object: {}, trying to recover: ",
@@ -463,7 +466,7 @@ private void onReadFailure(IOException ioe, int length, boolean forceAbort)
463466
uri, client, object);
464467
}
465468
streamStatistics.readException();
466-
reopen("failure recovery", pos, length, forceAbort);
469+
closeStream("failure recovery", contentRangeFinish, forceAbort);
467470
}
468471

469472
/**
@@ -506,16 +509,22 @@ public synchronized int read(byte[] buf, int off, int len)
506509
int bytesRead = invoker.retry("read", pathStr, true,
507510
() -> {
508511
int bytes;
512+
// When exception happens before re-setting wrappedStream in "reopen" called
513+
// by onReadFailure, then wrappedStream will be null. But the **retry** may
514+
// re-execute this block and cause NPE if we don't check wrappedStream
515+
if (wrappedStream == null) {
516+
reopen("failure recovery", getPos(), 1, false);
517+
}
509518
try {
510519
bytes = wrappedStream.read(buf, off, len);
511520
} catch (EOFException e) {
512521
// the base implementation swallows EOFs.
513522
return -1;
514523
} catch (SocketTimeoutException e) {
515-
onReadFailure(e, len, true);
524+
onReadFailure(e, true);
516525
throw e;
517526
} catch (IOException e) {
518-
onReadFailure(e, len, false);
527+
onReadFailure(e, false);
519528
throw e;
520529
}
521530
return bytes;

hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AInputStreamRetry.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.net.SocketException;
2424
import java.nio.charset.Charset;
2525

26+
import com.amazonaws.SdkClientException;
2627
import com.amazonaws.services.s3.model.GetObjectRequest;
2728
import com.amazonaws.services.s3.model.ObjectMetadata;
2829
import com.amazonaws.services.s3.model.S3Object;
@@ -120,10 +121,28 @@ private S3AInputStream.InputStreamCallbacks getMockedInputStreamCallback() {
120121
return new S3AInputStream.InputStreamCallbacks() {
121122

122123
private final S3Object mockedS3Object = getMockedS3Object();
124+
private Integer mockedS3ObjectIndex = 0;
123125

124126
@Override
125127
public S3Object getObject(GetObjectRequest request) {
126128
// Set s3 client to return mocked s3object with defined read behavior.
129+
mockedS3ObjectIndex++;
130+
// open() -> lazySeek() -> reopen()
131+
// -> getObject (mockedS3ObjectIndex=1) -> getObjectContent(objectInputStreamBad1)
132+
// read() -> objectInputStreamBad1 throws exception
133+
// -> onReadFailure -> close wrappedStream
134+
// -> retry(1) -> wrappedStream==null -> reopen -> getObject (mockedS3ObjectIndex=2)
135+
// -> getObjectContent(objectInputStreamBad2)-> objectInputStreamBad2
136+
// -> wrappedStream.read -> objectInputStreamBad2 throws exception
137+
// -> onReadFailure -> close wrappedStream
138+
// -> retry(2) -> wrappedStream==null -> reopen
139+
// -> getObject (mockedS3ObjectIndex=3) throws exception
140+
// -> retry(3) -> wrappedStream==null -> reopen -> getObject (mockedS3ObjectIndex=4)
141+
// -> getObjectContent(objectInputStreamGood)-> objectInputStreamGood
142+
// -> wrappedStream.read
143+
if (mockedS3ObjectIndex == 3) {
144+
throw new SdkClientException("Failed to get S3Object");
145+
}
127146
return mockedS3Object;
128147
}
129148

0 commit comments

Comments
 (0)