Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,10 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_FS_AZURE_ATOMIC_RENAME_DIRECTORIES)
private String azureAtomicDirs;

@BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ENABLE_CONDITIONAL_CREATE_OVERWRITE,
DefaultValue = DEFAULT_FS_AZURE_ENABLE_CONDITIONAL_CREATE_OVERWRITE)
private boolean enableConditionalCreateOverwrite;

@StringConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_APPEND_BLOB_KEY,
DefaultValue = DEFAULT_FS_AZURE_APPEND_BLOB_DIRECTORIES)
private String azureAppendBlobDirs;
Expand Down Expand Up @@ -573,6 +577,10 @@ public String getAzureAtomicRenameDirs() {
return this.azureAtomicDirs;
}

public boolean isConditionalCreateOverwriteEnabled() {
return this.enableConditionalCreateOverwrite;
}

public String getAppendBlobDirs() {
return this.azureAppendBlobDirs;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.ConcurrentWriteOperationDetectedException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.FileSystemOperationUnhandledException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidAbfsRestOperationException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidFileSystemPropertyException;
Expand Down Expand Up @@ -464,10 +465,32 @@ public OutputStream createFile(final Path path,
isAppendBlob = true;
}

final AbfsRestOperation op = client.createPath(relativePath, true, overwrite,
isNamespaceEnabled ? getOctalNotation(permission) : null,
isNamespaceEnabled ? getOctalNotation(umask) : null,
isAppendBlob);
// if "fs.azure.enable.conditional.create.overwrite" is enabled and
// is a create request with overwrite=true, create will follow different
// flow.
boolean triggerConditionalCreateOverwrite = false;
if (overwrite
&& abfsConfiguration.isConditionalCreateOverwriteEnabled()) {
triggerConditionalCreateOverwrite = true;
}

AbfsRestOperation op;
if (triggerConditionalCreateOverwrite) {
op = conditionalCreateOverwriteFile(relativePath,
statistics,
isNamespaceEnabled ? getOctalNotation(permission) : null,
isNamespaceEnabled ? getOctalNotation(umask) : null,
isAppendBlob
);

} else {
op = client.createPath(relativePath, true,
overwrite,
isNamespaceEnabled ? getOctalNotation(permission) : null,
isNamespaceEnabled ? getOctalNotation(umask) : null,
isAppendBlob,
null);
}
perfInfo.registerResult(op.getResult()).registerSuccess(true);

return new AbfsOutputStream(
Expand All @@ -479,6 +502,74 @@ public OutputStream createFile(final Path path,
}
}

/**
* Conditional create overwrite flow ensures that create overwrites is done
* only if there is match for eTag of existing file.
* @param relativePath
* @param statistics
* @param permission
* @param umask
* @param isAppendBlob
* @return
* @throws AzureBlobFileSystemException
*/
private AbfsRestOperation conditionalCreateOverwriteFile(final String relativePath,
final FileSystem.Statistics statistics,
final String permission,
final String umask,
final boolean isAppendBlob) throws AzureBlobFileSystemException {
AbfsRestOperation op;

try {
// Trigger a create with overwrite=false first so that eTag fetch can be
// avoided for cases when no pre-existing file is present (major portion
// of create file traffic falls into the case of no pre-existing file).
op = client.createPath(relativePath, true,
false, permission, umask, isAppendBlob, null);
} catch (AbfsRestOperationException e) {
if (e.getStatusCode() == HttpURLConnection.HTTP_CONFLICT) {
// File pre-exists, fetch eTag
try {
op = client.getPathStatus(relativePath, false);
} catch (AbfsRestOperationException ex) {
if (ex.getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) {
// Is a parallel access case, as file which was found to be
// present went missing by this request.
throw new ConcurrentWriteOperationDetectedException(
"Parallel access to the create path detected. Failing request "
+ "to honor single writer semantics");
} else {
throw ex;
}
}

String eTag = op.getResult()
.getResponseHeader(HttpHeaderConfigurations.ETAG);

try {
// overwrite only if eTag matches with the file properties fetched befpre
op = client.createPath(relativePath, true,
true, permission, umask, isAppendBlob, eTag);
} catch (AbfsRestOperationException ex) {
if (ex.getStatusCode() == HttpURLConnection.HTTP_PRECON_FAILED) {
// Is a parallel access case, as file with eTag was just queried
// and precondition failure can happen only when another file with
// different etag got created.
throw new ConcurrentWriteOperationDetectedException(
"Parallel access to the create path detected. Failing request "
+ "to honor single writer semantics");
} else {
throw ex;
}
}
} else {
throw e;
}
}

return op;
}

private AbfsOutputStreamContext populateAbfsOutputStreamContext(boolean isAppendBlob) {
int bufferSize = abfsConfiguration.getWriteBufferSize();
if (isAppendBlob && bufferSize > FileSystemConfigurations.APPENDBLOB_MAX_WRITE_BUFFER_SIZE) {
Expand Down Expand Up @@ -508,7 +599,7 @@ public void createDirectory(final Path path, final FsPermission permission, fina

final AbfsRestOperation op = client.createPath(getRelativePath(path), false, true,
isNamespaceEnabled ? getOctalNotation(permission) : null,
isNamespaceEnabled ? getOctalNotation(umask) : null, false);
isNamespaceEnabled ? getOctalNotation(umask) : null, false, null);
perfInfo.registerResult(op.getResult()).registerSuccess(true);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ public final class ConfigurationKeys {
public static final String FS_AZURE_ENABLE_AUTOTHROTTLING = "fs.azure.enable.autothrottling";
public static final String FS_AZURE_ALWAYS_USE_HTTPS = "fs.azure.always.use.https";
public static final String FS_AZURE_ATOMIC_RENAME_KEY = "fs.azure.atomic.rename.key";
/** This config ensures that during create overwrite an existing file will be
* overwritten only if there is a match on the eTag of existing file.
*/
public static final String FS_AZURE_ENABLE_CONDITIONAL_CREATE_OVERWRITE = "fs.azure.enable.conditional.create.overwrite";
/** Provides a config to provide comma separated path prefixes on which Appendblob based files are created
* Default is empty. **/
public static final String FS_AZURE_APPEND_BLOB_KEY = "fs.azure.appendblob.directories";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public final class FileSystemConfigurations {
public static final boolean DEFAULT_AZURE_SKIP_USER_GROUP_METADATA_DURING_INITIALIZATION = false;

public static final String DEFAULT_FS_AZURE_ATOMIC_RENAME_DIRECTORIES = "/hbase";
public static final boolean DEFAULT_FS_AZURE_ENABLE_CONDITIONAL_CREATE_OVERWRITE = true;
public static final String DEFAULT_FS_AZURE_APPEND_BLOB_DIRECTORIES = "";

public static final int DEFAULT_READ_AHEAD_QUEUE_DEPTH = -1;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs.azurebfs.contracts.exceptions;

/**
* Thrown when a concurrent write operation is detected.
*/
@org.apache.hadoop.classification.InterfaceAudience.Public
@org.apache.hadoop.classification.InterfaceStability.Evolving
public class ConcurrentWriteOperationDetectedException
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HDFS would simply acquire a lease and overwrite the file or fail to acquire the lease and throw an IOException, so what we're really pointing out here is the need for Azure Blob Storage to support lease atomically on file creation and for ABFS to use leases when writing to files so that it can uphold the single writer semantics. We knew this was needed from the beginning but the work has not been done.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems we should be doing something like https://issues.apache.org/jira/browse/HADOOP-16948 instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will check on what the scope for lease support is on server and how it can be extended to support this PR scenario when we plan for adopting the new lease related changes in server.

extends AzureBlobFileSystemException {

public ConcurrentWriteOperationDetectedException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ public AbfsRestOperation deleteFilesystem() throws AzureBlobFileSystemException

public AbfsRestOperation createPath(final String path, final boolean isFile, final boolean overwrite,
final String permission, final String umask,
final boolean isAppendBlob) throws AzureBlobFileSystemException {
final boolean isAppendBlob, final String eTag) throws AzureBlobFileSystemException {
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
if (!overwrite) {
requestHeaders.add(new AbfsHttpHeader(IF_NONE_MATCH, AbfsHttpConstants.STAR));
Expand All @@ -279,6 +279,10 @@ public AbfsRestOperation createPath(final String path, final boolean isFile, fin
requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_UMASK, umask));
}

if (eTag != null && !eTag.isEmpty()) {
requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.IF_MATCH, eTag));
}

final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, isFile ? FILE : DIRECTORY);
if (isAppendBlob) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,18 @@ public void testAbfsHttpSendStatistics() throws IOException {
connectionsMade++;
requestsSent++;


try (AbfsOutputStream out = createAbfsOutputStreamWithFlushEnabled(fs,
sendRequestPath)) {

// Is a file overwrite case
long createRequestCalls = 1;
long createTriggeredGFSForETag = 0;
if (this.getConfiguration().isConditionalCreateOverwriteEnabled()) {
createRequestCalls += 1;
createTriggeredGFSForETag = 1;
}

for (int i = 0; i < LARGE_OPERATIONS; i++) {
out.write(testNetworkStatsString.getBytes());

Expand Down Expand Up @@ -141,17 +150,20 @@ public void testAbfsHttpSendStatistics() throws IOException {
* wrote each time).
*
*/

connectionsMade += createRequestCalls + createTriggeredGFSForETag;
requestsSent += createRequestCalls;
if (fs.getAbfsStore().isAppendBlobKey(fs.makeQualified(sendRequestPath).toString())) {
// no network calls are made for hflush in case of appendblob
assertAbfsStatistics(CONNECTIONS_MADE,
connectionsMade + 1 + LARGE_OPERATIONS, metricMap);
connectionsMade + LARGE_OPERATIONS, metricMap);
assertAbfsStatistics(SEND_REQUESTS,
requestsSent + 1 + LARGE_OPERATIONS, metricMap);
requestsSent + LARGE_OPERATIONS, metricMap);
} else {
assertAbfsStatistics(CONNECTIONS_MADE,
connectionsMade + 1 + LARGE_OPERATIONS * 2, metricMap);
connectionsMade + LARGE_OPERATIONS * 2, metricMap);
assertAbfsStatistics(SEND_REQUESTS,
requestsSent + 1 + LARGE_OPERATIONS * 2, metricMap);
requestsSent + LARGE_OPERATIONS * 2, metricMap);
}
assertAbfsStatistics(AbfsStatistic.BYTES_SENT,
bytesSent + LARGE_OPERATIONS * (testNetworkStatsString.getBytes().length),
Expand Down Expand Up @@ -237,13 +249,21 @@ public void testAbfsHttpResponseStatistics() throws IOException {
try {

/*
* Creating a file and writing buffer into it. Also recording the
* buffer for future read() call.
* Creating a file and writing buffer into it.
* This is a file recreate, so it will trigger
* 2 extra calls if create overwrite is off by default.
* Also recording the buffer for future read() call.
* This creating outputStream and writing requires 2 *
* (LARGE_OPERATIONS) get requests.
*/
StringBuilder largeBuffer = new StringBuilder();
out = fs.create(getResponsePath);

long createRequestCalls = 1;
if (this.getConfiguration().isConditionalCreateOverwriteEnabled()) {
createRequestCalls += 2;
}

for (int i = 0; i < LARGE_OPERATIONS; i++) {
out.write(testResponseString.getBytes());
out.hflush();
Expand All @@ -268,7 +288,8 @@ public void testAbfsHttpResponseStatistics() throws IOException {
*
* get_response : get_responses(Last assertion) + 1
* (OutputStream) + 2 * LARGE_OPERATIONS(Writing and flushing
* LARGE_OPERATIONS times) + 1(open()) + 1(read()).
* LARGE_OPERATIONS times) + 1(open()) + 1(read()) +
* 1 (createOverwriteTriggeredGetForeTag).
*
* bytes_received : bytes_received(Last assertion) + LARGE_OPERATIONS *
* bytes wrote each time (bytes_received is equal to bytes wrote in the
Expand All @@ -284,7 +305,8 @@ public void testAbfsHttpResponseStatistics() throws IOException {
getResponses + 3 + LARGE_OPERATIONS, metricMap);
} else {
assertAbfsStatistics(AbfsStatistic.GET_RESPONSES,
getResponses + 3 + 2 * LARGE_OPERATIONS, metricMap);
getResponses + 2 + createRequestCalls + 2 * LARGE_OPERATIONS,
metricMap);
}

} finally {
Expand Down Expand Up @@ -319,4 +341,4 @@ public void testAbfsHttpResponseFailure() throws IOException {
IOUtils.cleanupWithLogger(LOG, out);
}
}
}
}
Loading