diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java index 66d485317c9ba..72a8a435717e3 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java @@ -181,6 +181,10 @@ public class AbfsConfiguration{ DefaultValue = DEFAULT_FS_AZURE_ATOMIC_RENAME_DIRECTORIES) private String azureAtomicDirs; + @BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ENABLE_CONDITIONAL_CREATE_OVERWRITE, + DefaultValue = DEFAULT_FS_AZURE_ENABLE_CONDITIONAL_CREATE_OVERWRITE) + private boolean enableConditionalCreateOverwrite; + @StringConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_APPEND_BLOB_KEY, DefaultValue = DEFAULT_FS_AZURE_APPEND_BLOB_DIRECTORIES) private String azureAppendBlobDirs; @@ -573,6 +577,10 @@ public String getAzureAtomicRenameDirs() { return this.azureAtomicDirs; } + public boolean isConditionalCreateOverwriteEnabled() { + return this.enableConditionalCreateOverwrite; + } + public String getAppendBlobDirs() { return this.azureAppendBlobDirs; } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index 23d2b5a3d63fb..d2a1d538f6380 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -66,6 +66,7 @@ import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.ConcurrentWriteOperationDetectedException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.FileSystemOperationUnhandledException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidAbfsRestOperationException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidFileSystemPropertyException; @@ -464,10 +465,32 @@ public OutputStream createFile(final Path path, isAppendBlob = true; } - final AbfsRestOperation op = client.createPath(relativePath, true, overwrite, - isNamespaceEnabled ? getOctalNotation(permission) : null, - isNamespaceEnabled ? getOctalNotation(umask) : null, - isAppendBlob); + // if "fs.azure.enable.conditional.create.overwrite" is enabled and + // is a create request with overwrite=true, create will follow different + // flow. + boolean triggerConditionalCreateOverwrite = false; + if (overwrite + && abfsConfiguration.isConditionalCreateOverwriteEnabled()) { + triggerConditionalCreateOverwrite = true; + } + + AbfsRestOperation op; + if (triggerConditionalCreateOverwrite) { + op = conditionalCreateOverwriteFile(relativePath, + statistics, + isNamespaceEnabled ? getOctalNotation(permission) : null, + isNamespaceEnabled ? getOctalNotation(umask) : null, + isAppendBlob + ); + + } else { + op = client.createPath(relativePath, true, + overwrite, + isNamespaceEnabled ? getOctalNotation(permission) : null, + isNamespaceEnabled ? getOctalNotation(umask) : null, + isAppendBlob, + null); + } perfInfo.registerResult(op.getResult()).registerSuccess(true); return new AbfsOutputStream( @@ -479,6 +502,74 @@ public OutputStream createFile(final Path path, } } + /** + * Conditional create overwrite flow ensures that create overwrites is done + * only if there is match for eTag of existing file. + * @param relativePath + * @param statistics + * @param permission + * @param umask + * @param isAppendBlob + * @return + * @throws AzureBlobFileSystemException + */ + private AbfsRestOperation conditionalCreateOverwriteFile(final String relativePath, + final FileSystem.Statistics statistics, + final String permission, + final String umask, + final boolean isAppendBlob) throws AzureBlobFileSystemException { + AbfsRestOperation op; + + try { + // Trigger a create with overwrite=false first so that eTag fetch can be + // avoided for cases when no pre-existing file is present (major portion + // of create file traffic falls into the case of no pre-existing file). + op = client.createPath(relativePath, true, + false, permission, umask, isAppendBlob, null); + } catch (AbfsRestOperationException e) { + if (e.getStatusCode() == HttpURLConnection.HTTP_CONFLICT) { + // File pre-exists, fetch eTag + try { + op = client.getPathStatus(relativePath, false); + } catch (AbfsRestOperationException ex) { + if (ex.getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) { + // Is a parallel access case, as file which was found to be + // present went missing by this request. + throw new ConcurrentWriteOperationDetectedException( + "Parallel access to the create path detected. Failing request " + + "to honor single writer semantics"); + } else { + throw ex; + } + } + + String eTag = op.getResult() + .getResponseHeader(HttpHeaderConfigurations.ETAG); + + try { + // overwrite only if eTag matches with the file properties fetched befpre + op = client.createPath(relativePath, true, + true, permission, umask, isAppendBlob, eTag); + } catch (AbfsRestOperationException ex) { + if (ex.getStatusCode() == HttpURLConnection.HTTP_PRECON_FAILED) { + // Is a parallel access case, as file with eTag was just queried + // and precondition failure can happen only when another file with + // different etag got created. + throw new ConcurrentWriteOperationDetectedException( + "Parallel access to the create path detected. Failing request " + + "to honor single writer semantics"); + } else { + throw ex; + } + } + } else { + throw e; + } + } + + return op; + } + private AbfsOutputStreamContext populateAbfsOutputStreamContext(boolean isAppendBlob) { int bufferSize = abfsConfiguration.getWriteBufferSize(); if (isAppendBlob && bufferSize > FileSystemConfigurations.APPENDBLOB_MAX_WRITE_BUFFER_SIZE) { @@ -508,7 +599,7 @@ public void createDirectory(final Path path, final FsPermission permission, fina final AbfsRestOperation op = client.createPath(getRelativePath(path), false, true, isNamespaceEnabled ? getOctalNotation(permission) : null, - isNamespaceEnabled ? getOctalNotation(umask) : null, false); + isNamespaceEnabled ? getOctalNotation(umask) : null, false, null); perfInfo.registerResult(op.getResult()).registerSuccess(true); } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java index 681390c019873..c15c470a4455f 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java @@ -67,6 +67,10 @@ public final class ConfigurationKeys { public static final String FS_AZURE_ENABLE_AUTOTHROTTLING = "fs.azure.enable.autothrottling"; public static final String FS_AZURE_ALWAYS_USE_HTTPS = "fs.azure.always.use.https"; public static final String FS_AZURE_ATOMIC_RENAME_KEY = "fs.azure.atomic.rename.key"; + /** This config ensures that during create overwrite an existing file will be + * overwritten only if there is a match on the eTag of existing file. + */ + public static final String FS_AZURE_ENABLE_CONDITIONAL_CREATE_OVERWRITE = "fs.azure.enable.conditional.create.overwrite"; /** Provides a config to provide comma separated path prefixes on which Appendblob based files are created * Default is empty. **/ public static final String FS_AZURE_APPEND_BLOB_KEY = "fs.azure.appendblob.directories"; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java index f70d102c1d905..fa0ee6a89212d 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java @@ -70,6 +70,7 @@ public final class FileSystemConfigurations { public static final boolean DEFAULT_AZURE_SKIP_USER_GROUP_METADATA_DURING_INITIALIZATION = false; public static final String DEFAULT_FS_AZURE_ATOMIC_RENAME_DIRECTORIES = "/hbase"; + public static final boolean DEFAULT_FS_AZURE_ENABLE_CONDITIONAL_CREATE_OVERWRITE = true; public static final String DEFAULT_FS_AZURE_APPEND_BLOB_DIRECTORIES = ""; public static final int DEFAULT_READ_AHEAD_QUEUE_DEPTH = -1; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/ConcurrentWriteOperationDetectedException.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/ConcurrentWriteOperationDetectedException.java new file mode 100644 index 0000000000000..79813ddfe6400 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/ConcurrentWriteOperationDetectedException.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.contracts.exceptions; + +/** + * Thrown when a concurrent write operation is detected. + */ +@org.apache.hadoop.classification.InterfaceAudience.Public +@org.apache.hadoop.classification.InterfaceStability.Evolving +public class ConcurrentWriteOperationDetectedException + extends AzureBlobFileSystemException { + + public ConcurrentWriteOperationDetectedException(String message) { + super(message); + } +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java index 45c1948a0ec7a..0415857745ba6 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java @@ -265,7 +265,7 @@ public AbfsRestOperation deleteFilesystem() throws AzureBlobFileSystemException public AbfsRestOperation createPath(final String path, final boolean isFile, final boolean overwrite, final String permission, final String umask, - final boolean isAppendBlob) throws AzureBlobFileSystemException { + final boolean isAppendBlob, final String eTag) throws AzureBlobFileSystemException { final List requestHeaders = createDefaultHeaders(); if (!overwrite) { requestHeaders.add(new AbfsHttpHeader(IF_NONE_MATCH, AbfsHttpConstants.STAR)); @@ -279,6 +279,10 @@ public AbfsRestOperation createPath(final String path, final boolean isFile, fin requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_UMASK, umask)); } + if (eTag != null && !eTag.isEmpty()) { + requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.IF_MATCH, eTag)); + } + final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, isFile ? FILE : DIRECTORY); if (isAppendBlob) { diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsNetworkStatistics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsNetworkStatistics.java index f6ee7a90faa31..c2dbe937b812b 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsNetworkStatistics.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsNetworkStatistics.java @@ -110,9 +110,18 @@ public void testAbfsHttpSendStatistics() throws IOException { connectionsMade++; requestsSent++; + try (AbfsOutputStream out = createAbfsOutputStreamWithFlushEnabled(fs, sendRequestPath)) { + // Is a file overwrite case + long createRequestCalls = 1; + long createTriggeredGFSForETag = 0; + if (this.getConfiguration().isConditionalCreateOverwriteEnabled()) { + createRequestCalls += 1; + createTriggeredGFSForETag = 1; + } + for (int i = 0; i < LARGE_OPERATIONS; i++) { out.write(testNetworkStatsString.getBytes()); @@ -141,17 +150,20 @@ public void testAbfsHttpSendStatistics() throws IOException { * wrote each time). * */ + + connectionsMade += createRequestCalls + createTriggeredGFSForETag; + requestsSent += createRequestCalls; if (fs.getAbfsStore().isAppendBlobKey(fs.makeQualified(sendRequestPath).toString())) { // no network calls are made for hflush in case of appendblob assertAbfsStatistics(CONNECTIONS_MADE, - connectionsMade + 1 + LARGE_OPERATIONS, metricMap); + connectionsMade + LARGE_OPERATIONS, metricMap); assertAbfsStatistics(SEND_REQUESTS, - requestsSent + 1 + LARGE_OPERATIONS, metricMap); + requestsSent + LARGE_OPERATIONS, metricMap); } else { assertAbfsStatistics(CONNECTIONS_MADE, - connectionsMade + 1 + LARGE_OPERATIONS * 2, metricMap); + connectionsMade + LARGE_OPERATIONS * 2, metricMap); assertAbfsStatistics(SEND_REQUESTS, - requestsSent + 1 + LARGE_OPERATIONS * 2, metricMap); + requestsSent + LARGE_OPERATIONS * 2, metricMap); } assertAbfsStatistics(AbfsStatistic.BYTES_SENT, bytesSent + LARGE_OPERATIONS * (testNetworkStatsString.getBytes().length), @@ -237,13 +249,21 @@ public void testAbfsHttpResponseStatistics() throws IOException { try { /* - * Creating a file and writing buffer into it. Also recording the - * buffer for future read() call. + * Creating a file and writing buffer into it. + * This is a file recreate, so it will trigger + * 2 extra calls if create overwrite is off by default. + * Also recording the buffer for future read() call. * This creating outputStream and writing requires 2 * * (LARGE_OPERATIONS) get requests. */ StringBuilder largeBuffer = new StringBuilder(); out = fs.create(getResponsePath); + + long createRequestCalls = 1; + if (this.getConfiguration().isConditionalCreateOverwriteEnabled()) { + createRequestCalls += 2; + } + for (int i = 0; i < LARGE_OPERATIONS; i++) { out.write(testResponseString.getBytes()); out.hflush(); @@ -268,7 +288,8 @@ public void testAbfsHttpResponseStatistics() throws IOException { * * get_response : get_responses(Last assertion) + 1 * (OutputStream) + 2 * LARGE_OPERATIONS(Writing and flushing - * LARGE_OPERATIONS times) + 1(open()) + 1(read()). + * LARGE_OPERATIONS times) + 1(open()) + 1(read()) + + * 1 (createOverwriteTriggeredGetForeTag). * * bytes_received : bytes_received(Last assertion) + LARGE_OPERATIONS * * bytes wrote each time (bytes_received is equal to bytes wrote in the @@ -284,7 +305,8 @@ public void testAbfsHttpResponseStatistics() throws IOException { getResponses + 3 + LARGE_OPERATIONS, metricMap); } else { assertAbfsStatistics(AbfsStatistic.GET_RESPONSES, - getResponses + 3 + 2 * LARGE_OPERATIONS, metricMap); + getResponses + 2 + createRequestCalls + 2 * LARGE_OPERATIONS, + metricMap); } } finally { @@ -319,4 +341,4 @@ public void testAbfsHttpResponseFailure() throws IOException { IOUtils.cleanupWithLogger(LOG, out); } } -} +} \ No newline at end of file diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java index 4b8f071b998de..981ed25cee75b 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java @@ -21,18 +21,44 @@ import java.io.FileNotFoundException; import java.io.FilterOutputStream; import java.io.IOException; +import java.lang.reflect.Field; import java.util.EnumSet; +import java.util.UUID; import org.junit.Test; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.FileAlreadyExistsException; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.ConcurrentWriteOperationDetectedException; +import org.apache.hadoop.fs.azurebfs.services.AbfsClient; +import org.apache.hadoop.fs.azurebfs.services.TestAbfsClient; +import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation; +import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation; + +import static java.net.HttpURLConnection.HTTP_CONFLICT; +import static java.net.HttpURLConnection.HTTP_INTERNAL_ERROR; +import static java.net.HttpURLConnection.HTTP_NOT_FOUND; +import static java.net.HttpURLConnection.HTTP_OK; +import static java.net.HttpURLConnection.HTTP_PRECON_FAILED; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + import static org.apache.hadoop.fs.contract.ContractTestUtils.assertIsFile; import static org.apache.hadoop.test.LambdaTestUtils.intercept; +import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CONNECTIONS_MADE; /** * Test create operation. @@ -188,4 +214,250 @@ public void testFilterFSWriteAfterClose() throws Throwable { }); } + /** + * Tests if the number of connections made for: + * 1. create overwrite=false of a file that doesnt pre-exist + * 2. create overwrite=false of a file that pre-exists + * 3. create overwrite=true of a file that doesnt pre-exist + * 4. create overwrite=true of a file that pre-exists + * matches the expectation when run against both combinations of + * fs.azure.enable.conditional.create.overwrite=true and + * fs.azure.enable.conditional.create.overwrite=false + * @throws Throwable + */ + @Test + public void testDefaultCreateOverwriteFileTest() throws Throwable { + testCreateFileOverwrite(true); + testCreateFileOverwrite(false); + } + + public void testCreateFileOverwrite(boolean enableConditionalCreateOverwrite) + throws Throwable { + final AzureBlobFileSystem currentFs = getFileSystem(); + Configuration config = new Configuration(this.getRawConfiguration()); + config.set("fs.azure.enable.conditional.create.overwrite", + Boolean.toString(enableConditionalCreateOverwrite)); + + final AzureBlobFileSystem fs = + (AzureBlobFileSystem) FileSystem.newInstance(currentFs.getUri(), + config); + + long totalConnectionMadeBeforeTest = fs.getInstrumentationMap() + .get(CONNECTIONS_MADE.getStatName()); + + int createRequestCount = 0; + final Path nonOverwriteFile = new Path("/NonOverwriteTest_FileName_" + + UUID.randomUUID().toString()); + + // Case 1: Not Overwrite - File does not pre-exist + // create should be successful + fs.create(nonOverwriteFile, false); + + // One request to server to create path should be issued + createRequestCount++; + + assertAbfsStatistics( + CONNECTIONS_MADE, + totalConnectionMadeBeforeTest + createRequestCount, + fs.getInstrumentationMap()); + + // Case 2: Not Overwrite - File pre-exists + intercept(FileAlreadyExistsException.class, + () -> fs.create(nonOverwriteFile, false)); + + // One request to server to create path should be issued + createRequestCount++; + + assertAbfsStatistics( + CONNECTIONS_MADE, + totalConnectionMadeBeforeTest + createRequestCount, + fs.getInstrumentationMap()); + + final Path overwriteFilePath = new Path("/OverwriteTest_FileName_" + + UUID.randomUUID().toString()); + + // Case 3: Overwrite - File does not pre-exist + // create should be successful + fs.create(overwriteFilePath, true); + + // One request to server to create path should be issued + createRequestCount++; + + assertAbfsStatistics( + CONNECTIONS_MADE, + totalConnectionMadeBeforeTest + createRequestCount, + fs.getInstrumentationMap()); + + // Case 4: Overwrite - File pre-exists + fs.create(overwriteFilePath, true); + + if (enableConditionalCreateOverwrite) { + // Three requests will be sent to server to create path, + // 1. create without overwrite + // 2. GetFileStatus to get eTag + // 3. create with overwrite + createRequestCount += 3; + } else { + createRequestCount++; + } + + assertAbfsStatistics( + CONNECTIONS_MADE, + totalConnectionMadeBeforeTest + createRequestCount, + fs.getInstrumentationMap()); + } + + /** + * Test negative scenarios with Create overwrite=false as default + * With create overwrite=true ending in 3 calls: + * A. Create overwrite=false + * B. GFS + * C. Create overwrite=true + * + * Scn1: A fails with HTTP409, leading to B which fails with HTTP404, + * detect parallel access + * Scn2: A fails with HTTP409, leading to B which fails with HTTP500, + * fail create with HTTP500 + * Scn3: A fails with HTTP409, leading to B and then C, + * which fails with HTTP412, detect parallel access + * Scn4: A fails with HTTP409, leading to B and then C, + * which fails with HTTP500, fail create with HTTP500 + * Scn5: A fails with HTTP500, fail create with HTTP500 + */ + @Test + public void testNegativeScenariosForCreateOverwriteDisabled() + throws Throwable { + + final AzureBlobFileSystem currentFs = getFileSystem(); + Configuration config = new Configuration(this.getRawConfiguration()); + config.set("fs.azure.enable.conditional.create.overwrite", + Boolean.toString(true)); + + final AzureBlobFileSystem fs = + (AzureBlobFileSystem) FileSystem.newInstance(currentFs.getUri(), + config); + + // Get mock AbfsClient with current config + AbfsClient + mockClient + = TestAbfsClient.getMockAbfsClient( + fs.getAbfsStore().getClient(), + fs.getAbfsStore().getAbfsConfiguration()); + + AzureBlobFileSystemStore abfsStore = fs.getAbfsStore(); + abfsStore = setAzureBlobSystemStoreField(abfsStore, "client", mockClient); + + AbfsRestOperation successOp = mock( + AbfsRestOperation.class); + AbfsHttpOperation http200Op = mock( + AbfsHttpOperation.class); + when(http200Op.getStatusCode()).thenReturn(HTTP_OK); + when(successOp.getResult()).thenReturn(http200Op); + + AbfsRestOperationException conflictResponseEx + = getMockAbfsRestOperationException(HTTP_CONFLICT); + AbfsRestOperationException serverErrorResponseEx + = getMockAbfsRestOperationException(HTTP_INTERNAL_ERROR); + AbfsRestOperationException fileNotFoundResponseEx + = getMockAbfsRestOperationException(HTTP_NOT_FOUND); + AbfsRestOperationException preConditionResponseEx + = getMockAbfsRestOperationException(HTTP_PRECON_FAILED); + + doThrow(conflictResponseEx) // Scn1: GFS fails with Http404 + .doThrow(conflictResponseEx) // Scn2: GFS fails with Http500 + .doThrow( + conflictResponseEx) // Scn3: create overwrite=true fails with Http412 + .doThrow( + conflictResponseEx) // Scn4: create overwrite=true fails with Http500 + .doThrow( + serverErrorResponseEx) // Scn5: create overwrite=false fails with Http500 + .when(mockClient) + .createPath(any(String.class), eq(true), eq(false), any(String.class), + any(String.class), any(boolean.class), eq(null)); + + doThrow(fileNotFoundResponseEx) // Scn1: GFS fails with Http404 + .doThrow(serverErrorResponseEx) // Scn2: GFS fails with Http500 + .doReturn(successOp) // Scn3: create overwrite=true fails with Http412 + .doReturn(successOp) // Scn4: create overwrite=true fails with Http500 + .when(mockClient) + .getPathStatus(any(String.class), eq(false)); + + doThrow( + preConditionResponseEx) // Scn3: create overwrite=true fails with Http412 + .doThrow( + serverErrorResponseEx) // Scn4: create overwrite=true fails with Http500 + .when(mockClient) + .createPath(any(String.class), eq(true), eq(true), any(String.class), + any(String.class), any(boolean.class), eq(null)); + + // Scn1: GFS fails with Http404 + // Sequence of events expected: + // 1. create overwrite=false - fail with conflict + // 2. GFS - fail with File Not found + // Create will fail with ConcurrentWriteOperationDetectedException + validateCreateFileException(ConcurrentWriteOperationDetectedException.class, + abfsStore); + + // Scn2: GFS fails with Http500 + // Sequence of events expected: + // 1. create overwrite=false - fail with conflict + // 2. GFS - fail with Server error + // Create will fail with 500 + validateCreateFileException(AbfsRestOperationException.class, abfsStore); + + // Scn3: create overwrite=true fails with Http412 + // Sequence of events expected: + // 1. create overwrite=false - fail with conflict + // 2. GFS - pass + // 3. create overwrite=true - fail with Pre-Condition + // Create will fail with ConcurrentWriteOperationDetectedException + validateCreateFileException(ConcurrentWriteOperationDetectedException.class, + abfsStore); + + // Scn4: create overwrite=true fails with Http500 + // Sequence of events expected: + // 1. create overwrite=false - fail with conflict + // 2. GFS - pass + // 3. create overwrite=true - fail with Server error + // Create will fail with 500 + validateCreateFileException(AbfsRestOperationException.class, abfsStore); + + // Scn5: create overwrite=false fails with Http500 + // Sequence of events expected: + // 1. create overwrite=false - fail with server error + // Create will fail with 500 + validateCreateFileException(AbfsRestOperationException.class, abfsStore); + } + + private AzureBlobFileSystemStore setAzureBlobSystemStoreField( + final AzureBlobFileSystemStore abfsStore, + final String fieldName, + Object fieldObject) throws Exception { + + Field abfsClientField = AzureBlobFileSystemStore.class.getDeclaredField( + fieldName); + abfsClientField.setAccessible(true); + Field modifiersField = Field.class.getDeclaredField("modifiers"); + modifiersField.setAccessible(true); + modifiersField.setInt(abfsClientField, + abfsClientField.getModifiers() & ~java.lang.reflect.Modifier.FINAL); + abfsClientField.set(abfsStore, fieldObject); + return abfsStore; + } + + private void validateCreateFileException(final Class exceptionClass, final AzureBlobFileSystemStore abfsStore) + throws Exception { + FsPermission permission = new FsPermission(FsAction.ALL, FsAction.ALL, + FsAction.ALL); + FsPermission umask = new FsPermission(FsAction.NONE, FsAction.NONE, + FsAction.NONE); + Path testPath = new Path("testFile"); + intercept( + exceptionClass, + () -> abfsStore.createFile(testPath, null, true, permission, umask)); + } + + private AbfsRestOperationException getMockAbfsRestOperationException(int status) { + return new AbfsRestOperationException(status, "", "", new Exception()); + } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemMkDir.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemMkDir.java index 382d3966485f1..de476a6abce9b 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemMkDir.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemMkDir.java @@ -18,12 +18,18 @@ package org.apache.hadoop.fs.azurebfs; +import java.util.UUID; + import org.junit.Test; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import static org.apache.hadoop.fs.contract.ContractTestUtils.assertMkdirs; +import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CONNECTIONS_MADE; + /** * Test mkdir operation. */ @@ -45,4 +51,58 @@ public void testCreateDirWithExistingDir() throws Exception { public void testCreateRoot() throws Exception { assertMkdirs(getFileSystem(), new Path("/")); } + + /** + * Test mkdir for possible values of fs.azure.disable.default.create.overwrite + * @throws Exception + */ + @Test + public void testDefaultCreateOverwriteDirTest() throws Throwable { + // the config fs.azure.disable.default.create.overwrite should have no + // effect on mkdirs + testCreateDirOverwrite(true); + testCreateDirOverwrite(false); + } + + public void testCreateDirOverwrite(boolean enableConditionalCreateOverwrite) + throws Throwable { + final AzureBlobFileSystem currentFs = getFileSystem(); + Configuration config = new Configuration(this.getRawConfiguration()); + config.set("fs.azure.enable.conditional.create.overwrite", + Boolean.toString(enableConditionalCreateOverwrite)); + + final AzureBlobFileSystem fs = + (AzureBlobFileSystem) FileSystem.newInstance(currentFs.getUri(), + config); + + long totalConnectionMadeBeforeTest = fs.getInstrumentationMap() + .get(CONNECTIONS_MADE.getStatName()); + + int mkdirRequestCount = 0; + final Path dirPath = new Path("/DirPath_" + + UUID.randomUUID().toString()); + + // Case 1: Dir does not pre-exist + fs.mkdirs(dirPath); + + // One request to server + mkdirRequestCount++; + + assertAbfsStatistics( + CONNECTIONS_MADE, + totalConnectionMadeBeforeTest + mkdirRequestCount, + fs.getInstrumentationMap()); + + // Case 2: Dir pre-exists + // Mkdir on existing Dir path will not lead to failure + fs.mkdirs(dirPath); + + // One request to server + mkdirRequestCount++; + + assertAbfsStatistics( + CONNECTIONS_MADE, + totalConnectionMadeBeforeTest + mkdirRequestCount, + fs.getInstrumentationMap()); + } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java index 0d904c85523e7..7a7992d9bb475 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java @@ -283,8 +283,7 @@ public static AbfsClient createTestClientFromCurrentContext( } public static AbfsClient getMockAbfsClient(AbfsClient baseAbfsClientInstance, - AbfsConfiguration abfsConfig) - throws IOException, NoSuchFieldException, IllegalAccessException { + AbfsConfiguration abfsConfig) throws Exception { AuthType currentAuthType = abfsConfig.getAuthType( abfsConfig.getAccountName()); @@ -310,47 +309,46 @@ public static AbfsClient getMockAbfsClient(AbfsClient baseAbfsClientInstance, when(client.createDefaultHeaders()).thenCallRealMethod(); // override baseurl - Field baseUrlField = AbfsClient.class.getDeclaredField("baseUrl"); - baseUrlField.setAccessible(true); - Field modifiersField = Field.class.getDeclaredField("modifiers"); - modifiersField.setAccessible(true); - modifiersField.setInt(baseUrlField, baseUrlField.getModifiers() & ~java.lang.reflect.Modifier.FINAL); - baseUrlField.set(client, baseAbfsClientInstance.getBaseUrl()); + client = TestAbfsClient.setAbfsClientField(client, "abfsConfiguration", + abfsConfig); + + // override baseurl + client = TestAbfsClient.setAbfsClientField(client, "baseUrl", + baseAbfsClientInstance.getBaseUrl()); // override auth provider if (currentAuthType == AuthType.SharedKey) { - Field sharedKeyCredsField = AbfsClient.class.getDeclaredField( - "sharedKeyCredentials"); - sharedKeyCredsField.setAccessible(true); - modifiersField.setInt(sharedKeyCredsField, - sharedKeyCredsField.getModifiers() - & ~java.lang.reflect.Modifier.FINAL); - sharedKeyCredsField.set(client, new SharedKeyCredentials( - abfsConfig.getAccountName().substring(0, - abfsConfig.getAccountName().indexOf(DOT)), - abfsConfig.getStorageAccountKey())); + client = TestAbfsClient.setAbfsClientField(client, "sharedKeyCredentials", + new SharedKeyCredentials( + abfsConfig.getAccountName().substring(0, + abfsConfig.getAccountName().indexOf(DOT)), + abfsConfig.getStorageAccountKey())); } else { - Field tokenProviderField = AbfsClient.class.getDeclaredField( - "tokenProvider"); - tokenProviderField.setAccessible(true); - modifiersField.setInt(tokenProviderField, - tokenProviderField.getModifiers() - & ~java.lang.reflect.Modifier.FINAL); - tokenProviderField.set(client, abfsConfig.getTokenProvider()); + client = TestAbfsClient.setAbfsClientField(client, "tokenProvider", + abfsConfig.getTokenProvider()); } // override user agent String userAgent = "APN/1.0 Azure Blob FS/3.4.0-SNAPSHOT (PrivateBuild " + "JavaJRE 1.8.0_252; Linux 5.3.0-59-generic/amd64; openssl-1.0; " + "UNKNOWN/UNKNOWN) MSFT"; - Field userAgentField = AbfsClient.class.getDeclaredField( - "userAgent"); - userAgentField.setAccessible(true); - modifiersField.setInt(userAgentField, - userAgentField.getModifiers() - & ~java.lang.reflect.Modifier.FINAL); - userAgentField.set(client, userAgent); + client = TestAbfsClient.setAbfsClientField(client, "userAgent", userAgent); return client; } + + private static AbfsClient setAbfsClientField( + final AbfsClient client, + final String fieldName, + Object fieldObject) throws Exception { + + Field field = AbfsClient.class.getDeclaredField(fieldName); + field.setAccessible(true); + Field modifiersField = Field.class.getDeclaredField("modifiers"); + modifiersField.setAccessible(true); + modifiersField.setInt(field, + field.getModifiers() & ~java.lang.reflect.Modifier.FINAL); + field.set(client, fieldObject); + return client; + } }