Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 59 additions & 39 deletions splitio/client/client.py

Large diffs are not rendered by default.

35 changes: 23 additions & 12 deletions splitio/client/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,8 @@ def __init__( # pylint: disable=too-many-arguments
telemetry_producer=None,
telemetry_init_producer=None,
telemetry_submitter=None,
preforked_initialization=False
preforked_initialization=False,
fallback_treatments_configuration=None
):
"""
Class constructor.
Expand Down Expand Up @@ -201,6 +202,7 @@ def __init__( # pylint: disable=too-many-arguments
self._ready_time = get_current_epoch_time_ms()
_LOGGER.debug("Running in threading mode")
self._sdk_internal_ready_flag = sdk_ready_flag
self._fallback_treatments_configuration = fallback_treatments_configuration
self._start_status_updater()

def _start_status_updater(self):
Expand Down Expand Up @@ -242,7 +244,7 @@ def client(self):
This client is only a set of references to structures hold by the factory.
Creating one a fast operation and safe to be used anywhere.
"""
return Client(self, self._recorder, self._labels_enabled)
return Client(self, self._recorder, self._labels_enabled, self._fallback_treatments_configuration)

def manager(self):
"""
Expand Down Expand Up @@ -338,7 +340,8 @@ def __init__( # pylint: disable=too-many-arguments
telemetry_init_producer=None,
telemetry_submitter=None,
manager_start_task=None,
api_client=None
api_client=None,
fallback_treatments_configuration=None
):
"""
Class constructor.
Expand Down Expand Up @@ -372,6 +375,7 @@ def __init__( # pylint: disable=too-many-arguments
self._sdk_ready_flag = asyncio.Event()
self._ready_task = asyncio.get_running_loop().create_task(self._update_status_when_ready_async())
self._api_client = api_client
self._fallback_treatments_configuration = fallback_treatments_configuration

async def _update_status_when_ready_async(self):
"""Wait until the sdk is ready and update the status for async mode."""
Expand Down Expand Up @@ -460,7 +464,7 @@ def client(self):
This client is only a set of references to structures hold by the factory.
Creating one a fast operation and safe to be used anywhere.
"""
return ClientAsync(self, self._recorder, self._labels_enabled)
return ClientAsync(self, self._recorder, self._labels_enabled, self._fallback_treatments_configuration)

def _wrap_impression_listener(listener, metadata):
"""
Expand Down Expand Up @@ -623,15 +627,16 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
synchronizer._split_synchronizers._segment_sync.shutdown()

return SplitFactory(api_key, storages, cfg['labelsEnabled'],
recorder, manager, None, telemetry_producer, telemetry_init_producer, telemetry_submitter, preforked_initialization=preforked_initialization)
recorder, manager, None, telemetry_producer, telemetry_init_producer, telemetry_submitter, preforked_initialization=preforked_initialization,
fallback_treatments_configuration=cfg['fallbackTreatments'])

initialization_thread = threading.Thread(target=manager.start, name="SDKInitializer", daemon=True)
initialization_thread.start()

return SplitFactory(api_key, storages, cfg['labelsEnabled'],
recorder, manager, sdk_ready_flag,
telemetry_producer, telemetry_init_producer,
telemetry_submitter)
telemetry_submitter, fallback_treatments_configuration=cfg['fallbackTreatments'])

async def _build_in_memory_factory_async(api_key, cfg, sdk_url=None, events_url=None, # pylint:disable=too-many-arguments,too-many-localsa
auth_api_base_url=None, streaming_api_base_url=None, telemetry_api_base_url=None,
Expand Down Expand Up @@ -750,7 +755,7 @@ async def _build_in_memory_factory_async(api_key, cfg, sdk_url=None, events_url=
recorder, manager,
telemetry_producer, telemetry_init_producer,
telemetry_submitter, manager_start_task=manager_start_task,
api_client=http_client)
api_client=http_client, fallback_treatments_configuration=cfg['fallbackTreatments'])

def _build_redis_factory(api_key, cfg):
"""Build and return a split factory with redis-based storage."""
Expand Down Expand Up @@ -828,7 +833,8 @@ def _build_redis_factory(api_key, cfg):
manager,
sdk_ready_flag=None,
telemetry_producer=telemetry_producer,
telemetry_init_producer=telemetry_init_producer
telemetry_init_producer=telemetry_init_producer,
fallback_treatments_configuration=cfg['fallbackTreatments']
)
redundant_factory_count, active_factory_count = _get_active_and_redundant_count()
storages['telemetry'].record_active_and_redundant_factories(active_factory_count, redundant_factory_count)
Expand Down Expand Up @@ -910,7 +916,8 @@ async def _build_redis_factory_async(api_key, cfg):
manager,
telemetry_producer=telemetry_producer,
telemetry_init_producer=telemetry_init_producer,
telemetry_submitter=telemetry_submitter
telemetry_submitter=telemetry_submitter,
fallback_treatments_configuration=cfg['fallbackTreatments']
)
redundant_factory_count, active_factory_count = _get_active_and_redundant_count()
await storages['telemetry'].record_active_and_redundant_factories(active_factory_count, redundant_factory_count)
Expand Down Expand Up @@ -992,7 +999,8 @@ def _build_pluggable_factory(api_key, cfg):
manager,
sdk_ready_flag=None,
telemetry_producer=telemetry_producer,
telemetry_init_producer=telemetry_init_producer
telemetry_init_producer=telemetry_init_producer,
fallback_treatments_configuration=cfg['fallbackTreatments']
)
redundant_factory_count, active_factory_count = _get_active_and_redundant_count()
storages['telemetry'].record_active_and_redundant_factories(active_factory_count, redundant_factory_count)
Expand Down Expand Up @@ -1072,7 +1080,8 @@ async def _build_pluggable_factory_async(api_key, cfg):
manager,
telemetry_producer=telemetry_producer,
telemetry_init_producer=telemetry_init_producer,
telemetry_submitter=telemetry_submitter
telemetry_submitter=telemetry_submitter,
fallback_treatments_configuration=cfg['fallbackTreatments']
)
redundant_factory_count, active_factory_count = _get_active_and_redundant_count()
await storages['telemetry'].record_active_and_redundant_factories(active_factory_count, redundant_factory_count)
Expand Down Expand Up @@ -1150,6 +1159,7 @@ def _build_localhost_factory(cfg):
telemetry_producer=telemetry_producer,
telemetry_init_producer=telemetry_producer.get_telemetry_init_producer(),
telemetry_submitter=LocalhostTelemetrySubmitter(),
fallback_treatments_configuration=cfg['fallbackTreatments']
)

async def _build_localhost_factory_async(cfg):
Expand Down Expand Up @@ -1220,7 +1230,8 @@ async def _build_localhost_factory_async(cfg):
telemetry_producer=telemetry_producer,
telemetry_init_producer=telemetry_producer.get_telemetry_init_producer(),
telemetry_submitter=LocalhostTelemetrySubmitterAsync(),
manager_start_task=manager_start_task
manager_start_task=manager_start_task,
fallback_treatments_configuration=cfg['fallbackTreatments']
)

def get_factory(api_key, **kwargs):
Expand Down
17 changes: 14 additions & 3 deletions splitio/client/input_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from splitio.client.key import Key
from splitio.client import client
from splitio.client.util import get_fallback_treatment_and_label
from splitio.engine.evaluator import CONTROL
from splitio.models.fallback_treatment import FallbackTreatment

Expand All @@ -16,7 +17,7 @@
EVENT_TYPE_PATTERN = r'^[a-zA-Z0-9][-_.:a-zA-Z0-9]{0,79}$'
MAX_PROPERTIES_LENGTH_BYTES = 32768
_FLAG_SETS_REGEX = '^[a-z0-9][_a-z0-9]{0,49}$'
_FALLBACK_TREATMENT_REGEX = '^[a-zA-Z][a-zA-Z0-9-_;]+$'
_FALLBACK_TREATMENT_REGEX = '^[0-9]+[.a-zA-Z0-9_-]*$|^[a-zA-Z]+[a-zA-Z0-9_-]*$'
_FALLBACK_TREATMENT_SIZE = 100

def _check_not_null(value, name, operation):
Expand Down Expand Up @@ -502,7 +503,7 @@ def validate_feature_flags_get_treatments( # pylint: disable=invalid-name
valid_feature_flags.append(ff)
return valid_feature_flags

def generate_control_treatments(feature_flags):
def generate_control_treatments(feature_flags, fallback_treatments_configuration):
"""
Generate valid feature flags to control.

Expand All @@ -517,7 +518,13 @@ def generate_control_treatments(feature_flags):
to_return = {}
for feature_flag in feature_flags:
if isinstance(feature_flag, str) and len(feature_flag.strip())> 0:
to_return[feature_flag] = (CONTROL, None)
treatment = CONTROL
config = None
label = ""
label, treatment, config = get_fallback_treatment_and_label(fallback_treatments_configuration,
feature_flag, treatment, label, _LOGGER)

to_return[feature_flag] = (treatment, config)
return to_return


Expand Down Expand Up @@ -719,6 +726,10 @@ def validate_fallback_treatment(fallback_treatment):
if not isinstance(fallback_treatment, FallbackTreatment):
_LOGGER.warning("Config: Fallback treatment instance should be FallbackTreatment, input is discarded")
return False

if not isinstance(fallback_treatment.treatment, str):
_LOGGER.warning("Config: Fallback treatment value should be str type, input is discarded")
return False

if not validate_regex_name(fallback_treatment.treatment):
_LOGGER.warning("Config: Fallback treatment should match regex %s", _FALLBACK_TREATMENT_REGEX)
Expand Down
19 changes: 19 additions & 0 deletions splitio/client/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,22 @@ def get_metadata(config):
version = 'python-%s' % __version__
ip_address, hostname = _get_hostname_and_ip(config)
return SdkMetadata(version, hostname, ip_address)

def get_fallback_treatment_and_label(fallback_treatments_configuration, feature_name, treatment, label, _logger):
if fallback_treatments_configuration == None:
return label, treatment, None

if fallback_treatments_configuration.by_flag_fallback_treatment != None and \
fallback_treatments_configuration.by_flag_fallback_treatment.get(feature_name) != None:
_logger.debug('Using Fallback Treatment for feature: %s', feature_name)
return fallback_treatments_configuration.by_flag_fallback_treatment.get(feature_name).label_prefix + label, \
fallback_treatments_configuration.by_flag_fallback_treatment.get(feature_name).treatment, \
fallback_treatments_configuration.by_flag_fallback_treatment.get(feature_name).config

if fallback_treatments_configuration.global_fallback_treatment != None:
_logger.debug('Using Global Fallback Treatment.')
return fallback_treatments_configuration.global_fallback_treatment.label_prefix + label, \
fallback_treatments_configuration.global_fallback_treatment.treatment, \
fallback_treatments_configuration.global_fallback_treatment.config

return label, treatment, None
23 changes: 3 additions & 20 deletions splitio/engine/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging
from collections import namedtuple

from splitio.client.util import get_fallback_treatment_and_label
from splitio.models.impressions import Label
from splitio.models.grammar.condition import ConditionType
from splitio.models.grammar.matchers.misc import DependencyMatcher
Expand Down Expand Up @@ -52,7 +53,8 @@ def eval_with_context(self, key, bucketing, feature_name, attrs, ctx):
if not feature:
_LOGGER.warning('Unknown or invalid feature: %s', feature)
label = Label.SPLIT_NOT_FOUND
label, _treatment, config = self._get_fallback_treatment_and_label(feature_name, _treatment, label)
label, _treatment, config = get_fallback_treatment_and_label(self._fallback_treatments_configuration,
feature_name, _treatment, label, _LOGGER)
else:
_change_number = feature.change_number
if feature.killed:
Expand All @@ -72,25 +74,6 @@ def eval_with_context(self, key, bucketing, feature_name, attrs, ctx):
},
'impressions_disabled': feature.impressions_disabled if feature else None
}

def _get_fallback_treatment_and_label(self, feature_name, treatment, label):
if self._fallback_treatments_configuration == None:
return label, treatment, None

if self._fallback_treatments_configuration.by_flag_fallback_treatment != None and \
self._fallback_treatments_configuration.by_flag_fallback_treatment.get(feature_name) != None:
_LOGGER.debug('Using Fallback Treatment for feature: %s', feature_name)
return self._fallback_treatments_configuration.by_flag_fallback_treatment.get(feature_name).label_prefix + label, \
self._fallback_treatments_configuration.by_flag_fallback_treatment.get(feature_name).treatment, \
self._fallback_treatments_configuration.by_flag_fallback_treatment.get(feature_name).config

if self._fallback_treatments_configuration.global_fallback_treatment != None:
_LOGGER.debug('Using Global Fallback Treatment.')
return self._fallback_treatments_configuration.global_fallback_treatment.label_prefix + label, \
self._fallback_treatments_configuration.global_fallback_treatment.treatment, \
self._fallback_treatments_configuration.global_fallback_treatment.config

return label, treatment, None

def _get_treatment(self, feature, bucketing, key, attrs, ctx, label, _treatment):
if _treatment == CONTROL:
Expand Down
4 changes: 2 additions & 2 deletions splitio/storage/adapters/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -715,7 +715,7 @@ def _build_default_client(config): # pylint: disable=too-many-locals
unix_socket_path = config.get('redisUnixSocketPath', None)
encoding = config.get('redisEncoding', 'utf-8')
encoding_errors = config.get('redisEncodingErrors', 'strict')
errors = config.get('redisErrors', None)
# errors = config.get('redisErrors', None)
decode_responses = config.get('redisDecodeResponses', True)
retry_on_timeout = config.get('redisRetryOnTimeout', False)
ssl = config.get('redisSsl', False)
Expand All @@ -740,7 +740,7 @@ def _build_default_client(config): # pylint: disable=too-many-locals
unix_socket_path=unix_socket_path,
encoding=encoding,
encoding_errors=encoding_errors,
errors=errors,
# errors=errors, Starting from redis 6.0.0 errors argument is removed
decode_responses=decode_responses,
retry_on_timeout=retry_on_timeout,
ssl=ssl,
Expand Down
Loading