Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
89597f8
Reduce log level for asyncio.CancelledError exceptions
chillaq Aug 20, 2025
cc56af8
Merge pull request #593 from splitio/FME-8450-lower-log-sse-renew
chillaq Aug 20, 2025
caacb33
Updated Config and input validator
chillaq Aug 25, 2025
d9bbce4
Added FallbackTreatmentsConfiguration class
chillaq Aug 26, 2025
5da06df
added label prefix
chillaq Aug 26, 2025
5811d9c
Updated evaluator
chillaq Aug 26, 2025
34f66ae
clean up client
chillaq Aug 26, 2025
50f5b76
Update client class
chillaq Aug 28, 2025
b7391b4
Update factory and tests
chillaq Aug 29, 2025
0cfb0ef
updated regex
chillaq Sep 2, 2025
52a2967
Removed FallbackConfig object and updated config parameter name
chillaq Sep 2, 2025
ba443e5
Merge pull request #594 from splitio/FME-9614-fallback-config-validator
chillaq Sep 2, 2025
714b4ba
updated evaluator
chillaq Sep 2, 2025
65b97e8
Merge pull request #595 from splitio/FME-9615-fallback-evaluator
chillaq Sep 2, 2025
44110b0
polishing
chillaq Sep 2, 2025
c1fdc5f
Merge branch 'feature/fallback-treatment' into FME-9616-fallback-client
chillaq Sep 2, 2025
48e2aa9
Merge pull request #596 from splitio/FME-9616-fallback-client
chillaq Sep 2, 2025
48c3084
Added fallback calculator
chillaq Sep 10, 2025
02484d9
Merge pull request #597 from splitio/fallback-calculator
chillaq Sep 10, 2025
f3d1065
deprecate redis errors param
chillaq Sep 10, 2025
5e17009
polish
chillaq Sep 10, 2025
6b02164
polish
chillaq Sep 10, 2025
4fe9854
Restrict redis lib to below 7.0.0
chillaq Sep 10, 2025
210405b
Merge pull request #598 from splitio/deprecate-redis-errors
chillaq Sep 11, 2025
9685012
Updated version and changes
chillaq Sep 12, 2025
7c577f5
Merge pull request #599 from splitio/FME-9762-changes
chillaq Sep 12, 2025
392fa24
Merge branch 'development' into feature/fallback-treatment
chillaq Sep 12, 2025
0451cb1
polish tests
chillaq Sep 12, 2025
bfe6ee7
update tests
chillaq Sep 12, 2025
13edab5
update test
chillaq Sep 12, 2025
bcd67de
fix tests
chillaq Sep 15, 2025
6f2d224
update test
chillaq Sep 15, 2025
c838083
update test
chillaq Sep 15, 2025
0e2f13c
updated changes
chillaq Sep 15, 2025
1226d2b
polishing
chillaq Sep 15, 2025
3c99066
Merge pull request #600 from splitio/feature/fallback-treatment
chillaq Sep 15, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
10.5.0 (Sep 15, 2025)
- Changed the log level from error to debug when renewing the token for Streaming service in asyncio mode.
- Added new configuration for Fallback Treatments, which allows setting a treatment value and optional config to be returned in place of "control", either globally or by flag. Read more in our docs.
- Deprecated config parameter `redisErrors` as it is removed in redis lib since 6.0.0 version (https://github.com/redis/redis-py/releases/tag/v6.0.0).

10.4.0 (Aug 4, 2025)
- Added a new optional argument to the client `getTreatment` methods to allow passing additional evaluation options, such as a map of properties to append to the generated impressions sent to Split backend. Read more in our docs.

Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
tests_require=TESTS_REQUIRES,
extras_require={
'test': TESTS_REQUIRES,
'redis': ['redis>=2.10.5'],
'redis': ['redis>=2.10.5,<7.0.0'],
'uwsgi': ['uwsgi>=2.0.0'],
'cpphash': ['mmh3cffi==0.2.1'],
'asyncio': ['aiohttp>=3.8.4', 'aiofiles>=23.1.0'],
Expand Down
95 changes: 57 additions & 38 deletions splitio/client/client.py

Large diffs are not rendered by default.

40 changes: 38 additions & 2 deletions splitio/client/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
from enum import Enum

from splitio.engine.impressions import ImpressionsMode
from splitio.client.input_validator import validate_flag_sets
from splitio.client.input_validator import validate_flag_sets, validate_fallback_treatment, validate_regex_name
from splitio.models.fallback_config import FallbackTreatmentsConfiguration

_LOGGER = logging.getLogger(__name__)
DEFAULT_DATA_SAMPLING = 1
Expand Down Expand Up @@ -69,7 +70,8 @@ class AuthenticateScheme(Enum):
'flagSetsFilter': None,
'httpAuthenticateScheme': AuthenticateScheme.NONE,
'kerberosPrincipalUser': None,
'kerberosPrincipalPassword': None
'kerberosPrincipalPassword': None,
'fallbackTreatments': FallbackTreatmentsConfiguration(None)
}

def _parse_operation_mode(sdk_key, config):
Expand Down Expand Up @@ -168,4 +170,38 @@ def sanitize(sdk_key, config):
' Defaulting to `none` mode.')
processed["httpAuthenticateScheme"] = authenticate_scheme

processed = _sanitize_fallback_config(config, processed)

if config.get("redisErrors") is not None:
_LOGGER.warning('Parameter `redisErrors` is deprecated as it is no longer supported in redis lib.' \
' Will ignore this value.')

processed["redisErrors"] = None
return processed

def _sanitize_fallback_config(config, processed):
if config.get('fallbackTreatments') is None:
return processed

if not isinstance(config['fallbackTreatments'], FallbackTreatmentsConfiguration):
_LOGGER.warning('Config: fallbackTreatments parameter should be of `FallbackTreatmentsConfiguration` class.')
processed['fallbackTreatments'] = None
return processed

sanitized_global_fallback_treatment = config['fallbackTreatments'].global_fallback_treatment
if config['fallbackTreatments'].global_fallback_treatment is not None and not validate_fallback_treatment(config['fallbackTreatments'].global_fallback_treatment):
_LOGGER.warning('Config: global fallbacktreatment parameter is discarded.')
sanitized_global_fallback_treatment = None

sanitized_flag_fallback_treatments = {}
if config['fallbackTreatments'].by_flag_fallback_treatment is not None:
for feature_name in config['fallbackTreatments'].by_flag_fallback_treatment.keys():
if not validate_regex_name(feature_name) or not validate_fallback_treatment(config['fallbackTreatments'].by_flag_fallback_treatment[feature_name]):
_LOGGER.warning('Config: fallback treatment parameter for feature flag %s is discarded.', feature_name)
continue

sanitized_flag_fallback_treatments[feature_name] = config['fallbackTreatments'].by_flag_fallback_treatment[feature_name]

processed['fallbackTreatments'] = FallbackTreatmentsConfiguration(sanitized_global_fallback_treatment, sanitized_flag_fallback_treatments)

return processed
37 changes: 24 additions & 13 deletions splitio/client/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
TelemetryStorageProducerAsync, TelemetryStorageConsumerAsync
from splitio.engine.impressions.manager import Counter as ImpressionsCounter
from splitio.engine.impressions.unique_keys_tracker import UniqueKeysTracker, UniqueKeysTrackerAsync

from splitio.models.fallback_config import FallbackTreatmentCalculator
# Storage
from splitio.storage.inmemmory import InMemorySplitStorage, InMemorySegmentStorage, \
InMemoryImpressionStorage, InMemoryEventStorage, InMemoryTelemetryStorage, LocalhostTelemetryStorage, \
Expand Down Expand Up @@ -170,7 +170,8 @@ def __init__( # pylint: disable=too-many-arguments
telemetry_producer=None,
telemetry_init_producer=None,
telemetry_submitter=None,
preforked_initialization=False
preforked_initialization=False,
fallback_treatment_calculator=None
):
"""
Class constructor.
Expand Down Expand Up @@ -201,6 +202,7 @@ def __init__( # pylint: disable=too-many-arguments
self._ready_time = get_current_epoch_time_ms()
_LOGGER.debug("Running in threading mode")
self._sdk_internal_ready_flag = sdk_ready_flag
self._fallback_treatment_calculator = fallback_treatment_calculator
self._start_status_updater()

def _start_status_updater(self):
Expand Down Expand Up @@ -242,7 +244,7 @@ def client(self):
This client is only a set of references to structures hold by the factory.
Creating one a fast operation and safe to be used anywhere.
"""
return Client(self, self._recorder, self._labels_enabled)
return Client(self, self._recorder, self._labels_enabled, self._fallback_treatment_calculator)

def manager(self):
"""
Expand Down Expand Up @@ -338,7 +340,8 @@ def __init__( # pylint: disable=too-many-arguments
telemetry_init_producer=None,
telemetry_submitter=None,
manager_start_task=None,
api_client=None
api_client=None,
fallback_treatment_calculator=None
):
"""
Class constructor.
Expand Down Expand Up @@ -372,6 +375,7 @@ def __init__( # pylint: disable=too-many-arguments
self._sdk_ready_flag = asyncio.Event()
self._ready_task = asyncio.get_running_loop().create_task(self._update_status_when_ready_async())
self._api_client = api_client
self._fallback_treatment_calculator = fallback_treatment_calculator

async def _update_status_when_ready_async(self):
"""Wait until the sdk is ready and update the status for async mode."""
Expand Down Expand Up @@ -460,7 +464,7 @@ def client(self):
This client is only a set of references to structures hold by the factory.
Creating one a fast operation and safe to be used anywhere.
"""
return ClientAsync(self, self._recorder, self._labels_enabled)
return ClientAsync(self, self._recorder, self._labels_enabled, self._fallback_treatment_calculator)

def _wrap_impression_listener(listener, metadata):
"""
Expand Down Expand Up @@ -623,15 +627,16 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
synchronizer._split_synchronizers._segment_sync.shutdown()

return SplitFactory(api_key, storages, cfg['labelsEnabled'],
recorder, manager, None, telemetry_producer, telemetry_init_producer, telemetry_submitter, preforked_initialization=preforked_initialization)
recorder, manager, None, telemetry_producer, telemetry_init_producer, telemetry_submitter, preforked_initialization=preforked_initialization,
fallback_treatment_calculator=FallbackTreatmentCalculator(cfg['fallbackTreatments']))

initialization_thread = threading.Thread(target=manager.start, name="SDKInitializer", daemon=True)
initialization_thread.start()

return SplitFactory(api_key, storages, cfg['labelsEnabled'],
recorder, manager, sdk_ready_flag,
telemetry_producer, telemetry_init_producer,
telemetry_submitter)
telemetry_submitter, fallback_treatment_calculator = FallbackTreatmentCalculator(cfg['fallbackTreatments']))

async def _build_in_memory_factory_async(api_key, cfg, sdk_url=None, events_url=None, # pylint:disable=too-many-arguments,too-many-localsa
auth_api_base_url=None, streaming_api_base_url=None, telemetry_api_base_url=None,
Expand Down Expand Up @@ -750,7 +755,7 @@ async def _build_in_memory_factory_async(api_key, cfg, sdk_url=None, events_url=
recorder, manager,
telemetry_producer, telemetry_init_producer,
telemetry_submitter, manager_start_task=manager_start_task,
api_client=http_client)
api_client=http_client, fallback_treatment_calculator=FallbackTreatmentCalculator(cfg['fallbackTreatments']))

def _build_redis_factory(api_key, cfg):
"""Build and return a split factory with redis-based storage."""
Expand Down Expand Up @@ -828,7 +833,8 @@ def _build_redis_factory(api_key, cfg):
manager,
sdk_ready_flag=None,
telemetry_producer=telemetry_producer,
telemetry_init_producer=telemetry_init_producer
telemetry_init_producer=telemetry_init_producer,
fallback_treatment_calculator=FallbackTreatmentCalculator(cfg['fallbackTreatments'])
)
redundant_factory_count, active_factory_count = _get_active_and_redundant_count()
storages['telemetry'].record_active_and_redundant_factories(active_factory_count, redundant_factory_count)
Expand Down Expand Up @@ -910,7 +916,8 @@ async def _build_redis_factory_async(api_key, cfg):
manager,
telemetry_producer=telemetry_producer,
telemetry_init_producer=telemetry_init_producer,
telemetry_submitter=telemetry_submitter
telemetry_submitter=telemetry_submitter,
fallback_treatment_calculator=FallbackTreatmentCalculator(cfg['fallbackTreatments'])
)
redundant_factory_count, active_factory_count = _get_active_and_redundant_count()
await storages['telemetry'].record_active_and_redundant_factories(active_factory_count, redundant_factory_count)
Expand Down Expand Up @@ -992,7 +999,8 @@ def _build_pluggable_factory(api_key, cfg):
manager,
sdk_ready_flag=None,
telemetry_producer=telemetry_producer,
telemetry_init_producer=telemetry_init_producer
telemetry_init_producer=telemetry_init_producer,
fallback_treatment_calculator=FallbackTreatmentCalculator(cfg['fallbackTreatments'])
)
redundant_factory_count, active_factory_count = _get_active_and_redundant_count()
storages['telemetry'].record_active_and_redundant_factories(active_factory_count, redundant_factory_count)
Expand Down Expand Up @@ -1072,7 +1080,8 @@ async def _build_pluggable_factory_async(api_key, cfg):
manager,
telemetry_producer=telemetry_producer,
telemetry_init_producer=telemetry_init_producer,
telemetry_submitter=telemetry_submitter
telemetry_submitter=telemetry_submitter,
fallback_treatment_calculator=FallbackTreatmentCalculator(cfg['fallbackTreatments'])
)
redundant_factory_count, active_factory_count = _get_active_and_redundant_count()
await storages['telemetry'].record_active_and_redundant_factories(active_factory_count, redundant_factory_count)
Expand Down Expand Up @@ -1150,6 +1159,7 @@ def _build_localhost_factory(cfg):
telemetry_producer=telemetry_producer,
telemetry_init_producer=telemetry_producer.get_telemetry_init_producer(),
telemetry_submitter=LocalhostTelemetrySubmitter(),
fallback_treatment_calculator=FallbackTreatmentCalculator(cfg['fallbackTreatments'])
)

async def _build_localhost_factory_async(cfg):
Expand Down Expand Up @@ -1220,7 +1230,8 @@ async def _build_localhost_factory_async(cfg):
telemetry_producer=telemetry_producer,
telemetry_init_producer=telemetry_producer.get_telemetry_init_producer(),
telemetry_submitter=LocalhostTelemetrySubmitterAsync(),
manager_start_task=manager_start_task
manager_start_task=manager_start_task,
fallback_treatment_calculator=FallbackTreatmentCalculator(cfg['fallbackTreatments'])
)

def get_factory(api_key, **kwargs):
Expand Down
37 changes: 34 additions & 3 deletions splitio/client/input_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,16 @@
from splitio.client.key import Key
from splitio.client import client
from splitio.engine.evaluator import CONTROL
from splitio.models.fallback_treatment import FallbackTreatment


_LOGGER = logging.getLogger(__name__)
MAX_LENGTH = 250
EVENT_TYPE_PATTERN = r'^[a-zA-Z0-9][-_.:a-zA-Z0-9]{0,79}$'
MAX_PROPERTIES_LENGTH_BYTES = 32768
_FLAG_SETS_REGEX = '^[a-z0-9][_a-z0-9]{0,49}$'

_FALLBACK_TREATMENT_REGEX = '^[0-9]+[.a-zA-Z0-9_-]*$|^[a-zA-Z]+[a-zA-Z0-9_-]*$'
_FALLBACK_TREATMENT_SIZE = 100

def _check_not_null(value, name, operation):
"""
Expand Down Expand Up @@ -500,7 +502,7 @@ def validate_feature_flags_get_treatments( # pylint: disable=invalid-name
valid_feature_flags.append(ff)
return valid_feature_flags

def generate_control_treatments(feature_flags):
def generate_control_treatments(feature_flags, fallback_treatment_calculator):
"""
Generate valid feature flags to control.

Expand All @@ -515,7 +517,11 @@ def generate_control_treatments(feature_flags):
to_return = {}
for feature_flag in feature_flags:
if isinstance(feature_flag, str) and len(feature_flag.strip())> 0:
to_return[feature_flag] = (CONTROL, None)
fallback_treatment = fallback_treatment_calculator.resolve(feature_flag, "")
treatment = fallback_treatment.treatment
config = fallback_treatment.config

to_return[feature_flag] = (treatment, config)
return to_return


Expand Down Expand Up @@ -712,3 +718,28 @@ def validate_flag_sets(flag_sets, method_name):
sanitized_flag_sets.add(flag_set)

return list(sanitized_flag_sets)

def validate_fallback_treatment(fallback_treatment):
if not isinstance(fallback_treatment, FallbackTreatment):
_LOGGER.warning("Config: Fallback treatment instance should be FallbackTreatment, input is discarded")
return False

if not isinstance(fallback_treatment.treatment, str):
_LOGGER.warning("Config: Fallback treatment value should be str type, input is discarded")
return False

if not validate_regex_name(fallback_treatment.treatment):
_LOGGER.warning("Config: Fallback treatment should match regex %s", _FALLBACK_TREATMENT_REGEX)
return False

if len(fallback_treatment.treatment) > _FALLBACK_TREATMENT_SIZE:
_LOGGER.warning("Config: Fallback treatment size should not exceed %s characters", _FALLBACK_TREATMENT_SIZE)
return False

return True

def validate_regex_name(name):
if re.match(_FALLBACK_TREATMENT_REGEX, name) == None:
return False

return True
2 changes: 1 addition & 1 deletion splitio/client/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,4 @@ def get_metadata(config):
"""
version = 'python-%s' % __version__
ip_address, hostname = _get_hostname_and_ip(config)
return SdkMetadata(version, hostname, ip_address)
return SdkMetadata(version, hostname, ip_address)
12 changes: 9 additions & 3 deletions splitio/engine/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@
class Evaluator(object): # pylint: disable=too-few-public-methods
"""Split Evaluator class."""

def __init__(self, splitter):
def __init__(self, splitter, fallback_treatment_calculator=None):
"""
Construct a Evaluator instance.

:param splitter: partition object.
:type splitter: splitio.engine.splitters.Splitters
"""
self._splitter = splitter
self._fallback_treatment_calculator = fallback_treatment_calculator

def eval_many_with_context(self, key, bucketing, features, attrs, ctx):
"""
Expand All @@ -51,6 +52,10 @@ def eval_with_context(self, key, bucketing, feature_name, attrs, ctx):
if not feature:
_LOGGER.warning('Unknown or invalid feature: %s', feature)
label = Label.SPLIT_NOT_FOUND
fallback_treatment = self._fallback_treatment_calculator.resolve(feature_name, label)
label = fallback_treatment.label
_treatment = fallback_treatment.treatment
config = fallback_treatment.config
else:
_change_number = feature.change_number
if feature.killed:
Expand All @@ -59,17 +64,18 @@ def eval_with_context(self, key, bucketing, feature_name, attrs, ctx):
else:
label, _treatment = self._check_prerequisites(feature, bucketing, key, attrs, ctx, label, _treatment)
label, _treatment = self._get_treatment(feature, bucketing, key, attrs, ctx, label, _treatment)
config = feature.get_configurations_for(_treatment)

return {
'treatment': _treatment,
'configurations': feature.get_configurations_for(_treatment) if feature else None,
'configurations': config,
'impression': {
'label': label,
'change_number': _change_number
},
'impressions_disabled': feature.impressions_disabled if feature else None
}

def _get_treatment(self, feature, bucketing, key, attrs, ctx, label, _treatment):
if _treatment == CONTROL:
treatment, label = self._treatment_for_flag(feature, key, bucketing, attrs, ctx)
Expand Down
Loading