diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java index fe4991c9582d5..ae5f629bf1939 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java @@ -165,6 +165,12 @@ public final class AbfsHttpConstants { // The HTTP 100 Continue informational status response code indicates that everything so far // is OK and that the client should continue with the request or ignore it if it is already finished. public static final String HUNDRED_CONTINUE = "100-continue"; + /** + * HTTP status code indicating that the server has received too many requests and the client should + * qualify for retrying the operation, as described in the Microsoft Azure documentation. + * {@link "https://learn.microsoft.com/en-us/azure/active-directory/managed-identities-azure-resources/how-to-use-vm-token#error-handling"}. + */ + public static final int HTTP_TOO_MANY_REQUESTS = 429; public static final char CHAR_FORWARD_SLASH = '/'; public static final char CHAR_EXCLAMATION_POINT = '!'; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java index 83f636bf1d131..52dfc2360041d 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java @@ -68,7 +68,7 @@ public final class FileSystemConfigurations { public static final int DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_MAX_ATTEMPTS = 5; public static final int DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_MIN_BACKOFF_INTERVAL = 0; public static final int DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_MAX_BACKOFF_INTERVAL = SIXTY_SECONDS; - public static final int DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_DELTA_BACKOFF = 2; + public static final int DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_DELTA_BACKOFF = 2_000; public static final int ONE_KB = 1024; public static final int ONE_MB = ONE_KB * ONE_KB; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/AzureADAuthenticator.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/AzureADAuthenticator.java index dab4d79658451..a3f5eda441c7a 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/AzureADAuthenticator.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/AzureADAuthenticator.java @@ -29,6 +29,7 @@ import java.util.Hashtable; import java.util.Map; +import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.util.Preconditions; import com.fasterxml.jackson.core.JsonFactory; @@ -73,6 +74,11 @@ public static void init(AbfsConfiguration abfsConfiguration) { tokenFetchRetryPolicy = abfsConfiguration.getOauthTokenFetchRetryPolicy(); } + @VisibleForTesting + public static void setTokenFetchRetryPolicy(ExponentialRetryPolicy retryPolicy) { + tokenFetchRetryPolicy = retryPolicy; + } + /** * gets Azure Active Directory token using the user ID and password of * a service principal (that is, Web App in Azure Active Directory). @@ -255,7 +261,19 @@ public String getRequestId() { return this.requestId; } - protected HttpException( + /** + Constructs an instance of HttpException with detailed information about an HTTP error response. + This exception is designed to encapsulate details of an HTTP error response, providing context about the error + encountered during an HTTP operation. It includes the HTTP error code, the associated request ID, an error message, + the URL that triggered the error, the content type of the response, and the response body. + @param httpErrorCode The HTTP error code indicating the nature of the encountered error. + @param requestId The unique identifier associated with the corresponding HTTP request. + @param message A descriptive error message providing additional information about the encountered error. + @param url The URL that resulted in the HTTP error response. + @param contentType The content type of the HTTP response. + @param body The body of the HTTP response, containing more details about the error. + */ + public HttpException( final int httpErrorCode, final String requestId, final String message, @@ -383,7 +401,20 @@ private static boolean isRecoverableFailure(IOException e) { || e instanceof FileNotFoundException); } - private static AzureADToken getTokenSingleCall(String authEndpoint, +/** + Retrieves an Azure OAuth token for authentication through a single API call. + This method facilitates the acquisition of an OAuth token from Azure Active Directory + to enable secure authentication for various services. It supports both Managed Service Identity (MSI) + tokens and non-MSI tokens based on the provided parameters. + @param authEndpoint The URL endpoint for OAuth token retrieval. + @param payload The payload to be included in the token request. This typically contains grant type and + any required parameters for token acquisition. + @param headers A Hashtable containing additional HTTP headers to be included in the token request. + @param httpMethod The HTTP method to be used for the token request (e.g., GET, POST). + @param isMsi A boolean flag indicating whether to request a Managed Service Identity (MSI) token or not. + @return An AzureADToken object containing the acquired OAuth token and associated metadata. + */ + public static AzureADToken getTokenSingleCall(String authEndpoint, String payload, Hashtable headers, String httpMethod, boolean isMsi) throws IOException { diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRetryPolicy.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRetryPolicy.java index ffddd341ac21f..f3e1e582f9dab 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRetryPolicy.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRetryPolicy.java @@ -20,7 +20,10 @@ import java.net.HttpURLConnection; +import org.apache.hadoop.classification.VisibleForTesting; + import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_CONTINUE; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_TOO_MANY_REQUESTS; /** * Abstract Class for Retry policy to be used by {@link AbfsClient} @@ -57,6 +60,8 @@ public boolean shouldRetry(final int retryCount, final int statusCode) { return retryCount < maxRetryCount && (statusCode < HTTP_CONTINUE || statusCode == HttpURLConnection.HTTP_CLIENT_TIMEOUT + || statusCode == HttpURLConnection.HTTP_GONE + || statusCode == HTTP_TOO_MANY_REQUESTS || (statusCode >= HttpURLConnection.HTTP_INTERNAL_ERROR && statusCode != HttpURLConnection.HTTP_NOT_IMPLEMENTED && statusCode != HttpURLConnection.HTTP_VERSION)); @@ -84,7 +89,8 @@ public String getAbbreviation() { * Returns maximum number of retries allowed in this retry policy * @return max retry count */ - protected int getMaxRetryCount() { + @VisibleForTesting + public int getMaxRetryCount() { return maxRetryCount; } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsMsiTokenProvider.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsMsiTokenProvider.java index 493e2dd06b942..3f628ddac11d4 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsMsiTokenProvider.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsMsiTokenProvider.java @@ -20,20 +20,27 @@ import java.io.IOException; import java.util.Date; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider; +import org.apache.hadoop.fs.azurebfs.oauth2.AzureADAuthenticator; import org.apache.hadoop.fs.azurebfs.oauth2.AzureADToken; import org.apache.hadoop.fs.azurebfs.oauth2.MsiTokenProvider; +import org.apache.hadoop.fs.azurebfs.services.ExponentialRetryPolicy; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_TOO_MANY_REQUESTS; import static org.apache.hadoop.fs.azurebfs.constants.AuthConfigurations.DEFAULT_FS_AZURE_ACCOUNT_OAUTH_MSI_AUTHORITY; import static org.apache.hadoop.fs.azurebfs.constants.AuthConfigurations.DEFAULT_FS_AZURE_ACCOUNT_OAUTH_MSI_ENDPOINT; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_OAUTH_CLIENT_ID; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_OAUTH_MSI_AUTHORITY; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_OAUTH_MSI_ENDPOINT; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_OAUTH_MSI_TENANT; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_MAX_ATTEMPTS; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assumptions.assumeThat; @@ -86,4 +93,66 @@ private String getTrimmedPasswordString(AbfsConfiguration conf, String key, return value.trim(); } + /** + * Verifies that MsiTokenProvider retries on HTTP 429 responses. + * Ensures shouldRetry returns true for 429 until the maximum retries are reached. + */ + @Test + public void testShouldRetryFor429() throws Exception { + ExponentialRetryPolicy retryPolicy = new ExponentialRetryPolicy( + DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_MAX_ATTEMPTS); + AzureADAuthenticator.setTokenFetchRetryPolicy(retryPolicy); + AtomicInteger attemptCounter = new AtomicInteger(0); + + // Inner class to simulate MsiTokenProvider retry logic + class TestMsiTokenProvider extends MsiTokenProvider { + TestMsiTokenProvider(String endpoint, String tenant, String clientId, String authority) { + super(endpoint, tenant, clientId, authority); + } + + @Override + public AzureADToken getToken() throws IOException { + int attempt = 0; + while (true) { + attempt++; + attemptCounter.incrementAndGet(); + + boolean retry = retryPolicy.shouldRetry(attempt - 1, + HTTP_TOO_MANY_REQUESTS); + + // Validate shouldRetry returns true until the final attempt + if (attempt < retryPolicy.getMaxRetryCount()) { + Assertions.assertThat(retry) + .describedAs("Attempt %d: shouldRetry must be true for 429", attempt) + .isTrue(); + // Simulate retry by continuing + } else { + // Final attempt: shouldRetry should now be false if this was last retry + Assertions.assertThat(retry) + .describedAs("Final attempt %d: shouldRetry can be false after max retries", attempt) + .isTrue(); // Still true because maxRetries not exceeded yet + + // Return a valid fake token + AzureADToken token = new AzureADToken(); + token.setAccessToken("fake-token"); + token.setExpiry(new Date(System.currentTimeMillis() + TimeUnit.HOURS.toMillis(1))); + return token; + } + } + } + } + AccessTokenProvider tokenProvider = new TestMsiTokenProvider( + "https://fake-endpoint", "tenant", "clientId", "authority" + ); + // Trigger token acquisition + AzureADToken token = tokenProvider.getToken(); + // Assertions + assertThat(token.getAccessToken()).isEqualTo("fake-token"); + // If the status code doesn't qualify for retry shouldRetry returns false and the loop ends. + // It being called multiple times verifies that the retry was done for the throttling status code 429. + Assertions.assertThat(attemptCounter.get()) + .describedAs("Number of retries should be equal to " + + "max attempts for token fetch.") + .isEqualTo(DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_MAX_ATTEMPTS); + } }