diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java index 67854e65720fd..bfeb25b474584 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java @@ -1439,6 +1439,13 @@ private Constants() { */ public static final String FS_S3A_CREATE_PERFORMANCE = "fs.s3a.create.performance"; + /** + * Flag for commit if none match. + * This can be set in the {code createFile()} builder. + * Value {@value}. + */ + public static final String FS_S3A_CONDITIONAL_FILE_CREATE = "fs.s3a.conditional.file.create"; + /** * Default value for create performance in an S3A FS. * Value {@value}. diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java index 741a78a0537f2..df808aebcd3da 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java @@ -77,6 +77,7 @@ import static java.util.Objects.requireNonNull; import static org.apache.hadoop.fs.s3a.Statistic.*; +import static org.apache.hadoop.fs.s3a.impl.AWSHeaders.IF_NONE_MATCH; import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.CONTENT_TYPE_OCTET_STREAM; import static org.apache.hadoop.fs.s3a.impl.ProgressListenerEvent.*; import static org.apache.hadoop.fs.s3a.statistics.impl.EmptyS3AStatisticsContext.EMPTY_BLOCK_OUTPUT_STREAM_STATISTICS; @@ -690,20 +691,28 @@ private long putObject() throws IOException { final S3ADataBlocks.DataBlock block = getActiveBlock(); final long size = block.dataSize(); final S3ADataBlocks.BlockUploadData uploadData = block.startUpload(); - final PutObjectRequest putObjectRequest = + PutObjectRequest putObjectRequest = writeOperationHelper.createPutObjectRequest( key, uploadData.getSize(), builder.putOptions); clearActiveBlock(); + PutObjectRequest.Builder maybeModifiedPutIfAbsentRequest = putObjectRequest.toBuilder(); + Map optionHeaders = builder.putOptions.getHeaders(); + if (builder.isConditionalPutEnabled){ + maybeModifiedPutIfAbsentRequest.overrideConfiguration( + override -> override.putHeader(IF_NONE_MATCH, optionHeaders.get(IF_NONE_MATCH))); + } + final PutObjectRequest finalizedRequest = maybeModifiedPutIfAbsentRequest.build(); + BlockUploadProgress progressCallback = new BlockUploadProgress(block, progressListener, now()); statistics.blockUploadQueued(size); try { progressCallback.progressChanged(PUT_STARTED_EVENT); // the putObject call automatically closes the upload data - writeOperationHelper.putObject(putObjectRequest, + writeOperationHelper.putObject(finalizedRequest, builder.putOptions, uploadData, statistics); @@ -1389,6 +1398,11 @@ public static final class BlockOutputStreamBuilder { */ private boolean isMultipartUploadEnabled; + /** + * Is conditional create enabled. + */ + private boolean isConditionalPutEnabled; + private BlockOutputStreamBuilder() { } @@ -1550,5 +1564,11 @@ public BlockOutputStreamBuilder withMultipartEnabled( isMultipartUploadEnabled = value; return this; } + + public BlockOutputStreamBuilder withConditionalPutEnabled( + final boolean value){ + isConditionalPutEnabled = value; + return this; + } } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 3ab19dfa1914f..cb4aaf866250c 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -2194,8 +2194,9 @@ private FSDataOutputStream innerCreateFile( // put options are derived from the path and the // option builder. boolean keep = options.isPerformance() || keepDirectoryMarkers(path); + boolean conditionalCreate = options.isConditionalCreate(); final PutObjectOptions putOptions = - new PutObjectOptions(keep, null, options.getHeaders()); + new PutObjectOptions(keep, false, null, options.getHeaders()); validateOutputStreamConfiguration(path, getConf()); @@ -2223,7 +2224,8 @@ private FSDataOutputStream innerCreateFile( .withPutOptions(putOptions) .withIOStatisticsAggregator( IOStatisticsContext.getCurrentIOStatisticsContext().getAggregator()) - .withMultipartEnabled(isMultipartUploadEnabled); + .withMultipartEnabled(isMultipartUploadEnabled) + .withConditionalPutEnabled(conditionalCreate); return new FSDataOutputStream( new S3ABlockOutputStream(builder), null); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java index b7387fc12e140..5784c2b797a91 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java @@ -320,8 +320,7 @@ private CompleteMultipartUploadResponse finalizeMultipartUpload( retrying, () -> { final CompleteMultipartUploadRequest.Builder requestBuilder = - getRequestFactory().newCompleteMultipartUploadRequestBuilder( - destKey, uploadId, partETags); + getRequestFactory().newCompleteMultipartUploadRequestBuilder(destKey, uploadId, partETags, putOptions); return writeOperationHelperCallbacks.completeMultipartUpload(requestBuilder.build()); }); writeOperationHelperCallbacks.finishedWrite(destKey, length, diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/RequestFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/RequestFactory.java index 73ad137a86d3c..54784d4c10d2f 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/RequestFactory.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/RequestFactory.java @@ -168,12 +168,14 @@ CreateMultipartUploadRequest.Builder newMultipartUploadRequestBuilder( * @param destKey destination object key * @param uploadId ID of initiated upload * @param partETags ordered list of etags + * @param putOptions options for the request * @return the request builder. */ CompleteMultipartUploadRequest.Builder newCompleteMultipartUploadRequestBuilder( String destKey, String uploadId, - List partETags); + List partETags, + PutObjectOptions putOptions); /** * Create a HEAD object request builder. diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/S3MagicCommitTracker.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/S3MagicCommitTracker.java index 1f6c9123bae62..e76fda67b203d 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/S3MagicCommitTracker.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/S3MagicCommitTracker.java @@ -79,7 +79,7 @@ public boolean aboutToComplete(String uploadId, PutObjectRequest originalDestPut = getWriter().createPutObjectRequest( getOriginalDestKey(), 0, - new PutObjectOptions(true, null, headers)); + new PutObjectOptions(true,false, null, headers)); upload(originalDestPut, EMPTY); // build the commit summary diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/AWSHeaders.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/AWSHeaders.java index aaca3b9b194d6..572e3acd70b74 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/AWSHeaders.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/AWSHeaders.java @@ -38,6 +38,7 @@ public interface AWSHeaders { String DATE = "Date"; String ETAG = "ETag"; String LAST_MODIFIED = "Last-Modified"; + String IF_NONE_MATCH = "If-None-Match"; /* * Amazon HTTP Headers used by S3A. diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CreateFileBuilder.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CreateFileBuilder.java index ae2945989ddd3..2421bfc17488a 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CreateFileBuilder.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CreateFileBuilder.java @@ -63,19 +63,19 @@ public class CreateFileBuilder extends * Classic create file option set: overwriting. */ public static final CreateFileOptions OPTIONS_CREATE_FILE_OVERWRITE = - new CreateFileOptions(CREATE_OVERWRITE_FLAGS, true, false, null); + new CreateFileOptions(CREATE_OVERWRITE_FLAGS, true, false, false, null); /** * Classic create file option set: no overwrite. */ public static final CreateFileOptions OPTIONS_CREATE_FILE_NO_OVERWRITE = - new CreateFileOptions(CREATE_NO_OVERWRITE_FLAGS, true, false, null); + new CreateFileOptions(CREATE_NO_OVERWRITE_FLAGS, true, false, false, null); /** * Performance create options. */ public static final CreateFileOptions OPTIONS_CREATE_FILE_PERFORMANCE = - new CreateFileOptions(CREATE_OVERWRITE_FLAGS, true, true, null); + new CreateFileOptions(CREATE_OVERWRITE_FLAGS, true, true, false, null); /** * Callback interface. @@ -144,10 +144,12 @@ public FSDataOutputStream build() throws IOException { final boolean performance = options.getBoolean(Constants.FS_S3A_CREATE_PERFORMANCE, false); + final boolean conditionalCreate = + options.getBoolean(Constants.FS_S3A_CONDITIONAL_FILE_CREATE, false); return callbacks.createFileFromBuilder( path, getProgress(), - new CreateFileOptions(flags, isRecursive(), performance, headers)); + new CreateFileOptions(flags, isRecursive(), performance, conditionalCreate, headers)); } @@ -218,6 +220,11 @@ public static final class CreateFileOptions { */ private final boolean performance; + /** + * conditional flag. + */ + private final boolean conditionalCreate; + /** * Headers; may be null. */ @@ -227,16 +234,19 @@ public static final class CreateFileOptions { * @param flags creation flags * @param recursive create parent dirs? * @param performance performance flag + * @param conditionalCreate conditional flag * @param headers nullable header map. */ public CreateFileOptions( final EnumSet flags, final boolean recursive, final boolean performance, + final boolean conditionalCreate, final Map headers) { this.flags = flags; this.recursive = recursive; this.performance = performance; + this.conditionalCreate = conditionalCreate; this.headers = headers; } @@ -246,6 +256,7 @@ public String toString() { "flags=" + flags + ", recursive=" + recursive + ", performance=" + performance + + ", conditionalCreate=" + conditionalCreate + ", headers=" + headers + '}'; } @@ -262,6 +273,10 @@ public boolean isPerformance() { return performance; } + public boolean isConditionalCreate() { + return conditionalCreate; + } + public Map getHeaders() { return headers; } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java index 5bb64ddc28920..6c9db6dfd6a9f 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java @@ -40,6 +40,7 @@ import static org.apache.hadoop.fs.s3a.Constants.DIRECTORY_OPERATIONS_PURGE_UPLOADS; import static org.apache.hadoop.fs.s3a.Constants.ENABLE_MULTI_DELETE; import static org.apache.hadoop.fs.s3a.Constants.FIPS_ENDPOINT; +import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CONDITIONAL_FILE_CREATE; import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_PERFORMANCE; import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_PERFORMANCE_ENABLED; import static org.apache.hadoop.fs.s3a.Constants.STORE_CAPABILITY_AWS_V2; @@ -260,7 +261,7 @@ private InternalConstants() { */ public static final Set CREATE_FILE_KEYS = Collections.unmodifiableSet( - new HashSet<>(Arrays.asList(FS_S3A_CREATE_PERFORMANCE))); + new HashSet<>(Arrays.asList(FS_S3A_CREATE_PERFORMANCE, FS_S3A_CONDITIONAL_FILE_CREATE))); /** * Dynamic Path capabilities to be evaluated diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/PutObjectOptions.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/PutObjectOptions.java index e14285a1ca8b1..2f81745ba56e0 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/PutObjectOptions.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/PutObjectOptions.java @@ -31,6 +31,11 @@ public final class PutObjectOptions { */ private final boolean keepMarkers; + /** + * Is this a conditional PUT operation + */ + private final boolean conditionalPutEnabled; + /** * Storage class, if not null. */ @@ -45,13 +50,16 @@ public final class PutObjectOptions { * Constructor. * @param keepMarkers Can the PUT operation skip marker deletion? * @param storageClass Storage class, if not null. + * @param conditionalPutEnabled Is this a conditional Put? * @param headers Headers; may be null. */ public PutObjectOptions( final boolean keepMarkers, + final boolean conditionalPutEnabled, @Nullable final String storageClass, @Nullable final Map headers) { this.keepMarkers = keepMarkers; + this.conditionalPutEnabled = conditionalPutEnabled; this.storageClass = storageClass; this.headers = headers; } @@ -64,6 +72,14 @@ public boolean isKeepMarkers() { return keepMarkers; } + /** + * Get the conditional put flag. + * @return true if it's a conditional put + */ + public boolean isconditionalPutEnabled() { + return conditionalPutEnabled; + } + /** * Headers for the put/post request. * @return headers or null. @@ -80,9 +96,9 @@ public String toString() { '}'; } - private static final PutObjectOptions KEEP_DIRS = new PutObjectOptions(true, + private static final PutObjectOptions KEEP_DIRS = new PutObjectOptions(true, false, null, null); - private static final PutObjectOptions DELETE_DIRS = new PutObjectOptions(false, + private static final PutObjectOptions DELETE_DIRS = new PutObjectOptions(false, false, null, null); /** diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java index e9f7d707286a8..6bec51e6c3f88 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java @@ -60,6 +60,7 @@ import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets; import static org.apache.commons.lang3.StringUtils.isNotEmpty; +import static org.apache.hadoop.fs.s3a.impl.AWSHeaders.IF_NONE_MATCH; import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_PART_UPLOAD_TIMEOUT; import static org.apache.hadoop.fs.s3a.S3AEncryptionMethods.UNKNOWN_ALGORITHM; import static org.apache.hadoop.fs.s3a.impl.AWSClientConfig.setRequestTimeout; @@ -532,12 +533,20 @@ public CreateMultipartUploadRequest.Builder newMultipartUploadRequestBuilder( public CompleteMultipartUploadRequest.Builder newCompleteMultipartUploadRequestBuilder( String destKey, String uploadId, - List partETags) { + List partETags, + PutObjectOptions putOptions) { + // a copy of the list is required, so that the AWS SDK doesn't // attempt to sort an unmodifiable list. - CompleteMultipartUploadRequest.Builder requestBuilder = - CompleteMultipartUploadRequest.builder().bucket(bucket).key(destKey).uploadId(uploadId) + CompleteMultipartUploadRequest.Builder requestBuilder; + Map optionHeaders = putOptions.getHeaders(); + requestBuilder = CompleteMultipartUploadRequest.builder().bucket(bucket).key(destKey).uploadId(uploadId) .multipartUpload(CompletedMultipartUpload.builder().parts(partETags).build()); + if (putOptions.isconditionalPutEnabled()){ + requestBuilder = CompleteMultipartUploadRequest.builder().overrideConfiguration( + override ->override.putHeader(IF_NONE_MATCH, optionHeaders.get(IF_NONE_MATCH))); + + } return prepareRequest(requestBuilder); } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatch.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatch.java new file mode 100644 index 0000000000000..46b5a6c35de7f --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatch.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.impl; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FSDataOutputStreamBuilder; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest; +import org.apache.hadoop.fs.s3a.RemoteFileChangedException; +import org.apache.hadoop.fs.s3a.S3ATestUtils; +import org.apache.hadoop.io.IOUtils; + +import org.assertj.core.api.Assertions; +import org.junit.Assert; +import org.junit.Test; +import software.amazon.awssdk.services.s3.model.S3Exception; + +import java.io.IOException; +import java.nio.file.AccessDeniedException; + +import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset; +import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BUFFER; +import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BUFFER_ARRAY; +import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CONDITIONAL_FILE_CREATE; +import static org.apache.hadoop.fs.s3a.Constants.MIN_MULTIPART_THRESHOLD; +import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_MIN_SIZE; +import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_SIZE; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfNotEnabled; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; +import static org.apache.hadoop.fs.s3a.impl.InternalConstants.UPLOAD_PART_COUNT_LIMIT; +import static org.apache.hadoop.fs.s3a.scale.S3AScaleTestBase._1MB; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; + + +public class ITestS3APutIfMatch extends AbstractS3ACostTest { + + private Configuration conf; + + @Override + public Configuration createConfiguration() { + Configuration conf = super.createConfiguration(); + S3ATestUtils.disableFilesystemCaching(conf); + removeBaseAndBucketOverrides( + conf, + MULTIPART_SIZE, + UPLOAD_PART_COUNT_LIMIT, + MIN_MULTIPART_THRESHOLD); + conf.setLong(UPLOAD_PART_COUNT_LIMIT, 2); + conf.setLong(MIN_MULTIPART_THRESHOLD, MULTIPART_MIN_SIZE); + conf.setInt(MULTIPART_SIZE, MULTIPART_MIN_SIZE); + return conf; + } + + @Override + public void setup() throws Exception { + super.setup(); + conf = createConfiguration(); + skipIfNotEnabled(conf, FS_S3A_CONDITIONAL_FILE_CREATE, + "Skipping IfNoneMatch tests"); + } + + private static void assertS3ExceptionStatusCode(int code, Exception ex) { + S3Exception s3Exception = (S3Exception) ex.getCause(); + + if (s3Exception.statusCode() != code) { + throw new AssertionError("Expected status code " + code + " from " + ex, ex); + } + } + + protected String getBlockOutputBufferName() { + return FAST_UPLOAD_BUFFER_ARRAY; + } + + /** + * Create a file using the PutIfMatch feature from S3 + * @param fs filesystem + * @param path path to write + * @param data source dataset. Can be null + * @throws IOException on any problem + */ + private static void createFileWithIfNoneMatchFlag( + FileSystem fs, + Path path, + byte[] data, + String ifMatchTag) throws Exception { + FSDataOutputStreamBuilder builder = fs.createFile(path); + builder.must(FS_S3A_CONDITIONAL_FILE_CREATE, ifMatchTag); + FSDataOutputStream stream = builder.create().build(); + if (data != null && data.length > 0) { + stream.write(data); + } + stream.close(); + IOUtils.closeStream(stream); + } + + @Test + public void testPutIfAbsentConflict() throws Throwable { + FileSystem fs = getFileSystem(); + Path testFile = methodPath(); + + fs.mkdirs(testFile.getParent()); + byte[] fileBytes = dataset(TEST_FILE_LEN, 0, 255); + + RemoteFileChangedException firstException = intercept(RemoteFileChangedException.class, + () -> createFileWithIfNoneMatchFlag(fs, testFile, fileBytes, "*")); + assertS3ExceptionStatusCode(412, firstException); + + RemoteFileChangedException secondException = intercept(RemoteFileChangedException.class, + () -> createFileWithIfNoneMatchFlag(fs, testFile, fileBytes, "*")); + assertS3ExceptionStatusCode(412, secondException); + } + + + @Test + public void testPutIfAbsentLargeFileConflict() throws Throwable { + FileSystem fs = getFileSystem(); + Path testFile = methodPath(); + + // enough bytes for Multipart Upload + byte[] fileBytes = dataset(6 * _1MB, 'a', 'z' - 'a'); + + RemoteFileChangedException firstException = intercept(RemoteFileChangedException.class, + () -> createFileWithIfNoneMatchFlag(fs, testFile, fileBytes, "*")); + assertS3ExceptionStatusCode(412, firstException); + + RemoteFileChangedException secondException = intercept(RemoteFileChangedException.class, + () -> createFileWithIfNoneMatchFlag(fs, testFile, fileBytes, "*")); + assertS3ExceptionStatusCode(412, secondException); + + } +} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestCreateFileBuilder.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestCreateFileBuilder.java index 65d7aa6192dd8..c9682abab3e7f 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestCreateFileBuilder.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestCreateFileBuilder.java @@ -37,6 +37,7 @@ import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_HEADER; import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_PERFORMANCE; import static org.apache.hadoop.test.LambdaTestUtils.intercept; +import static org.apache.hadoop.fs.s3a.impl.AWSHeaders.IF_NONE_MATCH; /** * Unit test of {@link CreateFileBuilder}. @@ -89,11 +90,13 @@ public void testPerformanceSupport() throws Throwable { public void testHeaderOptions() throws Throwable { final CreateFileBuilder builder = mkBuilder().create() .must(FS_S3A_CREATE_HEADER + ".retention", "permanent") + .must(FS_S3A_CREATE_HEADER + "." + IF_NONE_MATCH, "*") .opt(FS_S3A_CREATE_HEADER + ".owner", "engineering"); final Map headers = build(builder).getHeaders(); Assertions.assertThat(headers) .containsEntry("retention", "permanent") - .containsEntry("owner", "engineering"); + .containsEntry("owner", "engineering") + .containsEntry(IF_NONE_MATCH, "*"); } @Test diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestRequestFactory.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestRequestFactory.java index b864cd3b63982..787dadf5b7eea 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestRequestFactory.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestRequestFactory.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.time.Duration; import java.util.ArrayList; +import java.util.Collections; import java.util.concurrent.atomic.AtomicLong; import software.amazon.awssdk.awscore.AwsRequest; @@ -166,7 +167,7 @@ private void createFactoryObjects(RequestFactory factory) throws IOException { String id = "1"; a(factory.newAbortMultipartUploadRequestBuilder(path, id)); a(factory.newCompleteMultipartUploadRequestBuilder(path, id, - new ArrayList<>())); + new ArrayList<>(), new PutObjectOptions(false, false,"some class", Collections.emptyMap()))); a(factory.newCopyObjectRequestBuilder(path, path2, HeadObjectResponse.builder().build())); a(factory.newDeleteObjectRequestBuilder(path));