diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java index 80f803d80dab0..6c3c9eed0ecaa 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java @@ -121,6 +121,14 @@ public class AbfsConfiguration{ DefaultValue = DEFAULT_FS_AZURE_ACCOUNT_LEVEL_THROTTLING_ENABLED) private boolean accountThrottlingEnabled; + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_VECTOR_READS_MIN_SEEK_SIZE, + DefaultValue = DEFAULT_FS_AZURE_VECTOR_READS_MIN_SEEK_SIZE) + private int vectoredReadMinSeekSize; + + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_VECTOR_READS_MAX_SEEK_SIZE, + DefaultValue = DEFAULT_FS_AZURE_VECTOR_READS_MAX_MERGED_READ_SIZE) + private int vectoredReadMaxReadSize; + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_READ_BUFFER_SIZE, MinValue = MIN_BUFFER_SIZE, MaxValue = MAX_BUFFER_SIZE, @@ -710,6 +718,14 @@ public boolean accountThrottlingEnabled() { return accountThrottlingEnabled; } + public int getVectoredReadMinSeekSize() { + return vectoredReadMinSeekSize; + } + + public int getVectoredReadMaxReadSize() { + return vectoredReadMaxReadSize; + } + public String getAzureInfiniteLeaseDirs() { return this.azureInfiniteLeaseDirs; } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index e5e7056126564..e35112e600ad8 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -52,6 +52,8 @@ import java.util.WeakHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.hadoop.classification.VisibleForTesting; @@ -132,6 +134,7 @@ import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.apache.http.client.utils.URIBuilder; +import static java.util.Objects.requireNonNull; import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.METADATA_INCOMPLETE_RENAME_FAILURES; import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.RENAME_RECOVERY; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_EQUALS; @@ -164,6 +167,9 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport { private static final String TOKEN_DATE_PATTERN = "yyyy-MM-dd'T'HH:mm:ss.SSSSSSS'Z'"; private static final String XMS_PROPERTIES_ENCODING = "ISO-8859-1"; private static final int GET_SET_AGGREGATE_COUNT = 2; + private ThreadPoolExecutor unboundedThreadPool; + /** Vectored IO context. */ + private VectoredIOContext vectoredIOContext; private final Map leaseRefs; @@ -266,6 +272,29 @@ public AzureBlobFileSystemStore( abfsConfiguration.getMaxWriteRequestsToQueue(), 10L, TimeUnit.SECONDS, "abfs-bounded"); + this.unboundedThreadPool = new ThreadPoolExecutor( + abfsConfiguration.getWriteMaxConcurrentRequestCount(), Integer.MAX_VALUE, + 10L, TimeUnit.SECONDS, + new LinkedBlockingQueue<>(), + BlockingThreadPoolExecutorService.newDaemonThreadFactory( + "abfs-unbounded")); + unboundedThreadPool.allowCoreThreadTimeOut(true); + this.vectoredIOContext = populateVectoredIOContext(abfsConfiguration); + } + + /** + * Populates the configurations related to vectored IO operation + * in the context which has to passed down to input streams. + * @param configuration configuration object. + * @return VectoredIOContext. + */ + private VectoredIOContext populateVectoredIOContext(AbfsConfiguration configuration) { + final int minSeekVectored = configuration.getVectoredReadMinSeekSize(); + final int maxReadSizeVectored = configuration.getVectoredReadMaxReadSize(); + return new VectoredIOContext() + .setMinSeekForVectoredReads(minSeekVectored) + .setMaxReadSizeForVectoredReads(maxReadSizeVectored) + .build(); } /** @@ -795,7 +824,7 @@ public AbfsInputStream openFileForRead(Path path, return new AbfsInputStream(client, statistics, relativePath, contentLength, populateAbfsInputStreamContext( parameters.map(OpenFileParameters::getOptions)), - eTag, tracingContext); + eTag, tracingContext, unboundedThreadPool); } } @@ -817,6 +846,7 @@ private AbfsInputStreamContext populateAbfsInputStreamContext( abfsConfiguration.shouldReadBufferSizeAlways()) .withReadAheadBlockSize(abfsConfiguration.getReadAheadBlockSize()) .withBufferedPreadDisabled(bufferedPreadDisabled) + .withVectoredIOContext(requireNonNull(vectoredIOContext, "vectoredIOContext")) .build(); } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/VectoredIOContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/VectoredIOContext.java new file mode 100644 index 0000000000000..9f15e88ffa3dd --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/VectoredIOContext.java @@ -0,0 +1,59 @@ +package org.apache.hadoop.fs.azurebfs; + +import java.util.List; +import java.util.function.IntFunction; + +/** + * Context related to vectored IO operation. + */ +public class VectoredIOContext { + + /** + * What is the smallest reasonable seek that we should group + * ranges together during vectored read operation. + */ + private int minSeekForVectorReads; + + /** + * What is the largest size that we should group ranges + * together during vectored read operation. + * Setting this value 0 will disable merging of ranges. + */ + private int maxReadSizeForVectorReads; + + /** + * Default no arg constructor. + */ + public VectoredIOContext() { + } + + public VectoredIOContext setMinSeekForVectoredReads(int minSeek) { + this.minSeekForVectorReads = minSeek; + return this; + } + + public VectoredIOContext setMaxReadSizeForVectoredReads(int maxSize) { + this.maxReadSizeForVectorReads = maxSize; + return this; + } + + public VectoredIOContext build() { + return this; + } + + public int getMinSeekForVectorReads() { + return minSeekForVectorReads; + } + + public int getMaxReadSizeForVectorReads() { + return maxReadSizeForVectorReads; + } + + @Override + public String toString() { + return "VectoredIOContext{" + + "minSeekForVectorReads=" + minSeekForVectorReads + + ", maxReadSizeForVectorReads=" + maxReadSizeForVectorReads + + '}'; + } +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java index a59f76b6d0fe0..734919583b3e5 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java @@ -39,6 +39,9 @@ public final class ConfigurationKeys { public static final String FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME_REGX = "fs\\.azure\\.account\\.key\\.(.*)"; public static final String FS_AZURE_SECURE_MODE = "fs.azure.secure.mode"; public static final String FS_AZURE_ACCOUNT_LEVEL_THROTTLING_ENABLED = "fs.azure.account.throttling.enabled"; + public static final String FS_AZURE_VECTOR_READS_MIN_SEEK_SIZE = "fs.azure.vector.reads.min.seek.size"; + public static final String FS_AZURE_VECTOR_READS_MAX_SEEK_SIZE = "fs.azure.vector.reads.max.seek.size"; + // Retry strategy defined by the user public static final String AZURE_MIN_BACKOFF_INTERVAL = "fs.azure.io.retry.min.backoff.interval"; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java index 9994d9f5207f3..d9c70476d57e9 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java @@ -95,6 +95,14 @@ public final class FileSystemConfigurations { public static final boolean DEFAULT_DISABLE_OUTPUTSTREAM_FLUSH = true; public static final boolean DEFAULT_ENABLE_AUTOTHROTTLING = true; public static final boolean DEFAULT_FS_AZURE_ACCOUNT_LEVEL_THROTTLING_ENABLED = true; + /** + * Default minimum seek in bytes during vectored reads : {@value}. + */ + public static final int DEFAULT_FS_AZURE_VECTOR_READS_MIN_SEEK_SIZE = 4896; // 4K + /** + * Default maximum read size in bytes during vectored reads : {@value}. + */ + public static final int DEFAULT_FS_AZURE_VECTOR_READS_MAX_MERGED_READ_SIZE = 1253376; //1M public static final int DEFAULT_ACCOUNT_OPERATION_IDLE_TIMEOUT_MS = 60_000; public static final int DEFAULT_ANALYSIS_PERIOD_MS = 10_000; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java index fdeaf70177571..c69a9c72cd483 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java @@ -21,11 +21,22 @@ import java.io.EOFException; import java.io.FileNotFoundException; import java.io.IOException; +import java.io.InterruptedIOException; import java.net.HttpURLConnection; +import java.nio.ByteBuffer; +import java.util.List; import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.IntFunction; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.fs.FileRange; +import org.apache.hadoop.fs.VectoredReadUtils; +import org.apache.hadoop.fs.azurebfs.VectoredIOContext; +import org.apache.hadoop.fs.impl.CombinedFileRange; import org.apache.hadoop.util.Preconditions; import org.slf4j.Logger; @@ -48,6 +59,10 @@ import static java.lang.Math.max; import static java.lang.Math.min; +import static org.apache.hadoop.fs.VectoredReadUtils.isOrderedDisjoint; +import static org.apache.hadoop.fs.VectoredReadUtils.mergeSortedRanges; +import static org.apache.hadoop.fs.VectoredReadUtils.validateNonOverlappingAndReturnSortedRanges; +import static org.apache.hadoop.fs.VectoredReadUtils.validateRangeRequest; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_KB; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.STREAM_ID_LEN; import static org.apache.hadoop.fs.azurebfs.constants.InternalConstants.CAPABILITY_SAFE_READAHEAD; @@ -121,6 +136,21 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer, */ private long nextReadPos; + /** + * Thread pool used for vectored IO operation. + */ + private final ThreadPoolExecutor unboundedThreadPool; + + /** Vectored IO context. */ + private final VectoredIOContext vectoredIOContext; + + /** + * Atomic boolean variable to stop all ongoing vectored read operation + * for this input stream. This will be set to true when the stream is + * closed or unbuffer is called. + */ + private final AtomicBoolean stopVectoredIOOperations = new AtomicBoolean(false); + public AbfsInputStream( final AbfsClient client, final Statistics statistics, @@ -128,7 +158,8 @@ public AbfsInputStream( final long contentLength, final AbfsInputStreamContext abfsInputStreamContext, final String eTag, - TracingContext tracingContext) { + TracingContext tracingContext, + ThreadPoolExecutor unboundedThreadPool) { this.client = client; this.statistics = statistics; this.path = path; @@ -151,6 +182,8 @@ public AbfsInputStream( this.tracingContext.setOperation(FSOperationType.READ); this.tracingContext.setStreamID(inputStreamId); this.context = abfsInputStreamContext; + this.unboundedThreadPool = unboundedThreadPool; + this.vectoredIOContext = context.getVectoredIOContext(); readAheadBlockSize = abfsInputStreamContext.getReadAheadBlockSize(); // Propagate the config values to ReadBufferManager so that the first instance @@ -169,6 +202,22 @@ private String createInputStreamId() { return StringUtils.right(UUID.randomUUID().toString(), STREAM_ID_LEN); } + /** + * {@inheritDoc}. + */ + @Override + public int minSeekForVectorReads() { + return vectoredIOContext.getMinSeekForVectorReads(); + } + + /** + * {@inheritDoc}. + */ + @Override + public int maxReadSizeForVectorReads() { + return vectoredIOContext.getMaxReadSizeForVectorReads(); + } + @Override public int read(long position, byte[] buffer, int offset, int length) throws IOException { @@ -275,6 +324,152 @@ public synchronized int read(final byte[] b, final int off, final int len) throw return totalReadBytes > 0 ? totalReadBytes : lastReadBytes; } + /** + * {@inheritDoc} + * Vectored read implementation for AbfsInputStream. + * @param ranges the byte ranges to read. + * @param allocate the function to allocate ByteBuffer. + * @throws IOException IOE if any. + */ + @Override + public void readVectored(List ranges, + IntFunction allocate) throws IOException { + LOG.debug("Starting vectored read on path {} for ranges {} ", path, ranges); + if (closed) { + throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); + } + if (stopVectoredIOOperations.getAndSet(false)) { + LOG.debug("Reinstating vectored read operation for path {} ", path); + } + List sortedRanges = validateNonOverlappingAndReturnSortedRanges(ranges); + for (FileRange range : ranges) { + validateRangeRequest(range); + CompletableFuture result = new CompletableFuture<>(); + range.setData(result); + } + + if (isOrderedDisjoint(sortedRanges, 1, minSeekForVectorReads())) { + LOG.debug("Not merging the ranges as they are disjoint"); + for (FileRange range: sortedRanges) { + ByteBuffer buffer = allocate.apply(range.getLength()); + unboundedThreadPool.submit(() -> readSingleRange(range, buffer)); + } + } else { + LOG.debug("Trying to merge the ranges as they are not disjoint"); + List combinedFileRanges = mergeSortedRanges(sortedRanges, + 1, minSeekForVectorReads(), + maxReadSizeForVectorReads()); + LOG.debug("Number of original ranges size {} , Number of combined ranges {} ", + ranges.size(), combinedFileRanges.size()); + for (CombinedFileRange combinedFileRange: combinedFileRanges) { + unboundedThreadPool.submit( + () -> readCombinedRangeAndUpdateChildren(combinedFileRange, allocate)); + } + } + LOG.debug("Finished submitting vectored read to threadpool" + + " on path {} for ranges {} ", path, ranges); + } + + /** + * Read the data for the bigger combined file range and update all the + * underlying ranges. + * @param combinedFileRange big combined file range. + * @param allocate method to create byte buffers to hold result data. + */ + private void readCombinedRangeAndUpdateChildren(CombinedFileRange combinedFileRange, + IntFunction allocate) { + LOG.debug("Start reading combined range {} from path {} ", combinedFileRange, path); + try { + checkIfVectoredIOStopped(); + populateChildBuffers(combinedFileRange, allocate); + } catch (Exception ex) { + LOG.debug("Exception while reading a range {} from path {} ", combinedFileRange, path, ex); + for(FileRange child : combinedFileRange.getUnderlying()) { + child.getData().completeExceptionally(ex); + } + } + LOG.debug("Finished reading range {} from path {} ", combinedFileRange, path); + } + + /** + * Populate underlying buffers of the child ranges. + * @param combinedFileRange big combined file range. + * @param allocate method to allocate child byte buffers. + * @throws IOException any IOE. + */ + private void populateChildBuffers(CombinedFileRange combinedFileRange, + IntFunction allocate) throws IOException { + // If the combined file range just contains a single child + // range, we only have to fill that one child buffer else + // we drain the intermediate data between consecutive ranges + // and fill the buffers one by one. + int combinedLengthToRead = (int) (combinedFileRange.getLength() + combinedFileRange.getOffset()); + ByteBuffer buffer = allocate.apply(combinedLengthToRead); + readInternal(0, buffer.array(), 0, combinedLengthToRead, true); + if (combinedFileRange.getUnderlying().size() == 1) { + FileRange child = combinedFileRange.getUnderlying().get(0); + ByteBuffer buffer_child = allocate.apply(child.getLength()); + System.arraycopy(buffer.array(), (int) child.getOffset(), buffer_child.array(), 0, child.getLength()); + child.getData().complete(buffer_child); + } else { + for (FileRange child : combinedFileRange.getUnderlying()) { + ByteBuffer buffer_child = allocate.apply(child.getLength()); + System.arraycopy(buffer.array(), (int) child.getOffset(), buffer_child.array(), 0, child.getLength()); + child.getData().complete(buffer_child); + } + } + } + + /** + * Read data for this range and populate the buffer. + * @param range range of data to read. + * @param buffer buffer to fill. + */ + private void readSingleRange(FileRange range, ByteBuffer buffer) { + LOG.debug("Start reading range {} from path {} ", range, path); + int offset = (int) range.getOffset(); + int length = range.getLength(); + try { + checkIfVectoredIOStopped(); + readInternal(offset, buffer.array(), 0, length, true); + range.getData().complete(buffer); + }catch (Exception ex) { + LOG.warn("Exception while reading a range {} from path {} ", range, path, ex); + range.getData().completeExceptionally(ex); + } + LOG.debug("Finished reading range {} from path {} ", range, path); + } + + /** + * Validates range parameters. + * In case of S3 we already have contentLength from the first GET request + * during an open file operation so failing fast here. + * @param range requested range. + * @throws EOFException end of file exception. + */ + private void validateRangeRequest(FileRange range) throws EOFException { + VectoredReadUtils.validateRangeRequest(range); + if(range.getOffset() + range.getLength() > contentLength) { + LOG.warn("Requested range [{}, {}) is beyond EOF for path {}", + range.getOffset(), range.getLength(), path); + throw new EOFException("Requested range [" + range.getOffset() +", " + + range.getLength() + ") is beyond EOF for path " + path); + } + } + + /** + * Check if vectored io operation has been stooped. This happens + * when the stream is closed or unbuffer is called. + * @throws InterruptedIOException throw InterruptedIOException such + * that all running vectored io is + * terminated thus releasing resources. + */ + private void checkIfVectoredIOStopped() throws InterruptedIOException { + if (stopVectoredIOOperations.get()) { + throw new InterruptedIOException("Stream closed or unbuffer is called"); + } + } + private boolean shouldReadFully() { return this.firstRead && this.context.readSmallFilesCompletely() && this.contentLength <= this.bufferSize; @@ -694,6 +889,7 @@ public boolean seekToNewSource(long l) throws IOException { @Override public synchronized void close() throws IOException { + stopVectoredIOOperations.set(true); LOG.debug("Closing {}", this); closed = true; buffer = null; // de-reference the buffer so it can be GC'ed sooner diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java index e258958b1a111..6c77e030751d3 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java @@ -20,6 +20,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; + +import org.apache.hadoop.fs.azurebfs.VectoredIOContext; import org.apache.hadoop.util.Preconditions; /** @@ -51,10 +53,21 @@ public class AbfsInputStreamContext extends AbfsStreamContext { private boolean bufferedPreadDisabled; + /** + * Vectored IO context for vectored read api + * in {@code S3AInputStream#readVectored(List, IntFunction)}. + */ + private VectoredIOContext vectoredIOContext; + public AbfsInputStreamContext(final long sasTokenRenewPeriodForStreamsInSeconds) { super(sasTokenRenewPeriodForStreamsInSeconds); } + public AbfsInputStreamContext withVectoredIOContext(final VectoredIOContext vectoredIOContext) { + this.vectoredIOContext = vectoredIOContext; + return this; + } + public AbfsInputStreamContext withReadBufferSize(final int readBufferSize) { this.readBufferSize = readBufferSize; return this; @@ -180,4 +193,8 @@ public int getReadAheadBlockSize() { public boolean isBufferedPreadDisabled() { return bufferedPreadDisabled; } + + public VectoredIOContext getVectoredIOContext() { + return vectoredIOContext; + } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java index d96f1a283609f..1c68192e2e6df 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java @@ -19,9 +19,13 @@ package org.apache.hadoop.fs.azurebfs; import java.io.IOException; +import java.util.concurrent.ThreadPoolExecutor; import org.assertj.core.api.Assertions; +import org.eclipse.jetty.util.thread.ThreadPool; import org.junit.Test; +import org.mockito.Mock; +import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -286,7 +290,7 @@ public void testWithNullStreamStatistics() throws IOException { in = new AbfsInputStream(fs.getAbfsClient(), null, nullStatFilePath.toUri().getPath(), ONE_KB, abfsInputStreamContext, abfsRestOperation.getResult().getResponseHeader("ETag"), - getTestTracingContext(fs, false)); + getTestTracingContext(fs, false), Mockito.mock(ThreadPoolExecutor.class)); // Verifying that AbfsInputStream Operations works with null statistics. assertNotEquals("AbfsInputStream read() with null statistics should " diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStream.java index 66f072501dc4d..3c1da465fc0f7 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStream.java @@ -20,11 +20,17 @@ import java.io.IOException; import java.lang.reflect.Field; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.Random; +import java.util.concurrent.CompletableFuture; +import java.util.function.IntFunction; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileRange; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.azurebfs.AbfsConfiguration; @@ -35,6 +41,7 @@ import org.junit.Test; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB; +import static org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; @@ -60,6 +67,154 @@ public void testWithNoOptimization() throws Exception { } } + /** + * These ranges should not be merged as difference in offsets is greater than 4 KB. + * @throws Throwable + */ + @Test + public void testDisjointRangesWithVectoredRead() throws Throwable { + int fileSize = ONE_MB; + final AzureBlobFileSystem fs = getFileSystem(false, false, fileSize); + String fileName = methodName.getMethodName() + 1; + byte[] fileContent = getRandomBytesArray(fileSize); + Path testFilePath = createFileWithContent(fs, fileName, fileContent); + + List rangeList = new ArrayList<>(); + rangeList.add(FileRange.createFileRange(100, 10000)); + rangeList.add(FileRange.createFileRange(15000, 27000)); + IntFunction allocate = ByteBuffer::allocate; + CompletableFuture builder = fs.openFile(testFilePath).build(); + + try (FSDataInputStream in = builder.get()) { + in.readVectored(rangeList, allocate); + byte[] readFullRes = new byte[(int)fileSize]; + in.readFully(0, readFullRes); + // Comparing vectored read results with read fully. + validateVectoredReadResult(rangeList, readFullRes); + } + } + + /** + * The last two ranges should get merged, so it covers the case of combinedFileRanges + * where there is a single underlying child and multiple as well. + * @throws Throwable + */ + @Test + public void testMultipleDisjointRangesWithVectoredRead() throws Throwable { + int fileSize = ONE_MB; + final AzureBlobFileSystem fs = getFileSystem(false, false, fileSize); + String fileName = methodName.getMethodName() + 1; + byte[] fileContent = getRandomBytesArray(fileSize); + Path testFilePath = createFileWithContent(fs, fileName, fileContent); + + List rangeList = new ArrayList<>(); + rangeList.add(FileRange.createFileRange(100, 10000)); + rangeList.add(FileRange.createFileRange(15000, 27000)); + rangeList.add(FileRange.createFileRange(42500, 40000)); + IntFunction allocate = ByteBuffer::allocate; + CompletableFuture builder = fs.openFile(testFilePath).build(); + + try (FSDataInputStream in = builder.get()) { + in.readVectored(rangeList, allocate); + byte[] readFullRes = new byte[(int)fileSize]; + in.readFully(0, readFullRes); + // Comparing vectored read results with read fully. + validateVectoredReadResult(rangeList, readFullRes); + } + } + + /** + * Case which covers multiple disjoint ranges. + * @throws Throwable + */ + @Test + public void testMultipleRangesWithVectoredRead() throws Throwable { + int fileSize = ONE_MB; + final AzureBlobFileSystem fs = getFileSystem(false, false, fileSize); + String fileName = methodName.getMethodName() + 1; + byte[] fileContent = getRandomBytesArray(fileSize); + Path testFilePath = createFileWithContent(fs, fileName, fileContent); + + List rangeList = new ArrayList<>(); + rangeList.add(FileRange.createFileRange(100, 10000)); + rangeList.add(FileRange.createFileRange(15000, 27000)); + rangeList.add(FileRange.createFileRange(47500, 27000)); + + IntFunction allocate = ByteBuffer::allocate; + CompletableFuture builder = fs.openFile(testFilePath).build(); + + try (FSDataInputStream in = builder.get()) { + in.readVectored(rangeList, allocate); + byte[] readFullRes = new byte[(int)fileSize]; + in.readFully(0, readFullRes); + // Comparing vectored read results with read fully. + validateVectoredReadResult(rangeList, readFullRes); + } + } + + /** + * Case to verify two ranges with offset difference less than 4 KB will get merged. + * @throws Throwable + */ + @Test + public void testMergedRangesWithVectoredRead() throws Throwable { + int fileSize = ONE_MB; + final AzureBlobFileSystem fs = getFileSystem(false, false, fileSize); + String fileName = methodName.getMethodName() + 1; + byte[] fileContent = getRandomBytesArray(fileSize); + Path testFilePath = createFileWithContent(fs, fileName, fileContent); + + List rangeList = new ArrayList<>(); + rangeList.add(FileRange.createFileRange(100, 10000)); + rangeList.add(FileRange.createFileRange(12000, 27000)); + IntFunction allocate = ByteBuffer::allocate; + CompletableFuture builder = fs.openFile(testFilePath).build(); + + try (FSDataInputStream in = builder.get()) { + in.readVectored(rangeList, allocate); + byte[] readFullRes = new byte[(int)fileSize]; + in.readFully(0, readFullRes); + // Comparing vectored read results with read fully. + validateVectoredReadResult(rangeList, readFullRes); + } + } + + /** + * Verify read ranges for a huge file. + * @throws Throwable + */ + @Test + public void test_045_vectoredIOHugeFile() throws Throwable { + int fileSize = 100 * ONE_MB; + final AzureBlobFileSystem fs = getFileSystem(false, false, fileSize); + String fileName = methodName.getMethodName() + 1; + byte[] fileContent = getRandomBytesArray(fileSize); + Path testFilePath = createFileWithContent(fs, fileName, fileContent); + + List rangeList = new ArrayList<>(); + rangeList.add(FileRange.createFileRange(5856368, 116770)); + rangeList.add(FileRange.createFileRange(3520861, 116770)); + rangeList.add(FileRange.createFileRange(8191913, 116770)); + rangeList.add(FileRange.createFileRange(1520861, 116770)); + rangeList.add(FileRange.createFileRange(2520861, 116770)); + rangeList.add(FileRange.createFileRange(9191913, 116770)); + rangeList.add(FileRange.createFileRange(2820861, 156770)); + IntFunction allocate = ByteBuffer::allocate; + + CompletableFuture builder = + fs.openFile(testFilePath).build(); + try (FSDataInputStream in = builder.get()) { + long timeMilli1 = System.currentTimeMillis(); + in.readVectored(rangeList, allocate); + byte[] readFullRes = new byte[(int)fileSize]; + in.readFully(0, readFullRes); + // Comparing vectored read results with read fully. + validateVectoredReadResult(rangeList, readFullRes); + long timeMilli2 = System.currentTimeMillis(); + System.out.println("Time taken for the code to execute: " + (timeMilli2 - timeMilli1) + " milliseconds"); + } + } + protected void testWithNoOptimization(final FileSystem fs, final Path testFilePath, final int seekPos, final byte[] fileContent) throws IOException { diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java index 0395c4183b9b7..24ec75e51f030 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java @@ -23,6 +23,7 @@ import java.util.Optional; import java.util.Random; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ThreadPoolExecutor; import org.assertj.core.api.Assertions; import org.junit.Assert; @@ -109,6 +110,11 @@ private AbfsClient getMockAbfsClient() { return client; } + private ThreadPoolExecutor getThreadPoolExecutor() { + ThreadPoolExecutor threadPoolExecutor = Mockito.mock(ThreadPoolExecutor.class); + return threadPoolExecutor; + } + private AbfsInputStream getAbfsInputStream(AbfsClient mockAbfsClient, String fileName) throws IOException { AbfsInputStreamContext inputStreamContext = new AbfsInputStreamContext(-1); @@ -120,7 +126,7 @@ private AbfsInputStream getAbfsInputStream(AbfsClient mockAbfsClient, THREE_KB, inputStreamContext.withReadBufferSize(ONE_KB).withReadAheadQueueDepth(10).withReadAheadBlockSize(ONE_KB), "eTag", - getTestTracingContext(null, false)); + getTestTracingContext(null, false), getThreadPoolExecutor()); inputStream.setCachedSasToken( TestCachedSASToken.getTestCachedSASTokenInstance()); @@ -148,7 +154,7 @@ public AbfsInputStream getAbfsInputStream(AbfsClient abfsClient, .withShouldReadBufferSizeAlways(alwaysReadBufferSize) .withReadAheadBlockSize(readAheadBlockSize), eTag, - getTestTracingContext(getFileSystem(), false)); + getTestTracingContext(getFileSystem(), false), getThreadPoolExecutor()); inputStream.setCachedSasToken( TestCachedSASToken.getTestCachedSASTokenInstance()); diff --git a/hadoop-tools/hadoop-azure/src/test/resources/readVectored.plantuml b/hadoop-tools/hadoop-azure/src/test/resources/readVectored.plantuml new file mode 100644 index 0000000000000..57d17d545cfc6 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/resources/readVectored.plantuml @@ -0,0 +1,28 @@ +@startuml +'https://plantuml.com/sequence-diagram + +autonumber +Developer -> AbfsInputStream: readVectored(List ranges, IntFunction allocate) +AbfsInputStream -> VectoredReadUtils: getFinalRanges(ranges) +VectoredReadUtils -> AbfsInputStream : List getFinalRanges +loop range in finalRanges: +AbfsInputStream -> VectoredReadThreadUtils : getThread() +VectoredReadThreadUtils -> AbfsInputStream : thread +AbfsInputStream -> thread : invoke(range) +thread -> AbfsInputStream : control back to main thread +par threadExec: +loop buffer:multipleBuffersInRange +thread -> BufferManager : checkIfAlreadyQueued() +alt bufferIsNotQueued: +thread -> AbfsInputStream : read(bufferRange) +AbfsInputStream -> AbfsClient : execute() +AbfsClient -> AbfsInputStream : result +end +end +end +end +AbfsInputStream -> VectoredReadUtils : convertToDevUnderstandableData() +VectoredReadUtils -> AbfsInputStream : List> result +AbfsInputStream -> Developer : result + +@enduml \ No newline at end of file diff --git a/hadoop-tools/hadoop-azure/src/test/resources/readVectored.png b/hadoop-tools/hadoop-azure/src/test/resources/readVectored.png new file mode 100644 index 0000000000000..e121eea052b31 Binary files /dev/null and b/hadoop-tools/hadoop-azure/src/test/resources/readVectored.png differ