From 311c2e09995c8a82440024d082d87719333e3171 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Wed, 9 Oct 2024 18:47:02 +0100 Subject: [PATCH 1/5] HADOOP-19105. Improve resilience in vector reads. Add a new releaser method, which then is invoked to release buffers on failure. It is a bit contrived how we try not break external implementations when adding a new default implementation to PositionedReadable: the releaser will be lost unless they do so. Change-Id: Ie2f95a89cfa79d3fdf33c1cc58f63e4ca00c6207 --- .../apache/hadoop/fs/ChecksumFileSystem.java | 13 +++- .../apache/hadoop/fs/PositionedReadable.java | 28 +++++++++ .../apache/hadoop/fs/RawLocalFileSystem.java | 9 +++ .../apache/hadoop/fs/VectoredReadUtils.java | 53 ++++++++++++++++- .../apache/hadoop/fs/s3a/S3AInputStream.java | 59 ++++++++++++++++--- 5 files changed, 150 insertions(+), 12 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java index c9ccc09235ff5..d85f0895a6fb6 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java @@ -32,6 +32,7 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.function.Consumer; import java.util.function.IntFunction; import java.util.zip.CRC32; @@ -425,6 +426,7 @@ static ByteBuffer checkBytes(ByteBuffer sumsBytes, return data; } + /** * Vectored read. * If the file has no checksums: delegate to the underlying stream. @@ -438,6 +440,13 @@ static ByteBuffer checkBytes(ByteBuffer sumsBytes, @Override public void readVectored(List ranges, IntFunction allocate) throws IOException { + readVectored(ranges, allocate, (b) -> { }); + } + + @Override + public void readVectored(final List ranges, + final IntFunction allocate, + final Consumer release) throws IOException { // If the stream doesn't have checksums, just delegate. if (sums == null) { @@ -462,8 +471,8 @@ public void readVectored(List ranges, } List checksumRanges = findChecksumRanges(dataRanges, bytesPerSum, minSeek, maxSize); - sums.readVectored(checksumRanges, allocate); - datas.readVectored(dataRanges, allocate); + sums.readVectored(checksumRanges, allocate, release); + datas.readVectored(dataRanges, allocate, release); for(CombinedFileRange checksumRange: checksumRanges) { for(FileRange dataRange: checksumRange.getUnderlying()) { // when we have both the ranges, validate the checksum diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PositionedReadable.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PositionedReadable.java index 90009ecb61bb5..9389cb843f3eb 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PositionedReadable.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PositionedReadable.java @@ -21,10 +21,12 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.List; +import java.util.function.Consumer; import java.util.function.IntFunction; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.io.ByteBufferPool; /** * Stream that permits positional reading. @@ -133,4 +135,30 @@ default void readVectored(List ranges, IntFunction allocate) throws IOException { VectoredReadUtils.readVectored(this, ranges, allocate); } + + /** + * Variant of {@link #readVectored(List, IntFunction)} where a release() function + * may be invoked if problems surface during reads -this method is called to + * try to return any allocated buffer which has not been read yet. + * Buffers which have successfully been read and returned to the caller do not + * get released: this is for failures only. + *

+ * The default implementation calls readVectored/2 so as to ensure that + * if an existing stream implementation does not implement this method + * all is good. + *

+ * Implementations SHOULD override this method if they can release buffers as + * part of their error handling. + * @param ranges the byte ranges to read + * @param allocate the function to allocate ByteBuffer + * @param release the function to release a ByteBuffer. + * @throws IOException any IOE. + * @throws IllegalArgumentException if the any of ranges are invalid, or they overlap. + */ + default void readVectored(List ranges, + IntFunction allocate, + Consumer release) throws IOException { + readVectored(ranges, allocate); + } + } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java index fa5624e67158d..6e0f6bae1c1ab 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java @@ -49,6 +49,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; import java.util.function.IntFunction; import org.apache.hadoop.classification.InterfaceAudience; @@ -68,6 +69,7 @@ import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.StringUtils; +import static org.apache.hadoop.fs.VectoredReadUtils.LOG_BYTE_BUFFER_RELEASED; import static org.apache.hadoop.fs.VectoredReadUtils.sortRangeList; import static org.apache.hadoop.fs.VectoredReadUtils.validateRangeRequest; import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs; @@ -319,6 +321,13 @@ AsynchronousFileChannel getAsyncChannel() throws IOException { @Override public void readVectored(List ranges, IntFunction allocate) throws IOException { + readVectored(ranges, allocate, LOG_BYTE_BUFFER_RELEASED); + } + + @Override + public void readVectored(final List ranges, + final IntFunction allocate, + final Consumer release) throws IOException { // Validate, but do not pass in a file length as it may change. List sortedRanges = sortRangeList(ranges); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/VectoredReadUtils.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/VectoredReadUtils.java index 2f99edc910c16..edef56f94a8ab 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/VectoredReadUtils.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/VectoredReadUtils.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; import java.util.function.IntFunction; import org.slf4j.Logger; @@ -52,6 +53,15 @@ public final class VectoredReadUtils { private static final Logger LOG = LoggerFactory.getLogger(VectoredReadUtils.class); + /** + * This releaser just logs at debug that the buffer + * was released. + */ + public static final Consumer LOG_BYTE_BUFFER_RELEASED = + (buffer) -> { + LOG.debug("release buffer {}", buffer.toString()); + }; + /** * Validate a single range. * @param range range to validate. @@ -98,11 +108,29 @@ public static void validateVectoredReadRanges(List ranges) public static void readVectored(PositionedReadable stream, List ranges, IntFunction allocate) throws EOFException { + readVectored(stream, ranges, allocate, LOG_BYTE_BUFFER_RELEASED); + } + + /** + * Variant of {@link #readVectored(PositionedReadable, List, IntFunction)} + * where a release() function is invoked if problems surface during reads. + * @param ranges the byte ranges to read + * @param allocate the function to allocate ByteBuffer + * @param release the function to release a ByteBuffer. + * @throws IllegalArgumentException if the any of ranges are invalid, or they overlap. + * @throws EOFException the range offset is negative + */ + public static void readVectored(PositionedReadable stream, + List ranges, + IntFunction allocate, + Consumer release) throws EOFException { + for (FileRange range: validateAndSortRanges(ranges, Optional.empty())) { - range.setData(readRangeFrom(stream, range, allocate)); + range.setData(readRangeFrom(stream, range, allocate, release)); } } + /** * Synchronously reads a range from the stream dealing with the combinations * of ByteBuffers buffers and PositionedReadable streams. @@ -118,11 +146,31 @@ public static CompletableFuture readRangeFrom( PositionedReadable stream, FileRange range, IntFunction allocate) throws EOFException { + return readRangeFrom(stream, range, allocate, LOG_BYTE_BUFFER_RELEASED); + } + + /** + * Synchronously reads a range from the stream dealing with the combinations + * of ByteBuffers buffers and PositionedReadable streams. + * @param stream the stream to read from + * @param range the range to read + * @param allocate the function to allocate ByteBuffers + * @param release the function to release a ByteBuffer. + * @return the CompletableFuture that contains the read data or an exception. + * @throws IllegalArgumentException the range is invalid other than by offset or being null. + * @throws EOFException the range offset is negative + * @throws NullPointerException if the range is null. + */ + public static CompletableFuture readRangeFrom( + PositionedReadable stream, + FileRange range, + IntFunction allocate, + Consumer release) throws EOFException { validateRangeRequest(range); CompletableFuture result = new CompletableFuture<>(); + ByteBuffer buffer = allocate.apply(range.getLength()); try { - ByteBuffer buffer = allocate.apply(range.getLength()); if (stream instanceof ByteBufferPositionedReadable) { LOG.debug("ByteBufferPositionedReadable.readFully of {}", range); ((ByteBufferPositionedReadable) stream).readFully(range.getOffset(), @@ -136,6 +184,7 @@ public static CompletableFuture readRangeFrom( result.complete(buffer); } catch (IOException ioe) { LOG.debug("Failed to read {}", range, ioe); + release.accept(buffer); result.completeExceptionally(ioe); } return result; diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java index cfdc361234f9f..10228fb7f6905 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java @@ -31,6 +31,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; import java.util.function.IntFunction; import software.amazon.awssdk.core.ResponseInputStream; @@ -65,6 +66,7 @@ import static java.util.Objects.requireNonNull; import static org.apache.commons.lang3.StringUtils.isNotEmpty; +import static org.apache.hadoop.fs.VectoredReadUtils.LOG_BYTE_BUFFER_RELEASED; import static org.apache.hadoop.fs.VectoredReadUtils.isOrderedDisjoint; import static org.apache.hadoop.fs.VectoredReadUtils.mergeSortedRanges; import static org.apache.hadoop.fs.VectoredReadUtils.validateAndSortRanges; @@ -898,7 +900,8 @@ public int maxReadSizeForVectorReads() { /** * {@inheritDoc} - * Vectored read implementation for S3AInputStream. + * Pass to {@link #readVectored(List, IntFunction, Consumer)} + * with the {@link VectoredReadUtils#LOG_BYTE_BUFFER_RELEASED} releaser. * @param ranges the byte ranges to read. * @param allocate the function to allocate ByteBuffer. * @throws IOException IOE if any. @@ -906,11 +909,30 @@ public int maxReadSizeForVectorReads() { @Override public synchronized void readVectored(List ranges, IntFunction allocate) throws IOException { + readVectored(ranges, allocate, LOG_BYTE_BUFFER_RELEASED); + } + + /** + * {@inheritDoc} + * Vectored read implementation for S3AInputStream. + * @param ranges the byte ranges to read. + * @param allocate the function to allocate ByteBuffer. + * @param release the function to release a ByteBuffer. + * @throws IOException IOE if any. + */ + @Override + public void readVectored(final List ranges, + final IntFunction allocate, + final Consumer release) throws IOException { LOG.debug("Starting vectored read on path {} for ranges {} ", pathStr, ranges); checkNotClosed(); if (stopVectoredIOOperations.getAndSet(false)) { LOG.debug("Reinstating vectored read operation for path {} ", pathStr); } + // fail fast on parameters which would otherwise only be checked + // in threads and/or in failures. + requireNonNull(allocate , "Null allocator"); + requireNonNull(release, "Null releaser"); // prepare to read List sortedRanges = validateAndSortRanges(ranges, @@ -942,7 +964,7 @@ public synchronized void readVectored(List ranges, ranges.size(), combinedFileRanges.size()); for (CombinedFileRange combinedFileRange: combinedFileRanges) { boundedThreadPool.submit( - () -> readCombinedRangeAndUpdateChildren(combinedFileRange, allocate)); + () -> readCombinedRangeAndUpdateChildren(combinedFileRange, allocate, release)); } } LOG.debug("Finished submitting vectored read to threadpool" + @@ -954,16 +976,18 @@ public synchronized void readVectored(List ranges, * underlying ranges. * @param combinedFileRange big combined file range. * @param allocate method to create byte buffers to hold result data. + * @param release the function to release a ByteBuffer. */ private void readCombinedRangeAndUpdateChildren(CombinedFileRange combinedFileRange, - IntFunction allocate) { + IntFunction allocate, + final Consumer release) { LOG.debug("Start reading {} from path {} ", combinedFileRange, pathStr); ResponseInputStream rangeContent = null; try { rangeContent = getS3ObjectInputStream("readCombinedFileRange", combinedFileRange.getOffset(), combinedFileRange.getLength()); - populateChildBuffers(combinedFileRange, rangeContent, allocate); + populateChildBuffers(combinedFileRange, rangeContent, allocate, release); } catch (Exception ex) { LOG.debug("Exception while reading {} from path {} ", combinedFileRange, pathStr, ex); // complete exception all the underlying ranges which have not already @@ -981,17 +1005,21 @@ private void readCombinedRangeAndUpdateChildren(CombinedFileRange combinedFileRa /** * Populate underlying buffers of the child ranges. - * There is no attempt to recover from any read failures. + * There is no attempt to recover from any read failures, + * but the {@code release} function is invoked to attempt + * to release the buffer. * @param combinedFileRange big combined file range. * @param objectContent data from s3. * @param allocate method to allocate child byte buffers. + * @param release the function to release a ByteBuffer. * @throws IOException any IOE. * @throws EOFException if EOF if read() call returns -1 * @throws InterruptedIOException if vectored IO operation is stopped. */ private void populateChildBuffers(CombinedFileRange combinedFileRange, InputStream objectContent, - IntFunction allocate) throws IOException { + IntFunction allocate, + final Consumer release) throws IOException { // If the combined file range just contains a single child // range, we only have to fill that one child buffer else // we drain the intermediate data between consecutive ranges @@ -999,7 +1027,13 @@ private void populateChildBuffers(CombinedFileRange combinedFileRange, if (combinedFileRange.getUnderlying().size() == 1) { FileRange child = combinedFileRange.getUnderlying().get(0); ByteBuffer buffer = allocate.apply(child.getLength()); - populateBuffer(child, buffer, objectContent); + try { + populateBuffer(child, buffer, objectContent); + } catch (IOException e) { + // release the buffer + release.accept(buffer); + throw e; + } child.getData().complete(buffer); } else { FileRange prev = null; @@ -1016,7 +1050,13 @@ private void populateChildBuffers(CombinedFileRange combinedFileRange, } } ByteBuffer buffer = allocate.apply(child.getLength()); - populateBuffer(child, buffer, objectContent); + try { + populateBuffer(child, buffer, objectContent); + } catch (IOException e) { + // release the buffer + release.accept(buffer); + throw e; + } child.getData().complete(buffer); prev = child; } @@ -1074,6 +1114,7 @@ private void drainUnnecessaryData( * @param range range of data to read. * @param buffer buffer to fill. */ + @Retries.RetryTranslated("GET is retried; reads are not") private void readSingleRange(FileRange range, ByteBuffer buffer) { LOG.debug("Start reading {} from {} ", range, pathStr); if (range.getLength() == 0) { @@ -1130,6 +1171,7 @@ private ResponseInputStream getS3ObjectInputStream( * @throws EOFException if EOF if read() call returns -1 * @throws InterruptedIOException if vectored IO operation is stopped. */ + @Retries.OnceTranslated private void populateBuffer(FileRange range, ByteBuffer buffer, InputStream objectContent) throws IOException { @@ -1162,6 +1204,7 @@ private void populateBuffer(FileRange range, * @throws EOFException if EOF if read() call returns -1 * @throws InterruptedIOException if vectored IO operation is stopped. */ + @Retries.OnceTranslated private void readByteArray(InputStream objectContent, final FileRange range, byte[] dest, From 535bcfb0891b99fdbfad880775d03f11b17ad5f7 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Thu, 10 Oct 2024 14:07:13 +0100 Subject: [PATCH 2/5] HADOOP-19105. Improve resilience in vector reads. Trying to implement resilience in the s3a input stream; buffers of incomplete reads are now released on failures. Change-Id: I4b23c6a2ff417687fd19c1e8e9feef6cf733c031 --- .../fs/impl/BufferManagerFromVectorArgs.java | 74 ++++++++++++++++ .../apache/hadoop/fs/s3a/S3AInputStream.java | 88 +++++++++---------- 2 files changed, 115 insertions(+), 47 deletions(-) create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/BufferManagerFromVectorArgs.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/BufferManagerFromVectorArgs.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/BufferManagerFromVectorArgs.java new file mode 100644 index 0000000000000..53d686056b7f0 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/BufferManagerFromVectorArgs.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.impl; + +import java.nio.ByteBuffer; +import java.util.function.Consumer; +import java.util.function.IntFunction; + +import org.apache.hadoop.io.ByteBufferPool; + +/** + * A ByteBufferPool implementation that uses a pair of functions to allocate + * and release ByteBuffers; intended for use implementing the VectorIO API + * as it makes the pair of functions easier to pass around and use in + * existing code. + */ +public final class BufferManagerFromVectorArgs implements ByteBufferPool { + + /** The function to allocate a buffer. */ + private final IntFunction allocate; + + /** The function to release a buffer. */ + private final Consumer release; + + /** + * @param allocate the function to allocate ByteBuffer + * @param release the function to release a ByteBuffer. + */ + public BufferManagerFromVectorArgs(IntFunction allocate, + Consumer release) { + this.allocate = allocate; + this.release = release; + } + + /** + * Get a ByteBuffer. + * @param direct heap/direct flag. Unused. + * @param length The minimum length the buffer will have. + * @return a buffer + */ + @Override + public ByteBuffer getBuffer(final boolean direct, final int length) { + return allocate.apply(length); + } + + /** + * Release a buffer. + * Unlike normal ByteBufferPool implementations + * a null buffer is accepted and ignored. + * @param buffer buffer to release; may be null. + */ + @Override + public void putBuffer(final ByteBuffer buffer) { + if (buffer != null) { + release.accept(buffer); + } + } +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java index 10228fb7f6905..3c6b614301c0d 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java @@ -40,6 +40,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.fs.impl.BufferManagerFromVectorArgs; +import org.apache.hadoop.io.ByteBufferPool; import org.apache.hadoop.util.Preconditions; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -934,6 +936,7 @@ public void readVectored(final List ranges, requireNonNull(allocate , "Null allocator"); requireNonNull(release, "Null releaser"); + ByteBufferPool bufferPool = new BufferManagerFromVectorArgs(allocate, release); // prepare to read List sortedRanges = validateAndSortRanges(ranges, fileLength); @@ -951,8 +954,7 @@ public void readVectored(final List ranges, LOG.debug("Not merging the ranges as they are disjoint"); streamStatistics.readVectoredOperationStarted(sortedRanges.size(), sortedRanges.size()); for (FileRange range: sortedRanges) { - ByteBuffer buffer = allocate.apply(range.getLength()); - boundedThreadPool.submit(() -> readSingleRange(range, buffer)); + boundedThreadPool.submit(() -> readSingleRange(range, bufferPool)); } } else { LOG.debug("Trying to merge the ranges as they are not disjoint"); @@ -964,7 +966,7 @@ public void readVectored(final List ranges, ranges.size(), combinedFileRanges.size()); for (CombinedFileRange combinedFileRange: combinedFileRanges) { boundedThreadPool.submit( - () -> readCombinedRangeAndUpdateChildren(combinedFileRange, allocate, release)); + () -> readCombinedRangeAndUpdateChildren(combinedFileRange, bufferPool)); } } LOG.debug("Finished submitting vectored read to threadpool" + @@ -974,21 +976,25 @@ public void readVectored(final List ranges, /** * Read the data from S3 for the bigger combined file range and update all the * underlying ranges. + *

+ * If the read process fails, all underlying ranges which have not already completed + * are completed exceptionally. * @param combinedFileRange big combined file range. - * @param allocate method to create byte buffers to hold result data. - * @param release the function to release a ByteBuffer. + * @param bufferPool buffer pool + * @throws IOException any IOE */ private void readCombinedRangeAndUpdateChildren(CombinedFileRange combinedFileRange, - IntFunction allocate, - final Consumer release) { + ByteBufferPool bufferPool) + throws IOException { LOG.debug("Start reading {} from path {} ", combinedFileRange, pathStr); ResponseInputStream rangeContent = null; try { - rangeContent = getS3ObjectInputStream("readCombinedFileRange", + // this retries internally + rangeContent = getS3Object("readCombinedFileRange", combinedFileRange.getOffset(), combinedFileRange.getLength()); - populateChildBuffers(combinedFileRange, rangeContent, allocate, release); - } catch (Exception ex) { + populateChildBuffers(combinedFileRange, rangeContent, bufferPool); + } catch (IOException ex) { LOG.debug("Exception while reading {} from path {} ", combinedFileRange, pathStr, ex); // complete exception all the underlying ranges which have not already // finished. @@ -997,6 +1003,7 @@ private void readCombinedRangeAndUpdateChildren(CombinedFileRange combinedFileRa child.getData().completeExceptionally(ex); } } + throw ex; } finally { IOUtils.cleanupWithLogger(LOG, rangeContent); } @@ -1010,34 +1017,34 @@ private void readCombinedRangeAndUpdateChildren(CombinedFileRange combinedFileRa * to release the buffer. * @param combinedFileRange big combined file range. * @param objectContent data from s3. - * @param allocate method to allocate child byte buffers. - * @param release the function to release a ByteBuffer. + * @param bufferPool buffer pool * @throws IOException any IOE. * @throws EOFException if EOF if read() call returns -1 * @throws InterruptedIOException if vectored IO operation is stopped. */ private void populateChildBuffers(CombinedFileRange combinedFileRange, - InputStream objectContent, - IntFunction allocate, - final Consumer release) throws IOException { + InputStream objectContent, + ByteBufferPool bufferPool) throws IOException { // If the combined file range just contains a single child // range, we only have to fill that one child buffer else // we drain the intermediate data between consecutive ranges // and fill the buffers one by one. if (combinedFileRange.getUnderlying().size() == 1) { FileRange child = combinedFileRange.getUnderlying().get(0); - ByteBuffer buffer = allocate.apply(child.getLength()); + ByteBuffer buffer = bufferPool.getBuffer(false, child.getLength()); try { populateBuffer(child, buffer, objectContent); } catch (IOException e) { // release the buffer - release.accept(buffer); + bufferPool.putBuffer(buffer); throw e; } child.getData().complete(buffer); } else { FileRange prev = null; for (FileRange child : combinedFileRange.getUnderlying()) { + ByteBuffer buffer = null; + checkIfVectoredIOStopped(); if (prev != null) { final long position = prev.getOffset() + prev.getLength(); @@ -1046,15 +1053,16 @@ private void populateChildBuffers(CombinedFileRange combinedFileRange, // work out how much long drainQuantity = child.getOffset() - position; // and drain it. + // TODO: drain failure should switch to recovery mode drainUnnecessaryData(objectContent, position, drainQuantity); } } - ByteBuffer buffer = allocate.apply(child.getLength()); try { + buffer = bufferPool.getBuffer(false, child.getLength()); populateBuffer(child, buffer, objectContent); } catch (IOException e) { // release the buffer - release.accept(buffer); + bufferPool.putBuffer(buffer); throw e; } child.getData().complete(buffer); @@ -1110,13 +1118,14 @@ private void drainUnnecessaryData( } /** - * Read data from S3 for this range and populate the buffer. + * Read data from S3 for this range and populate a buffer. * @param range range of data to read. - * @param buffer buffer to fill. + * @param bufferPool buffer allocator. */ @Retries.RetryTranslated("GET is retried; reads are not") - private void readSingleRange(FileRange range, ByteBuffer buffer) { + private void readSingleRange(FileRange range, ByteBufferPool bufferPool) { LOG.debug("Start reading {} from {} ", range, pathStr); + ByteBuffer buffer = bufferPool.getBuffer(false, range.getLength()); if (range.getLength() == 0) { // a zero byte read. buffer.flip(); @@ -1127,37 +1136,17 @@ private void readSingleRange(FileRange range, ByteBuffer buffer) { try { long position = range.getOffset(); int length = range.getLength(); - objectRange = getS3ObjectInputStream("readSingleRange", position, length); + objectRange = getS3Object("readSingleRange", position, length); populateBuffer(range, buffer, objectRange); range.getData().complete(buffer); - } catch (Exception ex) { - LOG.warn("Exception while reading a range {} from path {} ", range, pathStr, ex); + LOG.debug("Finished reading range {} from path {} ", range, pathStr); + } catch (IOException ex) { + LOG.debug("Exception while reading a range {} from path {} ", range, pathStr, ex); + bufferPool.putBuffer(buffer); range.getData().completeExceptionally(ex); } finally { IOUtils.cleanupWithLogger(LOG, objectRange); } - LOG.debug("Finished reading range {} from path {} ", range, pathStr); - } - - /** - * Get the s3 object input stream for S3 server for a specified range. - * Also checks if the vectored io operation has been stopped before and after - * the http get request such that we don't waste time populating the buffers. - * @param operationName name of the operation for which get object on S3 is called. - * @param position position of the object to be read from S3. - * @param length length from position of the object to be read from S3. - * @return result s3 object. - * @throws IOException exception if any. - * @throws InterruptedIOException if vectored io operation is stopped. - */ - @Retries.RetryTranslated - private ResponseInputStream getS3ObjectInputStream( - final String operationName, final long position, final int length) throws IOException { - checkIfVectoredIOStopped(); - ResponseInputStream objectRange = - getS3Object(operationName, position, length); - checkIfVectoredIOStopped(); - return objectRange; } /** @@ -1220,6 +1209,7 @@ private void readByteArray(InputStream objectContent, length - readBytes); LOG.debug("read {} bytes from stream", readBytesCurr); if (readBytesCurr < 0) { + // TODO: abort the stream. throw new EOFException( String.format("HTTP stream closed before all bytes were read." + " Expected %,d bytes but only read %,d bytes. Current position %,d" @@ -1239,6 +1229,9 @@ private void readByteArray(InputStream objectContent, * This also handles if file has been changed while the * http call is getting executed. If the file has been * changed RemoteFileChangedException is thrown. + *

+ * Also checks if the vectored io operation has been stopped before and after + * the http get request such that we don't waste time populating the buffers. * @param operationName name of the operation for which get object on S3 is called. * @param position position of the object to be read from S3. * @param length length from position of the object to be read from S3. @@ -1274,6 +1267,7 @@ private ResponseInputStream getS3Object(String operationName, } changeTracker.processResponse(objectRange.response(), operationName, position); + checkIfVectoredIOStopped(); return objectRange; } From 29e20495659a61ab84db2aed1d0553151818f87a Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Thu, 10 Oct 2024 17:53:11 +0100 Subject: [PATCH 3/5] HADOOP-19105. Improve resilience in vector reads. Trying to implement resilience in the s3a input stream; buffers of incomplete reads are now released on failures. Change-Id: I851779383f8d120fe42bc97c5e6ae533c5fe2e23 --- .../apache/hadoop/fs/s3a/S3AInputStream.java | 91 ++++++++++++++----- 1 file changed, 67 insertions(+), 24 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java index 3c6b614301c0d..a099acf4d4ffe 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java @@ -33,6 +33,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; import java.util.function.IntFunction; +import java.util.stream.Collectors; import software.amazon.awssdk.core.ResponseInputStream; import software.amazon.awssdk.services.s3.model.GetObjectRequest; @@ -981,29 +982,56 @@ public void readVectored(final List ranges, * are completed exceptionally. * @param combinedFileRange big combined file range. * @param bufferPool buffer pool - * @throws IOException any IOE */ + @Retries.RetryExceptionsSwallowed private void readCombinedRangeAndUpdateChildren(CombinedFileRange combinedFileRange, - ByteBufferPool bufferPool) - throws IOException { + ByteBufferPool bufferPool) { LOG.debug("Start reading {} from path {} ", combinedFileRange, pathStr); ResponseInputStream rangeContent = null; try { - // this retries internally + // issue the GET request; this retries internally. rangeContent = getS3Object("readCombinedFileRange", - combinedFileRange.getOffset(), - combinedFileRange.getLength()); + combinedFileRange.getOffset(), + combinedFileRange.getLength(), + true); + } catch (IOException ex) { + // any exception here means that repeated HEAD requests have failed; + // consider the request unrecoverable. + LOG.debug("Failed to initiating GET request to {} ", pathStr, ex); + combinedFileRange.getUnderlying().stream() + .map(FileRange::getData) + .forEach(f -> f.completeExceptionally(ex)); + return; + } + // at this point there is a stream to read from, which + // MUST be closed in the finally block. + try { populateChildBuffers(combinedFileRange, rangeContent, bufferPool); } catch (IOException ex) { LOG.debug("Exception while reading {} from path {} ", combinedFileRange, pathStr, ex); + + // close the ongoing read. + IOUtils.cleanupWithLogger(LOG, rangeContent); + rangeContent = null; // complete exception all the underlying ranges which have not already // finished. - for(FileRange child : combinedFileRange.getUnderlying()) { - if (!child.getData().isDone()) { - child.getData().completeExceptionally(ex); - } + + // get all the incomplete reads. + final List incomplete = combinedFileRange.getUnderlying().stream() + .filter(f -> !f.getData().isDone()) + .collect(Collectors.toList()); + // previously these were completed exceptionally; now they are + // recovered from. + // while rebuilding a new combined range is possible, if there are problems + // we just fall back to each range being read individually, + // in sequence. Suboptimal, but simple -especially as this is + // already happening in a worker thread. + + for (FileRange child : incomplete) { + // failure reporting is already handed internally. + readSingleRange(child, bufferPool); } - throw ex; + } finally { IOUtils.cleanupWithLogger(LOG, rangeContent); } @@ -1022,6 +1050,7 @@ private void readCombinedRangeAndUpdateChildren(CombinedFileRange combinedFileRa * @throws EOFException if EOF if read() call returns -1 * @throws InterruptedIOException if vectored IO operation is stopped. */ + @Retries.OnceTranslated private void populateChildBuffers(CombinedFileRange combinedFileRange, InputStream objectContent, ByteBufferPool bufferPool) throws IOException { @@ -1053,7 +1082,6 @@ private void populateChildBuffers(CombinedFileRange combinedFileRange, // work out how much long drainQuantity = child.getOffset() - position; // and drain it. - // TODO: drain failure should switch to recovery mode drainUnnecessaryData(objectContent, position, drainQuantity); } } @@ -1125,24 +1153,30 @@ private void drainUnnecessaryData( @Retries.RetryTranslated("GET is retried; reads are not") private void readSingleRange(FileRange range, ByteBufferPool bufferPool) { LOG.debug("Start reading {} from {} ", range, pathStr); - ByteBuffer buffer = bufferPool.getBuffer(false, range.getLength()); if (range.getLength() == 0) { + ByteBuffer buffer = bufferPool.getBuffer(false, range.getLength()); // a zero byte read. buffer.flip(); range.getData().complete(buffer); return; } + ByteBuffer buffer = null; ResponseInputStream objectRange = null; try { long position = range.getOffset(); int length = range.getLength(); - objectRange = getS3Object("readSingleRange", position, length); + objectRange = getS3Object("readSingleRange", position, length, true); + buffer = bufferPool.getBuffer(false, range.getLength()); + + // TODO: error handling if the read fails. populateBuffer(range, buffer, objectRange); range.getData().complete(buffer); LOG.debug("Finished reading range {} from path {} ", range, pathStr); } catch (IOException ex) { LOG.debug("Exception while reading a range {} from path {} ", range, pathStr, ex); - bufferPool.putBuffer(buffer); + if (buffer != null) { + bufferPool.putBuffer(buffer); + } range.getData().completeExceptionally(ex); } finally { IOUtils.cleanupWithLogger(LOG, objectRange); @@ -1235,15 +1269,17 @@ private void readByteArray(InputStream objectContent, * @param operationName name of the operation for which get object on S3 is called. * @param position position of the object to be read from S3. * @param length length from position of the object to be read from S3. + * @param shouldRetry should GET requests be retried. * @return S3Object result s3 object. - * @throws IOException exception if any. + * @throws IOException exception if the S3 call fails. * @throws InterruptedIOException if vectored io operation is stopped. * @throws RemoteFileChangedException if file has changed on the store. */ - @Retries.RetryTranslated + @Retries.RetryTranslated("if shouldRetry is true; OnceTranslated otherwise") private ResponseInputStream getS3Object(String operationName, - long position, - int length) + long position, + int length, + boolean shouldRetry) throws IOException { final GetObjectRequest request = client.newGetRequestBuilder(key) .range(S3AUtils.formatRange(position, position + length - 1)) @@ -1253,11 +1289,18 @@ private ResponseInputStream getS3Object(String operationName, ResponseInputStream objectRange; Invoker invoker = context.getReadInvoker(); try { - objectRange = invoker.retry(operationName, pathStr, true, - () -> { - checkIfVectoredIOStopped(); - return client.getObject(request); - }); + // the operation to invoke + CallableRaisingIOE> operation = () -> { + checkIfVectoredIOStopped(); + return client.getObject(request); + }; + // should this be retried? + if (shouldRetry) { + objectRange = invoker.retry(operationName, pathStr, true, + operation); + } else { + objectRange = Invoker.once(operationName, pathStr, operation); + } } catch (IOException ex) { tracker.failed(); From fef176208ad4dbe6153a1944ddb545204c7a6537 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Tue, 29 Oct 2024 18:31:35 +0000 Subject: [PATCH 4/5] HADOOP-19105. s3a input stream codepath looks good. Now: how do I pull this out so it can be used with the prefetch stream? Change-Id: I98e350ab3b28d9adc7b458dee4ceee56a0562f17 --- ...ectorArgs.java => VectorIOBufferPool.java} | 16 +++++-- .../hadoop/fs/impl/TestVectoredReadUtils.java | 47 +++++++++++++++++++ .../apache/hadoop/fs/s3a/S3AInputStream.java | 36 +++++++------- 3 files changed, 75 insertions(+), 24 deletions(-) rename hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/{BufferManagerFromVectorArgs.java => VectorIOBufferPool.java} (83%) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/BufferManagerFromVectorArgs.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/VectorIOBufferPool.java similarity index 83% rename from hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/BufferManagerFromVectorArgs.java rename to hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/VectorIOBufferPool.java index 53d686056b7f0..22f1dce2b1f91 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/BufferManagerFromVectorArgs.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/VectorIOBufferPool.java @@ -24,13 +24,18 @@ import org.apache.hadoop.io.ByteBufferPool; +import static java.util.Objects.requireNonNull; + /** * A ByteBufferPool implementation that uses a pair of functions to allocate * and release ByteBuffers; intended for use implementing the VectorIO API * as it makes the pair of functions easier to pass around and use in * existing code. + *

+ * No matter what kind of buffer is requested, the allocation function + * is invoked; that is: the direct flag is ignored. */ -public final class BufferManagerFromVectorArgs implements ByteBufferPool { +public final class VectorIOBufferPool implements ByteBufferPool { /** The function to allocate a buffer. */ private final IntFunction allocate; @@ -42,10 +47,11 @@ public final class BufferManagerFromVectorArgs implements ByteBufferPool { * @param allocate the function to allocate ByteBuffer * @param release the function to release a ByteBuffer. */ - public BufferManagerFromVectorArgs(IntFunction allocate, - Consumer release) { - this.allocate = allocate; - this.release = release; + public VectorIOBufferPool( + IntFunction allocate, + Consumer release) { + this.allocate = requireNonNull(allocate); + this.release = requireNonNull(release); } /** diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/TestVectoredReadUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/TestVectoredReadUtils.java index b08fc95279a82..e232c53f66213 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/TestVectoredReadUtils.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/TestVectoredReadUtils.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; import java.util.function.IntFunction; import org.assertj.core.api.Assertions; @@ -40,6 +41,7 @@ import org.apache.hadoop.fs.FileRange; import org.apache.hadoop.fs.PositionedReadable; import org.apache.hadoop.fs.VectoredReadUtils; +import org.apache.hadoop.io.ElasticByteBufferPool; import org.apache.hadoop.test.HadoopTestBase; import static java.util.Arrays.asList; @@ -823,4 +825,49 @@ public void testReadOverEOFRejected() throws Exception { asList(createFileRange(length - 1, 2)), Optional.of(length))); } + + @Test + public void testVectorIOBufferPool() throws Throwable { + ElasticByteBufferPool elasticByteBufferPool = new ElasticByteBufferPool(); + + // inlined lambda to assert the pool size + Consumer assertPoolSize = (size) -> { + Assertions.assertThat(elasticByteBufferPool.size(false)) + .describedAs("Pool size") + .isEqualTo(size); + }; + + // build vector pool from the buffer pool operations converted to + // allocate and release lambda expressions + VectorIOBufferPool vectorBuffers = new VectorIOBufferPool( + r -> elasticByteBufferPool.getBuffer(false, r), + elasticByteBufferPool::putBuffer); + + assertPoolSize.accept(0); + + final ByteBuffer b1 = vectorBuffers.getBuffer(false, 100); + final ByteBuffer b2 = vectorBuffers.getBuffer(false, 50); + + // return the first buffer for a pool size of 1 + vectorBuffers.putBuffer(b1); + assertPoolSize.accept(1); + + // expect the returned buffer back + ByteBuffer b3 = vectorBuffers.getBuffer(true, 100); + Assertions.assertThat(b3) + .describedAs("buffer returned from a get after a previous one was returned") + .isSameAs(b1); + assertPoolSize.accept(0); + + // return them all + vectorBuffers.putBuffer(b2); + vectorBuffers.putBuffer(b3); + assertPoolSize.accept(2); + + // release does not propagate + vectorBuffers.release(); + assertPoolSize.accept(2); + + elasticByteBufferPool.release(); + } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java index a099acf4d4ffe..4d08aaa6fea24 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java @@ -41,7 +41,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hadoop.fs.impl.BufferManagerFromVectorArgs; +import org.apache.hadoop.fs.impl.VectorIOBufferPool; import org.apache.hadoop.io.ByteBufferPool; import org.apache.hadoop.util.Preconditions; import org.apache.hadoop.classification.InterfaceAudience; @@ -937,7 +937,7 @@ public void readVectored(final List ranges, requireNonNull(allocate , "Null allocator"); requireNonNull(release, "Null releaser"); - ByteBufferPool bufferPool = new BufferManagerFromVectorArgs(allocate, release); + ByteBufferPool bufferPool = new VectorIOBufferPool(allocate, release); // prepare to read List sortedRanges = validateAndSortRanges(ranges, fileLength); @@ -955,6 +955,8 @@ public void readVectored(final List ranges, LOG.debug("Not merging the ranges as they are disjoint"); streamStatistics.readVectoredOperationStarted(sortedRanges.size(), sortedRanges.size()); for (FileRange range: sortedRanges) { + // submit the read operation to the threadpool + // TODO: track wait time. boundedThreadPool.submit(() -> readSingleRange(range, bufferPool)); } } else { @@ -997,7 +999,7 @@ private void readCombinedRangeAndUpdateChildren(CombinedFileRange combinedFileRa } catch (IOException ex) { // any exception here means that repeated HEAD requests have failed; // consider the request unrecoverable. - LOG.debug("Failed to initiating GET request to {} ", pathStr, ex); + LOG.debug("Failed initiating GET request to {}; failing all ranges", pathStr, ex); combinedFileRange.getUnderlying().stream() .map(FileRange::getData) .forEach(f -> f.completeExceptionally(ex)); @@ -1010,25 +1012,18 @@ private void readCombinedRangeAndUpdateChildren(CombinedFileRange combinedFileRa } catch (IOException ex) { LOG.debug("Exception while reading {} from path {} ", combinedFileRange, pathStr, ex); - // close the ongoing read. + // close the ongoing read. No attempt is made to train the stream. IOUtils.cleanupWithLogger(LOG, rangeContent); rangeContent = null; - // complete exception all the underlying ranges which have not already - // finished. // get all the incomplete reads. - final List incomplete = combinedFileRange.getUnderlying().stream() + final List unreadRanges = combinedFileRange.getUnderlying().stream() .filter(f -> !f.getData().isDone()) .collect(Collectors.toList()); - // previously these were completed exceptionally; now they are - // recovered from. - // while rebuilding a new combined range is possible, if there are problems - // we just fall back to each range being read individually, - // in sequence. Suboptimal, but simple -especially as this is - // already happening in a worker thread. - - for (FileRange child : incomplete) { - // failure reporting is already handed internally. + + // Attempt to recover from the failure by reading each range individually. + // Suboptimal, but pragmatic. + for (FileRange child : unreadRanges) { readSingleRange(child, bufferPool); } @@ -1165,16 +1160,18 @@ private void readSingleRange(FileRange range, ByteBufferPool bufferPool) { try { long position = range.getOffset(); int length = range.getLength(); + // a GET request, which has risk of failing if the file is gone, changed etc. objectRange = getS3Object("readSingleRange", position, length, true); buffer = bufferPool.getBuffer(false, range.getLength()); - // TODO: error handling if the read fails. + // read in the data and declare this range successfully read. populateBuffer(range, buffer, objectRange); range.getData().complete(buffer); - LOG.debug("Finished reading range {} from path {} ", range, pathStr); + LOG.debug("Finished reading range {} from path {}", range, pathStr); } catch (IOException ex) { - LOG.debug("Exception while reading a range {} from path {} ", range, pathStr, ex); + LOG.debug("Exception while reading a range {} from path {}", range, pathStr, ex); if (buffer != null) { + // return any buffer to the pool bufferPool.putBuffer(buffer); } range.getData().completeExceptionally(ex); @@ -1201,6 +1198,7 @@ private void populateBuffer(FileRange range, int length = range.getLength(); if (buffer.isDirect()) { + // direct buffer so copy the data into the buffer from the stream. VectoredReadUtils.readInDirectBuffer(range, buffer, (position, tmp, offset, currentLength) -> { readByteArray(objectContent, range, tmp, offset, currentLength); From e11b0a9dafdbeb823d5d7f2a5bdd699dd1aadb2f Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Tue, 31 Dec 2024 12:40:11 +0000 Subject: [PATCH 5/5] HADOOP-19105. Improve resilience in vector reads. Rigorous aborting of streams where appropriate, and if one range read fails in the fallback sequential read, so do all the others. factor out start/stop. This is preparing to pull this into a superclass so that all s3a streams get it. Change-Id: I8961d6737764bb696aa45338bb0300b2f6efeec6 --- .../apache/hadoop/fs/VectoredReadUtils.java | 2 + .../apache/hadoop/fs/s3a/S3AInputStream.java | 164 ++++++++++++++---- .../hadoop/fs/s3a/impl/ErrorTranslation.java | 12 ++ .../hadoop/fs/s3a/impl/SDKStreamDrainer.java | 30 ++++ 4 files changed, 171 insertions(+), 37 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/VectoredReadUtils.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/VectoredReadUtils.java index edef56f94a8ab..26a92fcd60a11 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/VectoredReadUtils.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/VectoredReadUtils.java @@ -196,6 +196,8 @@ public static CompletableFuture readRangeFrom( * @param range file range * @param buffer destination buffer * @throws IOException IO problems. + * @throws EOFException the end of the data was reached before + * the read operation completed */ private static void readNonByteBufferPositionedReadable( PositionedReadable stream, diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java index f3d0604c55d46..a90f577fc38d7 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java @@ -77,6 +77,8 @@ import static org.apache.hadoop.fs.VectoredReadUtils.mergeSortedRanges; import static org.apache.hadoop.fs.VectoredReadUtils.validateAndSortRanges; import static org.apache.hadoop.fs.s3a.Invoker.onceTrackingDuration; +import static org.apache.hadoop.fs.s3a.impl.ErrorTranslation.shouldInputStreamBeAborted; +import static org.apache.hadoop.fs.s3a.impl.SDKStreamDrainer.abortSdkStream; import static org.apache.hadoop.util.StringUtils.toLowerCase; import static org.apache.hadoop.util.functional.FutureIO.awaitFuture; @@ -571,11 +573,8 @@ public synchronized int read() throws IOException { } try { b = wrappedStream.read(); - } catch (HttpChannelEOFException | SocketTimeoutException e) { - onReadFailure(e, true); - throw e; } catch (IOException e) { - onReadFailure(e, false); + onReadFailure(e, shouldInputStreamBeAborted(e)); throw e; } return b; @@ -718,7 +717,7 @@ public synchronized void close() throws IOException { if (!closed) { closed = true; try { - stopVectoredIOOperations.set(true); + stopVectorOperations(); // close or abort the stream; blocking closeStream("close() operation", false, true); // end the client+audit span. @@ -994,9 +993,7 @@ public void readVectored(final List ranges, final Consumer release) throws IOException { LOG.debug("Starting vectored read on path {} for ranges {} ", pathStr, ranges); checkNotClosed(); - if (stopVectoredIOOperations.getAndSet(false)) { - LOG.debug("Reinstating vectored read operation for path {} ", pathStr); - } + maybeStartVectorOperations(); // fail fast on parameters which would otherwise only be checked // in threads and/or in failures. requireNonNull(allocate , "Null allocator"); @@ -1021,8 +1018,7 @@ public void readVectored(final List ranges, streamStatistics.readVectoredOperationStarted(sortedRanges.size(), sortedRanges.size()); for (FileRange range: sortedRanges) { // submit the read operation to the threadpool - // TODO: track wait time. - boundedThreadPool.submit(() -> readSingleRange(range, bufferPool)); + boundedThreadPool.submit(() -> readSingleRangeWithRetries(range, bufferPool)); } } else { LOG.debug("Trying to merge the ranges as they are not disjoint"); @@ -1041,6 +1037,24 @@ public void readVectored(final List ranges, " on path {} for ranges {} ", pathStr, ranges); } + /** + * Start/restart vector operations if not active. + * In particular, after an unbuffer(), this performs any + * initialization required. + */ + private void maybeStartVectorOperations() { + if (stopVectoredIOOperations.getAndSet(false)) { + LOG.debug("Reinstating vectored read operation for path {} ", pathStr); + } + } + + /** + * Stop vector operations. + */ + private void stopVectorOperations() { + stopVectoredIOOperations.set(true); + } + /** * Read the data from S3 for the bigger combined file range and update all the * underlying ranges. @@ -1056,11 +1070,13 @@ private void readCombinedRangeAndUpdateChildren(CombinedFileRange combinedFileRa LOG.debug("Start reading {} from path {} ", combinedFileRange, pathStr); ResponseInputStream rangeContent = null; try { - // issue the GET request; this retries internally. + // issue the GET request; this retries GET but not reads internally. rangeContent = getS3Object("readCombinedFileRange", combinedFileRange.getOffset(), combinedFileRange.getLength(), true); + // GET has succeeded, make sure request is good to continue + checkIfVectoredIOStopped(); } catch (IOException ex) { // any exception here means that repeated HEAD requests have failed; // consider the request unrecoverable. @@ -1070,6 +1086,7 @@ private void readCombinedRangeAndUpdateChildren(CombinedFileRange combinedFileRa .forEach(f -> f.completeExceptionally(ex)); return; } + // at this point there is a stream to read from, which // MUST be closed in the finally block. try { @@ -1086,12 +1103,23 @@ private void readCombinedRangeAndUpdateChildren(CombinedFileRange combinedFileRa .filter(f -> !f.getData().isDone()) .collect(Collectors.toList()); - // Attempt to recover from the failure by reading each range individually. - // Suboptimal, but pragmatic. + LOG.debug("There are {} remaining vector ranges to retrieve", unreadRanges.size()); + + // Attempt to recover from the failure by reading each range individually + // within the current thread. + // If a single read is unrecoverable, all subsequent range reads are failed + // with the same exception. + // this is to process unrecoverable failures faster. + IOException lastIOE = null; for (FileRange child : unreadRanges) { - readSingleRange(child, bufferPool); + if (lastIOE == null) { + // all good so far: request the next range + lastIOE = readSingleRangeWithRetries(child, bufferPool); + } else { + // a predecessor failed, do not attempt to recover. + child.getData().completeExceptionally(lastIOE); + } } - } finally { IOUtils.cleanupWithLogger(LOG, rangeContent); } @@ -1112,7 +1140,7 @@ private void readCombinedRangeAndUpdateChildren(CombinedFileRange combinedFileRa */ @Retries.OnceTranslated private void populateChildBuffers(CombinedFileRange combinedFileRange, - InputStream objectContent, + ResponseInputStream objectContent, ByteBufferPool bufferPool) throws IOException { // If the combined file range just contains a single child // range, we only have to fill that one child buffer else @@ -1142,6 +1170,7 @@ private void populateChildBuffers(CombinedFileRange combinedFileRange, // work out how much long drainQuantity = child.getOffset() - position; // and drain it. + // this will raise EOFException if a -1 was returned. drainUnnecessaryData(objectContent, position, drainQuantity); } } @@ -1151,6 +1180,7 @@ private void populateChildBuffers(CombinedFileRange combinedFileRange, } catch (IOException e) { // release the buffer bufferPool.putBuffer(buffer); + // rethrow throw e; } child.getData().complete(buffer); @@ -1170,7 +1200,7 @@ private void populateChildBuffers(CombinedFileRange combinedFileRange, */ @Retries.OnceTranslated private void drainUnnecessaryData( - final InputStream objectContent, + final ResponseInputStream objectContent, final long position, long drainQuantity) throws IOException { @@ -1194,11 +1224,18 @@ private void drainUnnecessaryData( "End of stream reached draining data between ranges; expected %,d bytes;" + " only drained %,d bytes before -1 returned (position=%,d)", drainQuantity, drainBytes, position + drainBytes); + LOG.debug(s); throw new EOFException(s); } drainBytes += readCount; remaining -= readCount; } + } catch(IOException ex) { + if (shouldInputStreamBeAborted(ex)) { + // abort the stream if the exception indicates this is needed. + abortSdkStream(uri, objectContent, streamStatistics, "drain failure"); + } + throw ex; } finally { streamStatistics.readVectoredBytesDiscarded(drainBytes); LOG.debug("{} bytes drained from stream ", drainBytes); @@ -1207,11 +1244,49 @@ private void drainUnnecessaryData( /** * Read data from S3 for this range and populate a buffer. + * The GET request and single range reads are retried. + * Any IOException which is propagated by the retry logic is + * attached to the range as an exceptional failure. * @param range range of data to read. * @param bufferPool buffer allocator. + * @return any IOE which resulted in the read being unsuccessful; null on success. */ - @Retries.RetryTranslated("GET is retried; reads are not") - private void readSingleRange(FileRange range, ByteBufferPool bufferPool) { + @Retries.RetryTranslated + private IOException readSingleRangeWithRetries( + FileRange range, + ByteBufferPool bufferPool) { + try { + context.getReadInvoker().retry("vector read", uri, true, () -> + readSingleRange(range, bufferPool)); + return null; + } catch (IOException ex) { + // the retry mechanism has stopped retrying, so mark the request as a failure. + range.getData().completeExceptionally(ex); + return ex; + } + } + + /** + * Read data from S3 for this range and populate a buffer. + * If the full read was succesful, the range's future is declared + * complete. + *

+ * If an exception is raised, + *

    + *
  1. The buffer is returned to the pool.
  2. + *
  3. The HTTP connection will be aborted if deemed to have failed.
  4. + *
  5. The relevant statistics will be updated.
  6. + *
  7. The exception is rethrown.
  8. + *
+ * This is to allow the operation to be invoked in a retry() operation. + * @param range range of data to read. + * @param bufferPool buffer allocator. + * @throws IOException failure to GET or read the data. + */ + @Retries.OnceTranslated + private void readSingleRange(FileRange range, + ByteBufferPool bufferPool) throws IOException { + LOG.debug("Start reading {} from {} ", range, pathStr); if (range.getLength() == 0) { ByteBuffer buffer = bufferPool.getBuffer(false, range.getLength()); @@ -1220,13 +1295,21 @@ private void readSingleRange(FileRange range, ByteBufferPool bufferPool) { range.getData().complete(buffer); return; } + // buffer which will be fetched from the buffer pool and populated, + // on successful reads this will be returned in the response. + // on failures it must be returned to the pool. ByteBuffer buffer = null; + // the contents of the ranged object request. ResponseInputStream objectRange = null; try { long position = range.getOffset(); int length = range.getLength(); // a GET request, which has risk of failing if the file is gone, changed etc. - objectRange = getS3Object("readSingleRange", position, length, true); + objectRange = getS3Object("readSingleRange", position, length, false); + + // GET has succeeded, make sure request is good to continue + checkIfVectoredIOStopped(); + buffer = bufferPool.getBuffer(false, range.getLength()); // read in the data and declare this range successfully read. @@ -1234,12 +1317,18 @@ private void readSingleRange(FileRange range, ByteBufferPool bufferPool) { range.getData().complete(buffer); LOG.debug("Finished reading range {} from path {}", range, pathStr); } catch (IOException ex) { + // any failure. + // log, the error, return the buffer to the pool, and report a failure. LOG.debug("Exception while reading a range {} from path {}", range, pathStr, ex); if (buffer != null) { // return any buffer to the pool bufferPool.putBuffer(buffer); } - range.getData().completeExceptionally(ex); + if (shouldInputStreamBeAborted(ex)) { + // abort the stream if the exception indicates this is needed. + abortSdkStream(uri, objectRange, streamStatistics, "read failure"); + } + throw ex; } finally { IOUtils.cleanupWithLogger(LOG, objectRange); } @@ -1252,7 +1341,7 @@ private void readSingleRange(FileRange range, ByteBufferPool bufferPool) { * @param range vector range to populate. * @param buffer buffer to fill. * @param objectContent result retrieved from S3 store. - * @throws IOException any IOE. + * @throws IOException any IOE raised reading the input stream. * @throws EOFException if EOF if read() call returns -1 * @throws InterruptedIOException if vectored IO operation is stopped. */ @@ -1306,12 +1395,12 @@ private void readByteArray(InputStream objectContent, length - readBytes); LOG.debug("read {} bytes from stream", readBytesCurr); if (readBytesCurr < 0) { - // TODO: abort the stream. - throw new EOFException( - String.format("HTTP stream closed before all bytes were read." - + " Expected %,d bytes but only read %,d bytes. Current position %,d" - + " (%s)", - length, readBytes, position, range)); + final String message = String.format("HTTP stream closed before all bytes were read." + + " Expected %,d bytes but only read %,d bytes. Current position %,d" + + " (%s)", + length, readBytes, position, range); + LOG.warn(message); + throw new EOFException(message); } readBytes += readBytesCurr; position += readBytesCurr; @@ -1322,13 +1411,15 @@ private void readByteArray(InputStream objectContent, } /** - * Read data from S3 with retries for the GET request - * This also handles if file has been changed while the + * Read data from S3 with retries for the GET request, as part of a vector IO + * operation. + *

+ * This also handles the file being changed while the * http call is getting executed. If the file has been * changed RemoteFileChangedException is thrown. *

- * Also checks if the vectored io operation has been stopped before and after - * the http get request such that we don't waste time populating the buffers. + * It checks if the vectored io operation has been stopped before + * the http GET request such that we don't waste time populating the buffers. * @param operationName name of the operation for which get object on S3 is called. * @param position position of the object to be read from S3. * @param length length from position of the object to be read from S3. @@ -1373,20 +1464,19 @@ private ResponseInputStream getS3Object(String operationName, } changeTracker.processResponse(objectRange.response(), operationName, position); - checkIfVectoredIOStopped(); return objectRange; } /** - * Check if vectored io operation has been stooped. This happens - * when the stream is closed or unbuffer is called. + * Check if vectored io operation has been stopped. This happens + * when the stream is closed or unbuffer() was called during the read. * @throws InterruptedIOException throw InterruptedIOException such * that all running vectored io is * terminated thus releasing resources. */ private void checkIfVectoredIOStopped() throws InterruptedIOException { if (stopVectoredIOOperations.get()) { - throw new InterruptedIOException("Stream closed or unbuffer is called"); + throw new InterruptedIOException("Stream closed or unbuffer() was called during the read"); } } @@ -1484,7 +1574,7 @@ public static long validateReadahead(@Nullable Long readahead) { @Override public synchronized void unbuffer() { try { - stopVectoredIOOperations.set(true); + stopVectorOperations(); closeStream("unbuffer()", false, false); } finally { streamStatistics.unbuffered(); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ErrorTranslation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ErrorTranslation.java index cd24b61340c3f..ccf81fa4d39e5 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ErrorTranslation.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ErrorTranslation.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.lang.reflect.Constructor; +import java.net.SocketTimeoutException; import software.amazon.awssdk.awscore.exception.AwsServiceException; import software.amazon.awssdk.core.exception.SdkException; @@ -270,6 +271,17 @@ public static HttpChannelEOFException maybeExtractChannelException( return null; } + /** + * Is the exception to be considered as an unrecoverable channel failure + * -and that the stream should be aborted if so. + * @param t caught exception. + * @return true if the stream must not be returned to the pool. + */ + public static boolean shouldInputStreamBeAborted(Throwable t) { + return (t instanceof HttpChannelEOFException) + || (t instanceof SocketTimeoutException); + } + /** * AWS error codes explicitly recognized and processes specially; * kept in their own class for isolation. diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/SDKStreamDrainer.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/SDKStreamDrainer.java index a8aa532ac024d..48ea99ce30f61 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/SDKStreamDrainer.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/SDKStreamDrainer.java @@ -21,6 +21,8 @@ import java.io.InputStream; import java.util.concurrent.atomic.AtomicBoolean; +import javax.annotation.Nullable; + import software.amazon.awssdk.http.Abortable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,6 +40,7 @@ * Drains/aborts s3 or other AWS SDK streams. * It is callable so can be passed directly to a submitter * for async invocation. + * @param type of stream to drain/abort. */ public class SDKStreamDrainer implements CallableRaisingIOE { @@ -300,4 +303,31 @@ public String toString() { ", thrown=" + thrown + '}'; } + + /** + * Abort a stream, always. + * @param type of stream to drain/abort. + * @param uri URI for messages + * @param sdkStream stream to close. Can be null. + * @param streamStatistics stats to update + * @param reason reason for stream being closed; used in messages + * @return true if the abort was successful. + */ + public static boolean abortSdkStream( + final String uri, + @Nullable final TStream sdkStream, + final S3AInputStreamStatistics streamStatistics, + final String reason) { + if (sdkStream == null) { + return false; + } + return new SDKStreamDrainer<>( + uri, + sdkStream, + true, + 0, + streamStatistics, + reason) + .apply(); + } }