Skip to content
Merged
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,4 @@ changelog:
docker run -v "${PWD}":/workdir quay.io/git-chglog/git-chglog $$(git describe --abbrev=0 --tag).. > TMP_CHANGELOG.md

mypy:
poetry run mypy aws_lambda_powertools
poetry run mypy --pretty aws_lambda_powertools
4 changes: 2 additions & 2 deletions aws_lambda_powertools/utilities/feature_toggles/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@
"""
from .appconfig_fetcher import AppConfigFetcher
from .configuration_store import ConfigurationStore
from .exceptions import ConfigurationException
from .exceptions import ConfigurationError
from .schema import ACTION, SchemaValidator
from .schema_fetcher import SchemaFetcher

__all__ = [
"ConfigurationException",
"ConfigurationError",
"ConfigurationStore",
"ACTION",
"SchemaValidator",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from aws_lambda_powertools.utilities.parameters import AppConfigProvider, GetParameterError, TransformParameterError

from .exceptions import ConfigurationException
from .exceptions import ConfigurationError
from .schema_fetcher import SchemaFetcher

logger = logging.getLogger(__name__)
Expand All @@ -25,12 +25,18 @@ def __init__(
):
"""This class fetches JSON schemas from AWS AppConfig

Args:
environment (str): what appconfig environment to use 'dev/test' etc.
service (str): what service name to use from the supplied environment
configuration_name (str): what configuration to take from the environment & service combination
cache_seconds (int): cache expiration time, how often to call AppConfig to fetch latest configuration
config (Optional[Config]): boto3 client configuration
Parameters
----------
environment: str
what appconfig environment to use 'dev/test' etc.
service: str
what service name to use from the supplied environment
configuration_name: str
what configuration to take from the environment & service combination
cache_seconds: int
cache expiration time, how often to call AppConfig to fetch latest configuration
config: Optional[Config]
boto3 client configuration
"""
super().__init__(configuration_name, cache_seconds)
self._logger = logger
Expand All @@ -39,11 +45,15 @@ def __init__(
def get_json_configuration(self) -> Dict[str, Any]:
"""Get configuration string from AWs AppConfig and return the parsed JSON dictionary

Raises:
ConfigurationException: Any validation error or appconfig error that can occur
Raises
------
ConfigurationError
Any validation error or appconfig error that can occur

Returns:
Dict[str, Any]: parsed JSON dictionary
Returns
-------
Dict[str, Any]
parsed JSON dictionary
"""
try:
return self._conf_store.get(
Expand All @@ -54,4 +64,4 @@ def get_json_configuration(self) -> Dict[str, Any]:
except (GetParameterError, TransformParameterError) as exc:
error_str = f"unable to get AWS AppConfig configuration file, exception={str(exc)}"
self._logger.error(error_str)
raise ConfigurationException(error_str)
raise ConfigurationError(error_str)
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import logging
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, Optional, cast

from . import schema
from .exceptions import ConfigurationException
from .exceptions import ConfigurationError
from .schema_fetcher import SchemaFetcher

logger = logging.getLogger(__name__)
Expand All @@ -12,8 +12,10 @@ class ConfigurationStore:
def __init__(self, schema_fetcher: SchemaFetcher):
"""constructor

Args:
schema_fetcher (SchemaFetcher): A schema JSON fetcher, can be AWS AppConfig, Hashicorp Consul etc.
Parameters
----------
schema_fetcher: SchemaFetcher
A schema JSON fetcher, can be AWS AppConfig, Hashicorp Consul etc.
"""
self._logger = logger
self._schema_fetcher = schema_fetcher
Expand All @@ -39,25 +41,28 @@ def _match_by_action(self, action: str, condition_value: Any, context_value: Any
def _is_rule_matched(self, feature_name: str, rule: Dict[str, Any], rules_context: Dict[str, Any]) -> bool:
rule_name = rule.get(schema.RULE_NAME_KEY, "")
rule_default_value = rule.get(schema.RULE_DEFAULT_VALUE)
conditions: Dict[str, str] = rule.get(schema.CONDITIONS_KEY)
conditions = cast(List[Dict], rule.get(schema.CONDITIONS_KEY))

for condition in conditions:
context_value = rules_context.get(condition.get(schema.CONDITION_KEY))
context_value = rules_context.get(str(condition.get(schema.CONDITION_KEY)))
if not self._match_by_action(
condition.get(schema.CONDITION_ACTION),
condition.get(schema.CONDITION_ACTION, ""),
condition.get(schema.CONDITION_VALUE),
context_value,
):
logger.debug(
f"rule did not match action, rule_name={rule_name}, rule_default_value={rule_default_value}, feature_name={feature_name}, context_value={str(context_value)}" # noqa: E501
f"rule did not match action, rule_name={rule_name}, rule_default_value={rule_default_value}, "
f"feature_name={feature_name}, context_value={str(context_value)} "
)
# context doesn't match condition
return False
# if we got here, all conditions match
logger.debug(
f"rule matched, rule_name={rule_name}, rule_default_value={rule_default_value}, feature_name={feature_name}" # noqa: E501
f"rule matched, rule_name={rule_name}, rule_default_value={rule_default_value}, "
f"feature_name={feature_name}"
)
return True
return False

def _handle_rules(
self,
Expand All @@ -70,66 +75,77 @@ def _handle_rules(
for rule in rules:
rule_default_value = rule.get(schema.RULE_DEFAULT_VALUE)
if self._is_rule_matched(feature_name, rule, rules_context):
return rule_default_value
return bool(rule_default_value)
# no rule matched, return default value of feature
logger.debug(
f"no rule matched, returning default value of feature, feature_default_value={feature_default_value}, feature_name={feature_name}" # noqa: E501
f"no rule matched, returning default value of feature, feature_default_value={feature_default_value}, "
f"feature_name={feature_name}"
)
return feature_default_value
return False

def get_configuration(self) -> Dict[str, Any]:
"""Get configuration string from AWs AppConfig and returned the parsed JSON dictionary

Raises:
ConfigurationException: Any validation error or appconfig error that can occur
Raises
------
ConfigurationError
Any validation error or appconfig error that can occur

Returns:
Dict[str, Any]: parsed JSON dictionary
Returns
------
Dict[str, Any]
parsed JSON dictionary
"""
schema: Dict[
str, Any
] = (
self._schema_fetcher.get_json_configuration()
) # parse result conf as JSON, keep in cache for self.max_age seconds
# parse result conf as JSON, keep in cache for self.max_age seconds
config = self._schema_fetcher.get_json_configuration()
# validate schema
self._schema_validator.validate_json_schema(schema)
return schema
self._schema_validator.validate_json_schema(config)
return config

def get_feature_toggle(
self, *, feature_name: str, rules_context: Optional[Dict[str, Any]] = None, value_if_missing: bool
) -> bool:
"""get a feature toggle boolean value. Value is calculated according to a set of rules and conditions.
see below for explanation.

Args:
feature_name (str): feature name that you wish to fetch
rules_context (Optional[Dict[str, Any]]): dict of attributes that you would like to match the rules
against, can be {'tenant_id: 'X', 'username':' 'Y', 'region': 'Z'} etc.
value_if_missing (bool): this will be the returned value in case the feature toggle doesn't exist in
the schema or there has been an error while fetching the
configuration from appconfig

Returns:
bool: calculated feature toggle value. several possibilities:
1. if the feature doesn't appear in the schema or there has been an error fetching the
configuration -> error/warning log would appear and value_if_missing is returned
2. feature exists and has no rules or no rules have matched -> return feature_default_value of
the defined feature
3. feature exists and a rule matches -> rule_default_value of rule is returned
"""Get a feature toggle boolean value. Value is calculated according to a set of rules and conditions.

See below for explanation.

Parameters
----------
feature_name: str
feature name that you wish to fetch
rules_context: Optional[Dict[str, Any]]
dict of attributes that you would like to match the rules
against, can be {'tenant_id: 'X', 'username':' 'Y', 'region': 'Z'} etc.
value_if_missing: bool
this will be the returned value in case the feature toggle doesn't exist in
the schema or there has been an error while fetching the
configuration from appconfig

Returns
------
bool
calculated feature toggle value. several possibilities:
1. if the feature doesn't appear in the schema or there has been an error fetching the
configuration -> error/warning log would appear and value_if_missing is returned
2. feature exists and has no rules or no rules have matched -> return feature_default_value of
the defined feature
3. feature exists and a rule matches -> rule_default_value of rule is returned
"""
if rules_context is None:
rules_context = {}

try:
toggles_dict: Dict[str, Any] = self.get_configuration()
except ConfigurationException:
logger.error("unable to get feature toggles JSON, returning provided value_if_missing value") # noqa: E501
except ConfigurationError:
logger.error("unable to get feature toggles JSON, returning provided value_if_missing value")
return value_if_missing

feature: Dict[str, Dict] = toggles_dict.get(schema.FEATURES_KEY, {}).get(feature_name, None)
if feature is None:
logger.warning(
f"feature does not appear in configuration, using provided value_if_missing, feature_name={feature_name}, value_if_missing={value_if_missing}" # noqa: E501
f"feature does not appear in configuration, using provided value_if_missing, "
f"feature_name={feature_name}, value_if_missing={value_if_missing}"
)
return value_if_missing

Expand All @@ -138,38 +154,46 @@ def get_feature_toggle(
if not rules_list:
# not rules but has a value
logger.debug(
f"no rules found, returning feature default value, feature_name={feature_name}, default_value={feature_default_value}" # noqa: E501
f"no rules found, returning feature default value, feature_name={feature_name}, "
f"default_value={feature_default_value}"
)
return feature_default_value
return bool(feature_default_value)
# look for first rule match
logger.debug(
f"looking for rule match, feature_name={feature_name}, feature_default_value={feature_default_value}"
) # noqa: E501
)
return self._handle_rules(
feature_name=feature_name,
rules_context=rules_context,
feature_default_value=feature_default_value,
rules=rules_list,
feature_default_value=bool(feature_default_value),
rules=cast(List, rules_list),
)

def get_all_enabled_feature_toggles(self, *, rules_context: Optional[Dict[str, Any]] = None) -> List[str]:
"""Get all enabled feature toggles while also taking into account rule_context (when a feature has defined rules)

Args:
rules_context (Optional[Dict[str, Any]]): dict of attributes that you would like to match the rules
against, can be {'tenant_id: 'X', 'username':' 'Y', 'region': 'Z'} etc.

Returns:
List[str]: a list of all features name that are enabled by also taking into account
rule_context (when a feature has defined rules)
"""Get all enabled feature toggles while also taking into account rule_context
(when a feature has defined rules)

Parameters
----------
rules_context: Optional[Dict[str, Any]]
dict of attributes that you would like to match the rules
against, can be `{'tenant_id: 'X', 'username':' 'Y', 'region': 'Z'}` etc.

Returns
----------
List[str]
a list of all features name that are enabled by also taking into account
rule_context (when a feature has defined rules)
"""
if rules_context is None:
rules_context = {}

try:
toggles_dict: Dict[str, Any] = self.get_configuration()
except ConfigurationException:
logger.error("unable to get feature toggles JSON") # noqa: E501
except ConfigurationError:
logger.error("unable to get feature toggles JSON")
return []

ret_list = []
features: Dict[str, Any] = toggles_dict.get(schema.FEATURES_KEY, {})
for feature_name, feature_dict_def in features.items():
Expand All @@ -188,4 +212,5 @@ def get_all_enabled_feature_toggles(self, *, rules_context: Optional[Dict[str, A
):
self._logger.debug(f"feature's calculated value is True, feature_name={feature_name}")
ret_list.append(feature_name)

return ret_list
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
class ConfigurationException(Exception):
class ConfigurationError(Exception):
"""When a a configuration store raises an exception on config retrieval or parsing"""
13 changes: 7 additions & 6 deletions aws_lambda_powertools/utilities/feature_toggles/schema.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from enum import Enum
from logging import Logger
from typing import Any, Dict

from .exceptions import ConfigurationException
from .exceptions import ConfigurationError

FEATURES_KEY = "features"
RULES_KEY = "rules"
Expand All @@ -22,12 +23,12 @@ class ACTION(str, Enum):


class SchemaValidator:
def __init__(self, logger: object):
def __init__(self, logger: Logger):
self._logger = logger

def _raise_conf_exc(self, error_str: str) -> None:
self._logger.error(error_str)
raise ConfigurationException(error_str)
raise ConfigurationError(error_str)

def _validate_condition(self, rule_name: str, condition: Dict[str, str]) -> None:
if not condition or not isinstance(condition, dict):
Expand All @@ -47,7 +48,7 @@ def _validate_rule(self, feature_name: str, rule: Dict[str, Any]) -> None:
self._raise_conf_exc(f"feature rule is not a dictionary, feature_name={feature_name}")
rule_name = rule.get(RULE_NAME_KEY)
if not rule_name or rule_name is None or not isinstance(rule_name, str):
self._raise_conf_exc(f"invalid rule_name, feature_name={feature_name}")
return self._raise_conf_exc(f"invalid rule_name, feature_name={feature_name}")
rule_default_value = rule.get(RULE_DEFAULT_VALUE)
if rule_default_value is None or not isinstance(rule_default_value, bool):
self._raise_conf_exc(f"invalid rule_default_value, rule_name={rule_name}")
Expand Down Expand Up @@ -76,8 +77,8 @@ def _validate_feature(self, feature_name: str, feature_dict_def: Dict[str, Any])
def validate_json_schema(self, schema: Dict[str, Any]) -> None:
if not isinstance(schema, dict):
self._raise_conf_exc("invalid AWS AppConfig JSON schema detected, root schema is not a dictionary")
features_dict: Dict = schema.get(FEATURES_KEY)
features_dict = schema.get(FEATURES_KEY)
if not isinstance(features_dict, dict):
self._raise_conf_exc("invalid AWS AppConfig JSON schema detected, missing features dictionary")
return self._raise_conf_exc("invalid AWS AppConfig JSON schema detected, missing features dictionary")
for feature_name, feature_dict_def in features_dict.items():
self._validate_feature(feature_name, feature_dict_def)
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from abc import ABC, abstractclassmethod
from abc import ABC, abstractmethod
from typing import Any, Dict


Expand All @@ -7,14 +7,18 @@ def __init__(self, configuration_name: str, cache_seconds: int):
self.configuration_name = configuration_name
self._cache_seconds = cache_seconds

@abstractclassmethod
@abstractmethod
def get_json_configuration(self) -> Dict[str, Any]:
"""Get configuration string from any configuration storing service and return the parsed JSON dictionary

Raises:
ConfigurationException: Any error that can occur during schema fetch or JSON parse
Raises
------
ConfigurationError
Any error that can occur during schema fetch or JSON parse

Returns:
Dict[str, Any]: parsed JSON dictionary
Returns
-------
Dict[str, Any]
parsed JSON dictionary
"""
return None
return NotImplemented # pragma: no cover
Loading