-
Notifications
You must be signed in to change notification settings - Fork 22
Add standard mode retries #545
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: standard-retries
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -1,5 +1,6 @@ | ||||||
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||||||
# SPDX-License-Identifier: Apache-2.0 | ||||||
import asyncio | ||||||
import random | ||||||
from collections.abc import Callable | ||||||
from dataclasses import dataclass | ||||||
|
@@ -204,7 +205,7 @@ def __init__( | |||||
self.backoff_strategy = backoff_strategy or ExponentialRetryBackoffStrategy() | ||||||
self.max_attempts = max_attempts | ||||||
|
||||||
def acquire_initial_retry_token( | ||||||
async def acquire_initial_retry_token( | ||||||
self, *, token_scope: str | None = None | ||||||
) -> SimpleRetryToken: | ||||||
"""Called before any retries (for the first attempt at the operation). | ||||||
|
@@ -214,7 +215,7 @@ def acquire_initial_retry_token( | |||||
retry_delay = self.backoff_strategy.compute_next_backoff_delay(0) | ||||||
return SimpleRetryToken(retry_count=0, retry_delay=retry_delay) | ||||||
|
||||||
def refresh_retry_token_for_retry( | ||||||
async def refresh_retry_token_for_retry( | ||||||
self, | ||||||
*, | ||||||
token_to_renew: retries_interface.RetryToken, | ||||||
|
@@ -240,5 +241,158 @@ def refresh_retry_token_for_retry( | |||||
else: | ||||||
raise RetryError(f"Error is not retryable: {error}") from error | ||||||
|
||||||
def record_success(self, *, token: retries_interface.RetryToken) -> None: | ||||||
async def record_success(self, *, token: retries_interface.RetryToken) -> None: | ||||||
"""Not used by this retry strategy.""" | ||||||
|
||||||
|
||||||
@dataclass(kw_only=True) | ||||||
class StandardRetryToken: | ||||||
retry_count: int | ||||||
"""Retry count is the total number of attempts minus the initial attempt.""" | ||||||
|
||||||
retry_delay: float | ||||||
"""Delay in seconds to wait before the retry attempt.""" | ||||||
|
||||||
quota_consumed: int = 0 | ||||||
"""The total amount of quota consumed.""" | ||||||
|
||||||
last_quota_acquired: int = 0 | ||||||
"""The amount of last quota acquired.""" | ||||||
|
||||||
|
||||||
class StandardRetryStrategy(retries_interface.RetryStrategy): | ||||||
def __init__(self, *, max_attempts: int = 3): | ||||||
"""Standard retry strategy using truncated binary exponential backoff with full | ||||||
jitter. | ||||||
|
||||||
:param max_attempts: Upper limit on total number of attempts made, including | ||||||
initial attempt and retries. | ||||||
""" | ||||||
self.backoff_strategy = ExponentialRetryBackoffStrategy( | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I know the |
||||||
backoff_scale_value=1, | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit - Not really relevant to this PR, but we should consider adding some simple validation for these values on initialization (e.g., ensure they're positive integers). |
||||||
jitter_type=ExponentialBackoffJitterType.FULL, | ||||||
) | ||||||
self.max_attempts = max_attempts | ||||||
self._retry_quota = StandardRetryQuota() | ||||||
|
||||||
async def acquire_initial_retry_token( | ||||||
self, *, token_scope: str | None = None | ||||||
) -> StandardRetryToken: | ||||||
"""Called before any retries (for the first attempt at the operation). | ||||||
|
||||||
:param token_scope: This argument is ignored by this retry strategy. | ||||||
""" | ||||||
retry_delay = self.backoff_strategy.compute_next_backoff_delay(0) | ||||||
return StandardRetryToken(retry_count=0, retry_delay=retry_delay) | ||||||
|
||||||
async def refresh_retry_token_for_retry( | ||||||
self, | ||||||
*, | ||||||
token_to_renew: StandardRetryToken, | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is going to result in the following typechecker errors:
I think we need to be using
Suggested change
Making this change alone will result in even more typing errors because
|
||||||
error: Exception, | ||||||
) -> StandardRetryToken: | ||||||
"""Replace an existing retry token from a failed attempt with a new token. | ||||||
|
||||||
This retry strategy always returns a token until the attempt count stored in | ||||||
the new token exceeds the ``max_attempts`` value. | ||||||
|
||||||
:param token_to_renew: The token used for the previous failed attempt. | ||||||
:param error: The error that triggered the need for a retry. | ||||||
:raises RetryError: If no further retry attempts are allowed. | ||||||
""" | ||||||
if isinstance(error, retries_interface.ErrorRetryInfo) and error.is_retry_safe: | ||||||
retry_count = token_to_renew.retry_count + 1 | ||||||
if retry_count >= self.max_attempts: | ||||||
raise RetryError( | ||||||
f"Reached maximum number of allowed attempts: {self.max_attempts}" | ||||||
) from error | ||||||
|
||||||
# Acquire additional quota for this retry attempt | ||||||
# (may raise a RetryError if none is available) | ||||||
quota_acquired = await self._retry_quota.acquire(error=error) | ||||||
total_quota = token_to_renew.quota_consumed + quota_acquired | ||||||
|
||||||
if error.retry_after is not None: | ||||||
retry_delay = error.retry_after | ||||||
else: | ||||||
retry_delay = self.backoff_strategy.compute_next_backoff_delay( | ||||||
retry_count | ||||||
) | ||||||
|
||||||
return StandardRetryToken( | ||||||
retry_count=retry_count, | ||||||
retry_delay=retry_delay, | ||||||
quota_consumed=total_quota, | ||||||
last_quota_acquired=quota_acquired, | ||||||
) | ||||||
else: | ||||||
raise RetryError(f"Error is not retryable: {error}") from error | ||||||
|
||||||
async def record_success(self, *, token: StandardRetryToken) -> None: | ||||||
"""Return token after successful completion of an operation. | ||||||
|
||||||
Releases retry tokens back to the retry quota based on the previous amount | ||||||
consumed. | ||||||
|
||||||
:param token: The token used for the previous successful attempt. | ||||||
""" | ||||||
await self._retry_quota.release(release_amount=token.last_quota_acquired) | ||||||
|
||||||
|
||||||
class StandardRetryQuota: | ||||||
"""Retry quota used by :py:class:`StandardRetryStrategy`.""" | ||||||
|
||||||
INITIAL_RETRY_TOKENS = 500 | ||||||
RETRY_COST = 5 | ||||||
NO_RETRY_INCREMENT = 1 | ||||||
TIMEOUT_RETRY_COST = 10 | ||||||
|
||||||
def __init__(self): | ||||||
self._max_capacity = self.INITIAL_RETRY_TOKENS | ||||||
self._available_capacity = self.INITIAL_RETRY_TOKENS | ||||||
self._lock = asyncio.Lock() | ||||||
|
||||||
async def acquire(self, *, error: Exception) -> int: | ||||||
"""Attempt to acquire a certain amount of capacity. | ||||||
|
||||||
If there's no sufficient amount of capacity available, raise an exception. | ||||||
Otherwise, we return the amount of capacity successfully allocated. | ||||||
""" | ||||||
# TODO: update `is_timeout` when `is_timeout_error` is implemented | ||||||
is_timeout = False | ||||||
capacity_amount = self.TIMEOUT_RETRY_COST if is_timeout else self.RETRY_COST | ||||||
|
||||||
async with self._lock: | ||||||
if capacity_amount > self._available_capacity: | ||||||
raise RetryError("Retry quota exceeded") | ||||||
self._available_capacity -= capacity_amount | ||||||
return capacity_amount | ||||||
|
||||||
async def release(self, *, release_amount: int) -> None: | ||||||
"""Release capacity back to the retry quota. | ||||||
|
||||||
The capacity being released will be truncated if necessary to ensure the max | ||||||
capacity is never exceeded. | ||||||
""" | ||||||
increment = self.NO_RETRY_INCREMENT if release_amount == 0 else release_amount | ||||||
|
||||||
if self._available_capacity == self._max_capacity: | ||||||
return | ||||||
|
||||||
async with self._lock: | ||||||
self._available_capacity = min( | ||||||
self._available_capacity + increment, self._max_capacity | ||||||
) | ||||||
|
||||||
|
||||||
class RetryStrategyMode(Enum): | ||||||
"""Enumeration of available retry strategies.""" | ||||||
|
||||||
SIMPLE = "simple" | ||||||
STANDARD = "standard" | ||||||
|
||||||
|
||||||
RETRY_MODE_MAP = { | ||||||
RetryStrategyMode.SIMPLE: SimpleRetryStrategy, | ||||||
RetryStrategyMode.STANDARD: StandardRetryStrategy, | ||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this considered a feature or breaking change since we're updating a default?