Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,14 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_FS_AZURE_ACCOUNT_LEVEL_THROTTLING_ENABLED)
private boolean accountThrottlingEnabled;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_VECTOR_READS_MIN_SEEK_SIZE,
DefaultValue = DEFAULT_FS_AZURE_VECTOR_READS_MIN_SEEK_SIZE)
private int vectoredReadMinSeekSize;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_VECTOR_READS_MAX_SEEK_SIZE,
DefaultValue = DEFAULT_FS_AZURE_VECTOR_READS_MAX_MERGED_READ_SIZE)
private int vectoredReadMaxReadSize;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_READ_BUFFER_SIZE,
MinValue = MIN_BUFFER_SIZE,
MaxValue = MAX_BUFFER_SIZE,
Expand Down Expand Up @@ -710,6 +718,14 @@ public boolean accountThrottlingEnabled() {
return accountThrottlingEnabled;
}

public int getVectoredReadMinSeekSize() {
return vectoredReadMinSeekSize;
}

public int getVectoredReadMaxReadSize() {
return vectoredReadMaxReadSize;
}

public String getAzureInfiniteLeaseDirs() {
return this.azureInfiniteLeaseDirs;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@
import java.util.WeakHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.classification.VisibleForTesting;
Expand Down Expand Up @@ -132,6 +134,7 @@
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.http.client.utils.URIBuilder;

import static java.util.Objects.requireNonNull;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.METADATA_INCOMPLETE_RENAME_FAILURES;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.RENAME_RECOVERY;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_EQUALS;
Expand Down Expand Up @@ -164,6 +167,9 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
private static final String TOKEN_DATE_PATTERN = "yyyy-MM-dd'T'HH:mm:ss.SSSSSSS'Z'";
private static final String XMS_PROPERTIES_ENCODING = "ISO-8859-1";
private static final int GET_SET_AGGREGATE_COUNT = 2;
private ThreadPoolExecutor unboundedThreadPool;
/** Vectored IO context. */
private VectoredIOContext vectoredIOContext;

private final Map<AbfsLease, Object> leaseRefs;

Expand Down Expand Up @@ -266,6 +272,29 @@ public AzureBlobFileSystemStore(
abfsConfiguration.getMaxWriteRequestsToQueue(),
10L, TimeUnit.SECONDS,
"abfs-bounded");
this.unboundedThreadPool = new ThreadPoolExecutor(
abfsConfiguration.getWriteMaxConcurrentRequestCount(), Integer.MAX_VALUE,
10L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
BlockingThreadPoolExecutorService.newDaemonThreadFactory(
"abfs-unbounded"));
unboundedThreadPool.allowCoreThreadTimeOut(true);
this.vectoredIOContext = populateVectoredIOContext(abfsConfiguration);
}

/**
* Populates the configurations related to vectored IO operation
* in the context which has to passed down to input streams.
* @param configuration configuration object.
* @return VectoredIOContext.
*/
private VectoredIOContext populateVectoredIOContext(AbfsConfiguration configuration) {
final int minSeekVectored = configuration.getVectoredReadMinSeekSize();
final int maxReadSizeVectored = configuration.getVectoredReadMaxReadSize();
return new VectoredIOContext()
.setMinSeekForVectoredReads(minSeekVectored)
.setMaxReadSizeForVectoredReads(maxReadSizeVectored)
.build();
}

/**
Expand Down Expand Up @@ -795,7 +824,7 @@ public AbfsInputStream openFileForRead(Path path,
return new AbfsInputStream(client, statistics, relativePath,
contentLength, populateAbfsInputStreamContext(
parameters.map(OpenFileParameters::getOptions)),
eTag, tracingContext);
eTag, tracingContext, unboundedThreadPool);
}
}

Expand All @@ -817,6 +846,7 @@ private AbfsInputStreamContext populateAbfsInputStreamContext(
abfsConfiguration.shouldReadBufferSizeAlways())
.withReadAheadBlockSize(abfsConfiguration.getReadAheadBlockSize())
.withBufferedPreadDisabled(bufferedPreadDisabled)
.withVectoredIOContext(requireNonNull(vectoredIOContext, "vectoredIOContext"))
.build();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package org.apache.hadoop.fs.azurebfs;

import java.util.List;
import java.util.function.IntFunction;

/**
* Context related to vectored IO operation.
*/
public class VectoredIOContext {

/**
* What is the smallest reasonable seek that we should group
* ranges together during vectored read operation.
*/
private int minSeekForVectorReads;

/**
* What is the largest size that we should group ranges
* together during vectored read operation.
* Setting this value 0 will disable merging of ranges.
*/
private int maxReadSizeForVectorReads;

/**
* Default no arg constructor.
*/
public VectoredIOContext() {
}

public VectoredIOContext setMinSeekForVectoredReads(int minSeek) {
this.minSeekForVectorReads = minSeek;
return this;
}

public VectoredIOContext setMaxReadSizeForVectoredReads(int maxSize) {
this.maxReadSizeForVectorReads = maxSize;
return this;
}

public VectoredIOContext build() {
return this;
}

public int getMinSeekForVectorReads() {
return minSeekForVectorReads;
}

public int getMaxReadSizeForVectorReads() {
return maxReadSizeForVectorReads;
}

@Override
public String toString() {
return "VectoredIOContext{" +
"minSeekForVectorReads=" + minSeekForVectorReads +
", maxReadSizeForVectorReads=" + maxReadSizeForVectorReads +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ public final class ConfigurationKeys {
public static final String FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME_REGX = "fs\\.azure\\.account\\.key\\.(.*)";
public static final String FS_AZURE_SECURE_MODE = "fs.azure.secure.mode";
public static final String FS_AZURE_ACCOUNT_LEVEL_THROTTLING_ENABLED = "fs.azure.account.throttling.enabled";
public static final String FS_AZURE_VECTOR_READS_MIN_SEEK_SIZE = "fs.azure.vector.reads.min.seek.size";
public static final String FS_AZURE_VECTOR_READS_MAX_SEEK_SIZE = "fs.azure.vector.reads.max.seek.size";


// Retry strategy defined by the user
public static final String AZURE_MIN_BACKOFF_INTERVAL = "fs.azure.io.retry.min.backoff.interval";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,14 @@ public final class FileSystemConfigurations {
public static final boolean DEFAULT_DISABLE_OUTPUTSTREAM_FLUSH = true;
public static final boolean DEFAULT_ENABLE_AUTOTHROTTLING = true;
public static final boolean DEFAULT_FS_AZURE_ACCOUNT_LEVEL_THROTTLING_ENABLED = true;
/**
* Default minimum seek in bytes during vectored reads : {@value}.
*/
public static final int DEFAULT_FS_AZURE_VECTOR_READS_MIN_SEEK_SIZE = 4896; // 4K
/**
* Default maximum read size in bytes during vectored reads : {@value}.
*/
public static final int DEFAULT_FS_AZURE_VECTOR_READS_MAX_MERGED_READ_SIZE = 1253376; //1M
public static final int DEFAULT_ACCOUNT_OPERATION_IDLE_TIMEOUT_MS = 60_000;
public static final int DEFAULT_ANALYSIS_PERIOD_MS = 10_000;

Expand Down
Loading