2222import java .io .Closeable ;
2323import java .io .EOFException ;
2424import java .io .IOException ;
25+ import java .io .InputStream ;
2526import java .io .InterruptedIOException ;
2627import java .net .SocketTimeoutException ;
2728import java .nio .ByteBuffer ;
3132import java .util .concurrent .atomic .AtomicBoolean ;
3233import java .util .function .IntFunction ;
3334
34- import com .amazonaws .services .s3 .model .GetObjectRequest ;
35- import com .amazonaws .services .s3 .model .S3Object ;
36- import com .amazonaws .services .s3 .model .S3ObjectInputStream ;
3735import org .slf4j .Logger ;
3836import org .slf4j .LoggerFactory ;
3937
4644import org .apache .hadoop .fs .FSExceptionMessages ;
4745import org .apache .hadoop .fs .FSInputStream ;
4846import org .apache .hadoop .fs .FileRange ;
49- import org .apache .hadoop .fs .PathIOException ;
5047import org .apache .hadoop .fs .StreamCapabilities ;
5148import org .apache .hadoop .fs .impl .CombinedFileRange ;
5249import org .apache .hadoop .fs .VectoredReadUtils ;
6158import org .apache .hadoop .io .IOUtils ;
6259import org .apache .hadoop .util .functional .CallableRaisingIOE ;
6360
61+ import software .amazon .awssdk .core .ResponseInputStream ;
62+ import software .amazon .awssdk .services .s3 .model .GetObjectRequest ;
63+ import software .amazon .awssdk .services .s3 .model .GetObjectResponse ;
64+
6465import static java .util .Objects .requireNonNull ;
6566import static org .apache .commons .lang3 .StringUtils .isNotEmpty ;
6667import static org .apache .hadoop .fs .VectoredReadUtils .isOrderedDisjoint ;
@@ -125,14 +126,9 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
125126 */
126127 private volatile boolean closed ;
127128 /**
128- * wrappedStream is associated with an object (instance of S3Object). When
129- * the object is garbage collected, the associated wrappedStream will be
130- * closed. Keep a reference to this object to prevent the wrapperStream
131- * still in use from being closed unexpectedly due to garbage collection.
132- * See HADOOP-17338 for details.
129+ * Input stream returned by a getObject call.
133130 */
134- private S3Object object ;
135- private S3ObjectInputStream wrappedStream ;
131+ private ResponseInputStream <GetObjectResponse > wrappedStream ;
136132 private final S3AReadOpContext context ;
137133 private final InputStreamCallbacks client ;
138134
@@ -271,28 +267,22 @@ private synchronized void reopen(String reason, long targetPos, long length,
271267 uri , reason , targetPos , contentRangeFinish , length , pos , nextReadPos ,
272268 inputPolicy );
273269
270+ GetObjectRequest request = client .newGetRequestBuilder (key )
271+ .range (S3AUtils .formatRange (targetPos , contentRangeFinish - 1 ))
272+ .applyMutation (changeTracker ::maybeApplyConstraint )
273+ .build ();
274274 long opencount = streamStatistics .streamOpened ();
275- GetObjectRequest request = client .newGetRequest (key )
276- .withRange (targetPos , contentRangeFinish - 1 );
277275 String operation = opencount == 0 ? OPERATION_OPEN : OPERATION_REOPEN ;
278276 String text = String .format ("%s %s at %d" ,
279277 operation , uri , targetPos );
280- changeTracker .maybeApplyConstraint (request );
281-
282- object = onceTrackingDuration (text , uri ,
278+ wrappedStream = onceTrackingDuration (text , uri ,
283279 streamStatistics .initiateGetRequest (), () ->
284280 client .getObject (request ));
285281
286-
287- changeTracker .processResponse (object , operation ,
282+ changeTracker .processResponse (wrappedStream .response (), operation ,
288283 targetPos );
289- wrappedStream = object .getObjectContent ();
290- contentRangeStart = targetPos ;
291- if (wrappedStream == null ) {
292- throw new PathIOException (uri ,
293- "Null IO stream from " + operation + " of (" + reason + ") " );
294- }
295284
285+ contentRangeStart = targetPos ;
296286 this .pos = targetPos ;
297287 }
298288
@@ -505,14 +495,15 @@ public synchronized int read() throws IOException {
505495 */
506496 @ Retries .OnceTranslated
507497 private void onReadFailure (IOException ioe , boolean forceAbort ) {
498+ GetObjectResponse objectResponse = wrappedStream == null ? null : wrappedStream .response ();
508499 if (LOG .isDebugEnabled ()) {
509500 LOG .debug ("Got exception while trying to read from stream {}, " +
510501 "client: {} object: {}, trying to recover: " ,
511- uri , client , object , ioe );
502+ uri , client , objectResponse , ioe );
512503 } else {
513504 LOG .info ("Got exception while trying to read from stream {}, " +
514505 "client: {} object: {}, trying to recover: " + ioe ,
515- uri , client , object );
506+ uri , client , objectResponse );
516507 }
517508 streamStatistics .readException ();
518509 closeStream ("failure recovery" , forceAbort , false );
@@ -672,7 +663,6 @@ private CompletableFuture<Boolean> closeStream(
672663 CompletableFuture <Boolean > operation ;
673664 SDKStreamDrainer drainer = new SDKStreamDrainer (
674665 uri ,
675- object ,
676666 wrappedStream ,
677667 shouldAbort ,
678668 (int ) remaining ,
@@ -694,7 +684,6 @@ private CompletableFuture<Boolean> closeStream(
694684 // either the stream is closed in the blocking call or the async call is
695685 // submitted with its own copy of the references
696686 wrappedStream = null ;
697- object = null ;
698687 return operation ;
699688 }
700689
@@ -910,29 +899,21 @@ public void readVectored(List<? extends FileRange> ranges,
910899 private void readCombinedRangeAndUpdateChildren (CombinedFileRange combinedFileRange ,
911900 IntFunction <ByteBuffer > allocate ) {
912901 LOG .debug ("Start reading combined range {} from path {} " , combinedFileRange , pathStr );
913- // This reference is must be kept till all buffers are populated as this is a
914- // finalizable object which closes the internal stream when gc triggers.
915- S3Object objectRange = null ;
916- S3ObjectInputStream objectContent = null ;
902+ ResponseInputStream <GetObjectResponse > rangeContent = null ;
917903 try {
918904 checkIfVectoredIOStopped ();
919905 final String operationName = "readCombinedFileRange" ;
920- objectRange = getS3Object (operationName ,
906+ rangeContent = getS3Object (operationName ,
921907 combinedFileRange .getOffset (),
922908 combinedFileRange .getLength ());
923- objectContent = objectRange .getObjectContent ();
924- if (objectContent == null ) {
925- throw new PathIOException (uri ,
926- "Null IO stream received during " + operationName );
927- }
928- populateChildBuffers (combinedFileRange , objectContent , allocate );
909+ populateChildBuffers (combinedFileRange , rangeContent , allocate );
929910 } catch (Exception ex ) {
930911 LOG .debug ("Exception while reading a range {} from path {} " , combinedFileRange , pathStr , ex );
931912 for (FileRange child : combinedFileRange .getUnderlying ()) {
932913 child .getData ().completeExceptionally (ex );
933914 }
934915 } finally {
935- IOUtils .cleanupWithLogger (LOG , objectRange , objectContent );
916+ IOUtils .cleanupWithLogger (LOG , rangeContent );
936917 }
937918 LOG .debug ("Finished reading range {} from path {} " , combinedFileRange , pathStr );
938919 }
@@ -945,7 +926,7 @@ private void readCombinedRangeAndUpdateChildren(CombinedFileRange combinedFileRa
945926 * @throws IOException any IOE.
946927 */
947928 private void populateChildBuffers (CombinedFileRange combinedFileRange ,
948- S3ObjectInputStream objectContent ,
929+ InputStream objectContent ,
949930 IntFunction <ByteBuffer > allocate ) throws IOException {
950931 // If the combined file range just contains a single child
951932 // range, we only have to fill that one child buffer else
@@ -977,7 +958,7 @@ private void populateChildBuffers(CombinedFileRange combinedFileRange,
977958 * @param drainQuantity how many bytes to drain.
978959 * @throws IOException any IOE.
979960 */
980- private void drainUnnecessaryData (S3ObjectInputStream objectContent , long drainQuantity )
961+ private void drainUnnecessaryData (InputStream objectContent , long drainQuantity )
981962 throws IOException {
982963 int drainBytes = 0 ;
983964 int readCount ;
@@ -1019,26 +1000,20 @@ private void validateRangeRequest(FileRange range) throws EOFException {
10191000 */
10201001 private void readSingleRange (FileRange range , ByteBuffer buffer ) {
10211002 LOG .debug ("Start reading range {} from path {} " , range , pathStr );
1022- S3Object objectRange = null ;
1023- S3ObjectInputStream objectContent = null ;
1003+ ResponseInputStream <GetObjectResponse > objectRange = null ;
10241004 try {
10251005 checkIfVectoredIOStopped ();
10261006 long position = range .getOffset ();
10271007 int length = range .getLength ();
10281008 final String operationName = "readRange" ;
10291009 objectRange = getS3Object (operationName , position , length );
1030- objectContent = objectRange .getObjectContent ();
1031- if (objectContent == null ) {
1032- throw new PathIOException (uri ,
1033- "Null IO stream received during " + operationName );
1034- }
1035- populateBuffer (length , buffer , objectContent );
1010+ populateBuffer (length , buffer , objectRange );
10361011 range .getData ().complete (buffer );
10371012 } catch (Exception ex ) {
10381013 LOG .warn ("Exception while reading a range {} from path {} " , range , pathStr , ex );
10391014 range .getData ().completeExceptionally (ex );
10401015 } finally {
1041- IOUtils .cleanupWithLogger (LOG , objectRange , objectContent );
1016+ IOUtils .cleanupWithLogger (LOG , objectRange );
10421017 }
10431018 LOG .debug ("Finished reading range {} from path {} " , range , pathStr );
10441019 }
@@ -1053,7 +1028,7 @@ private void readSingleRange(FileRange range, ByteBuffer buffer) {
10531028 */
10541029 private void populateBuffer (int length ,
10551030 ByteBuffer buffer ,
1056- S3ObjectInputStream objectContent ) throws IOException {
1031+ InputStream objectContent ) throws IOException {
10571032
10581033 if (buffer .isDirect ()) {
10591034 VectoredReadUtils .readInDirectBuffer (length , buffer ,
@@ -1078,7 +1053,7 @@ private void populateBuffer(int length,
10781053 * @param length number of bytes to fill in dest.
10791054 * @throws IOException any IOE.
10801055 */
1081- private void readByteArray (S3ObjectInputStream objectContent ,
1056+ private void readByteArray (InputStream objectContent ,
10821057 byte [] dest ,
10831058 int offset ,
10841059 int length ) throws IOException {
@@ -1105,13 +1080,16 @@ private void readByteArray(S3ObjectInputStream objectContent,
11051080 * @return S3Object result s3 object.
11061081 * @throws IOException exception if any.
11071082 */
1108- private S3Object getS3Object (String operationName , long position ,
1109- int length ) throws IOException {
1110- final GetObjectRequest request = client .newGetRequest (key )
1111- .withRange (position , position + length - 1 );
1112- changeTracker .maybeApplyConstraint (request );
1083+ private ResponseInputStream <GetObjectResponse > getS3Object (String operationName ,
1084+ long position ,
1085+ int length )
1086+ throws IOException {
1087+ final GetObjectRequest request = client .newGetRequestBuilder (key )
1088+ .range (S3AUtils .formatRange (position , position + length - 1 ))
1089+ .applyMutation (changeTracker ::maybeApplyConstraint )
1090+ .build ();
11131091 DurationTracker tracker = streamStatistics .initiateGetRequest ();
1114- S3Object objectRange ;
1092+ ResponseInputStream < GetObjectResponse > objectRange ;
11151093 Invoker invoker = context .getReadInvoker ();
11161094 try {
11171095 objectRange = invoker .retry (operationName , pathStr , true ,
@@ -1126,7 +1104,7 @@ private S3Object getS3Object(String operationName, long position,
11261104 } finally {
11271105 tracker .close ();
11281106 }
1129- changeTracker .processResponse (objectRange , operationName ,
1107+ changeTracker .processResponse (objectRange . response () , operationName ,
11301108 position );
11311109 return objectRange ;
11321110 }
@@ -1279,19 +1257,19 @@ public IOStatistics getIOStatistics() {
12791257 public interface InputStreamCallbacks extends Closeable {
12801258
12811259 /**
1282- * Create a GET request.
1260+ * Create a GET request builder .
12831261 * @param key object key
1284- * @return the request
1262+ * @return the request builder
12851263 */
1286- GetObjectRequest newGetRequest (String key );
1264+ GetObjectRequest . Builder newGetRequestBuilder (String key );
12871265
12881266 /**
12891267 * Execute the request.
12901268 * @param request the request
12911269 * @return the response
12921270 */
12931271 @ Retries .OnceRaw
1294- S3Object getObject (GetObjectRequest request );
1272+ ResponseInputStream < GetObjectResponse > getObject (GetObjectRequest request );
12951273
12961274 /**
12971275 * Submit some asynchronous work, for example, draining a stream.
0 commit comments