diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml
index 1cf9401d707c3..9e2c09c97ddbb 100644
--- a/hadoop-project/pom.xml
+++ b/hadoop-project/pom.xml
@@ -188,6 +188,7 @@
900
1.12.599
2.24.6
+ 3.1.1
1.0.1
2.7.1
1.11.2
@@ -1149,6 +1150,17 @@
+
+ software.amazon.encryption.s3
+ amazon-s3-encryption-client-java
+ ${amazon-s3-encryption-client-java.version}
+
+
+ io.netty
+ *
+
+
+
org.apache.mina
mina-core
diff --git a/hadoop-tools/hadoop-aws/pom.xml b/hadoop-tools/hadoop-aws/pom.xml
index 5a0f2356b5e4a..da85cc34ded30 100644
--- a/hadoop-tools/hadoop-aws/pom.xml
+++ b/hadoop-tools/hadoop-aws/pom.xml
@@ -464,6 +464,16 @@
org.apache.hadoop.mapred.**
+
+ false
+ Restrict encryption client imports to encryption client factory
+
+ org.apache.hadoop.fs.s3a.EncryptionS3ClientFactory
+
+
+ software.amazon.encryption.s3.**
+
+
@@ -508,6 +518,11 @@
bundle
compile
+
+ software.amazon.encryption.s3
+ amazon-s3-encryption-client-java
+ provided
+
org.assertj
assertj-core
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java
index 284ba8e6ae5c9..17314879a840a 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java
@@ -38,6 +38,7 @@
import software.amazon.awssdk.identity.spi.AwsCredentialsIdentity;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3AsyncClient;
+import software.amazon.awssdk.services.s3.S3AsyncClientBuilder;
import software.amazon.awssdk.services.s3.S3BaseClientBuilder;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3Configuration;
@@ -145,11 +146,17 @@ public S3AsyncClient createS3AsyncClient(
.thresholdInBytes(parameters.getMultiPartThreshold())
.build();
- return configureClientBuilder(S3AsyncClient.builder(), parameters, conf, bucket)
- .httpClientBuilder(httpClientBuilder)
- .multipartConfiguration(multipartConfiguration)
- .multipartEnabled(parameters.isMultipartCopy())
- .build();
+ S3AsyncClientBuilder s3AsyncClientBuilder =
+ configureClientBuilder(S3AsyncClient.builder(), parameters, conf, bucket).httpClientBuilder(
+ httpClientBuilder);
+
+ if (!parameters.isClientSideEncryptionEnabled()) {
+ s3AsyncClientBuilder
+ .multipartConfiguration(multipartConfiguration)
+ .multipartEnabled(parameters.isMultipartCopy());
+ }
+
+ return s3AsyncClientBuilder.build();
}
@Override
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/EncryptionS3ClientFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/EncryptionS3ClientFactory.java
new file mode 100644
index 0000000000000..3036d67f72164
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/EncryptionS3ClientFactory.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+import java.net.URI;
+
+import software.amazon.awssdk.services.s3.S3AsyncClient;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.encryption.s3.S3AsyncEncryptionClient;
+import software.amazon.encryption.s3.S3EncryptionClient;
+
+import org.apache.hadoop.fs.s3a.impl.CSEMaterials;
+
+import static org.apache.hadoop.fs.s3a.impl.InstantiationIOException.unavailable;
+
+public class EncryptionS3ClientFactory extends DefaultS3ClientFactory {
+
+ private static final String ENCRYPTION_CLIENT_CLASSNAME =
+ "software.amazon.encryption.s3.S3EncryptionClient";
+
+ /**
+ * Encryption client availability.
+ */
+ private static final boolean ENCRYPTION_CLIENT_FOUND = checkForEncryptionClient();
+
+ /**
+ * S3Client to be wrapped by encryption client.
+ */
+ private S3Client s3Client;
+
+ /**
+ * S3AsyncClient to be wrapped by encryption client.
+ */
+ private S3AsyncClient s3AsyncClient;
+
+ private static boolean checkForEncryptionClient() {
+ try {
+ ClassLoader cl = EncryptionS3ClientFactory.class.getClassLoader();
+ cl.loadClass(ENCRYPTION_CLIENT_CLASSNAME);
+ LOG.debug("encryption client class {} found", ENCRYPTION_CLIENT_CLASSNAME);
+ return true;
+ } catch (Exception e) {
+ LOG.debug("encryption client class {} not found", ENCRYPTION_CLIENT_CLASSNAME, e);
+ return false;
+ }
+ }
+
+ /**
+ * Is the Encryption client available?
+ * @return true if it was found in the classloader
+ */
+ private static synchronized boolean isEncryptionClientAvailable() {
+ return ENCRYPTION_CLIENT_FOUND;
+ }
+
+
+ @Override
+ public S3Client createS3Client(URI uri, S3ClientCreationParameters parameters) throws IOException {
+
+ if (!isEncryptionClientAvailable()) {
+ throw unavailable(uri, ENCRYPTION_CLIENT_CLASSNAME, null, "No encryption client available");
+ }
+
+ s3Client = super.createS3Client(uri, parameters);
+ s3AsyncClient = super.createS3AsyncClient(uri, parameters);
+
+ return createS3EncryptionClient(parameters.getClientSideEncryptionMaterials());
+ }
+
+ @Override
+ public S3AsyncClient createS3AsyncClient(URI uri, S3ClientCreationParameters parameters) throws IOException {
+
+ if (!isEncryptionClientAvailable()) {
+ throw unavailable(uri, ENCRYPTION_CLIENT_CLASSNAME, null, "No encryption client available");
+ }
+
+ return createS3AsyncEncryptionClient(parameters.getClientSideEncryptionMaterials());
+ }
+
+ private S3Client createS3EncryptionClient(final CSEMaterials cseMaterials) {
+ S3EncryptionClient.Builder s3EncryptionClientBuilder =
+ S3EncryptionClient.builder().wrappedAsyncClient(s3AsyncClient).wrappedClient(s3Client)
+ .enableLegacyUnauthenticatedModes(true);
+
+ if (cseMaterials.getCseKeyType().equals(CSEMaterials.CSEKeyType.KMS)) {
+ s3EncryptionClientBuilder.kmsKeyId(cseMaterials.getKmsKeyId());
+ }
+
+ return s3EncryptionClientBuilder.build();
+ }
+
+
+ private S3AsyncClient createS3AsyncEncryptionClient(final CSEMaterials cseMaterials) {
+
+ S3AsyncEncryptionClient.Builder s3EncryptionAsyncClientBuilder =
+ S3AsyncEncryptionClient.builder().wrappedClient(s3AsyncClient)
+ .enableLegacyUnauthenticatedModes(true);
+
+ if (cseMaterials.getCseKeyType().equals(CSEMaterials.CSEKeyType.KMS)) {
+ s3EncryptionAsyncClientBuilder.kmsKeyId(cseMaterials.getKmsKeyId());
+ }
+
+ return s3EncryptionAsyncClientBuilder.build();
+ }
+
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java
index de0f59154e995..30be4670f6c87 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java
@@ -36,6 +36,7 @@
import software.amazon.awssdk.services.s3.model.CompletedPart;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
+import software.amazon.awssdk.services.s3.model.SdkPartType;
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
import software.amazon.awssdk.services.s3.model.UploadPartResponse;
@@ -876,11 +877,17 @@ private void uploadBlockAsync(final S3ADataBlocks.DataBlock block,
? RequestBody.fromFile(uploadData.getFile())
: RequestBody.fromInputStream(uploadData.getUploadStream(), size);
- request = writeOperationHelper.newUploadPartRequestBuilder(
+ UploadPartRequest.Builder requestBuilder = writeOperationHelper.newUploadPartRequestBuilder(
key,
uploadId,
currentPartNumber,
- size).build();
+ size);
+
+ if (isLast) {
+ requestBuilder.sdkPartType(SdkPartType.LAST);
+ }
+
+ request = requestBuilder.build();
} catch (SdkException aws) {
// catch and translate
IOException e = translateException("upload", key, aws);
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index 3aec03766dacf..4987f081aea66 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -120,6 +120,7 @@
import org.apache.hadoop.fs.s3a.impl.AWSCannedACL;
import org.apache.hadoop.fs.s3a.impl.AWSHeaders;
import org.apache.hadoop.fs.s3a.impl.BulkDeleteRetryHandler;
+import org.apache.hadoop.fs.s3a.impl.CSEMaterials;
import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy;
import org.apache.hadoop.fs.s3a.impl.ConfigurationHelper;
import org.apache.hadoop.fs.s3a.impl.ContextAccessors;
@@ -1034,6 +1035,21 @@ private void bindAWSClient(URI name, boolean dtEnabled) throws IOException {
S3_CLIENT_FACTORY_IMPL, DEFAULT_S3_CLIENT_FACTORY_IMPL,
S3ClientFactory.class);
+ S3ClientFactory clientFactory;
+ CSEMaterials cseMaterials = null;
+
+ if (isCSEEnabled) {
+ String kmsKeyId = getS3EncryptionKey(bucket, conf, true);
+
+ cseMaterials = new CSEMaterials()
+ .withCSEKeyType(CSEMaterials.CSEKeyType.KMS)
+ .withKmsKeyId(kmsKeyId);
+
+ clientFactory = ReflectionUtils.newInstance(EncryptionS3ClientFactory.class, conf);
+ } else {
+ clientFactory = ReflectionUtils.newInstance(s3ClientFactoryClass, conf);
+ }
+
S3ClientFactory.S3ClientCreationParameters parameters =
new S3ClientFactory.S3ClientCreationParameters()
.withCredentialSet(credentials)
@@ -1053,9 +1069,10 @@ private void bindAWSClient(URI name, boolean dtEnabled) throws IOException {
.withExpressCreateSession(
conf.getBoolean(S3EXPRESS_CREATE_SESSION, S3EXPRESS_CREATE_SESSION_DEFAULT))
.withChecksumValidationEnabled(
- conf.getBoolean(CHECKSUM_VALIDATION, CHECKSUM_VALIDATION_DEFAULT));
+ conf.getBoolean(CHECKSUM_VALIDATION, CHECKSUM_VALIDATION_DEFAULT))
+ .withClientSideEncryptionEnabled(isCSEEnabled)
+ .withClientSideEncryptionMaterials(cseMaterials);
- S3ClientFactory clientFactory = ReflectionUtils.newInstance(s3ClientFactoryClass, conf);
s3Client = clientFactory.createS3Client(getUri(), parameters);
createS3AsyncClient(clientFactory, parameters);
transferManager = clientFactory.createS3TransferManager(getS3AsyncClient());
@@ -1070,7 +1087,8 @@ private void bindAWSClient(URI name, boolean dtEnabled) throws IOException {
* @throws IOException on any IO problem
*/
private void createS3AsyncClient(S3ClientFactory clientFactory,
- S3ClientFactory.S3ClientCreationParameters parameters) throws IOException {
+ S3ClientFactory.S3ClientCreationParameters parameters)
+ throws IOException {
s3AsyncClient = clientFactory.createS3AsyncClient(getUri(), parameters);
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
index b7c89c4626a03..85eacc4cabeb0 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
@@ -78,6 +78,7 @@
import static org.apache.hadoop.fs.s3a.Constants.*;
import static org.apache.hadoop.fs.s3a.audit.AuditIntegration.maybeTranslateAuditException;
import static org.apache.hadoop.fs.s3a.impl.ErrorTranslation.isUnknownBucket;
+import static org.apache.hadoop.fs.s3a.impl.ErrorTranslation.maybeExtractSdkException;
import static org.apache.hadoop.fs.s3a.impl.InstantiationIOException.instantiationException;
import static org.apache.hadoop.fs.s3a.impl.InstantiationIOException.isAbstract;
import static org.apache.hadoop.fs.s3a.impl.InstantiationIOException.isNotInstanceOf;
@@ -182,6 +183,8 @@ public static IOException translateException(@Nullable String operation,
path = "/";
}
+ exception = maybeExtractSdkException(exception);
+
if (!(exception instanceof AwsServiceException)) {
// exceptions raised client-side: connectivity, auth, network problems...
Exception innerCause = containsInterruptedException(exception);
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java
index 0b01876ae504f..b7188b60ee904 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java
@@ -34,6 +34,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.s3a.impl.CSEMaterials;
import org.apache.hadoop.fs.s3a.statistics.StatisticsFromAwsSdk;
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_ENDPOINT;
@@ -186,6 +187,16 @@ final class S3ClientCreationParameters {
*/
private boolean fipsEnabled;
+ /**
+ * Is client side encryption enabled.
+ */
+ private Boolean isCSEEnabled;
+
+ /**
+ * Client side encryption materials.
+ */
+ private CSEMaterials cseMaterials;
+
/**
* List of execution interceptors to include in the chain
* of interceptors in the SDK.
@@ -504,5 +515,43 @@ public S3ClientCreationParameters withFipsEnabled(final boolean value) {
fipsEnabled = value;
return this;
}
+
+ /**
+ * Set the client side encryption flag.
+ *
+ * @param value new value
+ * @return the builder
+ */
+ public S3ClientCreationParameters withClientSideEncryptionEnabled(final boolean value) {
+ this.isCSEEnabled = value;
+ return this;
+ }
+
+ /**
+ * Get the client side encryption flag.
+ * @return client side encryption flag
+ */
+ public boolean isClientSideEncryptionEnabled() {
+ return this.isCSEEnabled;
+ }
+
+ /**
+ * Set the client side encryption materials.
+ *
+ * @param value new value
+ * @return the builder
+ */
+ public S3ClientCreationParameters withClientSideEncryptionMaterials(final CSEMaterials value) {
+ this.cseMaterials = value;
+ return this;
+ }
+
+ /**
+ * Get the client side encryption materials.
+ * @return client side encryption materials
+ */
+ public CSEMaterials getClientSideEncryptionMaterials() {
+ return this.cseMaterials;
+ }
}
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitOperations.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitOperations.java
index d1943fa47773f..2170f835bd15e 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitOperations.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitOperations.java
@@ -37,6 +37,7 @@
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.model.CompletedPart;
import software.amazon.awssdk.services.s3.model.MultipartUpload;
+import software.amazon.awssdk.services.s3.model.SdkPartType;
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
import software.amazon.awssdk.services.s3.model.UploadPartResponse;
import org.slf4j.Logger;
@@ -580,14 +581,20 @@ public SinglePendingCommit uploadFileToPendingCommit(File localFile,
for (int partNumber = 1; partNumber <= numParts; partNumber += 1) {
progress.progress();
long size = Math.min(length - offset, uploadPartSize);
- UploadPartRequest part = writeOperations.newUploadPartRequestBuilder(
+ UploadPartRequest.Builder partBuilder = writeOperations.newUploadPartRequestBuilder(
destKey,
uploadId,
partNumber,
- size).build();
+ size);
+
+ if (partNumber == numParts) {
+ partBuilder.sdkPartType(SdkPartType.LAST);
+ }
+
// Read from the file input stream at current position.
RequestBody body = RequestBody.fromInputStream(fileStream, size);
- UploadPartResponse response = writeOperations.uploadPart(part, body, statistics);
+ UploadPartResponse response =
+ writeOperations.uploadPart(partBuilder.build(), body, statistics);
offset += uploadPartSize;
parts.add(CompletedPart.builder()
.partNumber(partNumber)
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CSEMaterials.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CSEMaterials.java
new file mode 100644
index 0000000000000..3218d698f37e4
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CSEMaterials.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+/**
+ * This class is for storing information about key type and corresponding key
+ * to be used for client side encryption.
+ */
+public class CSEMaterials {
+
+ /**
+ * Enum for CSE key types.
+ */
+ public enum CSEKeyType {
+ KMS,
+ AES,
+ RSA
+ }
+
+ /**
+ * The KMS key Id.
+ */
+ private String kmsKeyId;
+
+ /**
+ * The CSE key type to use.
+ */
+ private CSEKeyType cseKeyType;
+
+ /**
+ * Kms key id to use.
+ * @param value new value
+ * @return the builder
+ */
+ public CSEMaterials withKmsKeyId(
+ final String value) {
+ kmsKeyId = value;
+ return this;
+ }
+
+ /**
+ * Get the Kms key id to use.
+ * @return the kms key id.
+ */
+ public String getKmsKeyId() {
+ return kmsKeyId;
+ }
+
+ /**
+ * CSE key type to use.
+ * @param value new value
+ * @return the builder
+ */
+ public CSEMaterials withCSEKeyType(
+ final CSEKeyType value) {
+ cseKeyType = value;
+ return this;
+ }
+
+ /**
+ * Get the CSE key type.
+ * @return CSE key type
+ */
+ public CSEKeyType getCseKeyType() {
+ return cseKeyType;
+ }
+
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ErrorTranslation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ErrorTranslation.java
index 7934a5c7d4d5c..2ea05efd9f2c0 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ErrorTranslation.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ErrorTranslation.java
@@ -22,6 +22,7 @@
import java.lang.reflect.Constructor;
import software.amazon.awssdk.awscore.exception.AwsServiceException;
+import software.amazon.awssdk.core.exception.SdkException;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.s3a.HttpChannelEOFException;
@@ -69,6 +70,9 @@ public final class ErrorTranslation {
private ErrorTranslation() {
}
+ static final String ENCRYPTION_CLIENT_EXCEPTION =
+ "software.amazon.encryption.s3.S3EncryptionClientException";
+
/**
* Does this exception indicate that the AWS Bucket was unknown.
* @param e exception.
@@ -152,6 +156,25 @@ public static IOException maybeExtractIOException(
return wrapWithInnerIOE(path, message, thrown, ioe);
}
+ /**
+ * Extracts the underlying exception from a SdkException.
+ * @param exception amazon exception raised
+ * @return extractedException
+ */
+ public static SdkException maybeExtractSdkException(SdkException exception) {
+ SdkException extractedException = exception;
+ if (exception.toString().contains(ENCRYPTION_CLIENT_EXCEPTION)
+ && exception.getCause() instanceof SdkException) {
+ extractedException = (SdkException) exception.getCause();
+ if (extractedException != null
+ && extractedException.getCause() instanceof AwsServiceException) {
+ extractedException = (SdkException) extractedException.getCause();
+ }
+ }
+
+ return extractedException;
+ }
+
/**
* Given an outer and an inner exception, create a new IOE
* of the inner type, with the outer exception as the cause.
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/HeaderProcessing.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/HeaderProcessing.java
index d42dda59caa5f..45a22cc18b4d0 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/HeaderProcessing.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/HeaderProcessing.java
@@ -47,6 +47,7 @@
import static org.apache.hadoop.fs.s3a.Statistic.INVOCATION_XATTR_GET_NAMED;
import static org.apache.hadoop.fs.s3a.Statistic.INVOCATION_XATTR_GET_NAMED_MAP;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.X_HEADER_MAGIC_MARKER;
+import static org.apache.hadoop.fs.s3a.impl.InternalConstants.CSE_PADDING_LENGTH;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration;
/**
@@ -322,20 +323,16 @@ private Map retrieveHeaders(
maybeSetHeader(headers, XA_CONTENT_LANGUAGE,
md.contentLanguage());
// If CSE is enabled, use the unencrypted content length.
- // TODO: CSE is not supported yet, add these headers in during CSE work.
-// if (md.getUserMetaDataOf(Headers.CRYPTO_CEK_ALGORITHM) != null
-// && md.getUserMetaDataOf(Headers.UNENCRYPTED_CONTENT_LENGTH) != null) {
-// maybeSetHeader(headers, XA_CONTENT_LENGTH,
-// md.getUserMetaDataOf(Headers.UNENCRYPTED_CONTENT_LENGTH));
-// } else {
-// maybeSetHeader(headers, XA_CONTENT_LENGTH,
-// md.contentLength());
-// }
-// maybeSetHeader(headers, XA_CONTENT_MD5,
-// md.getContentMD5());
- // TODO: Add back in else block during CSE work.
- maybeSetHeader(headers, XA_CONTENT_LENGTH,
- md.contentLength());
+ if (this.getStoreContext().isCSEEnabled() &&
+ md.metadata().get(AWSHeaders.CRYPTO_CEK_ALGORITHM) != null
+ && md.contentLength() >= CSE_PADDING_LENGTH) {
+ maybeSetHeader(headers, XA_CONTENT_LENGTH,
+ md.contentLength() - CSE_PADDING_LENGTH);
+ } else {
+ maybeSetHeader(headers, XA_CONTENT_LENGTH,
+ md.contentLength());
+ }
+
if (md.sdkHttpResponse() != null && md.sdkHttpResponse().headers() != null
&& md.sdkHttpResponse().headers().get("Content-Range") != null) {
maybeSetHeader(headers, XA_CONTENT_RANGE,
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryption.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryption.java
index 4094b22eb1926..af91225b49448 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryption.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryption.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.fs.s3a;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -237,8 +238,8 @@ public void testEncryptionEnabledAndDisabledFS() throws Exception {
assertEquals("Mismatch in content length bytes", SMALL_FILE_SIZE,
encryptedFSFileStatus.getLen());
- intercept(SecurityException.class, "",
- "SecurityException should be thrown",
+ intercept(FileNotFoundException.class, "Instruction file not found!",
+ "AWSClientIOException should be thrown",
() -> {
in.read(new byte[SMALL_FILE_SIZE]);
return "Exception should be raised if unencrypted data is read by "
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java
index 73bba9d62cbd8..67bae20f8946e 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java
@@ -370,6 +370,7 @@ public void shouldBeAbleToSwitchOnS3PathStyleAccessViaConfigProperty()
conf = new Configuration();
skipIfCrossRegionClient(conf);
+ unsetEncryption(conf);
conf.set(Constants.PATH_STYLE_ACCESS, Boolean.toString(true));
assertTrue(conf.getBoolean(Constants.PATH_STYLE_ACCESS, false));
@@ -409,6 +410,7 @@ public void shouldBeAbleToSwitchOnS3PathStyleAccessViaConfigProperty()
public void testDefaultUserAgent() throws Exception {
conf = new Configuration();
skipIfCrossRegionClient(conf);
+ unsetEncryption(conf);
fs = S3ATestUtils.createTestFileSystem(conf);
assertNotNull(fs);
S3Client s3 = getS3Client("User Agent");
@@ -423,6 +425,7 @@ public void testDefaultUserAgent() throws Exception {
public void testCustomUserAgent() throws Exception {
conf = new Configuration();
skipIfCrossRegionClient(conf);
+ unsetEncryption(conf);
conf.set(Constants.USER_AGENT_PREFIX, "MyApp");
fs = S3ATestUtils.createTestFileSystem(conf);
assertNotNull(fs);
@@ -437,6 +440,7 @@ public void testCustomUserAgent() throws Exception {
@Test
public void testRequestTimeout() throws Exception {
conf = new Configuration();
+ unsetEncryption(conf);
// remove the safety check on minimum durations.
AWSClientConfig.setMinimumOperationDuration(Duration.ZERO);
try {
@@ -590,6 +594,12 @@ public void testS3SpecificSignerOverride() throws Exception {
.describedAs("Custom STS signer not called").isTrue();
}
+ private void unsetEncryption(Configuration conf) {
+ removeBaseAndBucketOverrides(conf, S3_ENCRYPTION_ALGORITHM);
+ conf.set(Constants.S3_ENCRYPTION_ALGORITHM,
+ S3AEncryptionMethods.NONE.getMethod());
+ }
+
public static final class CustomS3Signer implements Signer {
private static boolean s3SignerCalled = false;
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestErrorTranslation.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestErrorTranslation.java
index 3a4994897a6b9..07d9aed45269f 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestErrorTranslation.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestErrorTranslation.java
@@ -30,15 +30,18 @@
import org.junit.Test;
import software.amazon.awssdk.awscore.retry.conditions.RetryOnErrorCodeCondition;
import software.amazon.awssdk.core.exception.SdkClientException;
+import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.core.retry.RetryPolicyContext;
+import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
+import software.amazon.encryption.s3.S3EncryptionClientException;
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.s3a.auth.NoAwsCredentialsException;
import org.apache.hadoop.test.AbstractHadoopTestBase;
import static org.apache.hadoop.fs.s3a.impl.ErrorTranslation.maybeExtractIOException;
+import static org.apache.hadoop.fs.s3a.impl.ErrorTranslation.maybeExtractSdkException;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
-import static org.junit.Assert.assertTrue;
/**
* Unit tests related to the {@link ErrorTranslation} class.
@@ -153,4 +156,28 @@ public void testMultiObjectExceptionFilledIn() throws Throwable {
.describedAs("retry policy of MultiObjectException")
.isFalse();
}
+
+ @Test
+ public void testEncryptionClientExceptionExtraction() throws Throwable {
+ intercept(NoSuchKeyException.class, () -> {
+ throw maybeExtractSdkException(new S3EncryptionClientException("top",
+ new S3EncryptionClientException("middle", NoSuchKeyException.builder().build())));
+ });
+ }
+
+ @Test
+ public void testNonEncryptionClientExceptionExtraction() throws Throwable {
+ intercept(SdkException.class, () -> {
+ throw maybeExtractSdkException(
+ sdkException("top", sdkException("middle", NoSuchKeyException.builder().build())));
+ });
+ }
+
+ @Test
+ public void testEncryptionClientExceptionExtractionWithRTE() throws Throwable {
+ intercept(S3EncryptionClientException.class, () -> {
+ throw maybeExtractSdkException(new S3EncryptionClientException("top",
+ new UnsupportedOperationException()));
+ });
+ }
}