From 19cc9168a2f775bf132b733862bad905a6f4e50f Mon Sep 17 00:00:00 2001 From: diljot grewal Date: Fri, 23 Aug 2024 15:24:02 -0700 Subject: [PATCH 1/6] Integrate PutIfNotExist functionality into S3A --- .../org/apache/hadoop/fs/s3a/Constants.java | 7 ++ .../hadoop/fs/s3a/S3ABlockOutputStream.java | 17 ++- .../hadoop/fs/s3a/WriteOperationHelper.java | 3 +- .../hadoop/fs/s3a/api/RequestFactory.java | 9 +- .../hadoop/fs/s3a/impl/CreateFileBuilder.java | 7 +- .../hadoop/fs/s3a/impl/InternalConstants.java | 3 +- .../fs/s3a/impl/RequestFactoryImpl.java | 17 ++- .../fs/s3a/impl/ITestS3APutIfMatch.java | 113 ++++++++++++++++++ .../fs/s3a/impl/TestCreateFileBuilder.java | 4 +- .../fs/s3a/impl/TestRequestFactory.java | 3 +- 10 files changed, 166 insertions(+), 17 deletions(-) create mode 100644 hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatch.java diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java index 078ffaa471aeb..ae2b895145b9f 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java @@ -1390,6 +1390,13 @@ private Constants() { */ public static final String FS_S3A_CREATE_PERFORMANCE = "fs.s3a.create.performance"; + /** + * Flag for commit if none match. + * This can be set in the {code createFile()} builder. + * Value {@value}. + */ + public static final String FS_S3A_CREATE_IF_NONE_MATCH = "fs.s3a.create.header.If-None-Match"; + /** * Default value for create performance in an S3A FS. * Value {@value}. diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java index 5fe39ac6ea336..db43878495509 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java @@ -25,6 +25,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Locale; +import java.util.Map; import java.util.StringJoiner; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -99,6 +100,8 @@ class S3ABlockOutputStream extends OutputStream implements private static final String E_NOT_SYNCABLE = "S3A streams are not Syncable. See HADOOP-17597."; + public static final String IF_NONE_MATCH_HEADER = "If-None-Match"; + /** Object being uploaded. */ private final String key; @@ -596,7 +599,7 @@ private long putObject() throws IOException { final S3ADataBlocks.DataBlock block = getActiveBlock(); long size = block.dataSize(); final S3ADataBlocks.BlockUploadData uploadData = block.startUpload(); - final PutObjectRequest putObjectRequest = uploadData.hasFile() ? + PutObjectRequest putObjectRequest = uploadData.hasFile() ? writeOperationHelper.createPutObjectRequest( key, uploadData.getFile().length(), @@ -608,6 +611,16 @@ private long putObject() throws IOException { builder.putOptions, false); + PutObjectRequest.Builder maybeModifiedPutIfAbsentRequest = putObjectRequest.toBuilder(); + Map optionHeaders = builder.putOptions.getHeaders(); + + if (optionHeaders != null && optionHeaders.containsKey(IF_NONE_MATCH_HEADER)) { + maybeModifiedPutIfAbsentRequest.overrideConfiguration( + override -> override.putHeader(IF_NONE_MATCH_HEADER, optionHeaders.get(IF_NONE_MATCH_HEADER))); + } + + final PutObjectRequest finalizedRequest = maybeModifiedPutIfAbsentRequest.build(); + BlockUploadProgress progressCallback = new BlockUploadProgress(block, progressListener, now()); statistics.blockUploadQueued(size); @@ -617,7 +630,7 @@ private long putObject() throws IOException { // the putObject call automatically closes the input // stream afterwards. PutObjectResponse response = - writeOperationHelper.putObject(putObjectRequest, builder.putOptions, uploadData, + writeOperationHelper.putObject(finalizedRequest, builder.putOptions, uploadData, uploadData.hasFile(), statistics); progressCallback.progressChanged(REQUEST_BYTE_TRANSFER_EVENT); return response; diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java index 3bbe000bf5b6e..ea7787d284f91 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java @@ -320,8 +320,7 @@ private CompleteMultipartUploadResponse finalizeMultipartUpload( retrying, () -> { final CompleteMultipartUploadRequest.Builder requestBuilder = - getRequestFactory().newCompleteMultipartUploadRequestBuilder( - destKey, uploadId, partETags); + getRequestFactory().newCompleteMultipartUploadRequestBuilder(destKey, uploadId, partETags, putOptions); return writeOperationHelperCallbacks.completeMultipartUpload(requestBuilder.build()); }); owner.finishedWrite(destKey, length, uploadResult.eTag(), diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/RequestFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/RequestFactory.java index 73ad137a86d3c..bfb4605e24038 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/RequestFactory.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/RequestFactory.java @@ -165,15 +165,16 @@ CreateMultipartUploadRequest.Builder newMultipartUploadRequestBuilder( /** * Complete a multipart upload. - * @param destKey destination object key - * @param uploadId ID of initiated upload - * @param partETags ordered list of etags + * + * @param destKey destination object key + * @param uploadId ID of initiated upload + * @param partETags ordered list of etags * @return the request builder. */ CompleteMultipartUploadRequest.Builder newCompleteMultipartUploadRequestBuilder( String destKey, String uploadId, - List partETags); + List partETags, PutObjectOptions putOptions); /** * Create a HEAD object request builder. diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CreateFileBuilder.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CreateFileBuilder.java index ae2945989ddd3..b50ac6aa948d8 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CreateFileBuilder.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CreateFileBuilder.java @@ -224,10 +224,11 @@ public static final class CreateFileOptions { private final Map headers; /** - * @param flags creation flags - * @param recursive create parent dirs? + * @param flags creation flags + * @param recursive create parent dirs? * @param performance performance flag - * @param headers nullable header map. + * @param + * @param headers nullable header map. */ public CreateFileOptions( final EnumSet flags, diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java index 7d23c10d8b5c8..2979da6930067 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java @@ -40,6 +40,7 @@ import static org.apache.hadoop.fs.s3a.Constants.DIRECTORY_OPERATIONS_PURGE_UPLOADS; import static org.apache.hadoop.fs.s3a.Constants.ENABLE_MULTI_DELETE; import static org.apache.hadoop.fs.s3a.Constants.FIPS_ENDPOINT; +import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_IF_NONE_MATCH; import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_PERFORMANCE; import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_PERFORMANCE_ENABLED; import static org.apache.hadoop.fs.s3a.Constants.STORE_CAPABILITY_AWS_V2; @@ -260,7 +261,7 @@ private InternalConstants() { */ public static final Set CREATE_FILE_KEYS = Collections.unmodifiableSet( - new HashSet<>(Arrays.asList(FS_S3A_CREATE_PERFORMANCE))); + new HashSet<>(Arrays.asList(FS_S3A_CREATE_PERFORMANCE, FS_S3A_CREATE_IF_NONE_MATCH))); /** * Dynamic Path capabilities to be evaluated diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java index df2a6567dbdec..11a1939725953 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java @@ -59,6 +59,7 @@ import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets; import static org.apache.commons.lang3.StringUtils.isNotEmpty; +import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_IF_NONE_MATCH; import static org.apache.hadoop.fs.s3a.S3AEncryptionMethods.UNKNOWN_ALGORITHM; import static org.apache.hadoop.fs.s3a.impl.InternalConstants.DEFAULT_UPLOAD_PART_COUNT_LIMIT; import static org.apache.hadoop.util.Preconditions.checkArgument; @@ -517,12 +518,22 @@ public CreateMultipartUploadRequest.Builder newMultipartUploadRequestBuilder( public CompleteMultipartUploadRequest.Builder newCompleteMultipartUploadRequestBuilder( String destKey, String uploadId, - List partETags) { + List partETags, + PutObjectOptions putOptions) { + // a copy of the list is required, so that the AWS SDK doesn't // attempt to sort an unmodifiable list. - CompleteMultipartUploadRequest.Builder requestBuilder = - CompleteMultipartUploadRequest.builder().bucket(bucket).key(destKey).uploadId(uploadId) + CompleteMultipartUploadRequest.Builder requestBuilder; + Map optionHeaders = putOptions.getHeaders(); + + if (optionHeaders != null && optionHeaders.containsKey("If-None-Match")) { + requestBuilder = CompleteMultipartUploadRequest.builder().bucket(bucket).key(destKey).uploadId(uploadId) + .overrideConfiguration(override ->override.putHeader("If-None-Match", optionHeaders.get("If-None-Match"))) .multipartUpload(CompletedMultipartUpload.builder().parts(partETags).build()); + } else { + requestBuilder = CompleteMultipartUploadRequest.builder().bucket(bucket).key(destKey).uploadId(uploadId) + .multipartUpload(CompletedMultipartUpload.builder().parts(partETags).build()); + } return prepareRequest(requestBuilder); } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatch.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatch.java new file mode 100644 index 0000000000000..2b8d98c9a224b --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatch.java @@ -0,0 +1,113 @@ +package org.apache.hadoop.fs.s3a.impl; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FSDataOutputStreamBuilder; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.AbstractS3ATestBase; +import org.apache.hadoop.fs.s3a.RemoteFileChangedException; +import org.apache.hadoop.fs.s3a.S3ATestUtils; +import org.apache.hadoop.io.IOUtils; + +import org.junit.Assert; +import org.junit.Test; +import software.amazon.awssdk.services.s3.model.S3Exception; + +import java.io.IOException; +import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset; +import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BUFFER; +import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BUFFER_ARRAY; +import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_IF_NONE_MATCH; +import static org.apache.hadoop.fs.s3a.Constants.MIN_MULTIPART_THRESHOLD; +import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_MIN_SIZE; +import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_SIZE; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; +import static org.apache.hadoop.fs.s3a.impl.InternalConstants.UPLOAD_PART_COUNT_LIMIT; +import static org.apache.hadoop.fs.s3a.scale.ITestS3AMultipartUploadSizeLimits.MPU_SIZE; +import static org.apache.hadoop.fs.s3a.scale.S3AScaleTestBase._1MB; + + +public class ITestS3APutIfMatch extends AbstractS3ATestBase { + + @Override + protected Configuration createConfiguration() { + Configuration conf = super.createConfiguration(); + S3ATestUtils.disableFilesystemCaching(conf); + removeBaseAndBucketOverrides(conf, + MULTIPART_SIZE, + UPLOAD_PART_COUNT_LIMIT); + conf.setLong(MULTIPART_SIZE, MPU_SIZE); + conf.setLong(UPLOAD_PART_COUNT_LIMIT, 2); + conf.setLong(MIN_MULTIPART_THRESHOLD, MULTIPART_MIN_SIZE); + conf.setInt(MULTIPART_SIZE, MULTIPART_MIN_SIZE); + conf.set(FAST_UPLOAD_BUFFER, getBlockOutputBufferName()); + return conf; + } + + protected String getBlockOutputBufferName() { + return FAST_UPLOAD_BUFFER_ARRAY; + } + + /** + * Create a file using the PutIfMatch feature from S3 + * @param fs filesystem + * @param path path to write + * @param data source dataset. Can be null + * @throws IOException on any problem + */ + private static void createFileWithIfNoneMatchFlag(FileSystem fs, + Path path, + byte[] data, + String ifMatchTag) throws Exception { + FSDataOutputStreamBuilder builder = fs.createFile(path); + builder.must(FS_S3A_CREATE_IF_NONE_MATCH, ifMatchTag); + FSDataOutputStream stream = builder.create().build(); + if (data != null && data.length > 0) { + stream.write(data); + } + stream.close(); + IOUtils.closeStream(stream); + } + + @Test + public void testPutIfAbsentConflict() throws IOException { + FileSystem fs = getFileSystem(); + Path testFile = methodPath(); + + fs.mkdirs(testFile.getParent()); + byte[] fileBytes = dataset(TEST_FILE_LEN, 0, 255); + + try { + createFileWithIfNoneMatchFlag(fs, testFile, fileBytes, "*"); + createFileWithIfNoneMatchFlag(fs, testFile, fileBytes, "*"); + } catch (Exception e) { + Assert.assertEquals(RemoteFileChangedException.class, e.getClass()); + + S3Exception s3Exception = (S3Exception) e.getCause(); + Assert.assertEquals(s3Exception.statusCode(), 412); + } + } + + + @Test + public void testPutIfAbsentLargeFileConflict() throws IOException { + FileSystem fs = getFileSystem(); + Path testFile = methodPath(); + + fs.mkdirs(testFile.getParent()); + // enough bytes for Multipart Upload + byte[] fileBytes = dataset(6 * _1MB, 'a', 'z' - 'a'); + + try { + createFileWithIfNoneMatchFlag(fs, testFile, fileBytes, "*"); + createFileWithIfNoneMatchFlag(fs, testFile, fileBytes, "*"); + } catch (Exception e) { + Assert.assertEquals(RemoteFileChangedException.class, e.getClass()); + + // Error gets caught here: + S3Exception s3Exception = (S3Exception) e.getCause(); + Assert.assertEquals(s3Exception.statusCode(), 412); + } + } +} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestCreateFileBuilder.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestCreateFileBuilder.java index 65d7aa6192dd8..a582a9ce6fc50 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestCreateFileBuilder.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestCreateFileBuilder.java @@ -89,11 +89,13 @@ public void testPerformanceSupport() throws Throwable { public void testHeaderOptions() throws Throwable { final CreateFileBuilder builder = mkBuilder().create() .must(FS_S3A_CREATE_HEADER + ".retention", "permanent") + .must(FS_S3A_CREATE_HEADER + ".If-None-Match", "*") .opt(FS_S3A_CREATE_HEADER + ".owner", "engineering"); final Map headers = build(builder).getHeaders(); Assertions.assertThat(headers) .containsEntry("retention", "permanent") - .containsEntry("owner", "engineering"); + .containsEntry("owner", "engineering") + .containsEntry("If-None-Match", "*"); } @Test diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestRequestFactory.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestRequestFactory.java index 9fee2fd63a0ef..7d7d2e95280b7 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestRequestFactory.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestRequestFactory.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.concurrent.atomic.AtomicLong; import software.amazon.awssdk.awscore.AwsRequest; @@ -162,7 +163,7 @@ private void createFactoryObjects(RequestFactory factory) throws IOException { String id = "1"; a(factory.newAbortMultipartUploadRequestBuilder(path, id)); a(factory.newCompleteMultipartUploadRequestBuilder(path, id, - new ArrayList<>())); + new ArrayList<>(), new PutObjectOptions(false, "some class", Collections.emptyMap()))); a(factory.newCopyObjectRequestBuilder(path, path2, HeadObjectResponse.builder().build())); a(factory.newDeleteObjectRequestBuilder(path)); From b5637c075ed3526abc9c9ba9113f112206564b1b Mon Sep 17 00:00:00 2001 From: diljot grewal Date: Thu, 26 Sep 2024 21:31:07 -0700 Subject: [PATCH 2/6] addressing PR feedback --- .../org/apache/hadoop/fs/s3a/Constants.java | 2 +- .../hadoop/fs/s3a/S3ABlockOutputStream.java | 18 ++++++-- .../hadoop/fs/s3a/api/RequestFactory.java | 11 ++--- .../apache/hadoop/fs/s3a/impl/AWSHeaders.java | 1 + .../hadoop/fs/s3a/impl/CreateFileBuilder.java | 30 +++++++++---- .../hadoop/fs/s3a/impl/InternalConstants.java | 4 +- .../fs/s3a/impl/RequestFactoryImpl.java | 12 +++-- .../fs/s3a/impl/ITestS3APutIfMatch.java | 45 +++++++++++++++---- .../fs/s3a/impl/TestCreateFileBuilder.java | 5 ++- 9 files changed, 91 insertions(+), 37 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java index ae2b895145b9f..5f0f156a99fe4 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java @@ -1395,7 +1395,7 @@ private Constants() { * This can be set in the {code createFile()} builder. * Value {@value}. */ - public static final String FS_S3A_CREATE_IF_NONE_MATCH = "fs.s3a.create.header.If-None-Match"; + public static final String FS_S3A_CONDITIONAL_FILE_CREATE = "fs.s3a.conditional.file.create"; /** * Default value for create performance in an S3A FS. diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java index db43878495509..7d68bd0686e50 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java @@ -72,6 +72,7 @@ import static org.apache.hadoop.fs.s3a.S3AUtils.*; import static org.apache.hadoop.fs.s3a.Statistic.*; import static org.apache.hadoop.fs.s3a.impl.ProgressListenerEvent.*; +import static org.apache.hadoop.fs.s3a.impl.AWSHeaders.IF_NONE_MATCH; import static org.apache.hadoop.fs.s3a.statistics.impl.EmptyS3AStatisticsContext.EMPTY_BLOCK_OUTPUT_STREAM_STATISTICS; import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration; import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfInvocation; @@ -100,8 +101,6 @@ class S3ABlockOutputStream extends OutputStream implements private static final String E_NOT_SYNCABLE = "S3A streams are not Syncable. See HADOOP-17597."; - public static final String IF_NONE_MATCH_HEADER = "If-None-Match"; - /** Object being uploaded. */ private final String key; @@ -614,9 +613,9 @@ private long putObject() throws IOException { PutObjectRequest.Builder maybeModifiedPutIfAbsentRequest = putObjectRequest.toBuilder(); Map optionHeaders = builder.putOptions.getHeaders(); - if (optionHeaders != null && optionHeaders.containsKey(IF_NONE_MATCH_HEADER)) { + if (optionHeaders != null && optionHeaders.containsKey(IF_NONE_MATCH)) { maybeModifiedPutIfAbsentRequest.overrideConfiguration( - override -> override.putHeader(IF_NONE_MATCH_HEADER, optionHeaders.get(IF_NONE_MATCH_HEADER))); + override -> override.putHeader(IF_NONE_MATCH, optionHeaders.get(IF_NONE_MATCH))); } final PutObjectRequest finalizedRequest = maybeModifiedPutIfAbsentRequest.build(); @@ -1185,6 +1184,11 @@ public static final class BlockOutputStreamBuilder { */ private boolean isMultipartUploadEnabled; + /** + * Is conditional create enables. + */ + private boolean isConditionalEnabled; + private BlockOutputStreamBuilder() { } @@ -1341,5 +1345,11 @@ public BlockOutputStreamBuilder withMultipartEnabled( isMultipartUploadEnabled = value; return this; } + + public BlockOutputStreamBuilder withConditionalEnabled( + final boolean value){ + isConditionalEnabled = value; + return this; + } } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/RequestFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/RequestFactory.java index bfb4605e24038..54784d4c10d2f 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/RequestFactory.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/RequestFactory.java @@ -165,16 +165,17 @@ CreateMultipartUploadRequest.Builder newMultipartUploadRequestBuilder( /** * Complete a multipart upload. - * - * @param destKey destination object key - * @param uploadId ID of initiated upload - * @param partETags ordered list of etags + * @param destKey destination object key + * @param uploadId ID of initiated upload + * @param partETags ordered list of etags + * @param putOptions options for the request * @return the request builder. */ CompleteMultipartUploadRequest.Builder newCompleteMultipartUploadRequestBuilder( String destKey, String uploadId, - List partETags, PutObjectOptions putOptions); + List partETags, + PutObjectOptions putOptions); /** * Create a HEAD object request builder. diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/AWSHeaders.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/AWSHeaders.java index aaca3b9b194d6..572e3acd70b74 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/AWSHeaders.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/AWSHeaders.java @@ -38,6 +38,7 @@ public interface AWSHeaders { String DATE = "Date"; String ETAG = "ETag"; String LAST_MODIFIED = "Last-Modified"; + String IF_NONE_MATCH = "If-None-Match"; /* * Amazon HTTP Headers used by S3A. diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CreateFileBuilder.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CreateFileBuilder.java index b50ac6aa948d8..2421bfc17488a 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CreateFileBuilder.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CreateFileBuilder.java @@ -63,19 +63,19 @@ public class CreateFileBuilder extends * Classic create file option set: overwriting. */ public static final CreateFileOptions OPTIONS_CREATE_FILE_OVERWRITE = - new CreateFileOptions(CREATE_OVERWRITE_FLAGS, true, false, null); + new CreateFileOptions(CREATE_OVERWRITE_FLAGS, true, false, false, null); /** * Classic create file option set: no overwrite. */ public static final CreateFileOptions OPTIONS_CREATE_FILE_NO_OVERWRITE = - new CreateFileOptions(CREATE_NO_OVERWRITE_FLAGS, true, false, null); + new CreateFileOptions(CREATE_NO_OVERWRITE_FLAGS, true, false, false, null); /** * Performance create options. */ public static final CreateFileOptions OPTIONS_CREATE_FILE_PERFORMANCE = - new CreateFileOptions(CREATE_OVERWRITE_FLAGS, true, true, null); + new CreateFileOptions(CREATE_OVERWRITE_FLAGS, true, true, false, null); /** * Callback interface. @@ -144,10 +144,12 @@ public FSDataOutputStream build() throws IOException { final boolean performance = options.getBoolean(Constants.FS_S3A_CREATE_PERFORMANCE, false); + final boolean conditionalCreate = + options.getBoolean(Constants.FS_S3A_CONDITIONAL_FILE_CREATE, false); return callbacks.createFileFromBuilder( path, getProgress(), - new CreateFileOptions(flags, isRecursive(), performance, headers)); + new CreateFileOptions(flags, isRecursive(), performance, conditionalCreate, headers)); } @@ -218,26 +220,33 @@ public static final class CreateFileOptions { */ private final boolean performance; + /** + * conditional flag. + */ + private final boolean conditionalCreate; + /** * Headers; may be null. */ private final Map headers; /** - * @param flags creation flags - * @param recursive create parent dirs? + * @param flags creation flags + * @param recursive create parent dirs? * @param performance performance flag - * @param - * @param headers nullable header map. + * @param conditionalCreate conditional flag + * @param headers nullable header map. */ public CreateFileOptions( final EnumSet flags, final boolean recursive, final boolean performance, + final boolean conditionalCreate, final Map headers) { this.flags = flags; this.recursive = recursive; this.performance = performance; + this.conditionalCreate = conditionalCreate; this.headers = headers; } @@ -247,6 +256,7 @@ public String toString() { "flags=" + flags + ", recursive=" + recursive + ", performance=" + performance + + ", conditionalCreate=" + conditionalCreate + ", headers=" + headers + '}'; } @@ -263,6 +273,10 @@ public boolean isPerformance() { return performance; } + public boolean isConditionalCreate() { + return conditionalCreate; + } + public Map getHeaders() { return headers; } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java index 2979da6930067..5b7ab29fe7d77 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java @@ -40,7 +40,7 @@ import static org.apache.hadoop.fs.s3a.Constants.DIRECTORY_OPERATIONS_PURGE_UPLOADS; import static org.apache.hadoop.fs.s3a.Constants.ENABLE_MULTI_DELETE; import static org.apache.hadoop.fs.s3a.Constants.FIPS_ENDPOINT; -import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_IF_NONE_MATCH; +import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CONDITIONAL_FILE_CREATE; import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_PERFORMANCE; import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_PERFORMANCE_ENABLED; import static org.apache.hadoop.fs.s3a.Constants.STORE_CAPABILITY_AWS_V2; @@ -261,7 +261,7 @@ private InternalConstants() { */ public static final Set CREATE_FILE_KEYS = Collections.unmodifiableSet( - new HashSet<>(Arrays.asList(FS_S3A_CREATE_PERFORMANCE, FS_S3A_CREATE_IF_NONE_MATCH))); + new HashSet<>(Arrays.asList(FS_S3A_CREATE_PERFORMANCE, FS_S3A_CONDITIONAL_FILE_CREATE))); /** * Dynamic Path capabilities to be evaluated diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java index 11a1939725953..4629fbf8aff2a 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java @@ -59,7 +59,7 @@ import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets; import static org.apache.commons.lang3.StringUtils.isNotEmpty; -import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_IF_NONE_MATCH; +import static org.apache.hadoop.fs.s3a.impl.AWSHeaders.IF_NONE_MATCH; import static org.apache.hadoop.fs.s3a.S3AEncryptionMethods.UNKNOWN_ALGORITHM; import static org.apache.hadoop.fs.s3a.impl.InternalConstants.DEFAULT_UPLOAD_PART_COUNT_LIMIT; import static org.apache.hadoop.util.Preconditions.checkArgument; @@ -526,13 +526,11 @@ public CompleteMultipartUploadRequest.Builder newCompleteMultipartUploadRequestB CompleteMultipartUploadRequest.Builder requestBuilder; Map optionHeaders = putOptions.getHeaders(); - if (optionHeaders != null && optionHeaders.containsKey("If-None-Match")) { - requestBuilder = CompleteMultipartUploadRequest.builder().bucket(bucket).key(destKey).uploadId(uploadId) - .overrideConfiguration(override ->override.putHeader("If-None-Match", optionHeaders.get("If-None-Match"))) - .multipartUpload(CompletedMultipartUpload.builder().parts(partETags).build()); - } else { - requestBuilder = CompleteMultipartUploadRequest.builder().bucket(bucket).key(destKey).uploadId(uploadId) + requestBuilder = CompleteMultipartUploadRequest.builder().bucket(bucket).key(destKey).uploadId(uploadId) .multipartUpload(CompletedMultipartUpload.builder().parts(partETags).build()); + if (optionHeaders != null && optionHeaders.containsKey(IF_NONE_MATCH)) { + requestBuilder = CompleteMultipartUploadRequest.builder().overrideConfiguration( + override ->override.putHeader(IF_NONE_MATCH, optionHeaders.get(IF_NONE_MATCH))); } return prepareRequest(requestBuilder); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatch.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatch.java index 2b8d98c9a224b..279ec9cdf88da 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatch.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatch.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hadoop.fs.s3a.impl; import org.apache.hadoop.conf.Configuration; @@ -5,11 +23,12 @@ import org.apache.hadoop.fs.FSDataOutputStreamBuilder; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.s3a.AbstractS3ATestBase; +import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest; import org.apache.hadoop.fs.s3a.RemoteFileChangedException; import org.apache.hadoop.fs.s3a.S3ATestUtils; import org.apache.hadoop.io.IOUtils; +import org.assertj.core.api.Assertions; import org.junit.Assert; import org.junit.Test; import software.amazon.awssdk.services.s3.model.S3Exception; @@ -18,20 +37,23 @@ import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset; import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BUFFER; import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BUFFER_ARRAY; -import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_IF_NONE_MATCH; +import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CONDITIONAL_FILE_CREATE; import static org.apache.hadoop.fs.s3a.Constants.MIN_MULTIPART_THRESHOLD; import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_MIN_SIZE; import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_SIZE; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfNotEnabled; import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; import static org.apache.hadoop.fs.s3a.impl.InternalConstants.UPLOAD_PART_COUNT_LIMIT; import static org.apache.hadoop.fs.s3a.scale.ITestS3AMultipartUploadSizeLimits.MPU_SIZE; import static org.apache.hadoop.fs.s3a.scale.S3AScaleTestBase._1MB; -public class ITestS3APutIfMatch extends AbstractS3ATestBase { +public class ITestS3APutIfMatch extends AbstractS3ACostTest { + + private Configuration conf; @Override - protected Configuration createConfiguration() { + public Configuration createConfiguration() { Configuration conf = super.createConfiguration(); S3ATestUtils.disableFilesystemCaching(conf); removeBaseAndBucketOverrides(conf, @@ -45,6 +67,14 @@ protected Configuration createConfiguration() { return conf; } + @Override + public void setup() throws Exception { + super.setup(); + conf = createConfiguration(); + skipIfNotEnabled(conf, FS_S3A_CONDITIONAL_FILE_CREATE, + "Skipping IfNoneMatch tests"); + } + protected String getBlockOutputBufferName() { return FAST_UPLOAD_BUFFER_ARRAY; } @@ -61,7 +91,7 @@ private static void createFileWithIfNoneMatchFlag(FileSystem fs, byte[] data, String ifMatchTag) throws Exception { FSDataOutputStreamBuilder builder = fs.createFile(path); - builder.must(FS_S3A_CREATE_IF_NONE_MATCH, ifMatchTag); + builder.must(FS_S3A_CONDITIONAL_FILE_CREATE, ifMatchTag); FSDataOutputStream stream = builder.create().build(); if (data != null && data.length > 0) { stream.write(data); @@ -85,7 +115,7 @@ public void testPutIfAbsentConflict() throws IOException { Assert.assertEquals(RemoteFileChangedException.class, e.getClass()); S3Exception s3Exception = (S3Exception) e.getCause(); - Assert.assertEquals(s3Exception.statusCode(), 412); + Assertions.assertThat(s3Exception.statusCode()).isEqualTo(412); } } @@ -95,7 +125,6 @@ public void testPutIfAbsentLargeFileConflict() throws IOException { FileSystem fs = getFileSystem(); Path testFile = methodPath(); - fs.mkdirs(testFile.getParent()); // enough bytes for Multipart Upload byte[] fileBytes = dataset(6 * _1MB, 'a', 'z' - 'a'); @@ -107,7 +136,7 @@ public void testPutIfAbsentLargeFileConflict() throws IOException { // Error gets caught here: S3Exception s3Exception = (S3Exception) e.getCause(); - Assert.assertEquals(s3Exception.statusCode(), 412); + Assertions.assertThat(s3Exception.statusCode()).isEqualTo(412); } } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestCreateFileBuilder.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestCreateFileBuilder.java index a582a9ce6fc50..9759cf610e8f0 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestCreateFileBuilder.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestCreateFileBuilder.java @@ -37,6 +37,7 @@ import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_HEADER; import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_PERFORMANCE; import static org.apache.hadoop.test.LambdaTestUtils.intercept; +import static org.apache.hadoop.fs.s3a.impl.AWSHeaders.IF_NONE_MATCH; /** * Unit test of {@link CreateFileBuilder}. @@ -89,13 +90,13 @@ public void testPerformanceSupport() throws Throwable { public void testHeaderOptions() throws Throwable { final CreateFileBuilder builder = mkBuilder().create() .must(FS_S3A_CREATE_HEADER + ".retention", "permanent") - .must(FS_S3A_CREATE_HEADER + ".If-None-Match", "*") + .must(FS_S3A_CREATE_HEADER + ".".concat(IF_NONE_MATCH), "*") .opt(FS_S3A_CREATE_HEADER + ".owner", "engineering"); final Map headers = build(builder).getHeaders(); Assertions.assertThat(headers) .containsEntry("retention", "permanent") .containsEntry("owner", "engineering") - .containsEntry("If-None-Match", "*"); + .containsEntry(IF_NONE_MATCH, "*"); } @Test From a36fa159cb27e78314761efc88bcdda0f50b2972 Mon Sep 17 00:00:00 2001 From: Diljot Date: Tue, 1 Oct 2024 23:19:02 -0700 Subject: [PATCH 3/6] addressing a minor issue missed in merge commit --- .../java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java index f893b9731b617..5e2b5addb1554 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java @@ -691,7 +691,7 @@ private long putObject() throws IOException { final S3ADataBlocks.DataBlock block = getActiveBlock(); final long size = block.dataSize(); final S3ADataBlocks.BlockUploadData uploadData = block.startUpload(); - final PutObjectRequest putObjectRequest = + PutObjectRequest putObjectRequest = writeOperationHelper.createPutObjectRequest( key, uploadData.getSize(), From 0eddecb91d49e79bcce9c11fd7f36f18694167f0 Mon Sep 17 00:00:00 2001 From: diljot grewal Date: Wed, 9 Oct 2024 23:44:04 -0700 Subject: [PATCH 4/6] addressing review comments, added flag to PutObjectOptions --- .../hadoop/fs/s3a/S3ABlockOutputStream.java | 19 +++--- .../apache/hadoop/fs/s3a/S3AFileSystem.java | 6 +- .../commit/magic/S3MagicCommitTracker.java | 2 +- .../hadoop/fs/s3a/impl/PutObjectOptions.java | 20 +++++- .../fs/s3a/impl/RequestFactoryImpl.java | 6 +- .../fs/s3a/impl/ITestS3APutIfMatch.java | 64 +++++++++++-------- .../fs/s3a/impl/TestCreateFileBuilder.java | 2 +- .../fs/s3a/impl/TestRequestFactory.java | 2 +- 8 files changed, 73 insertions(+), 48 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java index 5e2b5addb1554..312aaadef10c3 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java @@ -77,9 +77,9 @@ import static java.util.Objects.requireNonNull; import static org.apache.hadoop.fs.s3a.Statistic.*; +import static org.apache.hadoop.fs.s3a.impl.AWSHeaders.IF_NONE_MATCH; import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.CONTENT_TYPE_OCTET_STREAM; import static org.apache.hadoop.fs.s3a.impl.ProgressListenerEvent.*; -import static org.apache.hadoop.fs.s3a.impl.AWSHeaders.IF_NONE_MATCH; import static org.apache.hadoop.fs.s3a.statistics.impl.EmptyS3AStatisticsContext.EMPTY_BLOCK_OUTPUT_STREAM_STATISTICS; import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration; import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfInvocation; @@ -699,13 +699,10 @@ private long putObject() throws IOException { clearActiveBlock(); PutObjectRequest.Builder maybeModifiedPutIfAbsentRequest = putObjectRequest.toBuilder(); - Map optionHeaders = builder.putOptions.getHeaders(); - - if (optionHeaders != null && optionHeaders.containsKey(IF_NONE_MATCH)) { + if (builder.isConditionalPutEnabled){ maybeModifiedPutIfAbsentRequest.overrideConfiguration( - override -> override.putHeader(IF_NONE_MATCH, optionHeaders.get(IF_NONE_MATCH))); + override -> override.putHeader(IF_NONE_MATCH, "*")); } - final PutObjectRequest finalizedRequest = maybeModifiedPutIfAbsentRequest.build(); BlockUploadProgress progressCallback = @@ -714,7 +711,7 @@ private long putObject() throws IOException { try { progressCallback.progressChanged(PUT_STARTED_EVENT); // the putObject call automatically closes the upload data - writeOperationHelper.putObject(putObjectRequest, + writeOperationHelper.putObject(finalizedRequest, builder.putOptions, uploadData, statistics); @@ -1401,9 +1398,9 @@ public static final class BlockOutputStreamBuilder { private boolean isMultipartUploadEnabled; /** - * Is conditional create enables. + * Is conditional create enabled. */ - private boolean isConditionalEnabled; + private boolean isConditionalPutEnabled; private BlockOutputStreamBuilder() { } @@ -1567,9 +1564,9 @@ public BlockOutputStreamBuilder withMultipartEnabled( return this; } - public BlockOutputStreamBuilder withConditionalEnabled( + public BlockOutputStreamBuilder withConditionalPutEnabled( final boolean value){ - isConditionalEnabled = value; + isConditionalPutEnabled = value; return this; } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 3ab19dfa1914f..cb4aaf866250c 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -2194,8 +2194,9 @@ private FSDataOutputStream innerCreateFile( // put options are derived from the path and the // option builder. boolean keep = options.isPerformance() || keepDirectoryMarkers(path); + boolean conditionalCreate = options.isConditionalCreate(); final PutObjectOptions putOptions = - new PutObjectOptions(keep, null, options.getHeaders()); + new PutObjectOptions(keep, false, null, options.getHeaders()); validateOutputStreamConfiguration(path, getConf()); @@ -2223,7 +2224,8 @@ private FSDataOutputStream innerCreateFile( .withPutOptions(putOptions) .withIOStatisticsAggregator( IOStatisticsContext.getCurrentIOStatisticsContext().getAggregator()) - .withMultipartEnabled(isMultipartUploadEnabled); + .withMultipartEnabled(isMultipartUploadEnabled) + .withConditionalPutEnabled(conditionalCreate); return new FSDataOutputStream( new S3ABlockOutputStream(builder), null); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/S3MagicCommitTracker.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/S3MagicCommitTracker.java index 1f6c9123bae62..e76fda67b203d 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/S3MagicCommitTracker.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/S3MagicCommitTracker.java @@ -79,7 +79,7 @@ public boolean aboutToComplete(String uploadId, PutObjectRequest originalDestPut = getWriter().createPutObjectRequest( getOriginalDestKey(), 0, - new PutObjectOptions(true, null, headers)); + new PutObjectOptions(true,false, null, headers)); upload(originalDestPut, EMPTY); // build the commit summary diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/PutObjectOptions.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/PutObjectOptions.java index e14285a1ca8b1..2f81745ba56e0 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/PutObjectOptions.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/PutObjectOptions.java @@ -31,6 +31,11 @@ public final class PutObjectOptions { */ private final boolean keepMarkers; + /** + * Is this a conditional PUT operation + */ + private final boolean conditionalPutEnabled; + /** * Storage class, if not null. */ @@ -45,13 +50,16 @@ public final class PutObjectOptions { * Constructor. * @param keepMarkers Can the PUT operation skip marker deletion? * @param storageClass Storage class, if not null. + * @param conditionalPutEnabled Is this a conditional Put? * @param headers Headers; may be null. */ public PutObjectOptions( final boolean keepMarkers, + final boolean conditionalPutEnabled, @Nullable final String storageClass, @Nullable final Map headers) { this.keepMarkers = keepMarkers; + this.conditionalPutEnabled = conditionalPutEnabled; this.storageClass = storageClass; this.headers = headers; } @@ -64,6 +72,14 @@ public boolean isKeepMarkers() { return keepMarkers; } + /** + * Get the conditional put flag. + * @return true if it's a conditional put + */ + public boolean isconditionalPutEnabled() { + return conditionalPutEnabled; + } + /** * Headers for the put/post request. * @return headers or null. @@ -80,9 +96,9 @@ public String toString() { '}'; } - private static final PutObjectOptions KEEP_DIRS = new PutObjectOptions(true, + private static final PutObjectOptions KEEP_DIRS = new PutObjectOptions(true, false, null, null); - private static final PutObjectOptions DELETE_DIRS = new PutObjectOptions(false, + private static final PutObjectOptions DELETE_DIRS = new PutObjectOptions(false, false, null, null); /** diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java index db5caf97bd58e..2ba5b60fe48b9 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java @@ -539,13 +539,13 @@ public CompleteMultipartUploadRequest.Builder newCompleteMultipartUploadRequestB // a copy of the list is required, so that the AWS SDK doesn't // attempt to sort an unmodifiable list. CompleteMultipartUploadRequest.Builder requestBuilder; - Map optionHeaders = putOptions.getHeaders(); requestBuilder = CompleteMultipartUploadRequest.builder().bucket(bucket).key(destKey).uploadId(uploadId) .multipartUpload(CompletedMultipartUpload.builder().parts(partETags).build()); - if (optionHeaders != null && optionHeaders.containsKey(IF_NONE_MATCH)) { + if (putOptions.isconditionalPutEnabled()){ requestBuilder = CompleteMultipartUploadRequest.builder().overrideConfiguration( - override ->override.putHeader(IF_NONE_MATCH, optionHeaders.get(IF_NONE_MATCH))); + override ->override.putHeader(IF_NONE_MATCH, "*")); + } return prepareRequest(requestBuilder); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatch.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatch.java index 279ec9cdf88da..9a10ac544cec1 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatch.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatch.java @@ -34,6 +34,8 @@ import software.amazon.awssdk.services.s3.model.S3Exception; import java.io.IOException; +import java.nio.file.AccessDeniedException; + import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset; import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BUFFER; import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BUFFER_ARRAY; @@ -46,6 +48,7 @@ import static org.apache.hadoop.fs.s3a.impl.InternalConstants.UPLOAD_PART_COUNT_LIMIT; import static org.apache.hadoop.fs.s3a.scale.ITestS3AMultipartUploadSizeLimits.MPU_SIZE; import static org.apache.hadoop.fs.s3a.scale.S3AScaleTestBase._1MB; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; public class ITestS3APutIfMatch extends AbstractS3ACostTest { @@ -56,14 +59,16 @@ public class ITestS3APutIfMatch extends AbstractS3ACostTest { public Configuration createConfiguration() { Configuration conf = super.createConfiguration(); S3ATestUtils.disableFilesystemCaching(conf); - removeBaseAndBucketOverrides(conf, - MULTIPART_SIZE, - UPLOAD_PART_COUNT_LIMIT); + removeBaseAndBucketOverrides( + conf, + MULTIPART_SIZE, + UPLOAD_PART_COUNT_LIMIT, + MIN_MULTIPART_THRESHOLD, + MULTIPART_SIZE); conf.setLong(MULTIPART_SIZE, MPU_SIZE); conf.setLong(UPLOAD_PART_COUNT_LIMIT, 2); conf.setLong(MIN_MULTIPART_THRESHOLD, MULTIPART_MIN_SIZE); conf.setInt(MULTIPART_SIZE, MULTIPART_MIN_SIZE); - conf.set(FAST_UPLOAD_BUFFER, getBlockOutputBufferName()); return conf; } @@ -75,6 +80,14 @@ public void setup() throws Exception { "Skipping IfNoneMatch tests"); } + private static void assertS3ExceptionStatusCode(int code, Exception ex) { + S3Exception s3Exception = (S3Exception) ex.getCause(); + + if (s3Exception.statusCode() != code) { + throw new AssertionError("Expected status code " + code + " from " + ex, ex); + } + } + protected String getBlockOutputBufferName() { return FAST_UPLOAD_BUFFER_ARRAY; } @@ -86,10 +99,11 @@ protected String getBlockOutputBufferName() { * @param data source dataset. Can be null * @throws IOException on any problem */ - private static void createFileWithIfNoneMatchFlag(FileSystem fs, - Path path, - byte[] data, - String ifMatchTag) throws Exception { + private static void createFileWithIfNoneMatchFlag( + FileSystem fs, + Path path, + byte[] data, + String ifMatchTag) throws Exception { FSDataOutputStreamBuilder builder = fs.createFile(path); builder.must(FS_S3A_CONDITIONAL_FILE_CREATE, ifMatchTag); FSDataOutputStream stream = builder.create().build(); @@ -101,42 +115,38 @@ private static void createFileWithIfNoneMatchFlag(FileSystem fs, } @Test - public void testPutIfAbsentConflict() throws IOException { + public void testPutIfAbsentConflict() throws Throwable { FileSystem fs = getFileSystem(); Path testFile = methodPath(); fs.mkdirs(testFile.getParent()); byte[] fileBytes = dataset(TEST_FILE_LEN, 0, 255); - try { - createFileWithIfNoneMatchFlag(fs, testFile, fileBytes, "*"); - createFileWithIfNoneMatchFlag(fs, testFile, fileBytes, "*"); - } catch (Exception e) { - Assert.assertEquals(RemoteFileChangedException.class, e.getClass()); + RemoteFileChangedException firstException = intercept(RemoteFileChangedException.class, + () -> createFileWithIfNoneMatchFlag(fs, testFile, fileBytes, "*")); + assertS3ExceptionStatusCode(412, firstException); - S3Exception s3Exception = (S3Exception) e.getCause(); - Assertions.assertThat(s3Exception.statusCode()).isEqualTo(412); - } + RemoteFileChangedException secondException = intercept(RemoteFileChangedException.class, + () -> createFileWithIfNoneMatchFlag(fs, testFile, fileBytes, "*")); + assertS3ExceptionStatusCode(412, secondException); } @Test - public void testPutIfAbsentLargeFileConflict() throws IOException { + public void testPutIfAbsentLargeFileConflict() throws Throwable { FileSystem fs = getFileSystem(); Path testFile = methodPath(); // enough bytes for Multipart Upload byte[] fileBytes = dataset(6 * _1MB, 'a', 'z' - 'a'); - try { - createFileWithIfNoneMatchFlag(fs, testFile, fileBytes, "*"); - createFileWithIfNoneMatchFlag(fs, testFile, fileBytes, "*"); - } catch (Exception e) { - Assert.assertEquals(RemoteFileChangedException.class, e.getClass()); + RemoteFileChangedException firstException = intercept(RemoteFileChangedException.class, + () -> createFileWithIfNoneMatchFlag(fs, testFile, fileBytes, "*")); + assertS3ExceptionStatusCode(412, firstException); + + RemoteFileChangedException secondException = intercept(RemoteFileChangedException.class, + () -> createFileWithIfNoneMatchFlag(fs, testFile, fileBytes, "*")); + assertS3ExceptionStatusCode(412, secondException); - // Error gets caught here: - S3Exception s3Exception = (S3Exception) e.getCause(); - Assertions.assertThat(s3Exception.statusCode()).isEqualTo(412); - } } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestCreateFileBuilder.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestCreateFileBuilder.java index 9759cf610e8f0..c9682abab3e7f 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestCreateFileBuilder.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestCreateFileBuilder.java @@ -90,7 +90,7 @@ public void testPerformanceSupport() throws Throwable { public void testHeaderOptions() throws Throwable { final CreateFileBuilder builder = mkBuilder().create() .must(FS_S3A_CREATE_HEADER + ".retention", "permanent") - .must(FS_S3A_CREATE_HEADER + ".".concat(IF_NONE_MATCH), "*") + .must(FS_S3A_CREATE_HEADER + "." + IF_NONE_MATCH, "*") .opt(FS_S3A_CREATE_HEADER + ".owner", "engineering"); final Map headers = build(builder).getHeaders(); Assertions.assertThat(headers) diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestRequestFactory.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestRequestFactory.java index db1262296428c..787dadf5b7eea 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestRequestFactory.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestRequestFactory.java @@ -167,7 +167,7 @@ private void createFactoryObjects(RequestFactory factory) throws IOException { String id = "1"; a(factory.newAbortMultipartUploadRequestBuilder(path, id)); a(factory.newCompleteMultipartUploadRequestBuilder(path, id, - new ArrayList<>(), new PutObjectOptions(false, "some class", Collections.emptyMap()))); + new ArrayList<>(), new PutObjectOptions(false, false,"some class", Collections.emptyMap()))); a(factory.newCopyObjectRequestBuilder(path, path2, HeadObjectResponse.builder().build())); a(factory.newDeleteObjectRequestBuilder(path)); From 6f4af937e72ed1fa1925159cd25a10fd9ead4583 Mon Sep 17 00:00:00 2001 From: diljot grewal Date: Tue, 15 Oct 2024 00:44:21 -0700 Subject: [PATCH 5/6] reverting to using optionheaders --- .../java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java | 3 ++- .../org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java | 4 ++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java index 312aaadef10c3..df808aebcd3da 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java @@ -699,9 +699,10 @@ private long putObject() throws IOException { clearActiveBlock(); PutObjectRequest.Builder maybeModifiedPutIfAbsentRequest = putObjectRequest.toBuilder(); + Map optionHeaders = builder.putOptions.getHeaders(); if (builder.isConditionalPutEnabled){ maybeModifiedPutIfAbsentRequest.overrideConfiguration( - override -> override.putHeader(IF_NONE_MATCH, "*")); + override -> override.putHeader(IF_NONE_MATCH, optionHeaders.get(IF_NONE_MATCH))); } final PutObjectRequest finalizedRequest = maybeModifiedPutIfAbsentRequest.build(); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java index 2ba5b60fe48b9..6bec51e6c3f88 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java @@ -539,12 +539,12 @@ public CompleteMultipartUploadRequest.Builder newCompleteMultipartUploadRequestB // a copy of the list is required, so that the AWS SDK doesn't // attempt to sort an unmodifiable list. CompleteMultipartUploadRequest.Builder requestBuilder; - + Map optionHeaders = putOptions.getHeaders(); requestBuilder = CompleteMultipartUploadRequest.builder().bucket(bucket).key(destKey).uploadId(uploadId) .multipartUpload(CompletedMultipartUpload.builder().parts(partETags).build()); if (putOptions.isconditionalPutEnabled()){ requestBuilder = CompleteMultipartUploadRequest.builder().overrideConfiguration( - override ->override.putHeader(IF_NONE_MATCH, "*")); + override ->override.putHeader(IF_NONE_MATCH, optionHeaders.get(IF_NONE_MATCH))); } From d66dc791014ae43cca68ce9df644d7b038bf3119 Mon Sep 17 00:00:00 2001 From: diljot grewal Date: Tue, 15 Oct 2024 10:54:22 -0700 Subject: [PATCH 6/6] redundant set for multipart_size --- .../org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatch.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatch.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatch.java index 9a10ac544cec1..46b5a6c35de7f 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatch.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatch.java @@ -46,7 +46,6 @@ import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfNotEnabled; import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; import static org.apache.hadoop.fs.s3a.impl.InternalConstants.UPLOAD_PART_COUNT_LIMIT; -import static org.apache.hadoop.fs.s3a.scale.ITestS3AMultipartUploadSizeLimits.MPU_SIZE; import static org.apache.hadoop.fs.s3a.scale.S3AScaleTestBase._1MB; import static org.apache.hadoop.test.LambdaTestUtils.intercept; @@ -63,9 +62,7 @@ public Configuration createConfiguration() { conf, MULTIPART_SIZE, UPLOAD_PART_COUNT_LIMIT, - MIN_MULTIPART_THRESHOLD, - MULTIPART_SIZE); - conf.setLong(MULTIPART_SIZE, MPU_SIZE); + MIN_MULTIPART_THRESHOLD); conf.setLong(UPLOAD_PART_COUNT_LIMIT, 2); conf.setLong(MIN_MULTIPART_THRESHOLD, MULTIPART_MIN_SIZE); conf.setInt(MULTIPART_SIZE, MULTIPART_MIN_SIZE);