Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@
<suppressions>
<suppress checks="ParameterNumber|MagicNumber"
files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]AzureBlobFileSystemStore.java"/>
<suppress checks="ParameterNumber"
files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]services[\\/]AbfsBlobClient.java"/>
<suppress checks="ParameterNumber"
files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]services[\\/]AbfsClient.java"/>
<suppress checks="ParameterNumber"
files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]contracts[\\/]services[\\/]AppendRequestParameters.java"/>
<suppress checks="ParameterNumber|MagicNumber"
files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]utils[\\/]Base64.java"/>
<suppress checks="ParameterNumber|VisibilityModifier"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1870,7 +1870,7 @@ private FSDataOutputStream create(Path f, FsPermission permission,
* @return the output stream used to write data into the newly created file .
* @throws IOException if an IO error occurs while attempting to delete the
* path.
*
* @throws FileAlreadyExistsException if file already exists at path.
*/
protected FSDataOutputStream createInternal(Path f, FsPermission permission,
boolean overwrite,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
import org.apache.hadoop.fs.azurebfs.constants.AbfsServiceType;
import org.apache.hadoop.fs.azurebfs.constants.AuthConfigurations;
import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.IntegerConfigurationValidatorAnnotation;
import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.IntegerWithOutlierConfigurationValidatorAnnotation;
Expand Down Expand Up @@ -85,13 +86,18 @@ public class AbfsConfiguration{

private final Configuration rawConfig;
private final String accountName;
private final AbfsServiceType fsConfiguredServiceType;
private final boolean isSecure;
private static final Logger LOG = LoggerFactory.getLogger(AbfsConfiguration.class);

@StringConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ACCOUNT_IS_HNS_ENABLED,
DefaultValue = DEFAULT_FS_AZURE_ACCOUNT_IS_HNS_ENABLED)
private String isNamespaceEnabledAccount;

@BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ENABLE_DFSTOBLOB_FALLBACK,
DefaultValue = DEFAULT_FS_AZURE_ENABLE_DFSTOBLOB_FALLBACK)
private boolean isDfsToBlobFallbackEnabled;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_WRITE_MAX_CONCURRENT_REQUESTS,
DefaultValue = -1)
private int writeMaxConcurrentRequestCount;
Expand Down Expand Up @@ -392,11 +398,14 @@ public class AbfsConfiguration{
private String clientProvidedEncryptionKey;
private String clientProvidedEncryptionKeySHA;

public AbfsConfiguration(final Configuration rawConfig, String accountName)
throws IllegalAccessException, InvalidConfigurationValueException, IOException {
public AbfsConfiguration(final Configuration rawConfig,
String accountName,
AbfsServiceType fsConfiguredServiceType)
throws IllegalAccessException, IOException {
this.rawConfig = ProviderUtils.excludeIncompatibleCredentialProviders(
rawConfig, AzureBlobFileSystem.class);
this.accountName = accountName;
this.fsConfiguredServiceType = fsConfiguredServiceType;
this.isSecure = getBoolean(FS_AZURE_SECURE_MODE, false);

Field[] fields = this.getClass().getDeclaredFields();
Expand All @@ -418,10 +427,42 @@ public AbfsConfiguration(final Configuration rawConfig, String accountName)
}
}

public AbfsConfiguration(final Configuration rawConfig, String accountName)
throws IllegalAccessException, IOException {
this(rawConfig, accountName, AbfsServiceType.DFS);
}

public Trilean getIsNamespaceEnabledAccount() {
return Trilean.getTrilean(isNamespaceEnabledAccount);
}

public AbfsServiceType getFsConfiguredServiceType() {
return getEnum(FS_AZURE_FNS_ACCOUNT_SERVICE_TYPE, fsConfiguredServiceType);
}

/**
* Get the service type to be used for Ingress Operations.
* Default value is the same as the service type configured for the file system.
* @return the service type.
*/
public AbfsServiceType getIngressServiceType() {
return getEnum(FS_AZURE_INGRESS_SERVICE_TYPE, getFsConfiguredServiceType());
}

public boolean isDfsToBlobFallbackEnabled() {
return isDfsToBlobFallbackEnabled;
}

/**
* Returns whether the Blob client initialization is required based on the configurations.
* @return true if blob client initialization is required, false otherwise
*/
public boolean isBlobClientInitRequired() {
return getFsConfiguredServiceType() == AbfsServiceType.BLOB
|| getIngressServiceType() == AbfsServiceType.BLOB
|| isDfsToBlobFallbackEnabled();
}

/**
* Gets the Azure Storage account name corresponding to this instance of configuration.
* @return the Azure Storage account name
Expand Down Expand Up @@ -462,6 +503,7 @@ public String get(String key) {
* Returns the account-specific value if it exists, then looks for an
* account-agnostic value.
* @param key Account-agnostic configuration key
* @param defaultValue Value returned if not configured
* @return value if one exists, else the default value
*/
public String getString(String key, String defaultValue) {
Expand Down Expand Up @@ -495,7 +537,7 @@ public long getLong(String key, long defaultValue) {
* looks for an account-agnostic value.
* @param key Account-agnostic configuration key
* @return value in String form if one exists, else null
* @throws IOException
* @throws IOException if getPassword fails
*/
public String getPasswordString(String key) throws IOException {
char[] passchars = rawConfig.getPassword(accountConf(key));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,8 @@ public void initialize(URI uri, Configuration configuration)
}
}

abfsStore.validateConfiguredServiceType(getFsInitTracingContext());

LOG.trace("Initiate check for delegation token manager");
if (UserGroupInformation.isSecurityEnabled()) {
this.delegationTokenEnabled = abfsConfiguration.isDelegationTokenManagerEnabled();
Expand Down Expand Up @@ -1442,6 +1444,11 @@ private boolean isAbfsScheme(final String scheme) {
return false;
}

private TracingContext getFsInitTracingContext() {
return new TracingContext(clientCorrelationId, fileSystemId,
FSOperationType.INIT, tracingHeaderFormat, listener);
}

@VisibleForTesting
<T> FileSystemOperation<T> execute(
final String scopeDescription,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,15 @@
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.azurebfs.constants.AbfsServiceType;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidConfigurationValueException;
import org.apache.hadoop.fs.azurebfs.extensions.EncryptionContextProvider;
import org.apache.hadoop.fs.azurebfs.security.ContextProviderEncryptionAdapter;
import org.apache.hadoop.fs.azurebfs.security.ContextEncryptionAdapter;
import org.apache.hadoop.fs.azurebfs.security.NoContextEncryptionAdapter;
import org.apache.hadoop.fs.azurebfs.services.AbfsBlobClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsClientHandler;
import org.apache.hadoop.fs.azurebfs.services.AbfsDfsClient;
import org.apache.hadoop.fs.azurebfs.utils.EncryptionType;
import org.apache.hadoop.fs.azurebfs.utils.NamespaceUtil;
import org.apache.hadoop.fs.impl.BackReference;
Expand Down Expand Up @@ -142,6 +147,7 @@
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.http.client.utils.URIBuilder;

import static org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.METADATA_INCOMPLETE_RENAME_FAILURES;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.RENAME_RECOVERY;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_EQUALS;
Expand All @@ -158,6 +164,7 @@
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_ABFS_ENDPOINT;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_FOOTER_READ_BUFFER_SIZE;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BUFFERED_PREAD_DISABLE;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_FNS_ACCOUNT_SERVICE_TYPE;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_IDENTITY_TRANSFORM_CLASS;
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_ENCRYPTION_CONTEXT;

Expand All @@ -170,6 +177,7 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
private static final Logger LOG = LoggerFactory.getLogger(AzureBlobFileSystemStore.class);

private AbfsClient client;
private AbfsClientHandler clientHandler;
private URI uri;
private String userName;
private String primaryUserGroup;
Expand Down Expand Up @@ -222,7 +230,8 @@ public AzureBlobFileSystemStore(
leaseRefs = Collections.synchronizedMap(new WeakHashMap<>());

try {
this.abfsConfiguration = new AbfsConfiguration(abfsStoreBuilder.configuration, accountName);
this.abfsConfiguration = new AbfsConfiguration(
abfsStoreBuilder.configuration, accountName, identifyAbfsServiceTypeFromUrl());
} catch (IllegalAccessException exception) {
throw new FileSystemOperationUnhandledException(exception);
}
Expand Down Expand Up @@ -284,9 +293,26 @@ public AzureBlobFileSystemStore(
"abfs-bounded");
}

public void validateConfiguredServiceType(TracingContext tracingContext)
throws AzureBlobFileSystemException {
// Todo: [FnsOverBlob] - Remove this check, Failing FS Init with Blob Endpoint Until FNS over Blob is ready.
if (getConfiguredServiceType() == AbfsServiceType.BLOB) {
throw new InvalidConfigurationValueException(FS_DEFAULT_NAME_KEY);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should also inform what is wrong with the config.

}
if (getIsNamespaceEnabled(tracingContext) && getConfiguredServiceType() == AbfsServiceType.BLOB) {
// This could be because of either wrongly configured url or wrongly configured fns service type.
if (identifyAbfsServiceTypeFromUrl() == AbfsServiceType.BLOB) {
throw new InvalidConfigurationValueException(FS_DEFAULT_NAME_KEY, "Wrong Domain Suffix for HNS Account");
}
throw new InvalidConfigurationValueException(FS_AZURE_FNS_ACCOUNT_SERVICE_TYPE, "Wrong Service Type for HNS Accounts");
}
}

/**
* Checks if the given key in Azure Storage should be stored as a page
* blob instead of block blob.
* @param key The key to check.
* @return True if the key should be stored as a page blob, false otherwise.
*/
public boolean isAppendBlobKey(String key) {
return isKeyForDirectorySet(key, appendBlobDirSet);
Expand Down Expand Up @@ -1760,19 +1786,46 @@ private void initializeClient(URI uri, String fileSystemName,
}

LOG.trace("Initializing AbfsClient for {}", baseUrl);
AbfsDfsClient dfsClient = null;
AbfsBlobClient blobClient = null;
if (tokenProvider != null) {
this.client = new AbfsClient(baseUrl, creds, abfsConfiguration,
dfsClient = new AbfsDfsClient(baseUrl, creds, abfsConfiguration,
tokenProvider, encryptionContextProvider,
populateAbfsClientContext());
blobClient = abfsConfiguration.isBlobClientInitRequired()
? new AbfsBlobClient(baseUrl, creds, abfsConfiguration, tokenProvider,
encryptionContextProvider, populateAbfsClientContext())
: null;
} else {
this.client = new AbfsClient(baseUrl, creds, abfsConfiguration,
dfsClient = new AbfsDfsClient(baseUrl, creds, abfsConfiguration,
sasTokenProvider, encryptionContextProvider,
populateAbfsClientContext());
blobClient = abfsConfiguration.isBlobClientInitRequired()
? new AbfsBlobClient(baseUrl, creds, abfsConfiguration, sasTokenProvider,
encryptionContextProvider, populateAbfsClientContext())
: null;
}

this.clientHandler = new AbfsClientHandler(getConfiguredServiceType(),
dfsClient, blobClient);
this.client = clientHandler.getClient();

LOG.trace("AbfsClient init complete");
}

private AbfsServiceType identifyAbfsServiceTypeFromUrl() {
if (uri.toString().contains(FileSystemUriSchemes.ABFS_BLOB_DOMAIN_NAME)) {
return AbfsServiceType.BLOB;
}
// In case of DFS Domain name or any other custom endpoint, the service
// type is to be identified as default DFS.
return AbfsServiceType.DFS;
}

private AbfsServiceType getConfiguredServiceType() {
return abfsConfiguration.getFsConfiguredServiceType();
}

/**
* Populate a new AbfsClientContext instance with the desired properties.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,15 @@ public final class AbfsHttpConstants {
public static final String APPEND_BLOB_TYPE = "appendblob";
public static final String TOKEN_VERSION = "2";

public static final String CONTAINER = "container";
public static final String METADATA = "metadata";
public static final String LIST = "list";
public static final String BLOCK = "block";
public static final String BLOCKLIST = "blocklist";
public static final String LEASE = "lease";
public static final String BLOCK_BLOB_TYPE = "BlockBlob";
public static final String BLOCK_TYPE_COMMITTED = "committed";

public static final String JAVA_VENDOR = "java.vendor";
public static final String JAVA_VERSION = "java.version";
public static final String OS_NAME = "os.name";
Expand Down Expand Up @@ -88,6 +97,7 @@ public final class AbfsHttpConstants {
public static final String HTTP_HEADER_PREFIX = "x-ms-";
public static final String HASH = "#";
public static final String TRUE = "true";
public static final String ZERO = "0";

public static final String PLUS_ENCODE = "%20";
public static final String FORWARD_SLASH_ENCODE = "%2F";
Expand All @@ -97,6 +107,7 @@ public final class AbfsHttpConstants {
public static final String GMT_TIMEZONE = "GMT";
public static final String APPLICATION_JSON = "application/json";
public static final String APPLICATION_OCTET_STREAM = "application/octet-stream";
public static final String APPLICATION_XML = "application/xml";

public static final String ROOT_PATH = "/";
public static final String ACCESS_MASK = "mask:";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs.azurebfs.constants;

/**
* Azure Storage Offers two sets of Rest APIs for interacting with the storage account.
* <ol>
* <li>Blob Rest API: <a href = https://learn.microsoft.com/en-us/rest/api/storageservices/blob-service-rest-api></a></li>
* <li>Data Lake Rest API: <a href = https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/operation-groups></a></li>
* </ol>
*/
public enum AbfsServiceType {
DFS,
BLOB;
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,28 @@ public final class ConfigurationKeys {
* path to determine HNS status.
*/
public static final String FS_AZURE_ACCOUNT_IS_HNS_ENABLED = "fs.azure.account.hns.enabled";

/**
* Config to specify which {@link AbfsServiceType} to use with HNS-Disabled Account type.
* Recommendation is to always use Blob Endpoint with HNS-Disabled Account type.
* This will override service endpoint configured in "fs.defaultFS".
* Value {@value} case-insensitive "DFS" or "BLOB".
*/
public static final String FS_AZURE_FNS_ACCOUNT_SERVICE_TYPE = "fs.azure.fns.account.service.type";

/**
* Config to specify which {@link AbfsServiceType} to use only for Ingress Operations.
* Other operations will continue to move to the configured service endpoint.
* Value {@value} case-insensitive "DFS" or "BLOB".
*/
public static final String FS_AZURE_INGRESS_SERVICE_TYPE = "fs.azure.ingress.service.type";

/**
* Config to be set only for cases where traffic over dfs endpoint is experiencing compatibility issues.
* Value {@value} case-insensitive "True" or "False".
*/
public static final String FS_AZURE_ENABLE_DFSTOBLOB_FALLBACK = "fs.azure.enable.dfstoblob.fallback";

/**
* Enable or disable expect hundred continue header.
* Value: {@value}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ public enum FSOperationType {
SET_OWNER("SO"),
SET_ACL("SA"),
TEST_OP("TS"),
WRITE("WR");
WRITE("WR"),
INIT("IN");

private final String opCode;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
public final class FileSystemConfigurations {

public static final String DEFAULT_FS_AZURE_ACCOUNT_IS_HNS_ENABLED = "";
public static final boolean DEFAULT_FS_AZURE_ENABLE_DFSTOBLOB_FALLBACK = false;
public static final boolean DEFAULT_FS_AZURE_ACCOUNT_IS_EXPECT_HEADER_ENABLED = true;
public static final String USER_HOME_DIRECTORY_PREFIX = "/user";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,8 @@ public final class FileSystemUriSchemes {
public static final String WASB_SECURE_SCHEME = "wasbs";
public static final String WASB_DNS_PREFIX = "blob";

public static final String ABFS_DFS_DOMAIN_NAME = "dfs.core.windows.net";
public static final String ABFS_BLOB_DOMAIN_NAME = "blob.core.windows.net";

private FileSystemUriSchemes() {}
}
}
Loading