Skip to content

Commit 4085a8d

Browse files
committed
updates getObjectMetadata, putObject & copyObject operations
1 parent 77c5cf0 commit 4085a8d

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+934
-820
lines changed

hadoop-project/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1128,6 +1128,11 @@
11281128
</exclusion>
11291129
</exclusions>
11301130
</dependency>
1131+
<dependency>
1132+
<groupId>software.amazon.awssdk</groupId>
1133+
<artifactId>s3-transfer-manager</artifactId>
1134+
<version>2.17.196-PREVIEW</version>
1135+
</dependency>
11311136
<dependency>
11321137
<groupId>org.apache.mina</groupId>
11331138
<artifactId>mina-core</artifactId>

hadoop-tools/hadoop-aws/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -497,6 +497,11 @@
497497
<artifactId>bundle</artifactId>
498498
<scope>compile</scope>
499499
</dependency>
500+
<dependency>
501+
<groupId>software.amazon.awssdk</groupId>
502+
<artifactId>s3-transfer-manager</artifactId>
503+
<scope>compile</scope>
504+
</dependency>
500505
<dependency>
501506
<groupId>org.assertj</groupId>
502507
<artifactId>assertj-core</artifactId>

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSClientConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,10 @@
3131
import software.amazon.awssdk.core.retry.RetryPolicy;
3232
import software.amazon.awssdk.http.apache.ApacheHttpClient;
3333
import software.amazon.awssdk.http.apache.ProxyConfiguration;
34+
import software.amazon.awssdk.thirdparty.org.apache.http.client.utils.URIBuilder;
3435

3536
import org.apache.hadoop.conf.Configuration;
3637
import org.apache.hadoop.util.VersionInfo;
37-
import org.apache.http.client.utils.URIBuilder;
3838

3939
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_ESTABLISH_TIMEOUT;
4040
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_MAXIMUM_CONNECTIONS;

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Invoker.java

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
import com.amazonaws.SdkBaseException;
2929
import org.slf4j.Logger;
3030
import org.slf4j.LoggerFactory;
31+
import software.amazon.awssdk.awscore.exception.AwsServiceException;
32+
import software.amazon.awssdk.core.exception.SdkException;
3133

3234
import org.apache.commons.lang3.StringUtils;
3335
import org.apache.hadoop.classification.InterfaceAudience;
@@ -120,8 +122,13 @@ public static <T> T once(String action, String path,
120122
throws IOException {
121123
try (DurationInfo ignored = new DurationInfo(LOG, false, "%s", action)) {
122124
return operation.apply();
123-
} catch (AmazonClientException e) {
124-
throw S3AUtils.translateException(action, path, e);
125+
} catch (AmazonClientException | AwsServiceException e) {
126+
// TODO: This is temporary, and will be updated during error translation work.
127+
if (e instanceof AmazonClientException) {
128+
throw S3AUtils.translateException(action, path, (SdkBaseException) e);
129+
} else {
130+
throw S3AUtils.translateExceptionV2(action, path, (SdkException) e);
131+
}
125132
}
126133
}
127134

@@ -466,19 +473,24 @@ public <T> T retryUntranslated(
466473
}
467474
// execute the operation, returning if successful
468475
return operation.apply();
469-
} catch (IOException | SdkBaseException e) {
476+
} catch (IOException | SdkBaseException | AwsServiceException e) {
470477
caught = e;
471478
}
472479
// you only get here if the operation didn't complete
473480
// normally, hence caught != null
474481

475482
// translate the exception into an IOE for the retry logic
476483
IOException translated;
484+
// TODO: Update during error translation work. This is a temporary fix to allow
485+
// getObjectMetadata to throw FNFE.
477486
if (caught instanceof IOException) {
478487
translated = (IOException) caught;
479-
} else {
488+
} else if (caught instanceof SdkBaseException) {
480489
translated = S3AUtils.translateException(text, "",
481490
(SdkBaseException)caught);
491+
} else {
492+
translated = S3AUtils.translateExceptionV2(text, "",
493+
(AwsServiceException)caught);
482494
}
483495

484496
try {
@@ -516,8 +528,11 @@ public <T> T retryUntranslated(
516528

517529
if (caught instanceof IOException) {
518530
throw (IOException) caught;
519-
} else {
531+
// TODO: This is temporary, and will be updated during error translation work.
532+
} else if (caught instanceof SdkBaseException) {
520533
throw (SdkBaseException) caught;
534+
} else {
535+
throw (AwsServiceException) caught;
521536
}
522537
}
523538

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/ProgressableProgressListener.java

Lines changed: 23 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -18,59 +18,56 @@
1818

1919
package org.apache.hadoop.fs.s3a;
2020

21-
import com.amazonaws.event.ProgressEvent;
22-
import com.amazonaws.event.ProgressEventType;
23-
import com.amazonaws.event.ProgressListener;
24-
import com.amazonaws.services.s3.transfer.Upload;
25-
import org.apache.hadoop.util.Progressable;
2621
import org.slf4j.Logger;
2722

28-
import static com.amazonaws.event.ProgressEventType.TRANSFER_COMPLETED_EVENT;
29-
import static com.amazonaws.event.ProgressEventType.TRANSFER_PART_STARTED_EVENT;
23+
import software.amazon.awssdk.transfer.s3.ObjectTransfer;
24+
import software.amazon.awssdk.transfer.s3.progress.TransferListener;
25+
26+
import org.apache.hadoop.util.Progressable;
3027

3128
/**
3229
* Listener to progress from AWS regarding transfers.
3330
*/
34-
public class ProgressableProgressListener implements ProgressListener {
31+
public class ProgressableProgressListener implements TransferListener {
3532
private static final Logger LOG = S3AFileSystem.LOG;
3633
private final S3AFileSystem fs;
3734
private final String key;
3835
private final Progressable progress;
3936
private long lastBytesTransferred;
40-
private final Upload upload;
4137

4238
/**
4339
* Instantiate.
4440
* @param fs filesystem: will be invoked with statistics updates
4541
* @param key key for the upload
46-
* @param upload source of events
4742
* @param progress optional callback for progress.
4843
*/
4944
public ProgressableProgressListener(S3AFileSystem fs,
5045
String key,
51-
Upload upload,
5246
Progressable progress) {
5347
this.fs = fs;
5448
this.key = key;
55-
this.upload = upload;
5649
this.progress = progress;
5750
this.lastBytesTransferred = 0;
5851
}
5952

6053
@Override
61-
public void progressChanged(ProgressEvent progressEvent) {
62-
if (progress != null) {
63-
progress.progress();
64-
}
54+
public void transferInitiated(TransferListener.Context.TransferInitiated context) {
55+
fs.incrementWriteOperations();
56+
}
6557

66-
// There are 3 http ops here, but this should be close enough for now
67-
ProgressEventType pet = progressEvent.getEventType();
68-
if (pet == TRANSFER_PART_STARTED_EVENT ||
69-
pet == TRANSFER_COMPLETED_EVENT) {
70-
fs.incrementWriteOperations();
58+
@Override
59+
public void transferComplete(TransferListener.Context.TransferComplete context) {
60+
fs.incrementWriteOperations();
61+
}
62+
63+
@Override
64+
public void bytesTransferred(TransferListener.Context.BytesTransferred context) {
65+
66+
if(progress != null) {
67+
progress.progress();
7168
}
7269

73-
long transferred = upload.getProgress().getBytesTransferred();
70+
long transferred = context.progressSnapshot().bytesTransferred();
7471
long delta = transferred - lastBytesTransferred;
7572
fs.incrementPutProgressStatistics(key, delta);
7673
lastBytesTransferred = transferred;
@@ -81,9 +78,10 @@ public void progressChanged(ProgressEvent progressEvent) {
8178
* This can handle race conditions in setup/teardown.
8279
* @return the number of bytes which were transferred after the notification
8380
*/
84-
public long uploadCompleted() {
85-
long delta = upload.getProgress().getBytesTransferred() -
86-
lastBytesTransferred;
81+
public long uploadCompleted(ObjectTransfer upload) {
82+
83+
long delta =
84+
upload.progress().snapshot().bytesTransferred() - lastBytesTransferred;
8785
if (delta > 0) {
8886
LOG.debug("S3A write delta changed after finished: {} bytes", delta);
8987
fs.incrementPutProgressStatistics(key, delta);

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,6 @@
3636
import com.amazonaws.event.ProgressEventType;
3737
import com.amazonaws.event.ProgressListener;
3838
import com.amazonaws.services.s3.model.PartETag;
39-
import com.amazonaws.services.s3.model.PutObjectRequest;
40-
import com.amazonaws.services.s3.model.PutObjectResult;
4139
import com.amazonaws.services.s3.model.UploadPartRequest;
4240

4341
import org.apache.hadoop.fs.s3a.impl.PutObjectOptions;
@@ -50,6 +48,9 @@
5048
import org.slf4j.Logger;
5149
import org.slf4j.LoggerFactory;
5250

51+
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
52+
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
53+
5354
import org.apache.hadoop.classification.InterfaceAudience;
5455
import org.apache.hadoop.classification.InterfaceStability;
5556
import org.apache.hadoop.fs.Abortable;
@@ -575,24 +576,30 @@ private int putObject() throws IOException {
575576
final PutObjectRequest putObjectRequest = uploadData.hasFile() ?
576577
writeOperationHelper.createPutObjectRequest(
577578
key,
578-
uploadData.getFile(),
579-
builder.putOptions)
579+
uploadData.getFile().length(),
580+
builder.putOptions,
581+
true)
580582
: writeOperationHelper.createPutObjectRequest(
581583
key,
582-
uploadData.getUploadStream(),
583584
size,
584-
builder.putOptions);
585-
BlockUploadProgress callback =
586-
new BlockUploadProgress(
587-
block, progressListener, now());
588-
putObjectRequest.setGeneralProgressListener(callback);
585+
builder.putOptions,
586+
false);
587+
588+
// TODO: You cannot currently add progress listeners to requests not via the TM.
589+
// There is an open ticket for this with the SDK team. But need to check how important
590+
// this is for us?
591+
// BlockUploadProgress callback =
592+
// new BlockUploadProgress(
593+
// block, progressListener, now());
594+
// putObjectRequest.setGeneralProgressListener(callback);
589595
statistics.blockUploadQueued(size);
590-
ListenableFuture<PutObjectResult> putObjectResult =
596+
ListenableFuture<PutObjectResponse> putObjectResult =
591597
executorService.submit(() -> {
592598
try {
593599
// the putObject call automatically closes the input
594600
// stream afterwards.
595-
return writeOperationHelper.putObject(putObjectRequest, builder.putOptions);
601+
return writeOperationHelper.putObject(putObjectRequest, builder.putOptions, uploadData,
602+
uploadData.hasFile());
596603
} finally {
597604
cleanupWithLogger(LOG, uploadData, block);
598605
}

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ADataBlocks.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@
4747
* Set of classes to support output streaming into blocks which are then
4848
* uploaded as to S3 as a single PUT, or as part of a multipart request.
4949
*/
50-
final class S3ADataBlocks {
50+
public final class S3ADataBlocks {
5151

5252
private static final Logger LOG =
5353
LoggerFactory.getLogger(S3ADataBlocks.class);
@@ -101,15 +101,15 @@ static BlockFactory createFactory(S3AFileSystem owner,
101101
* It can be one of a file or an input stream.
102102
* When closed, any stream is closed. Any source file is untouched.
103103
*/
104-
static final class BlockUploadData implements Closeable {
104+
public static final class BlockUploadData implements Closeable {
105105
private final File file;
106106
private final InputStream uploadStream;
107107

108108
/**
109109
* File constructor; input stream will be null.
110110
* @param file file to upload
111111
*/
112-
BlockUploadData(File file) {
112+
public BlockUploadData(File file) {
113113
Preconditions.checkArgument(file.exists(), "No file: " + file);
114114
this.file = file;
115115
this.uploadStream = null;
@@ -119,7 +119,7 @@ static final class BlockUploadData implements Closeable {
119119
* Stream constructor, file field will be null.
120120
* @param uploadStream stream to upload
121121
*/
122-
BlockUploadData(InputStream uploadStream) {
122+
public BlockUploadData(InputStream uploadStream) {
123123
Preconditions.checkNotNull(uploadStream, "rawUploadStream");
124124
this.uploadStream = uploadStream;
125125
this.file = null;

0 commit comments

Comments
 (0)