Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion featuremanagement/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@
from ._featuremanager import FeatureManager
from ._featurefilters import FeatureFilter
from ._defaultfilters import TimeWindowFilter, TargetingFilter
from ._models._variant import Variant

from ._version import VERSION

__version__ = VERSION
__all__ = ["FeatureManager", "TimeWindowFilter", "TargetingFilter", "FeatureFilter"]
__all__ = ["FeatureManager", "TimeWindowFilter", "TargetingFilter", "FeatureFilter", "Variant"]
153 changes: 140 additions & 13 deletions featuremanagement/aio/_featuremanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
# -------------------------------------------------------------------------
from collections.abc import Mapping
import logging
import hashlib
from ._defaultfilters import TimeWindowFilter, TargetingFilter
from ._featurefilters import FeatureFilter
from .._featuremanager import (
Expand All @@ -15,6 +16,7 @@
_get_feature_flag,
_list_feature_flag_names,
)
from .._models._variant import Variant


class FeatureManager:
Expand Down Expand Up @@ -42,6 +44,80 @@ def __init__(self, configuration, **kwargs):
raise ValueError("Custom filter must be a subclass of FeatureFilter")
self._filters[feature_filter.name] = feature_filter

@staticmethod
def _check_default_disabled_variant(feature_flag):
if not feature_flag.allocation:
return False
return FeatureManager._check_variant_override(
feature_flag.variants, feature_flag.allocation.default_when_disabled, False
)

@staticmethod
def _check_default_enabled_variant(feature_flag):
if not feature_flag.allocation:
return True
return FeatureManager._check_variant_override(
feature_flag.variants, feature_flag.allocation.default_when_enabled, True
)

@staticmethod
def _check_variant_override(variants, default_variant_name, status):
if not variants or not default_variant_name:
return status
for variant in variants:
if variant.name == default_variant_name:
if variant.status_override == "Enabled":
return True
if variant.status_override == "Disabled":
return False
return status

@staticmethod
def _is_targeted(context_id):
"""Determine if the user is targeted for the given context"""
hashed_context_id = hashlib.sha256(context_id.encode()).digest()
context_marker = int.from_bytes(hashed_context_id[:4], byteorder="little", signed=False)

return (context_marker / (2**32 - 1)) * 100

def _assign_variant(self, feature_flag, **kwargs):
if not feature_flag.variants or not feature_flag.allocation:
return None
if feature_flag.allocation.user:
user = kwargs.get("user")
if user:
for user_allocation in feature_flag.allocation.user:
if user in user_allocation.users:
return user_allocation.variant
if feature_flag.allocation.group:
groups = kwargs.get("groups")
if groups:
for group_allocation in feature_flag.allocation.group:
for group in groups:
if group in group_allocation.groups:
return group_allocation.variant
if feature_flag.allocation.percentile:
user = kwargs.get("user", "")
context_id = user + "\n" + feature_flag.allocation.seed
box = self._is_targeted(context_id)
for percentile_allocation in feature_flag.allocation.percentile:
if box == 100 and percentile_allocation.percentile_to == 100:
return percentile_allocation.variant
if percentile_allocation.percentile_from <= box < percentile_allocation.percentile_to:
return percentile_allocation.variant
return None

def _variant_name_to_variant(self, feature_flag, variant_name):
if not feature_flag.variants:
return None
for variant_reference in feature_flag.variants:
if variant_reference.name == variant_name:
configuration = variant_reference.configuration_value
if not configuration:
configuration = self._configuration.get(variant_reference.configuration_reference)
return Variant(variant_reference.name, configuration)
return None

async def is_enabled(self, feature_flag_id, **kwargs):
"""
Determine if the feature flag is enabled for the given context
Expand All @@ -51,6 +127,29 @@ async def is_enabled(self, feature_flag_id, **kwargs):
:return: True if the feature flag is enabled for the given context
:rtype: bool
"""
return (await self._check_feature(feature_flag_id, **kwargs))["enabled"]

async def get_variant(self, feature_flag_id, **kwargs):
"""
Determine the variant for the given context

:param str feature_flag_id: Name of the feature flag
:paramtype feature_flag_id: str
:return: Name of the variant
:rtype: str
"""
return (await self._check_feature(feature_flag_id, **kwargs))["variant"]

async def _check_feature(self, feature_flag_id, **kwargs):
"""
Determine if the feature flag is enabled for the given context

:param str feature_flag_id: Name of the feature flag
:paramtype feature_flag_id: str
:return: True if the feature flag is enabled for the given context
:rtype: bool
"""
result = {"enabled": None, "variant": None}
if self._copy is not self._configuration.get(FEATURE_MANAGEMENT_KEY):
self._cache = {}
self._copy = self._configuration.get(FEATURE_MANAGEMENT_KEY)
Expand All @@ -64,32 +163,60 @@ async def is_enabled(self, feature_flag_id, **kwargs):
if not feature_flag:
logging.warning("Feature flag %s not found", feature_flag_id)
# Unknown feature flags are disabled by default
return False
return result

if not feature_flag.enabled:
# Feature flags that are disabled are always disabled
return False
result["enabled"] = FeatureManager._check_default_disabled_variant(feature_flag)
if feature_flag.allocation:
variant_name = feature_flag.allocation.default_when_disabled
result["variant"] = self._variant_name_to_variant(feature_flag, variant_name)
return result

feature_conditions = feature_flag.conditions
feature_filters = feature_conditions.client_filters

if len(feature_filters) == 0:
# Feature flags without any filters return evaluate
return True
result["enabled"] = True
else:
# The assumed value is no filters is based on the requirement type.
# Requirement type Any assumes false until proven true, All assumes true until proven false
result["enabled"] = feature_conditions.requirement_type == REQUIREMENT_TYPE_ALL

for feature_filter in feature_filters:
filter_name = feature_filter[FEATURE_FILTER_NAME]
if filter_name in self._filters:
if feature_conditions.requirement_type == REQUIREMENT_TYPE_ALL:
if not await self._filters[filter_name].evaluate(feature_filter, **kwargs):
return False
else:
if await self._filters[filter_name].evaluate(feature_filter, **kwargs):
return True
else:
if filter_name not in self._filters:
raise ValueError(f"Feature flag {feature_flag_id} has unknown filter {filter_name}")
# If this is reached, and true, default return value is true, else false
return feature_conditions.requirement_type == REQUIREMENT_TYPE_ALL
if feature_conditions.requirement_type == REQUIREMENT_TYPE_ALL:
if not await self._filters[filter_name].evaluate(feature_filter, **kwargs):
result["enabled"] = False
break
else:
if await self._filters[filter_name].evaluate(feature_filter, **kwargs):
result["enabled"] = True
break

if feature_flag.allocation and feature_flag.variants:
variant_name = self._assign_variant(feature_flag, **kwargs)
if variant_name:
result["enabled"] = FeatureManager._check_variant_override(
feature_flag.variants, variant_name, result["enabled"]
)
result["variant"] = self._variant_name_to_variant(feature_flag, variant_name)
return result

variant_name = None
if result["enabled"]:
result["enabled"] = FeatureManager._check_default_enabled_variant(feature_flag)
if feature_flag.allocation:
variant_name = feature_flag.allocation.default_when_enabled
else:
result["enabled"] = FeatureManager._check_default_disabled_variant(feature_flag)
if feature_flag.allocation:
variant_name = feature_flag.allocation.default_when_disabled
result["variant"] = self._variant_name_to_variant(feature_flag, variant_name)
return result

def list_feature_flag_names(self):
"""
Expand Down
Loading