From 64a18b0dea86d3d35339bc9a2599abbb368fa709 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Wed, 9 Sep 2020 16:43:54 +0100 Subject: [PATCH] HADOOP-17195. ABFS Store thread pool for stream IO. Change-Id: I6915539cfafe7164c404dfc153653710280d9bf6 --- .../hadoop/fs/azurebfs/AbfsConfiguration.java | 6 +-- .../fs/azurebfs/AzureBlobFileSystemStore.java | 49 +++++++++++++++++++ .../azurebfs/constants/ConfigurationKeys.java | 5 ++ .../azurebfs/services/AbfsOutputStream.java | 34 ++++++------- .../services/AbfsOutputStreamContext.java | 14 ++++++ .../services/TestAbfsOutputStream.java | 15 +++++- 6 files changed, 100 insertions(+), 23 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java index 66d485317c9ba..c5d0a9cf67d82 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java @@ -158,9 +158,9 @@ public class AbfsConfiguration{ DefaultValue = AZURE_BLOCK_LOCATION_HOST_DEFAULT) private String azureBlockLocationHost; - @IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_CONCURRENT_CONNECTION_VALUE_OUT, - MinValue = 1, - DefaultValue = MAX_CONCURRENT_WRITE_THREADS) + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_MAX_THREADS, + MinValue = -256, + DefaultValue = -4) private int maxConcurrentWriteThreads; @IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_LIST_MAX_RESULTS, diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index 23d2b5a3d63fb..da8dd3b15ea6b 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -47,10 +47,14 @@ import java.util.Locale; import java.util.Map; import java.util.Set; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Strings; +import com.google.common.util.concurrent.ListeningExecutorService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -111,8 +115,11 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.BlockingThreadPoolExecutorService; +import org.apache.hadoop.util.SemaphoredDelegatingExecutor; import org.apache.http.client.utils.URIBuilder; +import static com.google.common.util.concurrent.MoreExecutors.listeningDecorator; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_EQUALS; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_FORWARD_SLASH; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_HYPHEN; @@ -155,6 +162,17 @@ public class AzureBlobFileSystemStore implements Closeable { */ private Set appendBlobDirSet; + /** + * Unbounded executor pool. + */ + private ListeningExecutorService executorService; + + /** + * Capacity for creating an executor for a specific stream + * or other component which takes an executor service. + */ + private int executorCapacity; + public AzureBlobFileSystemStore(URI uri, boolean isSecureScheme, Configuration configuration, AbfsCounters abfsCounters) throws IOException { @@ -216,6 +234,29 @@ public AzureBlobFileSystemStore(URI uri, boolean isSecureScheme, this.appendBlobDirSet = new HashSet<>(Arrays.asList( abfsConfiguration.getAppendBlobDirs().split(AbfsHttpConstants.COMMA))); } + initThreadPools(); + } + + /** + * Initialize executor services. + */ + void initThreadPools() { + int maxThreads = abfsConfiguration.getMaxConcurrentWriteThreads(); + if (maxThreads < 0) { + // make a multiple of the core count + maxThreads = Runtime.getRuntime().availableProcessors() * (-maxThreads); + } + // TODO: configurable + long keepAliveTime = 60; + executorService = listeningDecorator( + new ThreadPoolExecutor( + maxThreads, Integer.MAX_VALUE, + keepAliveTime, TimeUnit.SECONDS, + new LinkedBlockingQueue<>(), + BlockingThreadPoolExecutorService.newDaemonThreadFactory( + "abfs-worker"))); + // TODO: configurable + executorCapacity = 8; } /** @@ -243,6 +284,9 @@ public String getPrimaryGroup() { @Override public void close() throws IOException { IOUtils.cleanupWithLogger(LOG, client); + if (executorService != null) { + executorService.shutdown(); + } } byte[] encodeAttribute(String value) throws UnsupportedEncodingException { @@ -492,6 +536,11 @@ private AbfsOutputStreamContext populateAbfsOutputStreamContext(boolean isAppend .withAppendBlob(isAppendBlob) .withWriteMaxConcurrentRequestCount(abfsConfiguration.getWriteMaxConcurrentRequestCount()) .withMaxWriteRequestsToQueue(abfsConfiguration.getMaxWriteRequestsToQueue()) + .withExecutorService( + new SemaphoredDelegatingExecutor( + executorService, + executorCapacity, + true)) .build(); } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java index 681390c019873..6d036c805b4ee 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java @@ -58,6 +58,11 @@ public final class ConfigurationKeys { public static final String AZURE_READ_BUFFER_SIZE = "fs.azure.read.request.size"; public static final String AZURE_BLOCK_SIZE_PROPERTY_NAME = "fs.azure.block.size"; public static final String AZURE_BLOCK_LOCATION_HOST_PROPERTY_NAME = "fs.azure.block.location.impersonatedhost"; + + /** + * Thread pool max size. Negative == multiply by core count. + */ + public static final String AZURE_MAX_THREADS = "fs.azure.threads.max"; public static final String AZURE_CONCURRENT_CONNECTION_VALUE_OUT = "fs.azure.concurrentRequestCount.out"; public static final String AZURE_CONCURRENT_CONNECTION_VALUE_IN = "fs.azure.concurrentRequestCount.in"; public static final String AZURE_TOLERATE_CONCURRENT_APPEND = "fs.azure.io.read.tolerate.concurrent.append"; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java index 1991638a66703..f61f14a415e27 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java @@ -26,15 +26,13 @@ import java.nio.ByteBuffer; import java.util.Locale; import java.util.concurrent.ConcurrentLinkedDeque; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ExecutorCompletionService; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.Callable; import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ListeningExecutorService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -73,7 +71,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa private final int maxRequestsThatCanBeQueued; private ConcurrentLinkedDeque writeOperations; - private final ThreadPoolExecutor threadExecutor; + private final ListeningExecutorService threadExecutor; private final ExecutorCompletionService completionService; // SAS tokens can be re-used until they expire @@ -125,12 +123,7 @@ public AbfsOutputStream( } this.maxRequestsThatCanBeQueued = abfsOutputStreamContext .getMaxWriteRequestsToQueue(); - this.threadExecutor - = new ThreadPoolExecutor(maxConcurrentRequestCount, - maxConcurrentRequestCount, - 10L, - TimeUnit.SECONDS, - new LinkedBlockingQueue<>()); + this.threadExecutor = abfsOutputStreamContext.getExecutorService(); this.completionService = new ExecutorCompletionService<>(this.threadExecutor); this.cachedSasToken = new CachedSASToken( abfsOutputStreamContext.getSasTokenRenewPeriodForStreamsInSeconds()); @@ -270,8 +263,7 @@ public void hflush() throws IOException { /** * Force all data in the output stream to be written to Azure storage. - * Wait to return until this is complete. Close the access to the stream and - * shutdown the upload thread pool. + * Wait to return until this is complete. Close the access to the stream. * If the blob was created, its lease will be released. * Any error encountered caught in threads and stored will be rethrown here * after cleanup. @@ -284,7 +276,11 @@ public synchronized void close() throws IOException { try { flushInternal(true); - threadExecutor.shutdown(); + // block until all uploads through the thread pool are complete. + // TODO: wait for all queued uploads to finish, rather + // than just one. This was implicitly + // done in shutdown/shutdownNow. + waitForTaskToComplete(); } catch (IOException e) { // Problems surface in try-with-resources clauses if // the exception thrown in a close == the one already thrown @@ -297,12 +293,9 @@ public synchronized void close() throws IOException { bufferIndex = 0; closed = true; writeOperations.clear(); - if (!threadExecutor.isShutdown()) { - threadExecutor.shutdownNow(); - } } if (LOG.isDebugEnabled()) { - LOG.debug("Closing AbfsOutputStream ", toString()); + LOG.debug("Closing AbfsOutputStream {}", this); } } @@ -375,12 +368,14 @@ private synchronized void writeCurrentBufferToService() throws IOException { final long offset = position; position += bytesLength; +/* if (threadExecutor.getQueue().size() >= maxRequestsThatCanBeQueued) { + if (executorService.getQueue().size() >= maxConcurrentRequestCount * 2) { long start = System.currentTimeMillis(); - waitForTaskToComplete(); outputStreamStatistics.timeSpentTaskWait(start, System.currentTimeMillis()); } - +*/ + long start = System.currentTimeMillis(); final Future job = completionService.submit(new Callable() { @Override public Void call() throws Exception { @@ -397,6 +392,7 @@ public Void call() throws Exception { } } }); + outputStreamStatistics.timeSpentTaskWait(start, System.currentTimeMillis()); if (job.isCancelled()) { outputStreamStatistics.uploadFailed(bytesLength); diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java index 2dce5dc2c77a7..a91dec46bd2eb 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java @@ -18,6 +18,8 @@ package org.apache.hadoop.fs.azurebfs.services; +import com.google.common.util.concurrent.ListeningExecutorService; + /** * Class to hold extra output stream configs. */ @@ -37,6 +39,8 @@ public class AbfsOutputStreamContext extends AbfsStreamContext { private int maxWriteRequestsToQueue; + private ListeningExecutorService executorService; + public AbfsOutputStreamContext(final long sasTokenRenewPeriodForStreamsInSeconds) { super(sasTokenRenewPeriodForStreamsInSeconds); } @@ -87,6 +91,12 @@ public AbfsOutputStreamContext withMaxWriteRequestsToQueue( return this; } + public AbfsOutputStreamContext withExecutorService( + final ListeningExecutorService _executorService) { + executorService = _executorService; + return this; + } + public int getWriteBufferSize() { return writeBufferSize; } @@ -114,4 +124,8 @@ public int getWriteMaxConcurrentRequestCount() { public int getMaxWriteRequestsToQueue() { return this.maxWriteRequestsToQueue; } + + public ListeningExecutorService getExecutorService() { + return executorService; + } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java index aab0248c407ed..5cf5793a222cf 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java @@ -22,13 +22,16 @@ import java.util.Arrays; import java.util.HashSet; import java.util.Random; +import java.util.concurrent.TimeUnit; +import com.google.common.util.concurrent.ListeningExecutorService; import org.junit.Test; import org.mockito.ArgumentCaptor; import org.apache.hadoop.fs.azurebfs.AbfsConfiguration; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.BlockingThreadPoolExecutorService; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -58,6 +61,15 @@ private AbfsOutputStreamContext populateAbfsOutputStreamContext(int writeBufferS boolean isAppendBlob) throws IOException, IllegalAccessException { AbfsConfiguration abfsConf = new AbfsConfiguration(new Configuration(), accountName1); + int maxThreads = Runtime.getRuntime().availableProcessors() * 2; + int totalTasks = maxThreads * 4; + long keepAliveTime = 60; + ListeningExecutorService executorService + = BlockingThreadPoolExecutorService.newInstance( + maxThreads, + maxThreads + totalTasks, + keepAliveTime, TimeUnit.SECONDS, + "abfs-worker"); return new AbfsOutputStreamContext(2) .withWriteBufferSize(writeBufferSize) .enableFlush(isFlushEnabled) @@ -66,7 +78,8 @@ private AbfsOutputStreamContext populateAbfsOutputStreamContext(int writeBufferS .withAppendBlob(isAppendBlob) .withWriteMaxConcurrentRequestCount(abfsConf.getWriteMaxConcurrentRequestCount()) .withMaxWriteRequestsToQueue(abfsConf.getMaxWriteRequestsToQueue()) - .build(); + .withExecutorService(executorService) + .build(); } /**