diff --git a/ld_openfeature/provider.py b/ld_openfeature/provider.py index 55fe421..d2acca1 100644 --- a/ld_openfeature/provider.py +++ b/ld_openfeature/provider.py @@ -1,12 +1,15 @@ +import threading from typing import Any, List, Optional, Union from ldclient import LDClient, Config +from ldclient.interfaces import DataSourceStatus, FlagChange, DataSourceState from openfeature.evaluation_context import EvaluationContext -from openfeature.exception import ErrorCode +from openfeature.exception import ErrorCode, ProviderFatalError from openfeature.flag_evaluation import FlagResolutionDetails, FlagType, Reason from openfeature.hook import Hook from openfeature.provider.metadata import Metadata -from openfeature.provider.provider import AbstractProvider +from openfeature.provider import AbstractProvider +from openfeature.event import ProviderEventDetails from ld_openfeature.impl.context_converter import EvaluationContextConverter from ld_openfeature.impl.details_converter import ResolutionDetailsConverter @@ -19,7 +22,60 @@ def __init__(self, config: Config): self.__context_converter = EvaluationContextConverter() self.__details_converter = ResolutionDetailsConverter() + def __handle_data_source_status(self, status: DataSourceStatus): + state = status.state + if state == DataSourceState.INITIALIZING: + return + elif state == DataSourceState.VALID: + self.emit_provider_ready(ProviderEventDetails()) + elif state == DataSourceState.OFF: + error_message = self.__get_message(status, + "the provider has encountered a permanent error or has been shutdown") + self.emit_provider_error(ProviderEventDetails(error_code=ErrorCode.PROVIDER_FATAL, + message=error_message)) + elif state == DataSourceState.INTERRUPTED: + error_message = self.__get_message(status, "encountered an unknown error") + self.emit_provider_stale(ProviderEventDetails(message=error_message)) + + # For now treat an unknown state as no change. + + def __handle_flag_change(self, change: FlagChange): + self.emit_provider_configuration_changed(ProviderEventDetails(flags_changed=[change.key])) + pass + + def initialize(self, evaluation_context: EvaluationContext): + ready_event = threading.Event() + + def ready_handler(status: DataSourceStatus): + if status.state == DataSourceState.VALID: + ready_event.set() + elif status.state == DataSourceState.OFF: + ready_event.set() + + # We listen just to handle the ready event. We do not emit events because the client emits them for us. + self.__client.data_source_status_provider.add_listener(ready_handler) + + # Check for conditions that may have happened before we added the listener. + if self.__client.data_source_status_provider.status.state == DataSourceState.OFF: + ready_event.set() + + if self.__client.is_initialized(): + ready_event.set() + + ready_event.wait() + + self.__client.data_source_status_provider.remove_listener(ready_handler) + + if not self.__client.is_initialized(): + raise ProviderFatalError(error_message="launchdarkly client initialization failed") + + # Listen to new status events and emit them. + self.__client.data_source_status_provider.add_listener(self.__handle_data_source_status) + self.__client.flag_tracker.add_listener(self.__handle_flag_change) + def shutdown(self): + self.__client.data_source_status_provider.remove_listener(self.__handle_data_source_status) + self.__client.flag_tracker.remove_listener(self.__handle_flag_change) self.__client.close() def get_metadata(self) -> Metadata: @@ -73,7 +129,8 @@ def resolve_object_details( """Resolves the flag value for the provided flag key as a list or dictionary""" return self.__resolve_value(FlagType(FlagType.OBJECT), flag_key, default_value, evaluation_context) - def __resolve_value(self, flag_type: FlagType, flag_key: str, default_value: Any, evaluation_context: Optional[EvaluationContext] = None) -> FlagResolutionDetails: + def __resolve_value(self, flag_type: FlagType, flag_key: str, default_value: Any, + evaluation_context: Optional[EvaluationContext] = None) -> FlagResolutionDetails: if evaluation_context is None: return FlagResolutionDetails( value=default_value, @@ -103,9 +160,16 @@ def __resolve_value(self, flag_type: FlagType, flag_key: str, default_value: Any return self.__details_converter.to_resolution_details(result) - def __mismatched_type_details(self, default_value: Any) -> FlagResolutionDetails: + @staticmethod + def __mismatched_type_details(default_value: Any) -> FlagResolutionDetails: return FlagResolutionDetails( value=default_value, reason=Reason(Reason.ERROR), error_code=ErrorCode.TYPE_MISMATCH ) + + @staticmethod + def __get_message(status: DataSourceStatus, default: str): + if status.error and status.error.message: + return status.error.message + return default diff --git a/pyproject.toml b/pyproject.toml index 85709cc..008194b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -28,7 +28,7 @@ packages = [ [tool.poetry.dependencies] python = "^3.8" -openfeature-sdk = ">=0.4.2,<1" +openfeature-sdk = ">=0.7.0,<1" launchdarkly-server-sdk = "<10" diff --git a/tests/test_data_sources.py b/tests/test_data_sources.py new file mode 100644 index 0000000..2b24cea --- /dev/null +++ b/tests/test_data_sources.py @@ -0,0 +1,134 @@ +import threading +import time +from typing import Optional + +from ldclient import Config +from ldclient.integrations.test_data import TestData +from ldclient.interfaces import UpdateProcessor, DataSourceUpdateSink, DataSourceState, DataSourceErrorInfo, \ + DataSourceErrorKind +from ldclient.versioned_data_kind import FEATURES + + +class FailingDataSource(UpdateProcessor): + def __init__(self, config: Config, store, ready: threading.Event): + self._data_source_update_sink: Optional[DataSourceUpdateSink] = config.data_source_update_sink + self._ready = ready + + def start(self): + if self._data_source_update_sink is None: + return + + self._ready.set() + + self._data_source_update_sink.update_status( + DataSourceState.OFF, + DataSourceErrorInfo( + DataSourceErrorKind.ERROR_RESPONSE, + 401, + time.time(), + str("Bad things") + ) + ) + + def stop(self): + pass + + def is_alive(self): + return False + + def initialized(self): + return False + + +class DelayedFailingDataSource(UpdateProcessor): + def __init__(self, config: Config, store, ready: threading.Event): + self._data_source_update_sink: Optional[DataSourceUpdateSink] = config.data_source_update_sink + self._ready = ready + + def start(self): + if self._data_source_update_sink is None: + return + + self._ready.set() + + def data_source_failure(): + self._data_source_update_sink.update_status( + DataSourceState.OFF, + DataSourceErrorInfo( + DataSourceErrorKind.ERROR_RESPONSE, + 401, + time.time(), + str("Bad things") + ) + ) + + threading.Timer(0.1, data_source_failure).start() + + def stop(self): + pass + + def is_alive(self): + return False + + def initialized(self): + return False + + +class StaleDataSource(UpdateProcessor): + def __init__(self, config: Config, store, ready: threading.Event): + self._data_source_update_sink: Optional[DataSourceUpdateSink] = config.data_source_update_sink + self._ready = ready + + def start(self): + self._ready.set() + self._data_source_update_sink.update_status(DataSourceState.VALID, None) + + def data_source_interrupted(): + self._data_source_update_sink.update_status( + DataSourceState.INTERRUPTED, + DataSourceErrorInfo( + DataSourceErrorKind.ERROR_RESPONSE, + 408, + time.time(), + str("Less bad things") + ) + ) + + threading.Timer(0.1, data_source_interrupted).start() + + def stop(self): + pass + + def is_alive(self): + return False + + def initialized(self): + return True + + +class UpdatingDataSource(UpdateProcessor): + def __init__(self, config: Config, store, ready: threading.Event): + self._data_source_update_sink: Optional[DataSourceUpdateSink] = config.data_source_update_sink + self._ready = ready + + def start(self): + self._ready.set() + self._data_source_update_sink.init({}) + self._data_source_update_sink.update_status(DataSourceState.VALID, None) + + def update_data(): + # The test_data_source is only used to access the flag builder. + # We call _build here, once TestData supports change handlers we should remove this. + self._data_source_update_sink.upsert(FEATURES, + TestData().data_source().flag("potato").on(True)._build(1)) + + threading.Timer(0.1, update_data).start() + + def stop(self): + pass + + def is_alive(self): + return False + + def initialized(self): + return True diff --git a/tests/test_provider.py b/tests/test_provider.py index 3c6bce6..851fb62 100644 --- a/tests/test_provider.py +++ b/tests/test_provider.py @@ -1,3 +1,4 @@ +import threading from typing import List, Union from unittest.mock import patch @@ -6,10 +7,13 @@ from ldclient.evaluation import EvaluationDetail from ldclient.integrations.test_data import TestData from openfeature.evaluation_context import EvaluationContext +from openfeature.event import ProviderEvent, EventDetails from openfeature.exception import ErrorCode from openfeature.flag_evaluation import Reason +from openfeature import api from ld_openfeature import LaunchDarklyProvider +from tests.test_data_sources import FailingDataSource, StaleDataSource, UpdatingDataSource, DelayedFailingDataSource @pytest.fixture @@ -47,7 +51,8 @@ def test_not_providing_context_returns_error(provider: LaunchDarklyProvider): assert resolution_details.error_code == ErrorCode.TARGETING_KEY_MISSING -def test_evaluation_results_are_converted_to_details(provider: LaunchDarklyProvider, evaluation_context: EvaluationContext): +def test_evaluation_results_are_converted_to_details(provider: LaunchDarklyProvider, + evaluation_context: EvaluationContext): resolution_details = provider.resolve_boolean_details("fallthrough-boolean", True, evaluation_context) assert resolution_details.value is True @@ -56,7 +61,8 @@ def test_evaluation_results_are_converted_to_details(provider: LaunchDarklyProvi assert resolution_details.error_code is None -def test_evaluation_error_results_are_converted_correctly(provider: LaunchDarklyProvider, evaluation_context: EvaluationContext): +def test_evaluation_error_results_are_converted_correctly(provider: LaunchDarklyProvider, + evaluation_context: EvaluationContext): detail = EvaluationDetail(True, None, {'kind': 'ERROR', 'errorKind': 'CLIENT_NOT_READY'}) with patch.object(LDClient, 'variation_detail', lambda self, _key, _context, _default: detail): resolution_details = provider.resolve_boolean_details("flag-key", True, evaluation_context) @@ -67,7 +73,8 @@ def test_evaluation_error_results_are_converted_correctly(provider: LaunchDarkly assert resolution_details.error_code == ErrorCode.PROVIDER_NOT_READY -def test_invalid_types_generate_type_mismatch_results(provider: LaunchDarklyProvider, evaluation_context: EvaluationContext): +def test_invalid_types_generate_type_mismatch_results(provider: LaunchDarklyProvider, + evaluation_context: EvaluationContext): resolution_details = provider.resolve_string_details("fallthrough-boolean", "default-value", evaluation_context) assert resolution_details.value == "default-value" @@ -128,3 +135,111 @@ def test_logger_changes_should_cascade_to_evaluation_converter(provider: LaunchD assert len(caplog.records) == 1 assert caplog.records[0].message == "'kind' was set to a non-string value; defaulting to user" + + +def test_provider_emits_ready_event_when_immediately_ready(): + ld_provider_ready_count = 0 + lock = threading.Lock() + + def handle_status(details: EventDetails): + if details.provider_name == 'launchdarkly-openfeature-server': + nonlocal lock + nonlocal ld_provider_ready_count + with lock: + ld_provider_ready_count = ld_provider_ready_count + 1 + + # At the time of implementation this handler runs synchronously on the same + # thread as initialization. The lock is in case this behavior changes. + api.add_handler(ProviderEvent.PROVIDER_READY, handle_status) + + openfeature_provider = LaunchDarklyProvider(Config("", offline=True)) + api.set_provider(openfeature_provider) + + with lock: + assert ld_provider_ready_count == 1 + + api.shutdown() + + +def test_provider_emits_error_event_immediately_failed(): + ld_provider_error_count = 0 + lock = threading.Lock() + + def handle_status(details: EventDetails): + if details.provider_name == 'launchdarkly-openfeature-server': + nonlocal lock + nonlocal ld_provider_error_count + with lock: + ld_provider_error_count = ld_provider_error_count + 1 + + api.add_handler(ProviderEvent.PROVIDER_ERROR, handle_status) + + openfeature_provider = LaunchDarklyProvider( + Config("", update_processor_class=FailingDataSource, send_events=False)) + + api.set_provider(openfeature_provider) + + with lock: + assert ld_provider_error_count == 1 + + api.shutdown() + + +def test_provider_emits_error_event_delayed_failure(): + ld_provider_error_count = 0 + lock = threading.Lock() + + def handle_status(details: EventDetails): + if details.provider_name == 'launchdarkly-openfeature-server': + nonlocal lock + nonlocal ld_provider_error_count + with lock: + ld_provider_error_count = ld_provider_error_count + 1 + + api.add_handler(ProviderEvent.PROVIDER_ERROR, handle_status) + + openfeature_provider = LaunchDarklyProvider( + Config("", update_processor_class=DelayedFailingDataSource, send_events=False)) + + api.set_provider(openfeature_provider) + + with lock: + assert ld_provider_error_count == 1 + + api.shutdown() + + +def test_provider_emits_stale_event(): + thread_event = threading.Event() + + def handle_status(details: EventDetails): + if details.provider_name == 'launchdarkly-openfeature-server': + thread_event.set() + + api.add_handler(ProviderEvent.PROVIDER_STALE, handle_status) + + openfeature_provider = LaunchDarklyProvider(Config("", update_processor_class=StaleDataSource, send_events=False)) + api.set_provider(openfeature_provider) + + assert thread_event.wait(timeout=5) + + api.shutdown() + + +def test_provider_emits_configuration_event(): + thread_event = threading.Event() + + provider = LaunchDarklyProvider(Config("", update_processor_class=UpdatingDataSource, send_events=False)) + + def handle_change(details: EventDetails): + assert details.flags_changed is not None + assert len(details.flags_changed) == 1 + assert details.flags_changed[0] == "potato" + thread_event.set() + + api.add_handler(ProviderEvent.PROVIDER_CONFIGURATION_CHANGED, handle_change) + api.set_provider(provider) + + assert thread_event.wait(timeout=5) + + api.shutdown()