diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java
index 57c775721384e..ab4838618dabe 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java
@@ -97,6 +97,13 @@ public final class StreamStatisticNames {
*/
public static final String STREAM_READ_OPENED = "stream_read_opened";
+ /**
+ * Total count of times an analytics input stream was opened.
+ *
+ * Value: {@value}.
+ */
+ public static final String STREAM_READ_ANALYTICS_OPENED = "stream_read_analytics_opened";
+
/**
* Count of exceptions raised during input stream reads.
* Value: {@value}.
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractMultipartUploaderTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractMultipartUploaderTest.java
index 16482915bdf7f..4c4514b249c28 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractMultipartUploaderTest.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractMultipartUploaderTest.java
@@ -507,7 +507,7 @@ public void testMultipartUpload() throws Exception {
@Test
public void testMultipartUploadEmptyPart() throws Exception {
FileSystem fs = getFileSystem();
- Path file = path("testMultipartUpload");
+ Path file = path("testMultipartUploadEmptyPart");
try (MultipartUploader uploader =
fs.createMultipartUploader(file).build()) {
UploadHandle uploadHandle = uploader.startUpload(file).get();
diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml
index 6dac90a58146b..88a481ac30da4 100644
--- a/hadoop-project/pom.xml
+++ b/hadoop-project/pom.xml
@@ -206,6 +206,7 @@
1.12.720
2.25.53
3.1.1
+ 0.0.4
1.0.1
2.7.1
1.11.2
@@ -1113,6 +1114,11 @@
+
+ software.amazon.s3.analyticsaccelerator
+ analyticsaccelerator-s3
+ ${amazon-s3-analyticsaccelerator-s3.version}
+
org.apache.mina
mina-core
diff --git a/hadoop-tools/hadoop-aws/pom.xml b/hadoop-tools/hadoop-aws/pom.xml
index 4c36d5b4653f8..f10eea7a4d3bf 100644
--- a/hadoop-tools/hadoop-aws/pom.xml
+++ b/hadoop-tools/hadoop-aws/pom.xml
@@ -484,6 +484,11 @@
amazon-s3-encryption-client-java
provided
+
+ software.amazon.s3.analyticsaccelerator
+ analyticsaccelerator-s3
+ compile
+
org.assertj
assertj-core
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
index 6796c29d34874..2b019e1fe4caa 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
@@ -1827,4 +1827,13 @@ private Constants() {
* Value: {@value}.
*/
public static final String S3A_IO_RATE_LIMIT = "fs.s3a.io.rate.limit";
+
+
+ /**
+ * Prefix to configure Analytics Accelerator Library.
+ * Value: {@value}.
+ */
+ public static final String ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX =
+ "fs.s3a.analytics.accelerator";
+
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java
index 556bd3510752e..a19e5f155f00b 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java
@@ -166,7 +166,7 @@ public S3AsyncClient createS3AsyncClient(
.httpClientBuilder(httpClientBuilder);
// multipart upload pending with HADOOP-19326.
- if (!parameters.isClientSideEncryptionEnabled()) {
+ if (!parameters.isClientSideEncryptionEnabled() && !parameters.isAnalyticsAcceleratorEnabled()) {
s3AsyncClientBuilder.multipartConfiguration(multipartConfiguration)
.multipartEnabled(parameters.isMultipartCopy());
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index 5ac3dca08e3b6..cd34e17f34663 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -147,9 +147,11 @@
import org.apache.hadoop.fs.s3a.impl.StoreContextFactory;
import org.apache.hadoop.fs.s3a.impl.UploadContentProviders;
import org.apache.hadoop.fs.s3a.impl.CSEUtils;
+import org.apache.hadoop.fs.s3a.impl.streams.InputStreamType;
import org.apache.hadoop.fs.s3a.impl.streams.ObjectReadParameters;
import org.apache.hadoop.fs.s3a.impl.streams.ObjectInputStreamCallbacks;
import org.apache.hadoop.fs.s3a.impl.streams.StreamFactoryRequirements;
+import org.apache.hadoop.fs.s3a.impl.streams.StreamIntegration;
import org.apache.hadoop.fs.s3a.tools.MarkerToolOperations;
import org.apache.hadoop.fs.s3a.tools.MarkerToolOperationsImpl;
import org.apache.hadoop.fs.statistics.DurationTracker;
@@ -440,6 +442,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
*/
private boolean isCSEEnabled;
+ /**
+ * Is this S3A FS instance using analytics accelerator?
+ */
+ private boolean isAnalyticsAccelaratorEnabled;
+
/**
* Bucket AccessPoint.
*/
@@ -629,6 +636,8 @@ public void initialize(URI name, Configuration originalConf)
// If encryption method is set to CSE-KMS or CSE-CUSTOM then CSE is enabled.
isCSEEnabled = CSEUtils.isCSEEnabled(getS3EncryptionAlgorithm().getMethod());
+ isAnalyticsAccelaratorEnabled = StreamIntegration.determineInputStreamType(conf).equals(InputStreamType.Analytics);
+
// Create the appropriate fsHandler instance using a factory method
fsHandler = createFileSystemHandler();
fsHandler.setCSEGauge((IOStatisticsStore) getIOStatistics());
@@ -1156,6 +1165,7 @@ private ClientManager createClientManager(URI fsURI, boolean dtEnabled) throws I
conf.getBoolean(CHECKSUM_VALIDATION, CHECKSUM_VALIDATION_DEFAULT))
.withClientSideEncryptionEnabled(isCSEEnabled)
.withClientSideEncryptionMaterials(cseMaterials)
+ .withAnalyticsAcceleratorEnabled(isAnalyticsAccelaratorEnabled)
.withKMSRegion(conf.get(S3_ENCRYPTION_CSE_KMS_REGION));
// this is where clients and the transfer manager are created on demand.
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
index b84f19fcd8773..1d26eb6275021 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
@@ -28,6 +28,7 @@
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.impl.WeakRefMetricsSource;
+import org.apache.hadoop.fs.s3a.impl.streams.InputStreamType;
import org.apache.hadoop.fs.s3a.statistics.BlockOutputStreamStatistics;
import org.apache.hadoop.fs.s3a.statistics.ChangeTrackerStatistics;
import org.apache.hadoop.fs.s3a.statistics.CommitterStatistics;
@@ -840,6 +841,7 @@ private final class InputStreamStatistics
private final AtomicLong closed;
private final AtomicLong forwardSeekOperations;
private final AtomicLong openOperations;
+ private final AtomicLong analyticsStreamOpenOperations;
private final AtomicLong readExceptions;
private final AtomicLong readsIncomplete;
private final AtomicLong readOperations;
@@ -888,7 +890,8 @@ private InputStreamStatistics(
StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS,
StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED,
StreamStatisticNames.STREAM_READ_VERSION_MISMATCHES,
- StreamStatisticNames.STREAM_EVICT_BLOCKS_FROM_FILE_CACHE)
+ StreamStatisticNames.STREAM_EVICT_BLOCKS_FROM_FILE_CACHE,
+ StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED)
.withGauges(STREAM_READ_GAUGE_INPUT_POLICY,
STREAM_READ_BLOCKS_IN_FILE_CACHE.getSymbol(),
STREAM_READ_ACTIVE_PREFETCH_OPERATIONS.getSymbol(),
@@ -927,6 +930,9 @@ private InputStreamStatistics(
StreamStatisticNames.STREAM_READ_SEEK_FORWARD_OPERATIONS);
openOperations = st.getCounterReference(
StreamStatisticNames.STREAM_READ_OPENED);
+ analyticsStreamOpenOperations = st.getCounterReference(
+ StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED
+ );
readExceptions = st.getCounterReference(
StreamStatisticNames.STREAM_READ_EXCEPTIONS);
readsIncomplete = st.getCounterReference(
@@ -1030,6 +1036,17 @@ public long streamOpened() {
return openOperations.getAndIncrement();
}
+ @Override
+ public long streamOpened(InputStreamType type) {
+ long count = openOperations.getAndIncrement();
+
+ if (type == InputStreamType.Analytics) {
+ count = analyticsStreamOpenOperations.getAndIncrement();
+ }
+
+ return count;
+ }
+
/**
* {@inheritDoc}.
* If the connection was aborted, increment {@link #aborted}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java
index cda09b622eaff..559cd49c34582 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java
@@ -202,6 +202,11 @@ final class S3ClientCreationParameters {
*/
private boolean fipsEnabled;
+ /**
+ * Is analytics accelerator enabled?
+ */
+ private boolean isAnalyticsAcceleratorEnabled;
+
/**
* List of execution interceptors to include in the chain
* of interceptors in the SDK.
@@ -457,6 +462,17 @@ public S3ClientCreationParameters withClientSideEncryptionEnabled(final boolean
return this;
}
+ /**
+ * Set the analytics accelerator enabled flag.
+ *
+ * @param value new value
+ * @return the builder
+ */
+ public S3ClientCreationParameters withAnalyticsAcceleratorEnabled(final boolean value) {
+ this.isAnalyticsAcceleratorEnabled = value;
+ return this;
+ }
+
/**
* Set the KMS client region.
* This is required for CSE-KMS
@@ -477,6 +493,14 @@ public boolean isClientSideEncryptionEnabled() {
return this.isCSEEnabled;
}
+ /**
+ * Get the analytics accelerator enabled flag.
+ * @return analytics accelerator enabled flag.
+ */
+ public boolean isAnalyticsAcceleratorEnabled() {
+ return this.isAnalyticsAcceleratorEnabled;
+ }
+
/**
* Set the client side encryption materials.
*
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
index 07133a48e8e35..ffd3f5e115519 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
@@ -322,6 +322,10 @@ public enum Statistic {
TYPE_COUNTER),
/* Stream Reads */
+ STREAM_READ_ANALYTICS_OPENED(
+ StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED,
+ "Total count of times an analytics input stream to object store data was opened",
+ TYPE_COUNTER),
STREAM_READ_BYTES(
StreamStatisticNames.STREAM_READ_BYTES,
"Bytes read from an input stream in read() calls",
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AbstractObjectInputStreamFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AbstractObjectInputStreamFactory.java
index 09dfa27ff4b66..205a4aaa858e7 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AbstractObjectInputStreamFactory.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AbstractObjectInputStreamFactory.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.fs.s3a.impl.streams;
+import java.io.IOException;
+
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.statistics.StreamStatisticNames;
import org.apache.hadoop.service.AbstractService;
@@ -58,7 +60,7 @@ protected AbstractObjectInputStreamFactory(final String name) {
* @param factoryBindingParameters parameters for the factory binding
*/
@Override
- public void bind(final FactoryBindingParameters factoryBindingParameters) {
+ public void bind(final FactoryBindingParameters factoryBindingParameters) throws IOException {
// must be on be invoked during service initialization
Preconditions.checkState(isInState(STATE.INITED),
"Input Stream factory %s is in wrong state: %s",
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java
new file mode 100644
index 0000000000000..6c18b7477ed8e
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl.streams;
+
+import java.io.EOFException;
+import java.io.IOException;
+
+import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory;
+import software.amazon.s3.analyticsaccelerator.S3SeekableInputStream;
+import software.amazon.s3.analyticsaccelerator.request.ObjectMetadata;
+import software.amazon.s3.analyticsaccelerator.util.InputPolicy;
+import software.amazon.s3.analyticsaccelerator.util.OpenStreamInformation;
+import software.amazon.s3.analyticsaccelerator.util.S3URI;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.fs.StreamCapabilities;
+import org.apache.hadoop.fs.s3a.Retries;
+import org.apache.hadoop.fs.s3a.S3AInputPolicy;
+import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
+
+/**
+ * Analytics stream creates a stream using aws-analytics-accelerator-s3. This stream supports
+ * parquet specific optimisations such as parquet-aware prefetching. For more details, see
+ * https://github.com/awslabs/analytics-accelerator-s3.
+ */
+public class AnalyticsStream extends ObjectInputStream implements StreamCapabilities {
+
+ private S3SeekableInputStream inputStream;
+ private long lastReadCurrentPos = 0;
+ private volatile boolean closed;
+
+ public static final Logger LOG = LoggerFactory.getLogger(AnalyticsStream.class);
+
+ public AnalyticsStream(final ObjectReadParameters parameters, final S3SeekableInputStreamFactory s3SeekableInputStreamFactory) throws IOException {
+ super(InputStreamType.Analytics, parameters);
+ S3ObjectAttributes s3Attributes = parameters.getObjectAttributes();
+ this.inputStream = s3SeekableInputStreamFactory.createStream(S3URI.of(s3Attributes.getBucket(), s3Attributes.getKey()), buildOpenStreamInformation(parameters));
+ getS3AStreamStatistics().streamOpened(InputStreamType.Analytics);
+ }
+
+ @Override
+ public int read() throws IOException {
+ throwIfClosed();
+ int bytesRead;
+ try {
+ bytesRead = inputStream.read();
+ } catch (IOException ioe) {
+ onReadFailure(ioe);
+ throw ioe;
+ }
+ return bytesRead;
+ }
+
+ @Override
+ public void seek(long pos) throws IOException {
+ throwIfClosed();
+ if (pos < 0) {
+ throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK
+ + " " + pos);
+ }
+ inputStream.seek(pos);
+ }
+
+
+ @Override
+ public synchronized long getPos() {
+ if (!closed) {
+ lastReadCurrentPos = inputStream.getPos();
+ }
+ return lastReadCurrentPos;
+ }
+
+
+ /**
+ * Reads the last n bytes from the stream into a byte buffer. Blocks until end of stream is
+ * reached. Leaves the position of the stream unaltered.
+ *
+ * @param buf buffer to read data into
+ * @param off start position in buffer at which data is written
+ * @param len the number of bytes to read; the n-th byte should be the last byte of the stream.
+ * @return the total number of bytes read into the buffer
+ * @throws IOException if an I/O error occurs
+ */
+ public int readTail(byte[] buf, int off, int len) throws IOException {
+ throwIfClosed();
+ int bytesRead;
+ try {
+ bytesRead = inputStream.readTail(buf, off, len);
+ } catch (IOException ioe) {
+ onReadFailure(ioe);
+ throw ioe;
+ }
+ return bytesRead;
+ }
+
+ @Override
+ public int read(byte[] buf, int off, int len) throws IOException {
+ throwIfClosed();
+ int bytesRead;
+ try {
+ bytesRead = inputStream.read(buf, off, len);
+ } catch (IOException ioe) {
+ onReadFailure(ioe);
+ throw ioe;
+ }
+ return bytesRead;
+ }
+
+
+ @Override
+ public boolean seekToNewSource(long l) throws IOException {
+ return false;
+ }
+
+ @Override
+ public int available() throws IOException {
+ throwIfClosed();
+ return super.available();
+ }
+
+ @Override
+ protected boolean isStreamOpen() {
+ return !isClosed();
+ }
+
+ protected boolean isClosed() {
+ return inputStream == null;
+ }
+
+ @Override
+ protected void abortInFinalizer() {
+ try {
+ close();
+ } catch (IOException ignored) {
+
+ }
+ }
+
+ @Override
+ public synchronized void close() throws IOException {
+ if(!closed) {
+ closed = true;
+ try {
+ inputStream.close();
+ inputStream = null;
+ super.close();
+ } catch (IOException ioe) {
+ LOG.debug("Failure closing stream {}: ", getKey());
+ throw ioe;
+ }
+ }
+ }
+
+ /**
+ * Close the stream on read failure.
+ * No attempt to recover from failure
+ *
+ * @param ioe exception caught.
+ */
+ @Retries.OnceTranslated
+ private void onReadFailure(IOException ioe) throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Got exception while trying to read from stream {}, " +
+ "not trying to recover:",
+ getKey(), ioe);
+ } else {
+ LOG.info("Got exception while trying to read from stream {}, " +
+ "not trying to recover:",
+ getKey(), ioe);
+ }
+ this.close();
+ }
+
+ private OpenStreamInformation buildOpenStreamInformation(ObjectReadParameters parameters) {
+ OpenStreamInformation.OpenStreamInformationBuilder openStreamInformationBuilder =
+ OpenStreamInformation.builder()
+ .inputPolicy(mapS3AInputPolicyToAAL(parameters.getContext()
+ .getInputPolicy()));
+
+ if (parameters.getObjectAttributes().getETag() != null) {
+ openStreamInformationBuilder.objectMetadata(ObjectMetadata.builder()
+ .contentLength(parameters.getObjectAttributes().getLen())
+ .etag(parameters.getObjectAttributes().getETag()).build());
+ }
+
+ return openStreamInformationBuilder.build();
+ }
+
+ /**
+ * If S3A's input policy is Sequential, that is, if the file format to be read is sequential
+ * (CSV, JSON), or the file policy passed down is WHOLE_FILE, then AAL's parquet specific
+ * optimisations will be turned off, regardless of the file extension. This is to allow for
+ * applications like DISTCP that read parquet files, but will read them whole, and so do not
+ * follow the typical parquet read patterns of reading footer first etc. and will not benefit
+ * from parquet optimisations.
+ * Else, AAL will make a decision on which optimisations based on the file extension,
+ * if the file ends in .par or .parquet, then parquet specific optimisations are used.
+ *
+ * @param inputPolicy S3A's input file policy passed down when opening the file
+ * @return the AAL read policy
+ */
+ private InputPolicy mapS3AInputPolicyToAAL(S3AInputPolicy inputPolicy) {
+ switch (inputPolicy) {
+ case Sequential:
+ return InputPolicy.Sequential;
+ default:
+ return InputPolicy.None;
+ }
+ }
+
+ protected void throwIfClosed() throws IOException {
+ if (closed) {
+ throw new IOException(getKey() + ": " + FSExceptionMessages.STREAM_IS_CLOSED);
+ }
+ }
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStreamFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStreamFactory.java
new file mode 100644
index 0000000000000..a35d59c956a6a
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStreamFactory.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl.streams;
+
+import java.io.IOException;
+
+import software.amazon.s3.analyticsaccelerator.S3SdkObjectClient;
+import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration;
+import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory;
+import software.amazon.s3.analyticsaccelerator.common.ConnectorConfiguration;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.s3a.VectoredIOContext;
+import org.apache.hadoop.util.functional.CallableRaisingIOE;
+import org.apache.hadoop.util.functional.LazyAutoCloseableReference;
+
+import static org.apache.hadoop.fs.s3a.Constants.ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX;
+import static org.apache.hadoop.fs.s3a.impl.streams.StreamIntegration.populateVectoredIOContext;
+
+/**
+ * A factory for {@link AnalyticsStream}. This class is instantiated during initialization of
+ * {@code S3AStore}, if fs.s3a.input.stream.type is set to Analytics.
+ */
+public class AnalyticsStreamFactory extends AbstractObjectInputStreamFactory {
+
+ private S3SeekableInputStreamConfiguration seekableInputStreamConfiguration;
+ private LazyAutoCloseableReference s3SeekableInputStreamFactory;
+ private boolean requireCrt;
+
+ public AnalyticsStreamFactory() {
+ super("AnalyticsStreamFactory");
+ }
+
+ @Override
+ protected void serviceInit(final Configuration conf) throws Exception {
+ super.serviceInit(conf);
+ ConnectorConfiguration configuration = new ConnectorConfiguration(conf,
+ ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX);
+ this.seekableInputStreamConfiguration =
+ S3SeekableInputStreamConfiguration.fromConfiguration(configuration);
+ this.requireCrt = false;
+ }
+
+ @Override
+ public void bind(final FactoryBindingParameters factoryBindingParameters) throws IOException {
+ super.bind(factoryBindingParameters);
+ this.s3SeekableInputStreamFactory =
+ new LazyAutoCloseableReference<>(createS3SeekableInputStreamFactory());
+
+ }
+
+ @Override
+ public ObjectInputStream readObject(final ObjectReadParameters parameters) throws IOException {
+ return new AnalyticsStream(
+ parameters,
+ getOrCreateS3SeekableInputStreamFactory());
+ }
+
+ @Override
+ public InputStreamType streamType() {
+ return InputStreamType.Analytics;
+ }
+
+ /**
+ * Calculate Return StreamFactoryRequirements
+ * @return a positive thread count.
+ */
+ @Override
+ public StreamFactoryRequirements factoryRequirements() {
+ // fill in the vector context
+ final VectoredIOContext vectorContext = populateVectoredIOContext(getConfig());
+ // and then disable range merging.
+ // this ensures that no reads are made for data which is then discarded...
+ // so the prefetch and block read code doesn't ever do wasteful fetches.
+ vectorContext.setMinSeekForVectoredReads(0);
+
+ return new StreamFactoryRequirements(0,
+ 0, vectorContext,
+ StreamFactoryRequirements.Requirements.ExpectUnauditedGetRequests);
+ }
+
+ private S3SeekableInputStreamFactory getOrCreateS3SeekableInputStreamFactory()
+ throws IOException {
+ return s3SeekableInputStreamFactory.eval();
+ }
+
+ private CallableRaisingIOE createS3SeekableInputStreamFactory() {
+ return () -> new S3SeekableInputStreamFactory(
+ new S3SdkObjectClient(callbacks().getOrCreateAsyncClient(requireCrt)),
+ seekableInputStreamConfiguration);
+ }
+
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/InputStreamType.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/InputStreamType.java
index 1775fa5f05c25..a9c33a60fc61c 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/InputStreamType.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/InputStreamType.java
@@ -45,13 +45,11 @@ public enum InputStreamType {
*/
Prefetch(StreamIntegration.PREFETCH, 2, c ->
new PrefetchingInputStreamFactory()),
-
/**
* The analytics input stream.
*/
- Analytics(StreamIntegration.ANALYTICS, 3, c -> {
- throw new IllegalArgumentException("not yet supported");
- }),
+ Analytics(StreamIntegration.ANALYTICS, 3, c ->
+ new AnalyticsStreamFactory()),
/**
* The a custom input stream.
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/StreamIntegration.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/StreamIntegration.java
index 63f7053e0e41d..bb35a0580a20e 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/StreamIntegration.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/StreamIntegration.java
@@ -113,7 +113,6 @@ private StreamIntegration() {
* @throws RuntimeException any binding/loading/instantiation problem
*/
public static ObjectInputStreamFactory factoryFromConfig(final Configuration conf) {
-
// Construct the factory.
return determineInputStreamType(conf)
.factory()
@@ -135,7 +134,7 @@ public static ObjectInputStreamFactory factoryFromConfig(final Configuration con
* @param conf configuration
* @return a stream factory.
*/
- static InputStreamType determineInputStreamType(final Configuration conf) {
+ public static InputStreamType determineInputStreamType(final Configuration conf) {
// work out the default stream; this includes looking at the
// deprecated prefetch enabled key to see if it is set.
if (conf.getBoolean(PREFETCH_ENABLED_KEY, false)) {
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/S3AInputStreamStatistics.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/S3AInputStreamStatistics.java
index 1e28a27e7088c..7ad7cf75367e2 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/S3AInputStreamStatistics.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/S3AInputStreamStatistics.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.fs.s3a.statistics;
import org.apache.hadoop.fs.impl.prefetch.PrefetchingStatistics;
+import org.apache.hadoop.fs.s3a.impl.streams.InputStreamType;
import org.apache.hadoop.fs.statistics.DurationTracker;
/**
@@ -53,6 +54,13 @@ public interface S3AInputStreamStatistics extends AutoCloseable,
*/
long streamOpened();
+ /**
+ * A stream of the given type was opened.
+ * @param type type of input stream
+ * @return the previous count or zero if this is the first opening.
+ */
+ long streamOpened(InputStreamType type);
+
/**
* The inner stream was closed.
* @param abortedConnection flag to indicate the stream was aborted,
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java
index e47656efd1800..a5d20095ba5cc 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java
@@ -22,6 +22,7 @@
import java.time.Duration;
import org.apache.hadoop.fs.s3a.Statistic;
+import org.apache.hadoop.fs.s3a.impl.streams.InputStreamType;
import org.apache.hadoop.fs.s3a.statistics.BlockOutputStreamStatistics;
import org.apache.hadoop.fs.s3a.statistics.ChangeTrackerStatistics;
import org.apache.hadoop.fs.s3a.statistics.CommitterStatistics;
@@ -164,6 +165,11 @@ public long streamOpened() {
return 0;
}
+ @Override
+ public long streamOpened(InputStreamType type) {
+ return 0;
+ }
+
@Override
public void streamClose(final boolean abortedConnection,
final long remainingInCurrentRequest) {
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractAnalyticsStreamVectoredRead.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractAnalyticsStreamVectoredRead.java
new file mode 100644
index 0000000000000..f532cc77ea209
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractAnalyticsStreamVectoredRead.java
@@ -0,0 +1,37 @@
+package org.apache.hadoop.fs.contract.s3a;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractContractVectoredReadTest;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.enableAnalyticsAccelerator;
+
+/**
+ * S3A contract tests for vectored reads with the Analytics stream. The analytics stream does
+ * not explicitly implement the vectoredRead() method, or currently do and vectored-read specific
+ * optimisations (such as range coalescing). However, this test ensures that the base implementation
+ * of readVectored {@link org.apache.hadoop.fs.PositionedReadable} still works.
+ */
+public class ITestS3AContractAnalyticsStreamVectoredRead extends AbstractContractVectoredReadTest {
+
+ public ITestS3AContractAnalyticsStreamVectoredRead(String bufferType) {
+ super(bufferType);
+ }
+
+ /**
+ * Create a configuration.
+ * @return a configuration
+ */
+ @Override
+ protected Configuration createConfiguration() {
+ Configuration conf = super.createConfiguration();
+ enableAnalyticsAccelerator(conf);
+ conf.set("fs.contract.vector-io-early-eof-check", "false");
+ return conf;
+ }
+
+ @Override
+ protected AbstractFSContract createContract(Configuration conf) {
+ return new S3AContract(conf);
+ }
+}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractCreate.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractCreate.java
index 033c2d94c7bf8..f346064d4d291 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractCreate.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractCreate.java
@@ -33,6 +33,7 @@
import static org.apache.hadoop.fs.s3a.Constants.CONNECTION_EXPECT_CONTINUE;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.setPerformanceFlags;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfNotEnabled;
/**
@@ -93,6 +94,17 @@ protected Configuration createConfiguration() {
return conf;
}
+ @Override
+ public void testOverwriteExistingFile() throws Throwable {
+ // Currently analytics accelerator does not support reading of files that have been overwritten.
+ // This is because the analytics accelerator library caches metadata, and when a file is
+ // overwritten, the old metadata continues to be used, until it is removed from the cache over
+ // time. This will be fixed in https://github.com/awslabs/analytics-accelerator-s3/issues/218.
+ skipIfAnalyticsAcceleratorEnabled(getContract().getConf(),
+ "Analytics Accelerator currently does not support reading of over written files");
+ super.testOverwriteExistingFile();
+ }
+
@Override
public void testOverwriteNonEmptyDirectory() throws Throwable {
try {
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractDistCp.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractDistCp.java
index e761e0d14bf83..e014945a89c71 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractDistCp.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractDistCp.java
@@ -20,6 +20,7 @@
import static org.apache.hadoop.fs.s3a.Constants.*;
import static org.apache.hadoop.fs.s3a.S3ATestConstants.SCALE_TEST_TIMEOUT_MILLIS;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageStatistics;
@@ -78,6 +79,19 @@ public void testNonDirectWrite() throws Exception {
getRenameOperationCount() - renames);
}
+ @Override
+ public void testDistCpUpdateCheckFileSkip() throws Exception {
+ // Currently analytics accelerator does not support reading of files that have been overwritten.
+ // This is because the analytics accelerator library caches metadata and data, and when a file is
+ // overwritten, the old data continues to be used, until it is removed from the cache over
+ // time. This will be fixed in https://github.com/awslabs/analytics-accelerator-s3/issues/218.
+ // In this test case, the remote file is created, read, then deleted, and then created again
+ // with different contents, and read again, which leads to assertions failing.
+ skipIfAnalyticsAcceleratorEnabled(getContract().getConf(),
+ "Analytics Accelerator Library does not support update to existing files");
+ super.testDistCpUpdateCheckFileSkip();
+ }
+
private long getRenameOperationCount() {
return getFileSystem().getStorageStatistics()
.getLong(StorageStatistics.CommonStatisticNames.OP_RENAME);
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractMultipartUploader.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractMultipartUploader.java
index 5a92066e06db4..54aca14798b97 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractMultipartUploader.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractMultipartUploader.java
@@ -32,6 +32,7 @@
import static org.apache.hadoop.fs.s3a.S3ATestUtils.assumeNotS3ExpressFileSystem;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestPropertyBool;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestPropertyBytes;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
import static org.apache.hadoop.fs.s3a.scale.AbstractSTestS3AHugeFiles.DEFAULT_HUGE_PARTITION_SIZE;
/**
@@ -127,6 +128,12 @@ public void testMultipartUploadReverseOrderNonContiguousPartNumbers() throws Exc
@Override
public void testConcurrentUploads() throws Throwable {
assumeNotS3ExpressFileSystem(getFileSystem());
+ // Currently analytics accelerator does not support reading of files that have been overwritten.
+ // This is because the analytics accelerator library caches metadata and data, and when a file is
+ // overwritten, the old data continues to be used, until it is removed from the cache over
+ // time. This will be fixed in https://github.com/awslabs/analytics-accelerator-s3/issues/218.
+ skipIfAnalyticsAcceleratorEnabled(getContract().getConf(),
+ "Analytics Accelerator currently does not support reading of over written files");
super.testConcurrentUploads();
}
}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java
index fbb6d5a04d27a..bf489fc44a5ff 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java
@@ -63,6 +63,7 @@
import static org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult;
import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE;
import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_VECTOR_READS_MIN_SEEK_SIZE;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString;
import static org.apache.hadoop.io.Sizes.S_1M;
@@ -88,6 +89,17 @@ protected AbstractFSContract createContract(Configuration conf) {
return new S3AContract(conf);
}
+ /**
+ * Analytics Accelerator Library for Amazon S3 does not support Vectored Reads.
+ * @throws Exception
+ */
+ @Override
+ public void setup() throws Exception {
+ super.setup();
+ skipIfAnalyticsAcceleratorEnabled(getContract().getConf(),
+ "Analytics Accelerator does not support vectored reads");
+ }
+
/**
* Verify response to a vector read request which is beyond the
* real length of the file.
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3AMockTest.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3AMockTest.java
index e76b304604836..940e23026af46 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3AMockTest.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3AMockTest.java
@@ -54,10 +54,11 @@ public abstract class AbstractS3AMockTest {
protected S3AFileSystem fs;
protected S3Client s3;
+ protected Configuration conf;
@Before
public void setup() throws Exception {
- Configuration conf = createConfiguration();
+ conf = createConfiguration();
fs = new S3AFileSystem();
URI uri = URI.create(FS_S3A + "://" + BUCKET);
// unset S3CSE property from config to avoid pathIOE.
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java
new file mode 100644
index 0000000000000..2b8d83cc69b6a
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.assertj.core.api.Assertions;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.impl.streams.InputStreamType;
+import org.apache.hadoop.fs.s3a.impl.streams.ObjectInputStream;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+
+import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration;
+import software.amazon.s3.analyticsaccelerator.common.ConnectorConfiguration;
+import software.amazon.s3.analyticsaccelerator.util.PrefetchMode;
+
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_PARQUET;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE;
+import static org.apache.hadoop.fs.s3a.Constants.ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.enableAnalyticsAccelerator;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+import static org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.getExternalData;
+import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
+import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Tests integration of the
+ * analytics accelerator library
+ *
+ * Certain tests in this class rely on reading local parquet files stored in resources.
+ * These files are copied from local to S3 and then read via the analytics stream.
+ * This is done to ensure AAL can read the parquet format, and handles exceptions from malformed
+ * parquet files.
+ *
+ */
+public class ITestS3AAnalyticsAcceleratorStreamReading extends AbstractS3ATestBase {
+
+ private static final String PHYSICAL_IO_PREFIX = "physicalio";
+ private static final String LOGICAL_IO_PREFIX = "logicalio";
+
+
+ private Path externalTestFile;
+
+ @Before
+ public void setUp() throws Exception {
+ super.setup();
+ externalTestFile = getExternalData(getConfiguration());
+ }
+
+ @Override
+ public Configuration createConfiguration() {
+ Configuration configuration = super.createConfiguration();
+ enableAnalyticsAccelerator(configuration);
+ return configuration;
+ }
+
+ @Test
+ public void testConnectorFrameWorkIntegration() throws Throwable {
+ describe("Verify S3 connector framework integration");
+
+ S3AFileSystem fs =
+ (S3AFileSystem) FileSystem.get(externalTestFile.toUri(), getConfiguration());
+ byte[] buffer = new byte[500];
+ IOStatistics ioStats;
+
+ try (FSDataInputStream inputStream =
+ fs.openFile(externalTestFile)
+ .must(FS_OPTION_OPENFILE_READ_POLICY, FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE)
+ .build().get()) {
+ ioStats = inputStream.getIOStatistics();
+ inputStream.seek(5);
+ inputStream.read(buffer, 0, 500);
+
+ final InputStream wrappedStream = inputStream.getWrappedStream();
+ ObjectInputStream objectInputStream = (ObjectInputStream) wrappedStream;
+
+ Assertions.assertThat(objectInputStream.streamType()).isEqualTo(InputStreamType.Analytics);
+ Assertions.assertThat(objectInputStream.getInputPolicy()).isEqualTo(S3AInputPolicy.Sequential);
+ }
+
+ verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);
+ }
+
+ @Test
+ public void testMalformedParquetFooter() throws IOException {
+ describe("Reading a malformed parquet file should not throw an exception");
+
+ // File with malformed footer take from https://github.com/apache/parquet-testing/blob/master/bad_data/PARQUET-1481.parquet.
+ // This test ensures AAL does not throw exceptions if footer parsing fails. It will only emit a WARN log,
+ // "Unable to parse parquet footer for test/malformedFooter.parquet, parquet prefetch optimisations will be disabled for this key."
+ Path dest = path("malformed_footer.parquet");
+
+ File file = new File("src/test/resources/malformed_footer.parquet");
+
+ Path sourcePath = new Path(file.toURI().getPath());
+ getFileSystem().copyFromLocalFile(false, true, sourcePath, dest);
+
+ byte[] buffer = new byte[500];
+ IOStatistics ioStats;
+
+ try (FSDataInputStream inputStream = getFileSystem().open(dest)) {
+ ioStats = inputStream.getIOStatistics();
+ inputStream.seek(5);
+ inputStream.read(buffer, 0, 500);
+ }
+
+ verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);
+ }
+
+ /**
+ * This test reads a multi-row group parquet file. Each parquet consists of at least one
+ * row group, which contains the column data for a subset of rows. A single parquet file
+ * can contain multiple row groups, this allows for further parallelisation, as each row group
+ * can be processed independently.
+ */
+ @Test
+ public void testMultiRowGroupParquet() throws Throwable {
+ describe("A parquet file is read successfully");
+
+ Path dest = path("multi_row_group.parquet");
+
+ File file = new File("src/test/resources/multi_row_group.parquet");
+ Path sourcePath = new Path(file.toURI().getPath());
+ getFileSystem().copyFromLocalFile(false, true, sourcePath, dest);
+
+ FileStatus fileStatus = getFileSystem().getFileStatus(dest);
+
+ byte[] buffer = new byte[3000];
+ IOStatistics ioStats;
+
+ try (FSDataInputStream inputStream = getFileSystem().open(dest)) {
+ ioStats = inputStream.getIOStatistics();
+ inputStream.readFully(buffer, 0, (int) fileStatus.getLen());
+ }
+
+ verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);
+
+ try (FSDataInputStream inputStream = getFileSystem().openFile(dest)
+ .must(FS_OPTION_OPENFILE_READ_POLICY,FS_OPTION_OPENFILE_READ_POLICY_PARQUET)
+ .build().get()) {
+ ioStats = inputStream.getIOStatistics();
+ inputStream.readFully(buffer, 0, (int) fileStatus.getLen());
+ }
+
+ verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);
+ }
+
+ @Test
+ public void testConnectorFrameworkConfigurable() {
+ describe("Verify S3 connector framework reads configuration");
+
+ Configuration conf = new Configuration(getConfiguration());
+
+ //Disable Predictive Prefetching
+ conf.set(ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX +
+ "." + LOGICAL_IO_PREFIX + ".prefetching.mode", "all");
+
+ //Set Blobstore Capacity
+ conf.setInt(ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX +
+ "." + PHYSICAL_IO_PREFIX + ".blobstore.capacity", 1);
+
+ ConnectorConfiguration connectorConfiguration =
+ new ConnectorConfiguration(conf, ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX);
+
+ S3SeekableInputStreamConfiguration configuration =
+ S3SeekableInputStreamConfiguration.fromConfiguration(connectorConfiguration);
+
+ Assertions.assertThat(configuration.getLogicalIOConfiguration().getPrefetchingMode())
+ .as("AnalyticsStream configuration is not set to expected value")
+ .isSameAs(PrefetchMode.ALL);
+
+ Assertions.assertThat(configuration.getPhysicalIOConfiguration().getBlobStoreCapacity())
+ .as("AnalyticsStream configuration is not set to expected value")
+ .isEqualTo(1);
+ }
+
+ @Test
+ public void testInvalidConfigurationThrows() throws Exception {
+ describe("Verify S3 connector framework throws with invalid configuration");
+
+ Configuration conf = new Configuration(getConfiguration()) ;
+ removeBaseAndBucketOverrides(conf);
+ //Disable Sequential Prefetching
+ conf.setInt(ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX +
+ "." + PHYSICAL_IO_PREFIX + ".blobstore.capacity", -1);
+
+ ConnectorConfiguration connectorConfiguration =
+ new ConnectorConfiguration(conf, ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX);
+
+ intercept(IllegalArgumentException.class,
+ () -> S3SeekableInputStreamConfiguration.fromConfiguration(connectorConfiguration));
+ }
+
+}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEC.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEC.java
index d22de3b06d81b..8671d962175f7 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEC.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEC.java
@@ -38,9 +38,11 @@
import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_KEY;
import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM;
import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_KEY;
+
import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.maybeSkipRootTests;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
/**
@@ -93,6 +95,8 @@ protected Configuration createConfiguration() {
@Override
public void setup() throws Exception {
super.setup();
+ skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
+ "Analytics Accelerator currently does not support SSE-C");
assumeEnabled();
// although not a root dir test, this confuses paths enough it shouldn't be run in
// parallel with other jobs
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFSMainOperations.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFSMainOperations.java
index 0281c57f5cbce..3c405cb7c5101 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFSMainOperations.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFSMainOperations.java
@@ -32,6 +32,7 @@
import static org.apache.hadoop.fs.s3a.S3ATestUtils.createTestPath;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.isCreatePerformanceEnabled;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.setPerformanceFlags;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
/**
* S3A Test suite for the FSMainOperationsBaseTest tests.
@@ -78,6 +79,28 @@ public void testCopyToLocalWithUseRawLocalFileSystemOption()
throws Exception {
}
+ @Override
+ public void testWriteReadAndDeleteOneAndAHalfBlocks() throws Exception {
+ // Currently analytics accelerator does not support reading of files that have been overwritten.
+ // This is because the analytics accelerator library caches metadata, and when a file is
+ // overwritten, the old metadata continues to be used, until it is removed from the cache over
+ // time. This will be fixed in https://github.com/awslabs/analytics-accelerator-s3/issues/218.
+ skipIfAnalyticsAcceleratorEnabled(this.contract.getConf(),
+ "Analytics Accelerator currently does not support reading of over written files");
+ super.testWriteReadAndDeleteOneAndAHalfBlocks();
+ }
+
+ @Override
+ public void testWriteReadAndDeleteTwoBlocks() throws Exception {
+ // Currently analytics accelerator does not support reading of files that have been overwritten.
+ // This is because the analytics accelerator library caches metadata, and when a file is
+ // overwritten, the old metadata continues to be used, until it is removed from the cache over
+ // time. This will be fixed in https://github.com/awslabs/analytics-accelerator-s3/issues/218.
+ skipIfAnalyticsAcceleratorEnabled(this.contract.getConf(),
+ "Analytics Accelerator currently does not support reading of over written files");
+ super.testWriteReadAndDeleteTwoBlocks();
+ }
+
@Override
public void testOverwrite() throws IOException {
boolean createPerformance = isCreatePerformanceEnabled(fSys);
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileSystemContract.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileSystemContract.java
index 4808145765822..02d56795890d7 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileSystemContract.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileSystemContract.java
@@ -34,8 +34,10 @@
import org.apache.hadoop.fs.Path;
import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
+
import static org.apache.hadoop.fs.s3a.S3ATestUtils.isCreatePerformanceEnabled;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.setPerformanceFlags;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.junit.Assume.*;
import static org.junit.Assert.*;
@@ -160,4 +162,15 @@ public void testOverwrite() throws IOException {
}
}
}
+
+ @Override
+ public void testOverWriteAndRead() throws Exception {
+ // Currently analytics accelerator does not support reading of files that have been overwritten.
+ // This is because the analytics accelerator library caches metadata, and when a file is
+ // overwritten, the old metadata continues to be used, until it is removed from the cache over
+ // time. This will be fixed in https://github.com/awslabs/analytics-accelerator-s3/issues/218.
+ skipIfAnalyticsAcceleratorEnabled(fs.getConf(),
+ "Analytics Accelerator currently does not support reading of over written files");
+ super.testOverWriteAndRead();
+ }
}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AIOStatisticsContext.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AIOStatisticsContext.java
index 70dc5ee476c47..daf5306dc399e 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AIOStatisticsContext.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AIOStatisticsContext.java
@@ -43,6 +43,7 @@
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.disablePrefetching;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BYTES;
@@ -77,6 +78,11 @@ protected Configuration createConfiguration() {
public void setup() throws Exception {
super.setup();
executor = HadoopExecutors.newFixedThreadPool(SMALL_THREADS);
+ // Analytics accelerator currently does not support IOStatisticsContext, this will be added as
+ // part of https://issues.apache.org/jira/browse/HADOOP-19364
+ skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
+ "Analytics Accelerator currently does not support IOStatisticsContext");
+
}
@Override
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AInputStreamLeakage.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AInputStreamLeakage.java
index 72c75162c9fda..f26a585776a21 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AInputStreamLeakage.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AInputStreamLeakage.java
@@ -36,6 +36,7 @@
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.assume;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_LEAKS;
import static org.apache.hadoop.test.GenericTestUtils.LogCapturer.captureLogs;
@@ -87,6 +88,11 @@ public void setup() throws Exception {
@Test
public void testFinalizer() throws Throwable {
Path path = methodPath();
+ // Analytics accelerator currently does not support stream leak detection. This work is tracked
+ // in https://issues.apache.org/jira/browse/HADOOP-19451
+ skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
+ "Analytics Accelerator currently does not support leak detection");
+
final S3AFileSystem fs = getFileSystem();
ContractTestUtils.createFile(fs, path, true, DATASET);
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMetrics.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMetrics.java
index 3bfe69c2bca91..4ec579ce4f649 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMetrics.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMetrics.java
@@ -28,6 +28,7 @@
import java.io.IOException;
import java.io.InputStream;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsSourceToString;
/**
@@ -51,6 +52,11 @@ public void testMetricsRegister()
@Test
public void testStreamStatistics() throws IOException {
+ // Analytics accelerator currently does not support IOStatistics, this will be added as
+ // part of https://issues.apache.org/jira/browse/HADOOP-19364
+ skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
+ "Analytics Accelerator currently does not support stream statistics");
+
S3AFileSystem fs = getFileSystem();
Path file = path("testStreamStatistics");
byte[] data = "abcdefghijklmnopqrstuvwxyz".getBytes();
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARequesterPays.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARequesterPays.java
index 049a698d20889..eadc398e61ab1 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARequesterPays.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARequesterPays.java
@@ -59,6 +59,7 @@ protected Configuration createConfiguration() {
public void testRequesterPaysOptionSuccess() throws Throwable {
describe("Test requester pays enabled case by reading last then first byte");
skipIfClientSideEncryption();
+
Configuration conf = this.createConfiguration();
conf.setBoolean(ALLOW_REQUESTER_PAYS, true);
// Enable bucket exists check, the first failure point people may encounter
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
index 14ab2b953d1a8..f1823330e7adf 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
@@ -105,6 +105,8 @@
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
import static org.apache.hadoop.fs.impl.FlagSet.createFlagSet;
+import static org.apache.hadoop.fs.s3a.impl.streams.InputStreamType.Analytics;
+import static org.apache.hadoop.fs.s3a.impl.streams.StreamIntegration.DEFAULT_STREAM_TYPE;
import static org.apache.hadoop.fs.s3a.impl.streams.InputStreamType.Prefetch;
import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.submit;
import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.waitForCompletion;
@@ -577,6 +579,21 @@ public static boolean isS3ExpressTestBucket(final Configuration conf) {
return S3ExpressStorage.isS3ExpressStore(getTestBucketName(conf), "");
}
+ /**
+ * Skip a test if the Analytics Accelerator Library for Amazon S3 is enabled.
+ * @param configuration configuration to probe
+ */
+ public static void skipIfAnalyticsAcceleratorEnabled(
+ Configuration configuration, String message) {
+ assume(message,
+ !isAnalyticsAcceleratorEnabled(configuration));
+ }
+
+ public static boolean isAnalyticsAcceleratorEnabled(final Configuration conf) {
+ return conf.get(INPUT_STREAM_TYPE,
+ INPUT_STREAM_TYPE_CLASSIC).equals(INPUT_STREAM_TYPE_ANALYTICS);
+ }
+
/**
* Skip a test if the filesystem lacks a required capability.
* @param fs filesystem
@@ -1804,6 +1821,18 @@ public static Configuration enablePrefetching(Configuration conf) {
return conf;
}
+ /**
+ * Enable analytics stream for S3A S3AFileSystem in tests.
+ * @param conf Configuration to update
+ * @return patched config
+ */
+ public static Configuration enableAnalyticsAccelerator(Configuration conf) {
+ removeBaseAndBucketOverrides(conf,
+ INPUT_STREAM_TYPE);
+ conf.setEnum(INPUT_STREAM_TYPE, Analytics);
+ return conf;
+ }
+
/**
* Probe for a filesystem having a specific stream type;
* this is done through filesystem capabilities.
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AUnbuffer.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AUnbuffer.java
index 643db02087b46..eb7d5af9300b3 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AUnbuffer.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AUnbuffer.java
@@ -44,6 +44,8 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
+
/**
* Uses mocks to check that the {@link ResponseInputStream} is
* closed when {@link org.apache.hadoop.fs.CanUnbuffer#unbuffer} is called.
@@ -55,6 +57,8 @@ public class TestS3AUnbuffer extends AbstractS3AMockTest {
@Test
public void testUnbuffer() throws IOException {
// Create mock ObjectMetadata for getFileStatus()
+ skipIfAnalyticsAcceleratorEnabled(conf,
+ "Analytics accelerator does not support unbuffer");
Path path = new Path("/file");
HeadObjectResponse objectMetadata = HeadObjectResponse.builder()
.contentLength(1L)
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperationCost.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperationCost.java
index 8132b44cdb438..02f0251e8f055 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperationCost.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperationCost.java
@@ -40,6 +40,7 @@
import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest;
import org.apache.hadoop.fs.statistics.IOStatisticsLogging;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
import static org.apache.hadoop.fs.s3a.Statistic.ACTION_HTTP_GET_REQUEST;
import static org.apache.hadoop.fs.s3a.Statistic.COMMITTER_MAGIC_FILES_CREATED;
import static org.apache.hadoop.fs.s3a.Statistic.COMMITTER_MAGIC_MARKER_PUT;
@@ -167,7 +168,12 @@ private void abortActiveStream() throws IOException {
@Test
public void testCostOfCreatingMagicFile() throws Throwable {
- describe("Files created under magic paths skip existence checks");
+ describe("Files created under magic paths skip existence checks and marker deletes");
+
+ // Analytics accelerator currently does not support IOStatistics, this will be added as
+ // part of https://issues.apache.org/jira/browse/HADOOP-19364
+ skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
+ "Analytics Accelerator currently does not support stream statistics");
S3AFileSystem fs = getFileSystem();
Path destFile = methodSubPath("file.txt");
fs.delete(destFile.getParent(), true);
@@ -245,6 +251,10 @@ public void testCostOfCreatingMagicFile() throws Throwable {
public void testCostOfSavingLoadingPendingFile() throws Throwable {
describe("Verify costs of saving .pending file under a magic path");
+ // Analytics accelerator currently does not support IOStatistics, this will be added as
+ // part of https://issues.apache.org/jira/browse/HADOOP-19364
+ skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
+ "Analytics Accelerator currently does not support stream statistics");
S3AFileSystem fs = getFileSystem();
Path partDir = methodSubPath("file.pending");
Path destFile = new Path(partDir, "file.pending");
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java
index 1724006a83198..5b489c1c39c7a 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java
@@ -30,6 +30,8 @@
import org.junit.Assert;
import org.junit.Before;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
+
/**
* S3a implementation of FCStatisticsBaseTest.
*/
@@ -44,6 +46,10 @@ public class ITestS3AFileContextStatistics extends FCStatisticsBaseTest {
@Before
public void setUp() throws Exception {
conf = new Configuration();
+ // Analytics accelerator currently does not support IOStatistics, this will be added as
+ // part of https://issues.apache.org/jira/browse/HADOOP-19364
+ skipIfAnalyticsAcceleratorEnabled(conf,
+ "Analytics Accelerator currently does not support stream statistics");
fc = S3ATestUtils.createTestFileContext(conf);
testRootPath = fileContextTestHelper.getTestRootPath(fc, "test");
fc.mkdir(testRootPath,
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java
index e94bb156cd2ef..febc6bb82c410 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java
@@ -57,6 +57,7 @@
import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.getS3AInputStream;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.streamType;
import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_BYTES_READ_CLOSE;
import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_OPENED;
@@ -110,6 +111,10 @@ public Configuration createConfiguration() {
@Override
public void setup() throws Exception {
super.setup();
+ // Analytics accelerator currently does not support IOStatistics, this will be added as
+ // part of https://issues.apache.org/jira/browse/HADOOP-19364
+ skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
+ "Analytics Accelerator currently does not support stream statistics");
S3AFileSystem fs = getFileSystem();
testFile = methodPath();
@@ -388,7 +393,6 @@ public void testPositionedReadableReadPastEOF() throws Throwable {
describe("PositionedReadable.read() past the end of the file");
assumeNoPrefetching();
-
verifyMetrics(() -> {
try (FSDataInputStream in =
openFile(longLen, FS_OPTION_OPENFILE_READ_POLICY_RANDOM)) {
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestUnbufferDraining.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestUnbufferDraining.java
index caf723b95dedd..e68ea9a031521 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestUnbufferDraining.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestUnbufferDraining.java
@@ -117,7 +117,6 @@ public Configuration createConfiguration() {
@Override
public void setup() throws Exception {
super.setup();
-
// now create a new FS with minimal http capacity and recovery
// a separate one is used to avoid test teardown suffering
// from the lack of http connections and short timeouts.
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesSSECDiskBlocks.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesSSECDiskBlocks.java
index 6f19ba15c1c9a..0203b00caab69 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesSSECDiskBlocks.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesSSECDiskBlocks.java
@@ -31,6 +31,7 @@
import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM;
import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_KEY;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfEncryptionTestsDisabled;
/**
@@ -54,6 +55,8 @@ public class ITestS3AHugeFilesSSECDiskBlocks
public void setup() throws Exception {
try {
super.setup();
+ skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
+ "Analytics Accelerator currently does not support SSE-C");
} catch (AccessDeniedException | AWSUnsupportedFeatureException e) {
skip("Bucket does not allow " + S3AEncryptionMethods.SSE_C + " encryption method");
}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AContractStreamIOStatistics.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AContractStreamIOStatistics.java
index 0f6b69cd54d89..da2a39a986ea4 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AContractStreamIOStatistics.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AContractStreamIOStatistics.java
@@ -27,6 +27,7 @@
import org.apache.hadoop.fs.contract.s3a.S3AContract;
import org.apache.hadoop.fs.statistics.StreamStatisticNames;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.*;
/**
@@ -78,4 +79,12 @@ public List outputStreamStatisticKeys() {
STREAM_WRITE_EXCEPTIONS);
}
+ @Override
+ public void testInputStreamStatisticRead() throws Throwable {
+ // Analytics accelerator currently does not support IOStatistics, this will be added as
+ // part of https://issues.apache.org/jira/browse/HADOOP-19364
+ skipIfAnalyticsAcceleratorEnabled(getContract().getConf(),
+ "Analytics Accelerator currently does not support stream statistics");
+ super.testInputStreamStatisticRead();
+ }
}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AFileSystemStatistic.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AFileSystemStatistic.java
index 0d5d2a789a02a..376dcdf727fa8 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AFileSystemStatistic.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AFileSystemStatistic.java
@@ -31,6 +31,8 @@
import org.apache.hadoop.fs.statistics.IOStatisticAssertions;
import org.apache.hadoop.fs.statistics.StreamStatisticNames;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
+
public class ITestS3AFileSystemStatistic extends AbstractS3ATestBase {
private static final int ONE_KB = 1024;
@@ -42,6 +44,10 @@ public class ITestS3AFileSystemStatistic extends AbstractS3ATestBase {
*/
@Test
public void testBytesReadWithStream() throws IOException {
+ // Analytics accelerator currently does not support IOStatistics, this will be added as
+ // part of https://issues.apache.org/jira/browse/HADOOP-19364
+ skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
+ "Analytics Accelerator currently does not support stream statistics");
S3AFileSystem fs = getFileSystem();
Path filePath = path(getMethodName());
byte[] oneKbBuf = new byte[ONE_KB];
diff --git a/hadoop-tools/hadoop-aws/src/test/resources/malformed_footer.parquet b/hadoop-tools/hadoop-aws/src/test/resources/malformed_footer.parquet
new file mode 100644
index 0000000000000..614912f630bce
Binary files /dev/null and b/hadoop-tools/hadoop-aws/src/test/resources/malformed_footer.parquet differ
diff --git a/hadoop-tools/hadoop-aws/src/test/resources/multi_row_group.parquet b/hadoop-tools/hadoop-aws/src/test/resources/multi_row_group.parquet
new file mode 100644
index 0000000000000..b8d6a95cd4418
Binary files /dev/null and b/hadoop-tools/hadoop-aws/src/test/resources/multi_row_group.parquet differ