diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java index c9ccc09235ff5..d85f0895a6fb6 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java @@ -32,6 +32,7 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.function.Consumer; import java.util.function.IntFunction; import java.util.zip.CRC32; @@ -425,6 +426,7 @@ static ByteBuffer checkBytes(ByteBuffer sumsBytes, return data; } + /** * Vectored read. * If the file has no checksums: delegate to the underlying stream. @@ -438,6 +440,13 @@ static ByteBuffer checkBytes(ByteBuffer sumsBytes, @Override public void readVectored(List ranges, IntFunction allocate) throws IOException { + readVectored(ranges, allocate, (b) -> { }); + } + + @Override + public void readVectored(final List ranges, + final IntFunction allocate, + final Consumer release) throws IOException { // If the stream doesn't have checksums, just delegate. if (sums == null) { @@ -462,8 +471,8 @@ public void readVectored(List ranges, } List checksumRanges = findChecksumRanges(dataRanges, bytesPerSum, minSeek, maxSize); - sums.readVectored(checksumRanges, allocate); - datas.readVectored(dataRanges, allocate); + sums.readVectored(checksumRanges, allocate, release); + datas.readVectored(dataRanges, allocate, release); for(CombinedFileRange checksumRange: checksumRanges) { for(FileRange dataRange: checksumRange.getUnderlying()) { // when we have both the ranges, validate the checksum diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PositionedReadable.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PositionedReadable.java index 90009ecb61bb5..9389cb843f3eb 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PositionedReadable.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PositionedReadable.java @@ -21,10 +21,12 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.List; +import java.util.function.Consumer; import java.util.function.IntFunction; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.io.ByteBufferPool; /** * Stream that permits positional reading. @@ -133,4 +135,30 @@ default void readVectored(List ranges, IntFunction allocate) throws IOException { VectoredReadUtils.readVectored(this, ranges, allocate); } + + /** + * Variant of {@link #readVectored(List, IntFunction)} where a release() function + * may be invoked if problems surface during reads -this method is called to + * try to return any allocated buffer which has not been read yet. + * Buffers which have successfully been read and returned to the caller do not + * get released: this is for failures only. + *

+ * The default implementation calls readVectored/2 so as to ensure that + * if an existing stream implementation does not implement this method + * all is good. + *

+ * Implementations SHOULD override this method if they can release buffers as + * part of their error handling. + * @param ranges the byte ranges to read + * @param allocate the function to allocate ByteBuffer + * @param release the function to release a ByteBuffer. + * @throws IOException any IOE. + * @throws IllegalArgumentException if the any of ranges are invalid, or they overlap. + */ + default void readVectored(List ranges, + IntFunction allocate, + Consumer release) throws IOException { + readVectored(ranges, allocate); + } + } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java index fa5624e67158d..6e0f6bae1c1ab 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java @@ -49,6 +49,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; import java.util.function.IntFunction; import org.apache.hadoop.classification.InterfaceAudience; @@ -68,6 +69,7 @@ import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.StringUtils; +import static org.apache.hadoop.fs.VectoredReadUtils.LOG_BYTE_BUFFER_RELEASED; import static org.apache.hadoop.fs.VectoredReadUtils.sortRangeList; import static org.apache.hadoop.fs.VectoredReadUtils.validateRangeRequest; import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs; @@ -319,6 +321,13 @@ AsynchronousFileChannel getAsyncChannel() throws IOException { @Override public void readVectored(List ranges, IntFunction allocate) throws IOException { + readVectored(ranges, allocate, LOG_BYTE_BUFFER_RELEASED); + } + + @Override + public void readVectored(final List ranges, + final IntFunction allocate, + final Consumer release) throws IOException { // Validate, but do not pass in a file length as it may change. List sortedRanges = sortRangeList(ranges); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/VectoredReadUtils.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/VectoredReadUtils.java index 2f99edc910c16..26a92fcd60a11 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/VectoredReadUtils.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/VectoredReadUtils.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; import java.util.function.IntFunction; import org.slf4j.Logger; @@ -52,6 +53,15 @@ public final class VectoredReadUtils { private static final Logger LOG = LoggerFactory.getLogger(VectoredReadUtils.class); + /** + * This releaser just logs at debug that the buffer + * was released. + */ + public static final Consumer LOG_BYTE_BUFFER_RELEASED = + (buffer) -> { + LOG.debug("release buffer {}", buffer.toString()); + }; + /** * Validate a single range. * @param range range to validate. @@ -98,11 +108,29 @@ public static void validateVectoredReadRanges(List ranges) public static void readVectored(PositionedReadable stream, List ranges, IntFunction allocate) throws EOFException { + readVectored(stream, ranges, allocate, LOG_BYTE_BUFFER_RELEASED); + } + + /** + * Variant of {@link #readVectored(PositionedReadable, List, IntFunction)} + * where a release() function is invoked if problems surface during reads. + * @param ranges the byte ranges to read + * @param allocate the function to allocate ByteBuffer + * @param release the function to release a ByteBuffer. + * @throws IllegalArgumentException if the any of ranges are invalid, or they overlap. + * @throws EOFException the range offset is negative + */ + public static void readVectored(PositionedReadable stream, + List ranges, + IntFunction allocate, + Consumer release) throws EOFException { + for (FileRange range: validateAndSortRanges(ranges, Optional.empty())) { - range.setData(readRangeFrom(stream, range, allocate)); + range.setData(readRangeFrom(stream, range, allocate, release)); } } + /** * Synchronously reads a range from the stream dealing with the combinations * of ByteBuffers buffers and PositionedReadable streams. @@ -118,11 +146,31 @@ public static CompletableFuture readRangeFrom( PositionedReadable stream, FileRange range, IntFunction allocate) throws EOFException { + return readRangeFrom(stream, range, allocate, LOG_BYTE_BUFFER_RELEASED); + } + + /** + * Synchronously reads a range from the stream dealing with the combinations + * of ByteBuffers buffers and PositionedReadable streams. + * @param stream the stream to read from + * @param range the range to read + * @param allocate the function to allocate ByteBuffers + * @param release the function to release a ByteBuffer. + * @return the CompletableFuture that contains the read data or an exception. + * @throws IllegalArgumentException the range is invalid other than by offset or being null. + * @throws EOFException the range offset is negative + * @throws NullPointerException if the range is null. + */ + public static CompletableFuture readRangeFrom( + PositionedReadable stream, + FileRange range, + IntFunction allocate, + Consumer release) throws EOFException { validateRangeRequest(range); CompletableFuture result = new CompletableFuture<>(); + ByteBuffer buffer = allocate.apply(range.getLength()); try { - ByteBuffer buffer = allocate.apply(range.getLength()); if (stream instanceof ByteBufferPositionedReadable) { LOG.debug("ByteBufferPositionedReadable.readFully of {}", range); ((ByteBufferPositionedReadable) stream).readFully(range.getOffset(), @@ -136,6 +184,7 @@ public static CompletableFuture readRangeFrom( result.complete(buffer); } catch (IOException ioe) { LOG.debug("Failed to read {}", range, ioe); + release.accept(buffer); result.completeExceptionally(ioe); } return result; @@ -147,6 +196,8 @@ public static CompletableFuture readRangeFrom( * @param range file range * @param buffer destination buffer * @throws IOException IO problems. + * @throws EOFException the end of the data was reached before + * the read operation completed */ private static void readNonByteBufferPositionedReadable( PositionedReadable stream, diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/VectorIOBufferPool.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/VectorIOBufferPool.java new file mode 100644 index 0000000000000..22f1dce2b1f91 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/VectorIOBufferPool.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.impl; + +import java.nio.ByteBuffer; +import java.util.function.Consumer; +import java.util.function.IntFunction; + +import org.apache.hadoop.io.ByteBufferPool; + +import static java.util.Objects.requireNonNull; + +/** + * A ByteBufferPool implementation that uses a pair of functions to allocate + * and release ByteBuffers; intended for use implementing the VectorIO API + * as it makes the pair of functions easier to pass around and use in + * existing code. + *

+ * No matter what kind of buffer is requested, the allocation function + * is invoked; that is: the direct flag is ignored. + */ +public final class VectorIOBufferPool implements ByteBufferPool { + + /** The function to allocate a buffer. */ + private final IntFunction allocate; + + /** The function to release a buffer. */ + private final Consumer release; + + /** + * @param allocate the function to allocate ByteBuffer + * @param release the function to release a ByteBuffer. + */ + public VectorIOBufferPool( + IntFunction allocate, + Consumer release) { + this.allocate = requireNonNull(allocate); + this.release = requireNonNull(release); + } + + /** + * Get a ByteBuffer. + * @param direct heap/direct flag. Unused. + * @param length The minimum length the buffer will have. + * @return a buffer + */ + @Override + public ByteBuffer getBuffer(final boolean direct, final int length) { + return allocate.apply(length); + } + + /** + * Release a buffer. + * Unlike normal ByteBufferPool implementations + * a null buffer is accepted and ignored. + * @param buffer buffer to release; may be null. + */ + @Override + public void putBuffer(final ByteBuffer buffer) { + if (buffer != null) { + release.accept(buffer); + } + } +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/TestVectoredReadUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/TestVectoredReadUtils.java index b08fc95279a82..e232c53f66213 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/TestVectoredReadUtils.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/TestVectoredReadUtils.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; import java.util.function.IntFunction; import org.assertj.core.api.Assertions; @@ -40,6 +41,7 @@ import org.apache.hadoop.fs.FileRange; import org.apache.hadoop.fs.PositionedReadable; import org.apache.hadoop.fs.VectoredReadUtils; +import org.apache.hadoop.io.ElasticByteBufferPool; import org.apache.hadoop.test.HadoopTestBase; import static java.util.Arrays.asList; @@ -823,4 +825,49 @@ public void testReadOverEOFRejected() throws Exception { asList(createFileRange(length - 1, 2)), Optional.of(length))); } + + @Test + public void testVectorIOBufferPool() throws Throwable { + ElasticByteBufferPool elasticByteBufferPool = new ElasticByteBufferPool(); + + // inlined lambda to assert the pool size + Consumer assertPoolSize = (size) -> { + Assertions.assertThat(elasticByteBufferPool.size(false)) + .describedAs("Pool size") + .isEqualTo(size); + }; + + // build vector pool from the buffer pool operations converted to + // allocate and release lambda expressions + VectorIOBufferPool vectorBuffers = new VectorIOBufferPool( + r -> elasticByteBufferPool.getBuffer(false, r), + elasticByteBufferPool::putBuffer); + + assertPoolSize.accept(0); + + final ByteBuffer b1 = vectorBuffers.getBuffer(false, 100); + final ByteBuffer b2 = vectorBuffers.getBuffer(false, 50); + + // return the first buffer for a pool size of 1 + vectorBuffers.putBuffer(b1); + assertPoolSize.accept(1); + + // expect the returned buffer back + ByteBuffer b3 = vectorBuffers.getBuffer(true, 100); + Assertions.assertThat(b3) + .describedAs("buffer returned from a get after a previous one was returned") + .isSameAs(b1); + assertPoolSize.accept(0); + + // return them all + vectorBuffers.putBuffer(b2); + vectorBuffers.putBuffer(b3); + assertPoolSize.accept(2); + + // release does not propagate + vectorBuffers.release(); + assertPoolSize.accept(2); + + elasticByteBufferPool.release(); + } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java index c620ca042dc82..a90f577fc38d7 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java @@ -32,7 +32,9 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; import java.util.function.IntFunction; +import java.util.stream.Collectors; import software.amazon.awssdk.core.ResponseInputStream; import software.amazon.awssdk.services.s3.model.GetObjectRequest; @@ -41,7 +43,9 @@ import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.impl.LeakReporter; +import org.apache.hadoop.fs.impl.VectorIOBufferPool; import org.apache.hadoop.fs.statistics.StreamStatisticNames; +import org.apache.hadoop.io.ByteBufferPool; import org.apache.hadoop.util.Preconditions; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -68,10 +72,13 @@ import static java.util.Objects.requireNonNull; import static org.apache.commons.lang3.StringUtils.isNotEmpty; +import static org.apache.hadoop.fs.VectoredReadUtils.LOG_BYTE_BUFFER_RELEASED; import static org.apache.hadoop.fs.VectoredReadUtils.isOrderedDisjoint; import static org.apache.hadoop.fs.VectoredReadUtils.mergeSortedRanges; import static org.apache.hadoop.fs.VectoredReadUtils.validateAndSortRanges; import static org.apache.hadoop.fs.s3a.Invoker.onceTrackingDuration; +import static org.apache.hadoop.fs.s3a.impl.ErrorTranslation.shouldInputStreamBeAborted; +import static org.apache.hadoop.fs.s3a.impl.SDKStreamDrainer.abortSdkStream; import static org.apache.hadoop.util.StringUtils.toLowerCase; import static org.apache.hadoop.util.functional.FutureIO.awaitFuture; @@ -566,11 +573,8 @@ public synchronized int read() throws IOException { } try { b = wrappedStream.read(); - } catch (HttpChannelEOFException | SocketTimeoutException e) { - onReadFailure(e, true); - throw e; } catch (IOException e) { - onReadFailure(e, false); + onReadFailure(e, shouldInputStreamBeAborted(e)); throw e; } return b; @@ -713,7 +717,7 @@ public synchronized void close() throws IOException { if (!closed) { closed = true; try { - stopVectoredIOOperations.set(true); + stopVectorOperations(); // close or abort the stream; blocking closeStream("close() operation", false, true); // end the client+audit span. @@ -963,7 +967,8 @@ public int maxReadSizeForVectorReads() { /** * {@inheritDoc} - * Vectored read implementation for S3AInputStream. + * Pass to {@link #readVectored(List, IntFunction, Consumer)} + * with the {@link VectoredReadUtils#LOG_BYTE_BUFFER_RELEASED} releaser. * @param ranges the byte ranges to read. * @param allocate the function to allocate ByteBuffer. * @throws IOException IOE if any. @@ -971,12 +976,30 @@ public int maxReadSizeForVectorReads() { @Override public synchronized void readVectored(List ranges, IntFunction allocate) throws IOException { + readVectored(ranges, allocate, LOG_BYTE_BUFFER_RELEASED); + } + + /** + * {@inheritDoc} + * Vectored read implementation for S3AInputStream. + * @param ranges the byte ranges to read. + * @param allocate the function to allocate ByteBuffer. + * @param release the function to release a ByteBuffer. + * @throws IOException IOE if any. + */ + @Override + public void readVectored(final List ranges, + final IntFunction allocate, + final Consumer release) throws IOException { LOG.debug("Starting vectored read on path {} for ranges {} ", pathStr, ranges); checkNotClosed(); - if (stopVectoredIOOperations.getAndSet(false)) { - LOG.debug("Reinstating vectored read operation for path {} ", pathStr); - } + maybeStartVectorOperations(); + // fail fast on parameters which would otherwise only be checked + // in threads and/or in failures. + requireNonNull(allocate , "Null allocator"); + requireNonNull(release, "Null releaser"); + ByteBufferPool bufferPool = new VectorIOBufferPool(allocate, release); // prepare to read List sortedRanges = validateAndSortRanges(ranges, fileLength); @@ -994,8 +1017,8 @@ public synchronized void readVectored(List ranges, LOG.debug("Not merging the ranges as they are disjoint"); streamStatistics.readVectoredOperationStarted(sortedRanges.size(), sortedRanges.size()); for (FileRange range: sortedRanges) { - ByteBuffer buffer = allocate.apply(range.getLength()); - boundedThreadPool.submit(() -> readSingleRange(range, buffer)); + // submit the read operation to the threadpool + boundedThreadPool.submit(() -> readSingleRangeWithRetries(range, bufferPool)); } } else { LOG.debug("Trying to merge the ranges as they are not disjoint"); @@ -1007,35 +1030,94 @@ public synchronized void readVectored(List ranges, ranges.size(), combinedFileRanges.size()); for (CombinedFileRange combinedFileRange: combinedFileRanges) { boundedThreadPool.submit( - () -> readCombinedRangeAndUpdateChildren(combinedFileRange, allocate)); + () -> readCombinedRangeAndUpdateChildren(combinedFileRange, bufferPool)); } } LOG.debug("Finished submitting vectored read to threadpool" + " on path {} for ranges {} ", pathStr, ranges); } + /** + * Start/restart vector operations if not active. + * In particular, after an unbuffer(), this performs any + * initialization required. + */ + private void maybeStartVectorOperations() { + if (stopVectoredIOOperations.getAndSet(false)) { + LOG.debug("Reinstating vectored read operation for path {} ", pathStr); + } + } + + /** + * Stop vector operations. + */ + private void stopVectorOperations() { + stopVectoredIOOperations.set(true); + } + /** * Read the data from S3 for the bigger combined file range and update all the * underlying ranges. + *

+ * If the read process fails, all underlying ranges which have not already completed + * are completed exceptionally. * @param combinedFileRange big combined file range. - * @param allocate method to create byte buffers to hold result data. + * @param bufferPool buffer pool */ + @Retries.RetryExceptionsSwallowed private void readCombinedRangeAndUpdateChildren(CombinedFileRange combinedFileRange, - IntFunction allocate) { + ByteBufferPool bufferPool) { LOG.debug("Start reading {} from path {} ", combinedFileRange, pathStr); ResponseInputStream rangeContent = null; try { - rangeContent = getS3ObjectInputStream("readCombinedFileRange", - combinedFileRange.getOffset(), - combinedFileRange.getLength()); - populateChildBuffers(combinedFileRange, rangeContent, allocate); - } catch (Exception ex) { + // issue the GET request; this retries GET but not reads internally. + rangeContent = getS3Object("readCombinedFileRange", + combinedFileRange.getOffset(), + combinedFileRange.getLength(), + true); + // GET has succeeded, make sure request is good to continue + checkIfVectoredIOStopped(); + } catch (IOException ex) { + // any exception here means that repeated HEAD requests have failed; + // consider the request unrecoverable. + LOG.debug("Failed initiating GET request to {}; failing all ranges", pathStr, ex); + combinedFileRange.getUnderlying().stream() + .map(FileRange::getData) + .forEach(f -> f.completeExceptionally(ex)); + return; + } + + // at this point there is a stream to read from, which + // MUST be closed in the finally block. + try { + populateChildBuffers(combinedFileRange, rangeContent, bufferPool); + } catch (IOException ex) { LOG.debug("Exception while reading {} from path {} ", combinedFileRange, pathStr, ex); - // complete exception all the underlying ranges which have not already - // finished. - for(FileRange child : combinedFileRange.getUnderlying()) { - if (!child.getData().isDone()) { - child.getData().completeExceptionally(ex); + + // close the ongoing read. No attempt is made to train the stream. + IOUtils.cleanupWithLogger(LOG, rangeContent); + rangeContent = null; + + // get all the incomplete reads. + final List unreadRanges = combinedFileRange.getUnderlying().stream() + .filter(f -> !f.getData().isDone()) + .collect(Collectors.toList()); + + LOG.debug("There are {} remaining vector ranges to retrieve", unreadRanges.size()); + + // Attempt to recover from the failure by reading each range individually + // within the current thread. + // If a single read is unrecoverable, all subsequent range reads are failed + // with the same exception. + // this is to process unrecoverable failures faster. + IOException lastIOE = null; + for (FileRange child : unreadRanges) { + if (lastIOE == null) { + // all good so far: request the next range + lastIOE = readSingleRangeWithRetries(child, bufferPool); + } else { + // a predecessor failed, do not attempt to recover. + child.getData().completeExceptionally(lastIOE); } } } finally { @@ -1046,29 +1128,40 @@ private void readCombinedRangeAndUpdateChildren(CombinedFileRange combinedFileRa /** * Populate underlying buffers of the child ranges. - * There is no attempt to recover from any read failures. + * There is no attempt to recover from any read failures, + * but the {@code release} function is invoked to attempt + * to release the buffer. * @param combinedFileRange big combined file range. * @param objectContent data from s3. - * @param allocate method to allocate child byte buffers. + * @param bufferPool buffer pool * @throws IOException any IOE. * @throws EOFException if EOF if read() call returns -1 * @throws InterruptedIOException if vectored IO operation is stopped. */ + @Retries.OnceTranslated private void populateChildBuffers(CombinedFileRange combinedFileRange, - InputStream objectContent, - IntFunction allocate) throws IOException { + ResponseInputStream objectContent, + ByteBufferPool bufferPool) throws IOException { // If the combined file range just contains a single child // range, we only have to fill that one child buffer else // we drain the intermediate data between consecutive ranges // and fill the buffers one by one. if (combinedFileRange.getUnderlying().size() == 1) { FileRange child = combinedFileRange.getUnderlying().get(0); - ByteBuffer buffer = allocate.apply(child.getLength()); - populateBuffer(child, buffer, objectContent); + ByteBuffer buffer = bufferPool.getBuffer(false, child.getLength()); + try { + populateBuffer(child, buffer, objectContent); + } catch (IOException e) { + // release the buffer + bufferPool.putBuffer(buffer); + throw e; + } child.getData().complete(buffer); } else { FileRange prev = null; for (FileRange child : combinedFileRange.getUnderlying()) { + ByteBuffer buffer = null; + checkIfVectoredIOStopped(); if (prev != null) { final long position = prev.getOffset() + prev.getLength(); @@ -1077,11 +1170,19 @@ private void populateChildBuffers(CombinedFileRange combinedFileRange, // work out how much long drainQuantity = child.getOffset() - position; // and drain it. + // this will raise EOFException if a -1 was returned. drainUnnecessaryData(objectContent, position, drainQuantity); } } - ByteBuffer buffer = allocate.apply(child.getLength()); - populateBuffer(child, buffer, objectContent); + try { + buffer = bufferPool.getBuffer(false, child.getLength()); + populateBuffer(child, buffer, objectContent); + } catch (IOException e) { + // release the buffer + bufferPool.putBuffer(buffer); + // rethrow + throw e; + } child.getData().complete(buffer); prev = child; } @@ -1099,7 +1200,7 @@ private void populateChildBuffers(CombinedFileRange combinedFileRange, */ @Retries.OnceTranslated private void drainUnnecessaryData( - final InputStream objectContent, + final ResponseInputStream objectContent, final long position, long drainQuantity) throws IOException { @@ -1123,11 +1224,18 @@ private void drainUnnecessaryData( "End of stream reached draining data between ranges; expected %,d bytes;" + " only drained %,d bytes before -1 returned (position=%,d)", drainQuantity, drainBytes, position + drainBytes); + LOG.debug(s); throw new EOFException(s); } drainBytes += readCount; remaining -= readCount; } + } catch(IOException ex) { + if (shouldInputStreamBeAborted(ex)) { + // abort the stream if the exception indicates this is needed. + abortSdkStream(uri, objectContent, streamStatistics, "drain failure"); + } + throw ex; } finally { streamStatistics.readVectoredBytesDiscarded(drainBytes); LOG.debug("{} bytes drained from stream ", drainBytes); @@ -1135,53 +1243,95 @@ private void drainUnnecessaryData( } /** - * Read data from S3 for this range and populate the buffer. + * Read data from S3 for this range and populate a buffer. + * The GET request and single range reads are retried. + * Any IOException which is propagated by the retry logic is + * attached to the range as an exceptional failure. * @param range range of data to read. - * @param buffer buffer to fill. + * @param bufferPool buffer allocator. + * @return any IOE which resulted in the read being unsuccessful; null on success. + */ + @Retries.RetryTranslated + private IOException readSingleRangeWithRetries( + FileRange range, + ByteBufferPool bufferPool) { + try { + context.getReadInvoker().retry("vector read", uri, true, () -> + readSingleRange(range, bufferPool)); + return null; + } catch (IOException ex) { + // the retry mechanism has stopped retrying, so mark the request as a failure. + range.getData().completeExceptionally(ex); + return ex; + } + } + + /** + * Read data from S3 for this range and populate a buffer. + * If the full read was succesful, the range's future is declared + * complete. + *

+ * If an exception is raised, + *

    + *
  1. The buffer is returned to the pool.
  2. + *
  3. The HTTP connection will be aborted if deemed to have failed.
  4. + *
  5. The relevant statistics will be updated.
  6. + *
  7. The exception is rethrown.
  8. + *
+ * This is to allow the operation to be invoked in a retry() operation. + * @param range range of data to read. + * @param bufferPool buffer allocator. + * @throws IOException failure to GET or read the data. */ - private void readSingleRange(FileRange range, ByteBuffer buffer) { + @Retries.OnceTranslated + private void readSingleRange(FileRange range, + ByteBufferPool bufferPool) throws IOException { + LOG.debug("Start reading {} from {} ", range, pathStr); if (range.getLength() == 0) { + ByteBuffer buffer = bufferPool.getBuffer(false, range.getLength()); // a zero byte read. buffer.flip(); range.getData().complete(buffer); return; } + // buffer which will be fetched from the buffer pool and populated, + // on successful reads this will be returned in the response. + // on failures it must be returned to the pool. + ByteBuffer buffer = null; + // the contents of the ranged object request. ResponseInputStream objectRange = null; try { long position = range.getOffset(); int length = range.getLength(); - objectRange = getS3ObjectInputStream("readSingleRange", position, length); + // a GET request, which has risk of failing if the file is gone, changed etc. + objectRange = getS3Object("readSingleRange", position, length, false); + + // GET has succeeded, make sure request is good to continue + checkIfVectoredIOStopped(); + + buffer = bufferPool.getBuffer(false, range.getLength()); + + // read in the data and declare this range successfully read. populateBuffer(range, buffer, objectRange); range.getData().complete(buffer); - } catch (Exception ex) { - LOG.warn("Exception while reading a range {} from path {} ", range, pathStr, ex); - range.getData().completeExceptionally(ex); + LOG.debug("Finished reading range {} from path {}", range, pathStr); + } catch (IOException ex) { + // any failure. + // log, the error, return the buffer to the pool, and report a failure. + LOG.debug("Exception while reading a range {} from path {}", range, pathStr, ex); + if (buffer != null) { + // return any buffer to the pool + bufferPool.putBuffer(buffer); + } + if (shouldInputStreamBeAborted(ex)) { + // abort the stream if the exception indicates this is needed. + abortSdkStream(uri, objectRange, streamStatistics, "read failure"); + } + throw ex; } finally { IOUtils.cleanupWithLogger(LOG, objectRange); } - LOG.debug("Finished reading range {} from path {} ", range, pathStr); - } - - /** - * Get the s3 object input stream for S3 server for a specified range. - * Also checks if the vectored io operation has been stopped before and after - * the http get request such that we don't waste time populating the buffers. - * @param operationName name of the operation for which get object on S3 is called. - * @param position position of the object to be read from S3. - * @param length length from position of the object to be read from S3. - * @return result s3 object. - * @throws IOException exception if any. - * @throws InterruptedIOException if vectored io operation is stopped. - */ - @Retries.RetryTranslated - private ResponseInputStream getS3ObjectInputStream( - final String operationName, final long position, final int length) throws IOException { - checkIfVectoredIOStopped(); - ResponseInputStream objectRange = - getS3Object(operationName, position, length); - checkIfVectoredIOStopped(); - return objectRange; } /** @@ -1191,16 +1341,18 @@ private ResponseInputStream getS3ObjectInputStream( * @param range vector range to populate. * @param buffer buffer to fill. * @param objectContent result retrieved from S3 store. - * @throws IOException any IOE. + * @throws IOException any IOE raised reading the input stream. * @throws EOFException if EOF if read() call returns -1 * @throws InterruptedIOException if vectored IO operation is stopped. */ + @Retries.OnceTranslated private void populateBuffer(FileRange range, ByteBuffer buffer, InputStream objectContent) throws IOException { int length = range.getLength(); if (buffer.isDirect()) { + // direct buffer so copy the data into the buffer from the stream. VectoredReadUtils.readInDirectBuffer(range, buffer, (position, tmp, offset, currentLength) -> { readByteArray(objectContent, range, tmp, offset, currentLength); @@ -1227,6 +1379,7 @@ private void populateBuffer(FileRange range, * @throws EOFException if EOF if read() call returns -1 * @throws InterruptedIOException if vectored IO operation is stopped. */ + @Retries.OnceTranslated private void readByteArray(InputStream objectContent, final FileRange range, byte[] dest, @@ -1242,11 +1395,12 @@ private void readByteArray(InputStream objectContent, length - readBytes); LOG.debug("read {} bytes from stream", readBytesCurr); if (readBytesCurr < 0) { - throw new EOFException( - String.format("HTTP stream closed before all bytes were read." - + " Expected %,d bytes but only read %,d bytes. Current position %,d" - + " (%s)", - length, readBytes, position, range)); + final String message = String.format("HTTP stream closed before all bytes were read." + + " Expected %,d bytes but only read %,d bytes. Current position %,d" + + " (%s)", + length, readBytes, position, range); + LOG.warn(message); + throw new EOFException(message); } readBytes += readBytesCurr; position += readBytesCurr; @@ -1257,22 +1411,29 @@ private void readByteArray(InputStream objectContent, } /** - * Read data from S3 with retries for the GET request - * This also handles if file has been changed while the + * Read data from S3 with retries for the GET request, as part of a vector IO + * operation. + *

+ * This also handles the file being changed while the * http call is getting executed. If the file has been * changed RemoteFileChangedException is thrown. + *

+ * It checks if the vectored io operation has been stopped before + * the http GET request such that we don't waste time populating the buffers. * @param operationName name of the operation for which get object on S3 is called. * @param position position of the object to be read from S3. * @param length length from position of the object to be read from S3. + * @param shouldRetry should GET requests be retried. * @return S3Object result s3 object. - * @throws IOException exception if any. + * @throws IOException exception if the S3 call fails. * @throws InterruptedIOException if vectored io operation is stopped. * @throws RemoteFileChangedException if file has changed on the store. */ - @Retries.RetryTranslated + @Retries.RetryTranslated("if shouldRetry is true; OnceTranslated otherwise") private ResponseInputStream getS3Object(String operationName, - long position, - int length) + long position, + int length, + boolean shouldRetry) throws IOException { final GetObjectRequest request = client.newGetRequestBuilder(key) .range(S3AUtils.formatRange(position, position + length - 1)) @@ -1282,11 +1443,18 @@ private ResponseInputStream getS3Object(String operationName, ResponseInputStream objectRange; Invoker invoker = context.getReadInvoker(); try { - objectRange = invoker.retry(operationName, pathStr, true, - () -> { - checkIfVectoredIOStopped(); - return client.getObject(request); - }); + // the operation to invoke + CallableRaisingIOE> operation = () -> { + checkIfVectoredIOStopped(); + return client.getObject(request); + }; + // should this be retried? + if (shouldRetry) { + objectRange = invoker.retry(operationName, pathStr, true, + operation); + } else { + objectRange = Invoker.once(operationName, pathStr, operation); + } } catch (IOException ex) { tracker.failed(); @@ -1300,15 +1468,15 @@ private ResponseInputStream getS3Object(String operationName, } /** - * Check if vectored io operation has been stooped. This happens - * when the stream is closed or unbuffer is called. + * Check if vectored io operation has been stopped. This happens + * when the stream is closed or unbuffer() was called during the read. * @throws InterruptedIOException throw InterruptedIOException such * that all running vectored io is * terminated thus releasing resources. */ private void checkIfVectoredIOStopped() throws InterruptedIOException { if (stopVectoredIOOperations.get()) { - throw new InterruptedIOException("Stream closed or unbuffer is called"); + throw new InterruptedIOException("Stream closed or unbuffer() was called during the read"); } } @@ -1406,7 +1574,7 @@ public static long validateReadahead(@Nullable Long readahead) { @Override public synchronized void unbuffer() { try { - stopVectoredIOOperations.set(true); + stopVectorOperations(); closeStream("unbuffer()", false, false); } finally { streamStatistics.unbuffered(); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ErrorTranslation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ErrorTranslation.java index cd24b61340c3f..ccf81fa4d39e5 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ErrorTranslation.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ErrorTranslation.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.lang.reflect.Constructor; +import java.net.SocketTimeoutException; import software.amazon.awssdk.awscore.exception.AwsServiceException; import software.amazon.awssdk.core.exception.SdkException; @@ -270,6 +271,17 @@ public static HttpChannelEOFException maybeExtractChannelException( return null; } + /** + * Is the exception to be considered as an unrecoverable channel failure + * -and that the stream should be aborted if so. + * @param t caught exception. + * @return true if the stream must not be returned to the pool. + */ + public static boolean shouldInputStreamBeAborted(Throwable t) { + return (t instanceof HttpChannelEOFException) + || (t instanceof SocketTimeoutException); + } + /** * AWS error codes explicitly recognized and processes specially; * kept in their own class for isolation. diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/SDKStreamDrainer.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/SDKStreamDrainer.java index a8aa532ac024d..48ea99ce30f61 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/SDKStreamDrainer.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/SDKStreamDrainer.java @@ -21,6 +21,8 @@ import java.io.InputStream; import java.util.concurrent.atomic.AtomicBoolean; +import javax.annotation.Nullable; + import software.amazon.awssdk.http.Abortable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,6 +40,7 @@ * Drains/aborts s3 or other AWS SDK streams. * It is callable so can be passed directly to a submitter * for async invocation. + * @param type of stream to drain/abort. */ public class SDKStreamDrainer implements CallableRaisingIOE { @@ -300,4 +303,31 @@ public String toString() { ", thrown=" + thrown + '}'; } + + /** + * Abort a stream, always. + * @param type of stream to drain/abort. + * @param uri URI for messages + * @param sdkStream stream to close. Can be null. + * @param streamStatistics stats to update + * @param reason reason for stream being closed; used in messages + * @return true if the abort was successful. + */ + public static boolean abortSdkStream( + final String uri, + @Nullable final TStream sdkStream, + final S3AInputStreamStatistics streamStatistics, + final String reason) { + if (sdkStream == null) { + return false; + } + return new SDKStreamDrainer<>( + uri, + sdkStream, + true, + 0, + streamStatistics, + reason) + .apply(); + } }