Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -178,9 +178,24 @@
// build the referrer up. so as to find/report problems early
initialHeader = buildHttpReferrer();
}

Check failure on line 181 in hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/store/audit/HttpReferrerAuditHeader.java

View check run for this annotation

ASF Cloudbees Jenkins ci-hadoop / Apache Yetus

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/store/audit/HttpReferrerAuditHeader.java#L181

blanks: end of line
/**
* Build the referrer string.
* Copy constructor.
* Creates a deep copy of a HttpReferrerAuditHeader object
*/
public HttpReferrerAuditHeader(HttpReferrerAuditHeader httpReferrerAuditHeader) {

Check failure on line 186 in hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/store/audit/HttpReferrerAuditHeader.java

View check run for this annotation

ASF Cloudbees Jenkins ci-hadoop / Apache Yetus

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/store/audit/HttpReferrerAuditHeader.java#L186

javadoc: warning: no @param for httpReferrerAuditHeader
this.contextId = requireNonNull(httpReferrerAuditHeader.contextId);
this.evaluated = new ConcurrentHashMap<>(httpReferrerAuditHeader.evaluated);
this.filter = ImmutableSet.copyOf(httpReferrerAuditHeader.filter);
this.operationName = requireNonNull(httpReferrerAuditHeader.operationName);
this.path1 = httpReferrerAuditHeader.path1;
this.path2 = httpReferrerAuditHeader.path2;
this.spanId = requireNonNull(httpReferrerAuditHeader.spanId);
this.attributes = new ConcurrentHashMap<>(httpReferrerAuditHeader.attributes);
this.initialHeader = httpReferrerAuditHeader.initialHeader;
}

Check failure on line 197 in hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/store/audit/HttpReferrerAuditHeader.java

View check run for this annotation

ASF Cloudbees Jenkins ci-hadoop / Apache Yetus

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/store/audit/HttpReferrerAuditHeader.java#L197

blanks: end of line
/* Build the referrer string.
* This includes dynamically evaluating all of the evaluated
* attributes.
* If there is an error creating the string it will be logged once
Expand Down
11 changes: 11 additions & 0 deletions hadoop-tools/hadoop-aws/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,17 @@
<artifactId>amazon-s3-encryption-client-java</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>software.amazon.s3.analyticsaccelerator</groupId>
<artifactId>analyticsaccelerator-s3</artifactId>
<version>0.0.2</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>software.amazon.awssdk.crt</groupId>
<artifactId>aws-crt</artifactId>
<version>0.29.10</version>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1760,4 +1760,38 @@ private Constants() {
* Value: {@value}.
*/
public static final String S3A_IO_RATE_LIMIT = "fs.s3a.io.rate.limit";


/**
* Prefix to configure Analytics Accelerator Library.
*/
public static final String ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX =
"fs.s3a.analytics.accelerator";

/**
* Config to enable Analytics Accelerator Library for Amazon S3.
* https://github.com/awslabs/analytics-accelerator-s3
*/
public static final String ANALYTICS_ACCELERATOR_ENABLED_KEY =
ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX + ".enabled";

/**
* Config to enable usage of crt client with Analytics Accelerator Library.
* It is by default true.
*/
public static final String ANALYTICS_ACCELERATOR_CRT_ENABLED =
"fs.s3a.analytics.accelerator.crt.client";

/**
* Default value for {@link #ANALYTICS_ACCELERATOR_ENABLED_KEY }
* Value {@value}.
*/
public static final boolean ANALYTICS_ACCELERATOR_ENABLED_DEFAULT = false;

/**
* Default value for {@link #ANALYTICS_ACCELERATOR_CRT_ENABLED }
* Value {@value}.
*/
public static final boolean ANALYTICS_ACCELERATOR_CRT_ENABLED_DEFAULT = true;

}
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,13 @@
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;

import org.apache.hadoop.fs.s3a.audit.impl.ActiveAuditManagerS3A;
import org.apache.hadoop.fs.s3a.audit.impl.LoggingAuditor;
import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.internal.crt.S3CrtAsyncClient;
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse;
import software.amazon.awssdk.services.s3.model.GetBucketLocationRequest;
Expand Down Expand Up @@ -87,6 +91,11 @@
import software.amazon.awssdk.transfer.s3.model.Copy;
import software.amazon.awssdk.transfer.s3.model.CopyRequest;

import software.amazon.s3.analyticsaccelerator.S3SdkObjectClient;
import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration;
import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory;
import software.amazon.s3.analyticsaccelerator.common.ConnectorConfiguration;

import org.apache.hadoop.fs.impl.prefetch.ExecutorServiceFuturePool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -220,6 +229,8 @@
import org.apache.hadoop.util.SemaphoredDelegatingExecutor;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.util.functional.CallableRaisingIOE;
import software.amazon.s3.analyticsaccelerator.request.StreamContext;

Check failure on line 232 in hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

View check run for this annotation

ASF Cloudbees Jenkins ci-hadoop / Apache Yetus

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java#L232

javadoc: error: cannot find symbol
import software.amazon.s3.analyticsaccelerator.request.ObjectClient;

import static java.util.Objects.requireNonNull;
import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL;
Expand Down Expand Up @@ -317,6 +328,13 @@
*/
private S3Client s3Client;

/**
* CRT-Based S3Client created of analytics accelerator library is enabled
* and managed by the S3AStoreImpl. Analytics accelerator library can be
* enabled with {@link Constants#ANALYTICS_ACCELERATOR_ENABLED_KEY}
*/
private S3AsyncClient s3AsyncClient;

// initial callback policy is fail-once; it's there just to assist
// some mock tests and other codepaths trying to call the low level
// APIs on an uninitialized filesystem.
Expand Down Expand Up @@ -344,6 +362,11 @@
// If true, the prefetching input stream is used for reads.
private boolean prefetchEnabled;

// If true, S3SeekableInputStream from Analytics Accelerator for Amazon S3 will be used.
private boolean analyticsAcceleratorEnabled;

private boolean analyticsAcceleratorCRTEnabled;

// Size in bytes of a single prefetch block.
private int prefetchBlockSize;

Expand Down Expand Up @@ -525,6 +548,11 @@
*/
private boolean s3AccessGrantsEnabled;

/**
* Factory to create S3SeekableInputStream if {@link this#analyticsAcceleratorEnabled} is true.
*/
private S3SeekableInputStreamFactory s3SeekableInputStreamFactory;

/** Add any deprecated keys. */
@SuppressWarnings("deprecation")
private static void addDeprecatedKeys() {
Expand Down Expand Up @@ -680,8 +708,21 @@
this.prefetchBlockSize = (int) prefetchBlockSizeLong;
this.prefetchBlockCount =
intOption(conf, PREFETCH_BLOCK_COUNT_KEY, PREFETCH_BLOCK_DEFAULT_COUNT, 1);

this.analyticsAcceleratorEnabled =
conf.getBoolean(ANALYTICS_ACCELERATOR_ENABLED_KEY, ANALYTICS_ACCELERATOR_ENABLED_DEFAULT);
this.analyticsAcceleratorCRTEnabled =
conf.getBoolean(ANALYTICS_ACCELERATOR_CRT_ENABLED,
ANALYTICS_ACCELERATOR_CRT_ENABLED_DEFAULT);

this.isMultipartUploadEnabled = conf.getBoolean(MULTIPART_UPLOADS_ENABLED,
DEFAULT_MULTIPART_UPLOAD_ENABLED);
DEFAULT_MULTIPART_UPLOAD_ENABLED);

if(this.analyticsAcceleratorEnabled && !analyticsAcceleratorCRTEnabled) {
// Temp change: Analytics Accelerator with S3AsyncClient do not support Multi-part upload.
this.isMultipartUploadEnabled = false;
}

// multipart copy and upload are the same; this just makes it explicit
this.isMultipartCopyEnabled = isMultipartUploadEnabled;

Expand Down Expand Up @@ -809,6 +850,27 @@
// directly through the client manager.
// this is to aid mocking.
s3Client = store.getOrCreateS3Client();

if (this.analyticsAcceleratorEnabled) {
LOG.info("Using S3SeekableInputStream");
if(this.analyticsAcceleratorCRTEnabled) {
LOG.info("Using S3 CRT client for analytics accelerator S3");
this.s3AsyncClient = S3CrtAsyncClient.builder().maxConcurrency(600).build();
} else {
LOG.info("Using S3 async client for analytics accelerator S3");
this.s3AsyncClient = store.getOrCreateAsyncClient();
}

ConnectorConfiguration configuration = new ConnectorConfiguration(conf,
ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX);
S3SeekableInputStreamConfiguration seekableInputStreamConfiguration =
S3SeekableInputStreamConfiguration.fromConfiguration(configuration);
this.s3SeekableInputStreamFactory =
new S3SeekableInputStreamFactory(
new S3SdkObjectClient(this.s3AsyncClient),
seekableInputStreamConfiguration);
}

// The filesystem is now ready to perform operations against
// S3
// This initiates a probe against S3 for the bucket existing.
Expand Down Expand Up @@ -1876,6 +1938,8 @@
final Path path,
final OpenFileSupport.OpenFileInformation fileInformation)
throws IOException {


// create the input stream statistics before opening
// the file so that the time to prepare to open the file is included.
S3AInputStreamStatistics inputStreamStats =
Expand All @@ -1892,6 +1956,18 @@
fileInformation.applyOptions(readContext);
LOG.debug("Opening '{}'", readContext);

if (this.analyticsAcceleratorEnabled) {
ActiveAuditManagerS3A.WrappingAuditSpan wrappingAuditSpan = (ActiveAuditManagerS3A.WrappingAuditSpan) auditSpan;
LoggingAuditor.LoggingAuditSpan loggingAuditSpan = (LoggingAuditor.LoggingAuditSpan) wrappingAuditSpan.getSpan();
StreamContext streamContext = new S3AStreamContext(loggingAuditSpan.getReferrer());
return new FSDataInputStream(
new S3ASeekableStream(
this.bucket,
pathToKey(path),
s3SeekableInputStreamFactory,
streamContext));
}

if (this.prefetchEnabled) {
Configuration configuration = getConf();
initLocalDirAllocatorIfNotInitialized(configuration);
Expand Down Expand Up @@ -4421,9 +4497,11 @@
protected synchronized void stopAllServices() {
try {
trackDuration(getDurationTrackerFactory(), FILESYSTEM_CLOSE.getSymbol(), () -> {
closeAutocloseables(LOG, store);
closeAutocloseables(LOG, store, s3SeekableInputStreamFactory);
store = null;
s3Client = null;
s3AsyncClient = null;
s3SeekableInputStreamFactory = null;

// At this point the S3A client is shut down,
// now the executor pools are closed
Expand Down
Loading
Loading