Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/validate.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,5 @@ jobs:
- name: Test with pytest
run: |
pytest tests --doctest-modules --cov-report=xml --cov-report=html
- name: cspell-action
uses: streetsidesoftware/[email protected]
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -411,3 +411,5 @@ docs/_static
docs/_templates
docs/doctrees
docs/html
package-lock.json
package.json
20 changes: 20 additions & 0 deletions cspell.config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
---
$schema: https://raw.githubusercontent.com/streetsidesoftware/cspell/main/cspell.schema.json
version: '0.2'
dictionaryDefinitions:
- name: project-words
path: './project-words.txt'
addWords: true
dictionaries:
- project-words
ignorePaths:
- 'env'
- '.*'
- 'build'
- 'docs'
- 'dev_requirements.txt'
- '*.egg-info'
- '*.ini'
- '*.toml'
- 'SECURITY.md'
- 'SUPPORT.md'
22 changes: 13 additions & 9 deletions featuremanagement/_defaultfilters.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@
# -------------------------------------------------------------------------
import logging
import hashlib

from datetime import datetime, timezone
from email.utils import parsedate_to_datetime

from typing import cast, List, Mapping, Optional, Dict, Any
from ._featurefilters import FeatureFilter

FEATURE_FLAG_NAME_KEY = "feature_name"
Expand Down Expand Up @@ -45,7 +44,7 @@ class TimeWindowFilter(FeatureFilter):
Feature Filter that determines if the current time is within the time window.
"""

def evaluate(self, context, **kwargs):
def evaluate(self, context: Mapping, **kwargs: Dict[str, Any]) -> bool:
"""
Determine if the feature flag is enabled for the given context.

Expand Down Expand Up @@ -75,7 +74,7 @@ class TargetingFilter(FeatureFilter):
"""

@staticmethod
def _is_targeted(context_id, rollout_percentage):
def _is_targeted(context_id: str, rollout_percentage: int) -> bool:
"""Determine if the user is targeted for the given context"""
# Always return true if rollout percentage is 100
if rollout_percentage == 100:
Expand All @@ -87,24 +86,29 @@ def _is_targeted(context_id, rollout_percentage):
percentage = (context_marker / (2**32 - 1)) * 100
return percentage < rollout_percentage

def _target_group(self, target_user, target_group, group, feature_flag_name):
def _target_group(
self, target_user: Optional[str], target_group: str, group: Mapping, feature_flag_name: str
) -> bool:
group_rollout_percentage = group.get(ROLLOUT_PERCENTAGE_KEY, 0)
if not target_user:
target_user = ""
audience_context_id = target_user + "\n" + feature_flag_name + "\n" + target_group

return self._is_targeted(audience_context_id, group_rollout_percentage)

def evaluate(self, context, **kwargs):
def evaluate(self, context: Mapping, **kwargs: Dict[str, Any]) -> bool:
"""
Determine if the feature flag is enabled for the given context.

:keyword Mapping context: Context for evaluating the user/group.
:return: True if the user is targeted for the feature flag.
:rtype: bool
"""
target_user = kwargs.get(TARGETED_USER_KEY, None)
target_groups = kwargs.get(TARGETED_GROUPS_KEY, [])
target_user: Optional[str] = cast(
str,
kwargs.get(TARGETED_USER_KEY, None),
)
target_groups: List[str] = cast(List[str], kwargs.get(TARGETED_GROUPS_KEY, []))

if not target_user and not (target_groups and len(target_groups) > 0):
logging.warning("%s: Name or Groups are required parameters", TargetingFilter.__name__)
Expand Down Expand Up @@ -152,7 +156,7 @@ def evaluate(self, context, **kwargs):
return self._is_targeted(context_id, default_rollout_percentage)

@staticmethod
def _validate(groups, default_rollout_percentage):
def _validate(groups: List, default_rollout_percentage: int) -> None:
# Validate the audience settings
if default_rollout_percentage < 0 or default_rollout_percentage > 100:
raise TargetingException("DefaultRolloutPercentage must be between 0 and 100")
Expand Down
13 changes: 7 additions & 6 deletions featuremanagement/_featurefilters.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
# license information.
# -------------------------------------------------------------------------
from abc import ABC, abstractmethod
from typing import Mapping, Callable, Dict, Any


class FeatureFilter(ABC):
Expand All @@ -12,36 +13,36 @@ class FeatureFilter(ABC):
"""

@abstractmethod
def evaluate(self, context, **kwargs):
def evaluate(self, context: Mapping, **kwargs: Dict[str, Any]) -> bool:
"""
Determine if the feature flag is enabled for the given context.

:param Mapping context: Context for the feature flag.
"""

@property
def name(self):
def name(self) -> str:
"""
Get the name of the filter.

:return: Name of the filter, or alias if it exists.
:rtype: str
"""
if hasattr(self, "_alias"):
return self._alias
return self._alias # type: ignore
return self.__class__.__name__

@staticmethod
def alias(alias):
def alias(alias: str) -> Callable:
"""
Decorator to set the alias for the filter.

:param str alias: Alias for the filter.
:return: Decorator.
:rtype: callable
:rtype: Callable
"""

def wrapper(cls):
def wrapper(cls) -> Any: # type: ignore
cls._alias = alias # pylint: disable=protected-access
return cls

Expand Down
50 changes: 34 additions & 16 deletions featuremanagement/_featuremanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
# license information.
# -------------------------------------------------------------------------
import logging
from typing import overload
from typing import cast, overload, Any, Optional, Dict, Mapping, List
from ._defaultfilters import TimeWindowFilter, TargetingFilter
from ._featurefilters import FeatureFilter
from ._models import EvaluationEvent
from ._models import EvaluationEvent, Variant, TargetingContext
from ._featuremanagerbase import (
_get_feature_flag,
FeatureManagerBase,
Expand All @@ -28,17 +28,17 @@ class FeatureManager(FeatureManagerBase):
evaluated.
"""

def __init__(self, configuration, **kwargs):
def __init__(self, configuration: Mapping, **kwargs: Dict[str, Any]):
super().__init__(configuration, **kwargs)
filters = [TimeWindowFilter(), TargetingFilter()] + kwargs.pop(PROVIDED_FEATURE_FILTERS, [])
filters = [TimeWindowFilter(), TargetingFilter()] + cast(List, kwargs.pop(PROVIDED_FEATURE_FILTERS, []))

for feature_filter in filters:
if not isinstance(feature_filter, FeatureFilter):
raise ValueError("Custom filter must be a subclass of FeatureFilter")
self._filters[feature_filter.name] = feature_filter

@overload
def is_enabled(self, feature_flag_id, user_id, **kwargs):
@overload # type: ignore
def is_enabled(self, feature_flag_id: str, user_id: str, **kwargs: Dict[str, Any]) -> bool:
"""
Determine if the feature flag is enabled for the given context.

Expand All @@ -48,7 +48,7 @@ def is_enabled(self, feature_flag_id, user_id, **kwargs):
:rtype: bool
"""

def is_enabled(self, feature_flag_id, *args, **kwargs):
def is_enabled(self, feature_flag_id: str, *args: Any, **kwargs: Dict[str, Any]) -> bool: # type: ignore
"""
Determine if the feature flag is enabled for the given context.

Expand All @@ -59,13 +59,18 @@ def is_enabled(self, feature_flag_id, *args, **kwargs):
targeting_context = self._build_targeting_context(args)

result = self._check_feature(feature_flag_id, targeting_context, **kwargs)
if self._on_feature_evaluated and result.feature.telemetry.enabled:
if (
self._on_feature_evaluated
and result.feature
and result.feature.telemetry.enabled
and callable(self._on_feature_evaluated)
):
result.user = targeting_context.user_id
self._on_feature_evaluated(result)
return result.enabled

@overload
def get_variant(self, feature_flag_id, user_id, **kwargs):
@overload # type: ignore
def get_variant(self, feature_flag_id: str, user_id: str, **kwargs: Dict[str, Any]) -> Optional[Variant]:
"""
Determine the variant for the given context.

Expand All @@ -75,7 +80,9 @@ def get_variant(self, feature_flag_id, user_id, **kwargs):
:rtype: Variant
"""

def get_variant(self, feature_flag_id, *args, **kwargs):
def get_variant( # type: ignore
self, feature_flag_id: str, *args: Any, **kwargs: Dict[str, Any]
) -> Optional[Variant]:
"""
Determine the variant for the given context.

Expand All @@ -87,13 +94,22 @@ def get_variant(self, feature_flag_id, *args, **kwargs):
targeting_context = self._build_targeting_context(args)

result = self._check_feature(feature_flag_id, targeting_context, **kwargs)
if self._on_feature_evaluated and result.feature.telemetry.enabled:
if (
self._on_feature_evaluated
and result.feature
and result.feature.telemetry.enabled
and callable(self._on_feature_evaluated)
):
result.user = targeting_context.user_id
self._on_feature_evaluated(result)
return result.variant

def _check_feature_filters(self, evaluation_event, targeting_context, **kwargs):
def _check_feature_filters(
self, evaluation_event: EvaluationEvent, targeting_context: TargetingContext, **kwargs: Dict
) -> None:
feature_flag = evaluation_event.feature
if not feature_flag:
return
feature_conditions = feature_flag.conditions
feature_filters = feature_conditions.client_filters

Expand All @@ -107,8 +123,8 @@ def _check_feature_filters(self, evaluation_event, targeting_context, **kwargs):

for feature_filter in feature_filters:
filter_name = feature_filter[FEATURE_FILTER_NAME]
kwargs["user"] = targeting_context.user_id
kwargs["groups"] = targeting_context.groups
kwargs["user"] = targeting_context.user_id # type: ignore
kwargs["groups"] = targeting_context.groups # type: ignore
if filter_name not in self._filters:
raise ValueError(f"Feature flag {feature_flag.name} has unknown filter {filter_name}")
if feature_conditions.requirement_type == REQUIREMENT_TYPE_ALL:
Expand All @@ -119,7 +135,9 @@ def _check_feature_filters(self, evaluation_event, targeting_context, **kwargs):
evaluation_event.enabled = True
break

def _check_feature(self, feature_flag_id, targeting_context, **kwargs):
def _check_feature(
self, feature_flag_id: str, targeting_context: TargetingContext, **kwargs: Dict[str, Any]
) -> EvaluationEvent:
"""
Determine if the feature flag is enabled for the given context.

Expand Down
Loading