Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion LICENSE-binary
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ org.objenesis:objenesis:2.6
org.xerial.snappy:snappy-java:1.1.10.4
org.yaml:snakeyaml:2.0
org.wildfly.openssl:wildfly-openssl:2.1.4.Final
software.amazon.awssdk:bundle:2.25.53
software.amazon.awssdk:bundle:2.29.52


--------------------------------------------------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion hadoop-project/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@
<make-maven-plugin.version>1.0-beta-1</make-maven-plugin.version>
<surefire.fork.timeout>900</surefire.fork.timeout>
<aws-java-sdk.version>1.12.720</aws-java-sdk.version>
<aws-java-sdk-v2.version>2.25.53</aws-java-sdk-v2.version>
<aws-java-sdk-v2.version>2.29.52</aws-java-sdk-v2.version>
<amazon-s3-encryption-client-java.version>3.1.1</amazon-s3-encryption-client-java.version>
<amazon-s3-analyticsaccelerator-s3.version>0.0.4</amazon-s3-analyticsaccelerator-s3.version>
<aws.eventstream.version>1.0.1</aws.eventstream.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import software.amazon.awssdk.http.auth.spi.scheme.AuthScheme;
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
import software.amazon.awssdk.identity.spi.AwsCredentialsIdentity;
import software.amazon.awssdk.metrics.LoggingMetricPublisher;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3AsyncClientBuilder;
Expand Down Expand Up @@ -201,12 +202,20 @@ private <BuilderT extends S3BaseClientBuilder<BuilderT, ClientT>, ClientT> Build
final ClientOverrideConfiguration.Builder override =
createClientOverrideConfiguration(parameters, conf);

S3BaseClientBuilder s3BaseClientBuilder = builder
S3BaseClientBuilder<BuilderT, ClientT> s3BaseClientBuilder = builder
.overrideConfiguration(override.build())
.credentialsProvider(parameters.getCredentialSet())
.disableS3ExpressSessionAuth(!parameters.isExpressCreateSession())
.serviceConfiguration(serviceConfiguration);

if (LOG.isTraceEnabled()) {
// if this log is set to "trace" then we turn on logging of SDK metrics.
// The metrics itself will log at info; it is just that reflection work
// would be needed to change that setting safely for shaded and unshaded aws artifacts.
s3BaseClientBuilder.overrideConfiguration(o ->
o.addMetricPublisher(LoggingMetricPublisher.create()));
}

if (conf.getBoolean(HTTP_SIGNER_ENABLED, HTTP_SIGNER_ENABLED_DEFAULT)) {
// use an http signer through an AuthScheme
final AuthScheme<AwsCredentialsIdentity> signer =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@

package org.apache.hadoop.fs.s3a.impl;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.s3a.impl.logging.LogControl;
import org.apache.hadoop.fs.s3a.impl.logging.LogControllerFactory;

/**
* This class exists to support workarounds for parts of the AWS SDK
Expand All @@ -35,16 +36,20 @@ public final class AwsSdkWorkarounds {
public static final String TRANSFER_MANAGER =
"software.amazon.awssdk.transfer.s3.S3TransferManager";

private static final Logger LOG = LoggerFactory.getLogger(AwsSdkWorkarounds.class);

private AwsSdkWorkarounds() {
}

/**
* Prepare logging before creating AWS clients.
* There is currently no logging to require tuning,
* so this only logs at trace that it was invoked.
* @return true if the log tuning operation took place.
*/
public static boolean prepareLogging() {
return LogControllerFactory.createController().
setLogLevel(TRANSFER_MANAGER, LogControl.LogLevel.ERROR);
LOG.trace("prepareLogging()");
return true;
}

/**
Expand All @@ -53,7 +58,6 @@ public static boolean prepareLogging() {
*/
@VisibleForTesting
static boolean restoreNoisyLogging() {
return LogControllerFactory.createController().
setLogLevel(TRANSFER_MANAGER, LogControl.LogLevel.INFO);
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,9 @@ public final InputStream newStream() {
// the stream has been recreated for the first time.
// notify only once for this stream, so as not to flood
// the logs.
LOG.info("Stream recreated: {}", this);
// originally logged at info; logs at debug because HADOOP-19516
// means that this message is very common with S3 Express stores.
LOG.debug("Stream recreated: {}", this);
}
return setCurrentStream(createNewStream());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ The features which may be unavailable include:
This is now the default -do not change it.
* List API to use (`fs.s3a.list.version = 1`)
* Bucket lifecycle rules to clean up pending uploads.
* Support for multipart uploads.

### Disabling Change Detection

Expand Down Expand Up @@ -409,7 +410,7 @@ which is a subset of the AWS API.
To get a compatible access and secret key, follow the instructions of
[Simple migration from Amazon S3 to Cloud Storage](https://cloud.google.com/storage/docs/aws-simple-migration#defaultproj).

Here are the per-bucket setings for an example bucket "gcs-container"
Here are the per-bucket settings for an example bucket "gcs-container"
in Google Cloud Storage. Note the multiobject delete option must be disabled;
this makes renaming and deleting significantly slower.

Expand Down Expand Up @@ -452,11 +453,21 @@ this makes renaming and deleting significantly slower.
<value>true</value>
</property>

<!-- any value is allowed here, using "gcs" is more informative -->
<property>
<name>fs.s3a.bucket.gcs-container.endpoint.region</name>
<value>dummy</value>
<value>gcs</value>
</property>

<!-- multipart uploads trigger 400 response-->
<property>
<name>fs.s3a.multipart.uploads.enabled</name>
<value>false</value>
</property>
<property>
<name>fs.s3a.optimized.copy.from.local.enabled</name>
<value>false</value>
</property>
</configuration>
```

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1359,6 +1359,48 @@ execchain.MainClientExec (MainClientExec.java:execute(284)) - Connection can be

```


To log the output of the AWS SDK metrics, set the log
`org.apache.hadoop.fs.s3a.DefaultS3ClientFactory` to `TRACE`.
This will then turn on logging of the internal SDK metrics.4

These will actually be logged at INFO in the log
```
software.amazon.awssdk.metrics.LoggingMetricPublisher
```

```text
INFO metrics.LoggingMetricPublisher (LoggerAdapter.java:info(165)) - Metrics published:
MetricCollection(name=ApiCall, metrics=[
MetricRecord(metric=MarshallingDuration, value=PT0.000092041S),
MetricRecord(metric=RetryCount, value=0),
MetricRecord(metric=ApiCallSuccessful, value=true),
MetricRecord(metric=OperationName, value=DeleteObject),
MetricRecord(metric=EndpointResolveDuration, value=PT0.000132792S),
MetricRecord(metric=ApiCallDuration, value=PT0.064890875S),
MetricRecord(metric=CredentialsFetchDuration, value=PT0.000017458S),
MetricRecord(metric=ServiceEndpoint, value=https://buckets3.eu-west-2.amazonaws.com),
MetricRecord(metric=ServiceId, value=S3)], children=[
MetricCollection(name=ApiCallAttempt, metrics=[
MetricRecord(metric=TimeToFirstByte, value=PT0.06260225S),
MetricRecord(metric=SigningDuration, value=PT0.000293083S),
MetricRecord(metric=ReadThroughput, value=0.0),
MetricRecord(metric=ServiceCallDuration, value=PT0.06260225S),
MetricRecord(metric=HttpStatusCode, value=204),
MetricRecord(metric=BackoffDelayDuration, value=PT0S),
MetricRecord(metric=TimeToLastByte, value=PT0.064313667S),
MetricRecord(metric=AwsRequestId, value=RKZD44SE5DW91K1G)], children=[
MetricCollection(name=HttpClient, metrics=[
MetricRecord(metric=AvailableConcurrency, value=1),
MetricRecord(metric=LeasedConcurrency, value=0),
MetricRecord(metric=ConcurrencyAcquireDuration, value=PT0S),
MetricRecord(metric=PendingConcurrencyAcquires, value=0),
MetricRecord(metric=MaxConcurrency, value=512),
MetricRecord(metric=HttpClientName, value=Apache)], children=[])
])
])
```

### <a name="audit-logging"></a> Enable S3 Server-side Logging

The [Auditing](auditing) feature of the S3A connector can be used to generate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,8 @@ public void testS3SpecificSignerOverride() throws Exception {
config.set(AWS_REGION, EU_WEST_1);
disableFilesystemCaching(config);
fs = S3ATestUtils.createTestFileSystem(config);
assumeStoreAwsHosted(fs);


S3Client s3Client = getS3Client("testS3SpecificSignerOverride");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM;
import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_KEY;

import static org.apache.hadoop.fs.s3a.S3ATestUtils.assumeStoreAwsHosted;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.maybeSkipRootTests;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
Expand Down Expand Up @@ -101,6 +102,7 @@ public void setup() throws Exception {
// although not a root dir test, this confuses paths enough it shouldn't be run in
// parallel with other jobs
maybeSkipRootTests(getConfiguration());
assumeStoreAwsHosted(getFileSystem());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@
import java.nio.file.AccessDeniedException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;

import org.assertj.core.api.Assertions;
import org.junit.Ignore;
import org.junit.Test;
import software.amazon.awssdk.awscore.AwsExecutionAttribute;
import software.amazon.awssdk.awscore.exception.AwsServiceException;
Expand Down Expand Up @@ -55,6 +55,7 @@
import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_ALGORITHM;
import static org.apache.hadoop.fs.s3a.DefaultS3ClientFactory.ERROR_ENDPOINT_WITH_FIPS;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.assume;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.assumeStoreAwsHosted;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
import static org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.DEFAULT_REQUESTER_PAYS_BUCKET_NAME;
import static org.apache.hadoop.io.IOUtils.closeStream;
Expand Down Expand Up @@ -106,6 +107,10 @@ public class ITestS3AEndpointRegion extends AbstractS3ATestBase {

public static final String EXCEPTION_THROWN_BY_INTERCEPTOR = "Exception thrown by interceptor";

/**
* Text to include in assertions.
*/
private static final AtomicReference<String> EXPECTED_MESSAGE = new AtomicReference<>();
/**
* New FS instance which will be closed in teardown.
*/
Expand Down Expand Up @@ -477,6 +482,7 @@ public void testCentralEndpointAndNullRegionWithCRUD() throws Throwable {
describe("Access the test bucket using central endpoint and"
+ " null region, perform file system CRUD operations");
final Configuration conf = getConfiguration();
assumeStoreAwsHosted(getFileSystem());

final Configuration newConf = new Configuration(conf);

Expand All @@ -499,6 +505,7 @@ public void testCentralEndpointAndNullRegionWithCRUD() throws Throwable {
public void testCentralEndpointAndNullRegionFipsWithCRUD() throws Throwable {
describe("Access the test bucket using central endpoint and"
+ " null region and fips enabled, perform file system CRUD operations");
assumeStoreAwsHosted(getFileSystem());

final String bucketLocation = getFileSystem().getBucketLocation();
assume("FIPS can be enabled to access buckets from US or Canada endpoints only",
Expand Down Expand Up @@ -576,7 +583,7 @@ private void assertOpsUsingNewFs() throws IOException {
.isFalse();
}

private final class RegionInterceptor implements ExecutionInterceptor {
private static final class RegionInterceptor implements ExecutionInterceptor {
private final String endpoint;
private final String region;
private final boolean isFips;
Expand All @@ -591,28 +598,49 @@ private final class RegionInterceptor implements ExecutionInterceptor {
public void beforeExecution(Context.BeforeExecution context,
ExecutionAttributes executionAttributes) {

if (endpoint != null && !endpoint.endsWith(CENTRAL_ENDPOINT)) {
Assertions.assertThat(
executionAttributes.getAttribute(AwsExecutionAttribute.ENDPOINT_OVERRIDDEN))
.describedAs("Endpoint not overridden").isTrue();

Assertions.assertThat(
executionAttributes.getAttribute(AwsExecutionAttribute.CLIENT_ENDPOINT).toString())
.describedAs("There is an endpoint mismatch").isEqualTo("https://" + endpoint);
// extract state from the execution attributes.
final Boolean endpointOveridden =
executionAttributes.getAttribute(AwsExecutionAttribute.ENDPOINT_OVERRIDDEN);
final String clientEndpoint =
executionAttributes.getAttribute(AwsExecutionAttribute.CLIENT_ENDPOINT).toString();
final Boolean fipsEnabled = executionAttributes.getAttribute(
AwsExecutionAttribute.FIPS_ENDPOINT_ENABLED);
final String reg = executionAttributes.getAttribute(AwsExecutionAttribute.AWS_REGION).
toString();

String state = "SDK beforeExecution callback; "
+ "endpointOveridden=" + endpointOveridden
+ "; clientEndpoint=" + clientEndpoint
+ "; fipsEnabled=" + fipsEnabled
+ "; region=" + reg;

if (endpoint != null && !endpoint.endsWith(CENTRAL_ENDPOINT)) {
Assertions.assertThat(endpointOveridden)
.describedAs("Endpoint not overridden in %s. Client Config=%s",
state, EXPECTED_MESSAGE.get())
.isTrue();

Assertions.assertThat(clientEndpoint)
.describedAs("There is an endpoint mismatch in %s. Client Config=%s",
state, EXPECTED_MESSAGE.get())
.isEqualTo("https://" + endpoint);
} else {
Assertions.assertThat(
executionAttributes.getAttribute(AwsExecutionAttribute.ENDPOINT_OVERRIDDEN))
.describedAs("Endpoint is overridden").isEqualTo(null);
Assertions.assertThat(endpointOveridden)
.describedAs("Attribute endpointOveridden is null in %s. Client Config=%s",
state, EXPECTED_MESSAGE.get())
.isEqualTo(false);
}

Assertions.assertThat(
executionAttributes.getAttribute(AwsExecutionAttribute.AWS_REGION).toString())
.describedAs("Incorrect region set").isEqualTo(region);
Assertions.assertThat(reg)
.describedAs("Incorrect region set in %s. Client Config=%s",
state, EXPECTED_MESSAGE.get())
.isEqualTo(region);

// verify the fips state matches expectation.
Assertions.assertThat(executionAttributes.getAttribute(
AwsExecutionAttribute.FIPS_ENDPOINT_ENABLED))
.describedAs("Incorrect FIPS flag set in execution attributes")
Assertions.assertThat(fipsEnabled)
.describedAs("Incorrect FIPS flag set in %s; Client Config=%s",
state, EXPECTED_MESSAGE.get())
.isNotNull()
.isEqualTo(isFips);

Expand All @@ -637,6 +665,11 @@ private S3Client createS3Client(Configuration conf,
String endpoint, String configuredRegion, String expectedRegion, boolean isFips)
throws IOException {

String expected =
"endpoint=" + endpoint + "; region=" + configuredRegion
+ "; expectedRegion=" + expectedRegion + "; isFips=" + isFips;
LOG.info("Creating S3 client with {}", expected);
EXPECTED_MESSAGE.set(expected);
List<ExecutionInterceptor> interceptors = new ArrayList<>();
interceptors.add(new RegionInterceptor(endpoint, expectedRegion, isFips));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1170,7 +1170,7 @@ public static void assumeNotS3ExpressFileSystem(final FileSystem fs) {
*/
public static void assumeStoreAwsHosted(final FileSystem fs) {
assume("store is not AWS S3",
!NetworkBinding.isAwsEndpoint(fs.getConf()
NetworkBinding.isAwsEndpoint(fs.getConf()
.getTrimmed(ENDPOINT, DEFAULT_ENDPOINT)));
}

Expand Down
Loading