Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,13 @@ public final class StreamStatisticNames {
*/
public static final String STREAM_READ_OPENED = "stream_read_opened";

/**
* Total count of times an analytics input stream was opened.
*
* Value: {@value}.
*/
public static final String STREAM_READ_ANALYTICS_OPENED = "stream_read_analytics_opened";

/**
* Count of exceptions raised during input stream reads.
* Value: {@value}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ public void testMultipartUpload() throws Exception {
@Test
public void testMultipartUploadEmptyPart() throws Exception {
FileSystem fs = getFileSystem();
Path file = path("testMultipartUpload");
Path file = path("testMultipartUploadEmptyPart");
try (MultipartUploader uploader =
fs.createMultipartUploader(file).build()) {
UploadHandle uploadHandle = uploader.startUpload(file).get();
Expand Down
6 changes: 6 additions & 0 deletions hadoop-project/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@
<aws-java-sdk.version>1.12.720</aws-java-sdk.version>
<aws-java-sdk-v2.version>2.25.53</aws-java-sdk-v2.version>
<amazon-s3-encryption-client-java.version>3.1.1</amazon-s3-encryption-client-java.version>
<amazon-s3-analyticsaccelerator-s3.version>0.0.4</amazon-s3-analyticsaccelerator-s3.version>
<aws.eventstream.version>1.0.1</aws.eventstream.version>
<hsqldb.version>2.7.1</hsqldb.version>
<frontend-maven-plugin.version>1.11.2</frontend-maven-plugin.version>
Expand Down Expand Up @@ -1113,6 +1114,11 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>software.amazon.s3.analyticsaccelerator</groupId>
<artifactId>analyticsaccelerator-s3</artifactId>
<version>${amazon-s3-analyticsaccelerator-s3.version}</version>
</dependency>
<dependency>
<groupId>org.apache.mina</groupId>
<artifactId>mina-core</artifactId>
Expand Down
5 changes: 5 additions & 0 deletions hadoop-tools/hadoop-aws/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,11 @@
<artifactId>amazon-s3-encryption-client-java</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>software.amazon.s3.analyticsaccelerator</groupId>
<artifactId>analyticsaccelerator-s3</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1827,4 +1827,13 @@ private Constants() {
* Value: {@value}.
*/
public static final String S3A_IO_RATE_LIMIT = "fs.s3a.io.rate.limit";


/**
* Prefix to configure Analytics Accelerator Library.
* Value: {@value}.
*/
public static final String ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX =
"fs.s3a.analytics.accelerator";

}
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ public S3AsyncClient createS3AsyncClient(
.httpClientBuilder(httpClientBuilder);

// multipart upload pending with HADOOP-19326.
if (!parameters.isClientSideEncryptionEnabled()) {
if (!parameters.isClientSideEncryptionEnabled() && !parameters.isAnalyticsAcceleratorEnabled()) {
s3AsyncClientBuilder.multipartConfiguration(multipartConfiguration)
.multipartEnabled(parameters.isMultipartCopy());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,11 @@
import org.apache.hadoop.fs.s3a.impl.StoreContextFactory;
import org.apache.hadoop.fs.s3a.impl.UploadContentProviders;
import org.apache.hadoop.fs.s3a.impl.CSEUtils;
import org.apache.hadoop.fs.s3a.impl.streams.InputStreamType;
import org.apache.hadoop.fs.s3a.impl.streams.ObjectReadParameters;
import org.apache.hadoop.fs.s3a.impl.streams.ObjectInputStreamCallbacks;
import org.apache.hadoop.fs.s3a.impl.streams.StreamFactoryRequirements;
import org.apache.hadoop.fs.s3a.impl.streams.StreamIntegration;
import org.apache.hadoop.fs.s3a.tools.MarkerToolOperations;
import org.apache.hadoop.fs.s3a.tools.MarkerToolOperationsImpl;
import org.apache.hadoop.fs.statistics.DurationTracker;
Expand Down Expand Up @@ -440,6 +442,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
*/
private boolean isCSEEnabled;

/**
* Is this S3A FS instance using analytics accelerator?
*/
private boolean isAnalyticsAccelaratorEnabled;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW I've got a draft plan in my head to move all these bools and other config options into some config class under store...probably start with the Conditional Create stuff where the new flags go in. Microsoft's reflection-based resolution is what I'm looking at.


/**
* Bucket AccessPoint.
*/
Expand Down Expand Up @@ -629,6 +636,8 @@ public void initialize(URI name, Configuration originalConf)
// If encryption method is set to CSE-KMS or CSE-CUSTOM then CSE is enabled.
isCSEEnabled = CSEUtils.isCSEEnabled(getS3EncryptionAlgorithm().getMethod());

isAnalyticsAccelaratorEnabled = StreamIntegration.determineInputStreamType(conf).equals(InputStreamType.Analytics);

// Create the appropriate fsHandler instance using a factory method
fsHandler = createFileSystemHandler();
fsHandler.setCSEGauge((IOStatisticsStore) getIOStatistics());
Expand Down Expand Up @@ -1156,6 +1165,7 @@ private ClientManager createClientManager(URI fsURI, boolean dtEnabled) throws I
conf.getBoolean(CHECKSUM_VALIDATION, CHECKSUM_VALIDATION_DEFAULT))
.withClientSideEncryptionEnabled(isCSEEnabled)
.withClientSideEncryptionMaterials(cseMaterials)
.withAnalyticsAcceleratorEnabled(isAnalyticsAccelaratorEnabled)
.withKMSRegion(conf.get(S3_ENCRYPTION_CSE_KMS_REGION));

// this is where clients and the transfer manager are created on demand.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.impl.WeakRefMetricsSource;
import org.apache.hadoop.fs.s3a.impl.streams.InputStreamType;
import org.apache.hadoop.fs.s3a.statistics.BlockOutputStreamStatistics;
import org.apache.hadoop.fs.s3a.statistics.ChangeTrackerStatistics;
import org.apache.hadoop.fs.s3a.statistics.CommitterStatistics;
Expand Down Expand Up @@ -840,6 +841,7 @@ private final class InputStreamStatistics
private final AtomicLong closed;
private final AtomicLong forwardSeekOperations;
private final AtomicLong openOperations;
private final AtomicLong analyticsStreamOpenOperations;
private final AtomicLong readExceptions;
private final AtomicLong readsIncomplete;
private final AtomicLong readOperations;
Expand Down Expand Up @@ -888,7 +890,8 @@ private InputStreamStatistics(
StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS,
StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED,
StreamStatisticNames.STREAM_READ_VERSION_MISMATCHES,
StreamStatisticNames.STREAM_EVICT_BLOCKS_FROM_FILE_CACHE)
StreamStatisticNames.STREAM_EVICT_BLOCKS_FROM_FILE_CACHE,
StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED)
.withGauges(STREAM_READ_GAUGE_INPUT_POLICY,
STREAM_READ_BLOCKS_IN_FILE_CACHE.getSymbol(),
STREAM_READ_ACTIVE_PREFETCH_OPERATIONS.getSymbol(),
Expand Down Expand Up @@ -927,6 +930,9 @@ private InputStreamStatistics(
StreamStatisticNames.STREAM_READ_SEEK_FORWARD_OPERATIONS);
openOperations = st.getCounterReference(
StreamStatisticNames.STREAM_READ_OPENED);
analyticsStreamOpenOperations = st.getCounterReference(
StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED
);
readExceptions = st.getCounterReference(
StreamStatisticNames.STREAM_READ_EXCEPTIONS);
readsIncomplete = st.getCounterReference(
Expand Down Expand Up @@ -1030,6 +1036,17 @@ public long streamOpened() {
return openOperations.getAndIncrement();
}

@Override
public long streamOpened(InputStreamType type) {
long count = openOperations.getAndIncrement();

if (type == InputStreamType.Analytics) {
count = analyticsStreamOpenOperations.getAndIncrement();
}

return count;
}

/**
* {@inheritDoc}.
* If the connection was aborted, increment {@link #aborted}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,11 @@ final class S3ClientCreationParameters {
*/
private boolean fipsEnabled;

/**
* Is analytics accelerator enabled?
*/
private boolean isAnalyticsAcceleratorEnabled;

/**
* List of execution interceptors to include in the chain
* of interceptors in the SDK.
Expand Down Expand Up @@ -457,6 +462,17 @@ public S3ClientCreationParameters withClientSideEncryptionEnabled(final boolean
return this;
}

/**
* Set the analytics accelerator enabled flag.
*
* @param value new value
* @return the builder
*/
public S3ClientCreationParameters withAnalyticsAcceleratorEnabled(final boolean value) {
this.isAnalyticsAcceleratorEnabled = value;
return this;
}

/**
* Set the KMS client region.
* This is required for CSE-KMS
Expand All @@ -477,6 +493,14 @@ public boolean isClientSideEncryptionEnabled() {
return this.isCSEEnabled;
}

/**
* Get the analytics accelerator enabled flag.
* @return analytics accelerator enabled flag.
*/
public boolean isAnalyticsAcceleratorEnabled() {
return this.isAnalyticsAcceleratorEnabled;
}

/**
* Set the client side encryption materials.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,10 @@ public enum Statistic {
TYPE_COUNTER),

/* Stream Reads */
STREAM_READ_ANALYTICS_OPENED(
StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED,
"Total count of times an analytics input stream to object store data was opened",
TYPE_COUNTER),
STREAM_READ_BYTES(
StreamStatisticNames.STREAM_READ_BYTES,
"Bytes read from an input stream in read() calls",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.hadoop.fs.s3a.impl.streams;

import java.io.IOException;

import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.statistics.StreamStatisticNames;
import org.apache.hadoop.service.AbstractService;
Expand Down Expand Up @@ -58,7 +60,7 @@ protected AbstractObjectInputStreamFactory(final String name) {
* @param factoryBindingParameters parameters for the factory binding
*/
@Override
public void bind(final FactoryBindingParameters factoryBindingParameters) {
public void bind(final FactoryBindingParameters factoryBindingParameters) throws IOException {
// must be on be invoked during service initialization
Preconditions.checkState(isInState(STATE.INITED),
"Input Stream factory %s is in wrong state: %s",
Expand Down
Loading