diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java index 6c0efa6e5c309..361806545403b 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java @@ -25,6 +25,7 @@ import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory; import java.time.Duration; +import java.util.Locale; import java.util.concurrent.TimeUnit; import static org.apache.hadoop.io.Sizes.S_128K; @@ -1339,6 +1340,37 @@ private Constants() { public static final String AWS_SERVICE_IDENTIFIER_DDB = "DDB"; public static final String AWS_SERVICE_IDENTIFIER_STS = "STS"; + /** Prefix for S3A client-specific properties. + * value: {@value} + */ + public static final String FS_S3A_CLIENT_PREFIX = "fs.s3a.client."; + + /** Custom headers postfix. + * value: {@value} + */ + public static final String CUSTOM_HEADERS_POSTFIX = ".custom.headers"; + + /** + * List of custom headers to be set on the service client. + * Multiple parameters can be used to specify custom headers. + *
+   * Usage:
+   * fs.s3a.client.s3.custom.headers - Headers to add on all the S3 requests.
+   * fs.s3a.client.sts.custom.headers - Headers to add on all the STS requests.
+   *
+   * Examples:
+   * CustomHeader {@literal ->} 'Header1:Value1'
+   * CustomHeaders {@literal ->} 'Header1=Value1;Value2,Header2=Value1'
+   * 
+ */ + public static final String CUSTOM_HEADERS_STS = + FS_S3A_CLIENT_PREFIX + AWS_SERVICE_IDENTIFIER_STS.toLowerCase(Locale.ROOT) + + CUSTOM_HEADERS_POSTFIX; + + public static final String CUSTOM_HEADERS_S3 = + FS_S3A_CLIENT_PREFIX + AWS_SERVICE_IDENTIFIER_S3.toLowerCase(Locale.ROOT) + + CUSTOM_HEADERS_POSTFIX; + /** * How long to wait for the thread pool to terminate when cleaning up. * Value: {@value} seconds. diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/AWSClientConfig.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/AWSClientConfig.java index 4ea43e7a66eef..274da46e7074a 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/AWSClientConfig.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/AWSClientConfig.java @@ -22,7 +22,11 @@ import java.net.URI; import java.net.URISyntaxException; import java.time.Duration; +import java.util.Arrays; +import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -76,6 +80,8 @@ import static org.apache.hadoop.fs.s3a.Constants.SIGNING_ALGORITHM_STS; import static org.apache.hadoop.fs.s3a.Constants.SOCKET_TIMEOUT; import static org.apache.hadoop.fs.s3a.Constants.USER_AGENT_PREFIX; +import static org.apache.hadoop.fs.s3a.Constants.CUSTOM_HEADERS_S3; +import static org.apache.hadoop.fs.s3a.Constants.CUSTOM_HEADERS_STS; import static org.apache.hadoop.fs.s3a.impl.ConfigurationHelper.enforceMinimumDuration; import static org.apache.hadoop.fs.s3a.impl.ConfigurationHelper.getDuration; import static org.apache.hadoop.util.Preconditions.checkArgument; @@ -120,6 +126,8 @@ public static ClientOverrideConfiguration.Builder createClientConfigBuilder(Conf initUserAgent(conf, overrideConfigBuilder); + initRequestHeaders(conf, overrideConfigBuilder, awsServiceIdentifier); + String signer = conf.getTrimmed(SIGNING_ALGORITHM, ""); if (!signer.isEmpty()) { LOG.debug("Signer override = {}", signer); @@ -412,6 +420,44 @@ private static void initSigner(Configuration conf, } } + /** + * Initialize custom request headers for AWS clients. + * @param conf hadoop configuration + * @param clientConfig client configuration to update + * @param awsServiceIdentifier service name + */ + private static void initRequestHeaders(Configuration conf, + ClientOverrideConfiguration.Builder clientConfig, String awsServiceIdentifier) { + String configKey = null; + switch (awsServiceIdentifier) { + case AWS_SERVICE_IDENTIFIER_S3: + configKey = CUSTOM_HEADERS_S3; + break; + case AWS_SERVICE_IDENTIFIER_STS: + configKey = CUSTOM_HEADERS_STS; + break; + default: + // No known service. + } + if (configKey != null) { + Map awsClientCustomHeadersMap = + S3AUtils.getTrimmedStringCollectionSplitByEquals(conf, configKey); + awsClientCustomHeadersMap.forEach((header, valueString) -> { + List headerValues = Arrays.stream(valueString.split(";")) + .map(String::trim) + .filter(v -> !v.isEmpty()) + .collect(Collectors.toList()); + if (!headerValues.isEmpty()) { + clientConfig.putHeader(header, headerValues); + } else { + LOG.warn("Ignoring header '{}' for {} client because no values were provided", + header, awsServiceIdentifier); + } + }); + LOG.debug("headers for {} client = {}", awsServiceIdentifier, clientConfig.headers()); + } + } + /** * Configures request timeout in the client configuration. * This is independent of the timeouts set in the sync and async HTTP clients; diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md index 79bcf55f92953..1f65caeb5e219 100644 --- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md +++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md @@ -947,6 +947,31 @@ The switch to turn S3A auditing on or off. ``` + +### Configuring Custom Headers for AWS Service Clients + +You can set custom headers for S3 and STS requests. These headers are set on client level, and will be sent for all requests made to these services. + +**Configuration Properties:** +- `fs.s3a.client.s3.custom.headers`: Custom headers for S3 service requests. +- `fs.s3a.client.sts.custom.headers`: Sets custom headers for all requests to AWS STS. + +**Header Format:** +Custom headers should be specified as key-value pairs, separated by `=`. Multiple values for a single header can be separated by `;`. Multiple headers can be separated by `,`. + + +```xml + + fs.s3a.client.s3.custom.headers + Header1=Value1 + + + +fs.s3a.client.sts.custom.headers +Header1=Value1;Value2,Header2=Value1 + +``` + ## Retry and Recovery The S3A client makes a best-effort attempt at recovering from network failures; diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestAwsClientConfig.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestAwsClientConfig.java index eacff90ea4c8a..a199e5bc33541 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestAwsClientConfig.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestAwsClientConfig.java @@ -18,6 +18,7 @@ package org.apache.hadoop.fs.s3a.impl; +import java.io.IOException; import java.time.Duration; import java.util.Arrays; @@ -29,11 +30,16 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.test.AbstractHadoopTestBase; +import org.apache.hadoop.util.Lists; +import static org.apache.hadoop.fs.s3a.Constants.AWS_SERVICE_IDENTIFIER_S3; +import static org.apache.hadoop.fs.s3a.Constants.AWS_SERVICE_IDENTIFIER_STS; import static org.apache.hadoop.fs.s3a.Constants.CONNECTION_ACQUISITION_TIMEOUT; import static org.apache.hadoop.fs.s3a.Constants.CONNECTION_IDLE_TIME; import static org.apache.hadoop.fs.s3a.Constants.CONNECTION_KEEPALIVE; import static org.apache.hadoop.fs.s3a.Constants.CONNECTION_TTL; +import static org.apache.hadoop.fs.s3a.Constants.CUSTOM_HEADERS_S3; +import static org.apache.hadoop.fs.s3a.Constants.CUSTOM_HEADERS_STS; import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_CONNECTION_ACQUISITION_TIMEOUT_DURATION; import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_CONNECTION_IDLE_TIME_DURATION; import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_CONNECTION_KEEPALIVE; @@ -48,6 +54,7 @@ import static org.apache.hadoop.fs.s3a.Constants.REQUEST_TIMEOUT; import static org.apache.hadoop.fs.s3a.Constants.SOCKET_TIMEOUT; import static org.apache.hadoop.fs.s3a.impl.AWSClientConfig.createApiConnectionSettings; +import static org.apache.hadoop.fs.s3a.impl.AWSClientConfig.createClientConfigBuilder; import static org.apache.hadoop.fs.s3a.impl.AWSClientConfig.createConnectionSettings; import static org.apache.hadoop.fs.s3a.impl.ConfigurationHelper.enforceMinimumDuration; @@ -201,4 +208,122 @@ public void testCreateApiConnectionSettingsDefault() { private void setOptionsToValue(String value, Configuration conf, String... keys) { Arrays.stream(keys).forEach(key -> conf.set(key, value)); } + + /** + * if {@link org.apache.hadoop.fs.s3a.Constants#CUSTOM_HEADERS_STS} is set, + * verify that returned client configuration has desired headers set. + */ + @Test + public void testInitRequestHeadersForSTS() throws IOException { + final Configuration conf = new Configuration(); + conf.set(CUSTOM_HEADERS_STS, "header1=value1;value2,header2=value3"); + + Assertions.assertThat(conf.get(CUSTOM_HEADERS_S3)) + .describedAs("Custom client headers for s3 %s", CUSTOM_HEADERS_S3) + .isNull(); + + Assertions.assertThat(createClientConfigBuilder(conf, AWS_SERVICE_IDENTIFIER_S3) + .headers().size()) + .describedAs("Count of S3 client headers") + .isEqualTo(0); + + Assertions.assertThat(createClientConfigBuilder(conf, AWS_SERVICE_IDENTIFIER_STS) + .headers().size()) + .describedAs("Count of STS client headers") + .isEqualTo(2); + + Assertions.assertThat(createClientConfigBuilder(conf, AWS_SERVICE_IDENTIFIER_STS) + .headers().get("header1")) + .describedAs("STS client 'header1' header value") + .isEqualTo(Lists.newArrayList("value1", "value2")); + + Assertions.assertThat(createClientConfigBuilder(conf, AWS_SERVICE_IDENTIFIER_STS) + .headers().get("header2")) + .describedAs("STS client 'header2' header value") + .isEqualTo(Lists.newArrayList("value3")); + } + + /** + * if {@link org.apache.hadoop.fs.s3a.Constants#CUSTOM_HEADERS_S3} is set, + * verify that returned client configuration has desired headers set. + */ + @Test + public void testInitRequestHeadersForS3() throws IOException { + final Configuration conf = new Configuration(); + conf.set(CUSTOM_HEADERS_S3, "header1=value1;value2,header2=value3"); + + Assertions.assertThat(conf.get(CUSTOM_HEADERS_STS)) + .describedAs("Custom client headers for STS %s", CUSTOM_HEADERS_STS) + .isNull(); + + Assertions.assertThat(createClientConfigBuilder(conf, AWS_SERVICE_IDENTIFIER_STS) + .headers().size()) + .describedAs("Count of STS client headers") + .isEqualTo(0); + + Assertions.assertThat(createClientConfigBuilder(conf, AWS_SERVICE_IDENTIFIER_S3) + .headers().size()) + .describedAs("Count of S3 client headers") + .isEqualTo(2); + + Assertions.assertThat(createClientConfigBuilder(conf, AWS_SERVICE_IDENTIFIER_S3) + .headers().get("header1")) + .describedAs("S3 client 'header1' header value") + .isEqualTo(Lists.newArrayList("value1", "value2")); + + Assertions.assertThat(createClientConfigBuilder(conf, AWS_SERVICE_IDENTIFIER_S3) + .headers().get("header2")) + .describedAs("S3 client 'header2' header value") + .isEqualTo(Lists.newArrayList("value3")); + } + + /** + * if {@link org.apache.hadoop.fs.s3a.Constants#CUSTOM_HEADERS_S3} is set, + * verify that returned client configuration has desired headers set with + * whitespaces trimmed for headers and values. + */ + @Test + public void testInitRequestHeadersForS3WithWhitespace() throws IOException { + final Configuration conf = new Configuration(); + conf.set(CUSTOM_HEADERS_S3, " header1 = value1 ; value2 , header2= value3 "); + + Assertions.assertThat(conf.get(CUSTOM_HEADERS_STS)) + .describedAs("Custom client headers for STS %s", CUSTOM_HEADERS_STS) + .isNull(); + + Assertions.assertThat(createClientConfigBuilder(conf, AWS_SERVICE_IDENTIFIER_STS) + .headers().size()) + .describedAs("Count of STS client headers") + .isEqualTo(0); + + Assertions.assertThat(createClientConfigBuilder(conf, AWS_SERVICE_IDENTIFIER_S3) + .headers().size()) + .describedAs("Count of S3 client headers") + .isEqualTo(2); + + Assertions.assertThat(createClientConfigBuilder(conf, AWS_SERVICE_IDENTIFIER_S3) + .headers().get("header1")) + .describedAs("S3 client 'header1' header value") + .isEqualTo(Lists.newArrayList("value1", "value2")); + + Assertions.assertThat(createClientConfigBuilder(conf, AWS_SERVICE_IDENTIFIER_S3) + .headers().get("header2")) + .describedAs("S3 client 'header2' header value") + .isEqualTo(Lists.newArrayList("value3")); + } + + /** + * if {@link org.apache.hadoop.fs.s3a.Constants#CUSTOM_HEADERS_S3} is set with duplicate values, + * verify that returned client configuration has desired headers with both values. + */ + @Test + public void testInitRequestHeadersForS3WithDuplicateValues() throws IOException { + Configuration conf = new Configuration(); + conf.set(CUSTOM_HEADERS_S3, "header1=duplicate;duplicate"); + + Assertions.assertThat(createClientConfigBuilder(conf, AWS_SERVICE_IDENTIFIER_S3) + .headers().get("header1")) + .describedAs("S3 client 'header1' header value") + .isEqualTo(Lists.newArrayList("duplicate", "duplicate")); + } }