Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,14 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
* set
*/
private volatile boolean closed;
/**
* wrappedStream is associated with an object (instance of S3Object). When
* the object is garbage collected, the associated wrappedStream will be
* closed. Keep a reference to this object to prevent the wrapperStream
* still in use from being closed unexpectedly due to garbage collection.
* See HADOOP-17338 for details.
*/
private S3Object object;
private S3ObjectInputStream wrappedStream;
private final S3AReadOpContext context;
private final AmazonS3 client;
Expand Down Expand Up @@ -202,7 +210,7 @@ private synchronized void reopen(String reason, long targetPos, long length,
String text = String.format("%s %s at %d",
operation, uri, targetPos);
changeTracker.maybeApplyConstraint(request);
S3Object object = Invoker.once(text, uri,
object = Invoker.once(text, uri,
() -> client.getObject(request));

changeTracker.processResponse(object, operation,
Expand Down Expand Up @@ -430,9 +438,15 @@ public synchronized int read() throws IOException {
@Retries.OnceTranslated
private void onReadFailure(IOException ioe, int length, boolean forceAbort)
throws IOException {

LOG.info("Got exception while trying to read from stream {}" +
" trying to recover: " + ioe, uri);
if (LOG.isDebugEnabled()) {
LOG.debug("Got exception while trying to read from stream {}, " +
"client: {} object: {}, trying to recover: ",
uri, client, object, ioe);
} else {
LOG.info("Got exception while trying to read from stream {}, " +
"client: {} object: {}, trying to recover: " + ioe,
uri, client, object);
}
streamStatistics.readException();
reopen("failure recovery", pos, length, forceAbort);
}
Expand Down Expand Up @@ -550,14 +564,19 @@ public synchronized void close() throws IOException {
*/
@Retries.OnceRaw
private void closeStream(String reason, long length, boolean forceAbort) {
if (isObjectStreamOpen()) {
if (!isObjectStreamOpen()) {
// steam is already closed
return;
}

// if the amount of data remaining in the current request is greater
// than the readahead value: abort.
long remaining = remainingInCurrentRequest();
LOG.debug("Closing stream {}: {}", reason,
forceAbort ? "abort" : "soft");
boolean shouldAbort = forceAbort || remaining > readahead;

// if the amount of data remaining in the current request is greater
// than the readahead value: abort.
long remaining = remainingInCurrentRequest();
LOG.debug("Closing stream {}: {}", reason,
forceAbort ? "abort" : "soft");
boolean shouldAbort = forceAbort || remaining > readahead;
try {
if (!shouldAbort) {
try {
// clean close. This will read to the end of the stream,
Expand All @@ -578,25 +597,33 @@ private void closeStream(String reason, long length, boolean forceAbort) {
streamStatistics.streamClose(false, drained);
} catch (Exception e) {
// exception escalates to an abort
LOG.debug("When closing {} stream for {}", uri, reason, e);
LOG.debug("When closing {} stream for {}, will abort the stream",
uri, reason, e);
shouldAbort = true;
}
}
if (shouldAbort) {
// Abort, rather than just close, the underlying stream. Otherwise, the
// remaining object payload is read from S3 while closing the stream.
LOG.debug("Aborting stream");
wrappedStream.abort();
LOG.debug("Aborting stream {}", uri);
try {
wrappedStream.abort();
} catch (Exception e) {
LOG.warn("When aborting {} stream after failing to close it for {}",
uri, reason, e);
}
streamStatistics.streamClose(true, remaining);
}
LOG.debug("Stream {} {}: {}; remaining={} streamPos={},"
+ " nextReadPos={}," +
" request range {}-{} length={}",
" request range {}-{} length={}",
uri, (shouldAbort ? "aborted" : "closed"), reason,
remaining, pos, nextReadPos,
contentRangeStart, contentRangeFinish,
length);
} finally {
wrappedStream = null;
object = null;
}
}

Expand Down