Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,9 @@ public class AbfsConfiguration{
DefaultValue = AZURE_BLOCK_LOCATION_HOST_DEFAULT)
private String azureBlockLocationHost;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_CONCURRENT_CONNECTION_VALUE_OUT,
MinValue = 1,
DefaultValue = MAX_CONCURRENT_WRITE_THREADS)
@IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_MAX_THREADS,
MinValue = -256,
DefaultValue = -4)
private int maxConcurrentWriteThreads;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_LIST_MAX_RESULTS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,14 @@
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.util.concurrent.ListeningExecutorService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -111,8 +115,11 @@
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
import org.apache.hadoop.util.SemaphoredDelegatingExecutor;
import org.apache.http.client.utils.URIBuilder;

import static com.google.common.util.concurrent.MoreExecutors.listeningDecorator;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_EQUALS;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_FORWARD_SLASH;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_HYPHEN;
Expand Down Expand Up @@ -155,6 +162,17 @@ public class AzureBlobFileSystemStore implements Closeable {
*/
private Set<String> appendBlobDirSet;

/**
* Unbounded executor pool.
*/
private ListeningExecutorService executorService;

/**
* Capacity for creating an executor for a specific stream
* or other component which takes an executor service.
*/
private int executorCapacity;

public AzureBlobFileSystemStore(URI uri, boolean isSecureScheme,
Configuration configuration,
AbfsCounters abfsCounters) throws IOException {
Expand Down Expand Up @@ -216,6 +234,29 @@ public AzureBlobFileSystemStore(URI uri, boolean isSecureScheme,
this.appendBlobDirSet = new HashSet<>(Arrays.asList(
abfsConfiguration.getAppendBlobDirs().split(AbfsHttpConstants.COMMA)));
}
initThreadPools();
}

/**
* Initialize executor services.
*/
void initThreadPools() {
int maxThreads = abfsConfiguration.getMaxConcurrentWriteThreads();
if (maxThreads < 0) {
// make a multiple of the core count
maxThreads = Runtime.getRuntime().availableProcessors() * (-maxThreads);
}
// TODO: configurable
long keepAliveTime = 60;
executorService = listeningDecorator(
new ThreadPoolExecutor(
maxThreads, Integer.MAX_VALUE,
keepAliveTime, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
BlockingThreadPoolExecutorService.newDaemonThreadFactory(
"abfs-worker")));
// TODO: configurable
executorCapacity = 8;
}

/**
Expand Down Expand Up @@ -243,6 +284,9 @@ public String getPrimaryGroup() {
@Override
public void close() throws IOException {
IOUtils.cleanupWithLogger(LOG, client);
if (executorService != null) {
executorService.shutdown();
}
}

byte[] encodeAttribute(String value) throws UnsupportedEncodingException {
Expand Down Expand Up @@ -492,6 +536,11 @@ private AbfsOutputStreamContext populateAbfsOutputStreamContext(boolean isAppend
.withAppendBlob(isAppendBlob)
.withWriteMaxConcurrentRequestCount(abfsConfiguration.getWriteMaxConcurrentRequestCount())
.withMaxWriteRequestsToQueue(abfsConfiguration.getMaxWriteRequestsToQueue())
.withExecutorService(
new SemaphoredDelegatingExecutor(
executorService,
executorCapacity,
true))
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ public final class ConfigurationKeys {
public static final String AZURE_READ_BUFFER_SIZE = "fs.azure.read.request.size";
public static final String AZURE_BLOCK_SIZE_PROPERTY_NAME = "fs.azure.block.size";
public static final String AZURE_BLOCK_LOCATION_HOST_PROPERTY_NAME = "fs.azure.block.location.impersonatedhost";

/**
* Thread pool max size. Negative == multiply by core count.
*/
public static final String AZURE_MAX_THREADS = "fs.azure.threads.max";
public static final String AZURE_CONCURRENT_CONNECTION_VALUE_OUT = "fs.azure.concurrentRequestCount.out";
public static final String AZURE_CONCURRENT_CONNECTION_VALUE_IN = "fs.azure.concurrentRequestCount.in";
public static final String AZURE_TOLERATE_CONCURRENT_APPEND = "fs.azure.io.read.tolerate.concurrent.append";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,13 @@
import java.nio.ByteBuffer;
import java.util.Locale;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ListeningExecutorService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -73,7 +71,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
private final int maxRequestsThatCanBeQueued;

private ConcurrentLinkedDeque<WriteOperation> writeOperations;
private final ThreadPoolExecutor threadExecutor;
private final ListeningExecutorService threadExecutor;
private final ExecutorCompletionService<Void> completionService;

// SAS tokens can be re-used until they expire
Expand Down Expand Up @@ -125,12 +123,7 @@ public AbfsOutputStream(
}
this.maxRequestsThatCanBeQueued = abfsOutputStreamContext
.getMaxWriteRequestsToQueue();
this.threadExecutor
= new ThreadPoolExecutor(maxConcurrentRequestCount,
maxConcurrentRequestCount,
10L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>());
this.threadExecutor = abfsOutputStreamContext.getExecutorService();
this.completionService = new ExecutorCompletionService<>(this.threadExecutor);
this.cachedSasToken = new CachedSASToken(
abfsOutputStreamContext.getSasTokenRenewPeriodForStreamsInSeconds());
Expand Down Expand Up @@ -270,8 +263,7 @@ public void hflush() throws IOException {

/**
* Force all data in the output stream to be written to Azure storage.
* Wait to return until this is complete. Close the access to the stream and
* shutdown the upload thread pool.
* Wait to return until this is complete. Close the access to the stream.
* If the blob was created, its lease will be released.
* Any error encountered caught in threads and stored will be rethrown here
* after cleanup.
Expand All @@ -284,7 +276,11 @@ public synchronized void close() throws IOException {

try {
flushInternal(true);
threadExecutor.shutdown();
// block until all uploads through the thread pool are complete.
// TODO: wait for all queued uploads to finish, rather
// than just one. This was implicitly
// done in shutdown/shutdownNow.
waitForTaskToComplete();
} catch (IOException e) {
// Problems surface in try-with-resources clauses if
// the exception thrown in a close == the one already thrown
Expand All @@ -297,12 +293,9 @@ public synchronized void close() throws IOException {
bufferIndex = 0;
closed = true;
writeOperations.clear();
if (!threadExecutor.isShutdown()) {
threadExecutor.shutdownNow();
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Closing AbfsOutputStream ", toString());
LOG.debug("Closing AbfsOutputStream {}", this);
}
}

Expand Down Expand Up @@ -375,12 +368,14 @@ private synchronized void writeCurrentBufferToService() throws IOException {
final long offset = position;
position += bytesLength;

/*
if (threadExecutor.getQueue().size() >= maxRequestsThatCanBeQueued) {
if (executorService.getQueue().size() >= maxConcurrentRequestCount * 2) {
long start = System.currentTimeMillis();
waitForTaskToComplete();
outputStreamStatistics.timeSpentTaskWait(start, System.currentTimeMillis());
}

*/
long start = System.currentTimeMillis();
final Future<Void> job = completionService.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
Expand All @@ -397,6 +392,7 @@ public Void call() throws Exception {
}
}
});
outputStreamStatistics.timeSpentTaskWait(start, System.currentTimeMillis());

if (job.isCancelled()) {
outputStreamStatistics.uploadFailed(bytesLength);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.hadoop.fs.azurebfs.services;

import com.google.common.util.concurrent.ListeningExecutorService;

/**
* Class to hold extra output stream configs.
*/
Expand All @@ -37,6 +39,8 @@ public class AbfsOutputStreamContext extends AbfsStreamContext {

private int maxWriteRequestsToQueue;

private ListeningExecutorService executorService;

public AbfsOutputStreamContext(final long sasTokenRenewPeriodForStreamsInSeconds) {
super(sasTokenRenewPeriodForStreamsInSeconds);
}
Expand Down Expand Up @@ -87,6 +91,12 @@ public AbfsOutputStreamContext withMaxWriteRequestsToQueue(
return this;
}

public AbfsOutputStreamContext withExecutorService(
final ListeningExecutorService _executorService) {
executorService = _executorService;
return this;
}

public int getWriteBufferSize() {
return writeBufferSize;
}
Expand Down Expand Up @@ -114,4 +124,8 @@ public int getWriteMaxConcurrentRequestCount() {
public int getMaxWriteRequestsToQueue() {
return this.maxWriteRequestsToQueue;
}

public ListeningExecutorService getExecutorService() {
return executorService;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,16 @@
import java.util.Arrays;
import java.util.HashSet;
import java.util.Random;
import java.util.concurrent.TimeUnit;

import com.google.common.util.concurrent.ListeningExecutorService;
import org.junit.Test;

import org.mockito.ArgumentCaptor;

import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.BlockingThreadPoolExecutorService;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -58,6 +61,15 @@ private AbfsOutputStreamContext populateAbfsOutputStreamContext(int writeBufferS
boolean isAppendBlob) throws IOException, IllegalAccessException {
AbfsConfiguration abfsConf = new AbfsConfiguration(new Configuration(),
accountName1);
int maxThreads = Runtime.getRuntime().availableProcessors() * 2;
int totalTasks = maxThreads * 4;
long keepAliveTime = 60;
ListeningExecutorService executorService
= BlockingThreadPoolExecutorService.newInstance(
maxThreads,
maxThreads + totalTasks,
keepAliveTime, TimeUnit.SECONDS,
"abfs-worker");
return new AbfsOutputStreamContext(2)
.withWriteBufferSize(writeBufferSize)
.enableFlush(isFlushEnabled)
Expand All @@ -66,7 +78,8 @@ private AbfsOutputStreamContext populateAbfsOutputStreamContext(int writeBufferS
.withAppendBlob(isAppendBlob)
.withWriteMaxConcurrentRequestCount(abfsConf.getWriteMaxConcurrentRequestCount())
.withMaxWriteRequestsToQueue(abfsConf.getMaxWriteRequestsToQueue())
.build();
.withExecutorService(executorService)
.build();
}

/**
Expand Down