Skip to content

Commit 120620c

Browse files
HADOOP-18888. S3A. createS3AsyncClient() always enables multipart uploads (#6056)
* The multipart flag fs.s3a.multipart.uploads.enabled is passed to the async client created * s3A connector bypasses the transfer manager entirely if disabled or for small files. Contributed by Steve Loughran
1 parent 510a7dc commit 120620c

File tree

6 files changed

+117
-64
lines changed

6 files changed

+117
-64
lines changed

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ public S3AsyncClient createS3AsyncClient(
112112
return configureClientBuilder(S3AsyncClient.builder(), parameters, conf, bucket)
113113
.httpClientBuilder(httpClientBuilder)
114114
.multipartConfiguration(multipartConfiguration)
115-
.multipartEnabled(true)
115+
.multipartEnabled(parameters.isMultipartCopy())
116116
.build();
117117
}
118118

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

Lines changed: 63 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -440,6 +440,12 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
440440
*/
441441
private boolean isMultipartUploadEnabled = DEFAULT_MULTIPART_UPLOAD_ENABLED;
442442

443+
/**
444+
* Should file copy operations use the S3 transfer manager?
445+
* True unless multipart upload is disabled.
446+
*/
447+
private boolean isMultipartCopyEnabled;
448+
443449
/**
444450
* A cache of files that should be deleted when the FileSystem is closed
445451
* or the JVM is exited.
@@ -576,6 +582,9 @@ public void initialize(URI name, Configuration originalConf)
576582
intOption(conf, PREFETCH_BLOCK_COUNT_KEY, PREFETCH_BLOCK_DEFAULT_COUNT, 1);
577583
this.isMultipartUploadEnabled = conf.getBoolean(MULTIPART_UPLOADS_ENABLED,
578584
DEFAULT_MULTIPART_UPLOAD_ENABLED);
585+
// multipart copy and upload are the same; this just makes it explicit
586+
this.isMultipartCopyEnabled = isMultipartUploadEnabled;
587+
579588
initThreadPools(conf);
580589

581590
int listVersion = conf.getInt(LIST_VERSION, DEFAULT_LIST_VERSION);
@@ -982,6 +991,7 @@ private void bindAWSClient(URI name, boolean dtEnabled) throws IOException {
982991
.withRequesterPays(conf.getBoolean(ALLOW_REQUESTER_PAYS, DEFAULT_ALLOW_REQUESTER_PAYS))
983992
.withExecutionInterceptors(auditManager.createExecutionInterceptors())
984993
.withMinimumPartSize(partSize)
994+
.withMultipartCopyEnabled(isMultipartCopyEnabled)
985995
.withMultipartThreshold(multiPartThreshold)
986996
.withTransferManagerExecutor(unboundedThreadPool)
987997
.withRegion(region);
@@ -1468,6 +1478,11 @@ public AWSCredentialProviderList shareCredentials(final String purpose) {
14681478
LOG.debug("Sharing credentials for: {}", purpose);
14691479
return credentials.share();
14701480
}
1481+
1482+
@Override
1483+
public boolean isMultipartCopyEnabled() {
1484+
return S3AFileSystem.this.isMultipartUploadEnabled;
1485+
}
14711486
}
14721487

14731488
/**
@@ -4436,37 +4451,56 @@ private CopyObjectResponse copyFile(String srcKey, String dstKey, long size,
44364451
e);
44374452
}
44384453

4439-
return readInvoker.retry(
4440-
action, srcKey,
4441-
true,
4442-
() -> {
4443-
CopyObjectRequest.Builder copyObjectRequestBuilder =
4444-
getRequestFactory().newCopyObjectRequestBuilder(srcKey, dstKey, srcom);
4445-
changeTracker.maybeApplyConstraint(copyObjectRequestBuilder);
4446-
incrementStatistic(OBJECT_COPY_REQUESTS);
4447-
4448-
Copy copy = transferManager.copy(
4449-
CopyRequest.builder()
4450-
.copyObjectRequest(copyObjectRequestBuilder.build())
4451-
.build());
4454+
CopyObjectRequest.Builder copyObjectRequestBuilder =
4455+
getRequestFactory().newCopyObjectRequestBuilder(srcKey, dstKey, srcom);
4456+
changeTracker.maybeApplyConstraint(copyObjectRequestBuilder);
4457+
CopyObjectResponse response;
44524458

4453-
try {
4454-
CompletedCopy completedCopy = copy.completionFuture().join();
4455-
CopyObjectResponse result = completedCopy.response();
4456-
changeTracker.processResponse(result);
4457-
incrementWriteOperations();
4458-
instrumentation.filesCopied(1, size);
4459-
return result;
4460-
} catch (CompletionException e) {
4461-
Throwable cause = e.getCause();
4462-
if (cause instanceof SdkException) {
4463-
SdkException awsException = (SdkException)cause;
4464-
changeTracker.processException(awsException, "copy");
4465-
throw awsException;
4459+
// transfer manager is skipped if disabled or the file is too small to worry about
4460+
final boolean useTransferManager = isMultipartCopyEnabled && size >= multiPartThreshold;
4461+
if (useTransferManager) {
4462+
// use transfer manager
4463+
response = readInvoker.retry(
4464+
action, srcKey,
4465+
true,
4466+
() -> {
4467+
incrementStatistic(OBJECT_COPY_REQUESTS);
4468+
4469+
Copy copy = transferManager.copy(
4470+
CopyRequest.builder()
4471+
.copyObjectRequest(copyObjectRequestBuilder.build())
4472+
.build());
4473+
4474+
try {
4475+
CompletedCopy completedCopy = copy.completionFuture().join();
4476+
return completedCopy.response();
4477+
} catch (CompletionException e) {
4478+
Throwable cause = e.getCause();
4479+
if (cause instanceof SdkException) {
4480+
SdkException awsException = (SdkException)cause;
4481+
changeTracker.processException(awsException, "copy");
4482+
throw awsException;
4483+
}
4484+
throw extractException(action, srcKey, e);
44664485
}
4467-
throw extractException(action, srcKey, e);
4468-
}
4469-
});
4486+
});
4487+
} else {
4488+
// single part copy bypasses transfer manager
4489+
// note, this helps with some mock testing, e.g. HBoss. as there is less to mock.
4490+
response = readInvoker.retry(
4491+
action, srcKey,
4492+
true,
4493+
() -> {
4494+
LOG.debug("copyFile: single part copy {} -> {} of size {}", srcKey, dstKey, size);
4495+
incrementStatistic(OBJECT_COPY_REQUESTS);
4496+
return s3Client.copyObject(copyObjectRequestBuilder.build());
4497+
});
4498+
}
4499+
4500+
changeTracker.processResponse(response);
4501+
incrementWriteOperations();
4502+
instrumentation.filesCopied(1, size);
4503+
return response;
44704504
}
44714505

44724506
/**

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInternals.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,4 +115,10 @@ public interface S3AInternals {
115115
@AuditEntryPoint
116116
@Retries.RetryTranslated
117117
HeadBucketResponse getBucketMetadata() throws IOException;
118+
119+
/**
120+
* Is multipart copy enabled?
121+
* @return true if the transfer manager is used to copy files.
122+
*/
123+
boolean isMultipartCopyEnabled();
118124
}

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,11 @@ final class S3ClientCreationParameters {
156156
*/
157157
private long multiPartThreshold;
158158

159+
/**
160+
* Multipart upload enabled.
161+
*/
162+
private boolean multipartCopy = true;
163+
159164
/**
160165
* Executor that the transfer manager will use to execute background tasks.
161166
*/
@@ -399,5 +404,24 @@ public S3ClientCreationParameters withRegion(
399404
public Region getRegion() {
400405
return region;
401406
}
407+
408+
/**
409+
* Set the multipart flag..
410+
*
411+
* @param value new value
412+
* @return the builder
413+
*/
414+
public S3ClientCreationParameters withMultipartCopyEnabled(final boolean value) {
415+
this.multipartCopy = value;
416+
return this;
417+
}
418+
419+
/**
420+
* Get the multipart flag.
421+
* @return multipart flag
422+
*/
423+
public boolean isMultipartCopy() {
424+
return multipartCopy;
425+
}
402426
}
403427
}

hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -355,10 +355,10 @@ public long getFilesize() {
355355
/**
356356
* Is this expected to be a multipart upload?
357357
* Assertions will change if not.
358-
* @return true by default.
358+
* @return what the filesystem expects.
359359
*/
360360
protected boolean expectMultipartUpload() {
361-
return true;
361+
return getFileSystem().getS3AInternals().isMultipartCopyEnabled();
362362
}
363363

364364
/**

hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesNoMultipart.java

Lines changed: 21 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,10 @@
1818

1919
package org.apache.hadoop.fs.s3a.scale;
2020

21+
import org.assertj.core.api.Assertions;
22+
2123
import org.apache.hadoop.conf.Configuration;
22-
import org.apache.hadoop.fs.FileSystem;
23-
import org.apache.hadoop.fs.Path;
2424
import org.apache.hadoop.fs.s3a.Constants;
25-
import org.apache.hadoop.fs.s3a.S3AFileSystem;
26-
import org.apache.hadoop.fs.s3a.S3ATestUtils;
27-
import org.apache.hadoop.fs.s3a.api.UnsupportedRequestException;
2825

2926
import static org.apache.hadoop.fs.contract.ContractTestUtils.IO_CHUNK_BUFFER_SIZE;
3027
import static org.apache.hadoop.fs.s3a.Constants.MIN_MULTIPART_THRESHOLD;
@@ -33,19 +30,13 @@
3330
import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_UPLOADS_ENABLED;
3431
import static org.apache.hadoop.fs.s3a.Constants.REQUEST_TIMEOUT;
3532
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
36-
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
3733

3834
/**
3935
* Use a single PUT for the whole upload/rename/delete workflow; include verification
4036
* that the transfer manager will fail fast unless the multipart threshold is huge.
4137
*/
4238
public class ITestS3AHugeFilesNoMultipart extends AbstractSTestS3AHugeFiles {
4339

44-
/**
45-
* Size to ensure MPUs don't happen in transfer manager.
46-
*/
47-
public static final String S_1T = "1T";
48-
4940
public static final String SINGLE_PUT_REQUEST_TIMEOUT = "1h";
5041

5142
/**
@@ -56,11 +47,23 @@ protected String getBlockOutputBufferName() {
5647
return Constants.FAST_UPLOAD_BUFFER_DISK;
5748
}
5849

50+
/**
51+
* Multipart upload is always disabled.
52+
* @return false
53+
*/
5954
@Override
6055
protected boolean expectMultipartUpload() {
6156
return false;
6257
}
6358

59+
/**
60+
* Is multipart copy enabled?
61+
* @return true if the transfer manager is used to copy files.
62+
*/
63+
private boolean isMultipartCopyEnabled() {
64+
return getFileSystem().getS3AInternals().isMultipartCopyEnabled();
65+
}
66+
6467
/**
6568
* Create a configuration without multipart upload,
6669
* and a long request timeout to allow for a very slow
@@ -77,35 +80,21 @@ protected Configuration createScaleConfiguration() {
7780
MULTIPART_SIZE,
7881
REQUEST_TIMEOUT);
7982
conf.setInt(IO_CHUNK_BUFFER_SIZE, 655360);
80-
conf.set(MIN_MULTIPART_THRESHOLD, S_1T);
81-
conf.set(MULTIPART_SIZE, S_1T);
83+
conf.setInt(MIN_MULTIPART_THRESHOLD, MULTIPART_MIN_SIZE);
84+
conf.setInt(MULTIPART_SIZE, MULTIPART_MIN_SIZE);
8285
conf.setBoolean(MULTIPART_UPLOADS_ENABLED, false);
8386
conf.set(REQUEST_TIMEOUT, SINGLE_PUT_REQUEST_TIMEOUT);
8487
return conf;
8588
}
8689

8790
/**
88-
* After the file is created, attempt a rename with an FS
89-
* instance with a small multipart threshold;
90-
* this MUST be rejected.
91+
* Verify multipart copy is disabled.
9192
*/
9293
@Override
9394
public void test_030_postCreationAssertions() throws Throwable {
94-
assumeHugeFileExists();
95-
final Path hugefile = getHugefile();
96-
final Path hugefileRenamed = getHugefileRenamed();
97-
describe("renaming %s to %s", hugefile, hugefileRenamed);
98-
S3AFileSystem fs = getFileSystem();
99-
fs.delete(hugefileRenamed, false);
100-
// create a new fs with a small multipart threshold; expect rename failure.
101-
final Configuration conf = new Configuration(fs.getConf());
102-
conf.setInt(MIN_MULTIPART_THRESHOLD, MULTIPART_MIN_SIZE);
103-
conf.setInt(MULTIPART_SIZE, MULTIPART_MIN_SIZE);
104-
S3ATestUtils.disableFilesystemCaching(conf);
105-
106-
try (FileSystem fs2 = FileSystem.get(fs.getUri(), conf)) {
107-
intercept(UnsupportedRequestException.class, () ->
108-
fs2.rename(hugefile, hugefileRenamed));
109-
}
95+
super.test_030_postCreationAssertions();
96+
Assertions.assertThat(isMultipartCopyEnabled())
97+
.describedAs("Multipart copy should be disabled in %s", getFileSystem())
98+
.isFalse();
11099
}
111100
}

0 commit comments

Comments
 (0)