diff --git a/.readthedocs.yaml b/.readthedocs.yaml index d3ff610..65105e4 100644 --- a/.readthedocs.yaml +++ b/.readthedocs.yaml @@ -13,6 +13,8 @@ sphinx: # Fail on all warnings to avoid broken references fail_on_warning: true +formats: all + python: install: - method: pip diff --git a/DEVELOPING.md b/DEVELOPING.md index 7741ca3..7005fc2 100644 --- a/DEVELOPING.md +++ b/DEVELOPING.md @@ -4,7 +4,7 @@ This repository contains the Python SDK for INTERSECT. ## Documentation -Documentation for the INTERSECT Python SDK can be viewed at http://10.64.193.144:30002. The documentation is generated with [Sphinx](https://www.sphinx-doc.org) from the `docs` directory. See the documentation for more information about installation, usage, and examples of the Python SDK for INTERSECT. +Documentation for the INTERSECT Python SDK can be viewed at https://intersect-python-sdk.readthedocs.io/ . The documentation is generated with [Sphinx](https://www.sphinx-doc.org) from the `docs` directory. See the documentation for more information about installation, usage, and examples of the Python SDK for INTERSECT. ## Quickstart (developers) diff --git a/src/intersect_sdk/client.py b/src/intersect_sdk/client.py index b8f5d59..c8de147 100644 --- a/src/intersect_sdk/client.py +++ b/src/intersect_sdk/client.py @@ -13,7 +13,9 @@ from __future__ import annotations import time -from typing import TYPE_CHECKING +from collections import defaultdict +from threading import Event, Thread +from typing import TYPE_CHECKING, Any from uuid import uuid4 from pydantic import ValidationError @@ -46,6 +48,7 @@ from .client_callback_definitions import ( INTERSECT_CLIENT_EVENT_CALLBACK_TYPE, INTERSECT_CLIENT_RESPONSE_CALLBACK_TYPE, + INTERSECT_CLIENT_TIMEOUT_CALLBACK_TYPE, ) from .shared_callback_definitions import IntersectDirectMessageParams @@ -70,6 +73,7 @@ def __init__( config: IntersectClientConfig, user_callback: INTERSECT_CLIENT_RESPONSE_CALLBACK_TYPE | None = None, event_callback: INTERSECT_CLIENT_EVENT_CALLBACK_TYPE | None = None, + timeout_callback: INTERSECT_CLIENT_TIMEOUT_CALLBACK_TYPE | None = None, ) -> None: """The constructor performs almost all validation checks necessary to function in the INTERSECT ecosystem, with the exception of checking connections/credentials to any backing services. @@ -78,6 +82,7 @@ def __init__( user_callback: The callback function you can use to handle response messages from Services. If this is left empty, you can only send a single message event_callback: The callback function you can use to handle events from any Service. + timeout_callback: The callback function you can use to handle request timeouts. """ # this is called here in case a user created the object using "IntersectClientConfig.model_construct()" to skip validation config = IntersectClientConfig.model_validate(config) @@ -86,6 +91,8 @@ def __init__( die('user_callback function should be a callable function if defined') if event_callback is not None and not callable(event_callback): die('event_callback function should be a callable function if defined') + if timeout_callback is not None and not callable(timeout_callback): + die('timeout_callback function should be a callable function if defined') if not user_callback and not event_callback: die('must define at least one of user_callback or event_callback') if not user_callback: @@ -145,6 +152,10 @@ def __init__( ) self._user_callback = user_callback self._event_callback = event_callback + self._timeout_callback = timeout_callback + self._pending_requests: defaultdict[str, list[dict[str, Any]]] = defaultdict(list) + self._stop_timeout_thread = Event() + self._timeout_thread = Thread(target=self._check_timeouts, daemon=True) @final def startup(self) -> Self: @@ -171,6 +182,8 @@ def startup(self) -> Self: # and has nothing to do with the Service at all. time.sleep(1.0) + self._timeout_thread.start() + if self._resend_initial_messages or not self._sent_initial_messages: for message in self._initial_messages: self._send_userspace_message(message) @@ -199,11 +212,31 @@ def shutdown(self, reason: str | None = None) -> Self: """ logger.info(f'Client is shutting down (reason: {reason})') + self._stop_timeout_thread.set() + self._timeout_thread.join() self._control_plane_manager.disconnect() logger.info('Client shutdown complete') return self + def _check_timeouts(self) -> None: + """Periodically check for timed out requests.""" + while not self._stop_timeout_thread.is_set(): + now = time.time() + for operation_id, requests in list(self._pending_requests.items()): + for request in requests: + if now > request['timeout']: + try: + request['on_timeout'](operation_id) + except Exception as e: # noqa: BLE001 + logger.warning( + f'Exception from timeout callback for operation {operation_id}:\n{e}' + ) + requests.remove(request) + if not requests: + del self._pending_requests[operation_id] + time.sleep(0.1) # Sleep for a short duration + @final def is_connected(self) -> bool: """Check if we're currently connected to the INTERSECT brokers. @@ -257,6 +290,15 @@ def _handle_userspace_message( send_os_signal() return + # If not in pending requests, it already timed out, so ignore this response + if headers.operation_id in self._pending_requests: + del self._pending_requests[headers.operation_id] + else: + logger.debug( + f'Received response for operation {headers.operation_id} that already timed out, ignoring' + ) + return + # TWO: GET DATA FROM APPROPRIATE DATA STORE AND DESERIALIZE IT try: request_params = self._data_plane_manager.incoming_message_data_handler( @@ -442,3 +484,11 @@ def _send_userspace_message(self, params: IntersectDirectMessageParams) -> None: self._control_plane_manager.publish_message( channel, payload, params.content_type, headers, persist=False ) + + if params.timeout is not None and params.on_timeout is not None: + self._pending_requests[params.operation].append( + { + 'timeout': time.time() + params.timeout, + 'on_timeout': params.on_timeout, + } + ) diff --git a/src/intersect_sdk/client_callback_definitions.py b/src/intersect_sdk/client_callback_definitions.py index 78e250e..f647f39 100644 --- a/src/intersect_sdk/client_callback_definitions.py +++ b/src/intersect_sdk/client_callback_definitions.py @@ -15,6 +15,15 @@ IntersectEventMessageParams, ) +INTERSECT_CLIENT_TIMEOUT_CALLBACK_TYPE = Callable[[str], None] +""" +This is a callable function type which should be defined by the user. + +Params + The SDK will send the function one argument: + 1) The operation ID of the request that timed out. +""" + @final class IntersectClientCallback(BaseModel): diff --git a/src/intersect_sdk/shared_callback_definitions.py b/src/intersect_sdk/shared_callback_definitions.py index 4172cc9..853a5bc 100644 --- a/src/intersect_sdk/shared_callback_definitions.py +++ b/src/intersect_sdk/shared_callback_definitions.py @@ -1,5 +1,8 @@ """Callback definitions shared between Services, Capabilities, and Clients.""" +from __future__ import annotations + +from collections.abc import Callable from typing import Annotated, Any, TypeAlias from pydantic import BaseModel, ConfigDict, Field @@ -73,6 +76,16 @@ class IntersectDirectMessageParams(BaseModel): # pydantic config model_config = ConfigDict(revalidate_instances='always') + timeout: float | None = None + """ + The timeout in seconds for the request. If the request is not fulfilled within this time, the on_timeout callback will be called. + """ + + on_timeout: Callable[[], None] | None = None + """ + The callback to call if the request times out. + """ + class IntersectEventMessageParams(BaseModel): """Public facing properties of events the Client/Service wants to listen to.""" diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py new file mode 100644 index 0000000..b98cced --- /dev/null +++ b/tests/unit/test_client.py @@ -0,0 +1,236 @@ +from __future__ import annotations + +import time +from datetime import datetime, timezone +from threading import Event, Thread +from unittest.mock import MagicMock +from uuid import uuid4 + +from intersect_sdk.client import IntersectClient +from intersect_sdk.client_callback_definitions import IntersectClientCallback +from intersect_sdk.config.client import IntersectClientConfig +from intersect_sdk.shared_callback_definitions import IntersectDirectMessageParams + + +def test_timeout_callback_is_called() -> None: + """Tests that the timeout callback is called when a request times out.""" + config = IntersectClientConfig( + system='test', + facility='test', + organization='test', + brokers=[ + { + 'host': 'localhost', + 'port': 1883, + 'protocol': 'mqtt5.0', + 'username': 'test', + 'password': 'test', + } + ], + initial_message_event_config=IntersectClientCallback(), + terminate_after_initial_messages=True, + ) + + timeout_called = [] + + def on_timeout(operation_id): + timeout_called.append(operation_id) + + def user_callback(source, operation_id, has_error, payload): + pass + + client = IntersectClient(config, user_callback=user_callback) + + # Mock the control plane and data plane managers + client._control_plane_manager = MagicMock() + client._control_plane_manager.is_connected.return_value = True + client._control_plane_manager.considered_unrecoverable.return_value = False + client._data_plane_manager = MagicMock() + client._data_plane_manager.outgoing_message_data_handler.return_value = b'test' + + # Manually start the timeout thread (since startup() would try to connect) + client._stop_timeout_thread = Event() + client._timeout_thread = Thread(target=client._check_timeouts, daemon=True) + client._timeout_thread.start() + + message = IntersectDirectMessageParams( + destination='test.test.test.test.test', + operation='test_op', + payload='test', + timeout=0.1, + on_timeout=on_timeout, + ) + + client._send_userspace_message(message) + + # Wait for timeout to trigger + time.sleep(0.3) + + assert len(timeout_called) == 1 + assert timeout_called[0] == 'test_op' + + # Clean up + client._stop_timeout_thread.set() + client._timeout_thread.join() + + +def test_timeout_callback_is_not_called() -> None: + """Tests that the timeout callback is not called when a request is fulfilled.""" + config = IntersectClientConfig( + system='test', + facility='test', + organization='test', + brokers=[ + { + 'host': 'localhost', + 'port': 1883, + 'protocol': 'mqtt5.0', + 'username': 'test', + 'password': 'test', + } + ], + initial_message_event_config=IntersectClientCallback(), + terminate_after_initial_messages=True, + ) + + timeout_called = [] + + def on_timeout(operation_id): + timeout_called.append(operation_id) + + user_callback_called = [] + + def user_callback(source, operation_id, has_error, payload): + user_callback_called.append(True) + + client = IntersectClient(config, user_callback=user_callback) + + # Mock the control plane and data plane managers + client._control_plane_manager = MagicMock() + client._control_plane_manager.is_connected.return_value = True + client._control_plane_manager.considered_unrecoverable.return_value = False + client._data_plane_manager = MagicMock() + client._data_plane_manager.outgoing_message_data_handler.return_value = b'test' + client._data_plane_manager.incoming_message_data_handler.return_value = b'"test"' + + # Manually start the timeout thread + client._stop_timeout_thread = Event() + client._timeout_thread = Thread(target=client._check_timeouts, daemon=True) + client._timeout_thread.start() + + message = IntersectDirectMessageParams( + destination='test.test.test.test.test', + operation='test_op', + payload='test', + timeout=0.5, + on_timeout=on_timeout, + ) + + client._send_userspace_message(message) + + # Simulate receiving a response before the timeout + response_headers = { + 'message_id': str(uuid4()), + 'source': 'test.test.test.test.test', + 'destination': client._hierarchy.hierarchy_string('.'), + 'has_error': 'false', + 'sdk_version': '0.8.0', + 'created_at': datetime.now(timezone.utc).isoformat(), + 'data_handler': '0', # IntersectDataHandler.MESSAGE + 'operation_id': 'test_op', + } + + client._handle_userspace_message(b'"test"', 'application/json', response_headers) + + # Wait to make sure timeout doesn't fire + time.sleep(0.7) + + assert len(timeout_called) == 0 + assert len(user_callback_called) == 1 + + # Clean up + client._stop_timeout_thread.set() + client._timeout_thread.join() + + +def test_response_after_timeout_is_ignored() -> None: + """Tests that responses arriving after timeout are ignored and user_callback is not called.""" + config = IntersectClientConfig( + system='test', + facility='test', + organization='test', + brokers=[ + { + 'host': 'localhost', + 'port': 1883, + 'protocol': 'mqtt5.0', + 'username': 'test', + 'password': 'test', + } + ], + initial_message_event_config=IntersectClientCallback(), + terminate_after_initial_messages=True, + ) + + timeout_called = [] + + def on_timeout(operation_id): + timeout_called.append(operation_id) + + user_callback_called = [] + + def user_callback(source, operation_id, has_error, payload): + user_callback_called.append(True) + + client = IntersectClient(config, user_callback=user_callback) + + # Mock the control plane and data plane managers + client._control_plane_manager = MagicMock() + client._control_plane_manager.is_connected.return_value = True + client._control_plane_manager.considered_unrecoverable.return_value = False + client._data_plane_manager = MagicMock() + client._data_plane_manager.outgoing_message_data_handler.return_value = b'test' + client._data_plane_manager.incoming_message_data_handler.return_value = b'"test"' + + # Manually start the timeout thread + client._stop_timeout_thread = Event() + client._timeout_thread = Thread(target=client._check_timeouts, daemon=True) + client._timeout_thread.start() + + message = IntersectDirectMessageParams( + destination='test.test.test.test.test', + operation='test_op', + payload='test', + timeout=0.1, + on_timeout=on_timeout, + ) + + client._send_userspace_message(message) + + # Wait for timeout to trigger + time.sleep(0.3) + + # Timeout should have been called + assert len(timeout_called) == 1 + assert timeout_called[0] == 'test_op' + + # Now simulate receiving a late response after timeout + response_headers = { + 'message_id': str(uuid4()), + 'source': 'test.test.test.test.test', + 'destination': client._hierarchy.hierarchy_string('.'), + 'has_error': 'false', + 'sdk_version': '0.8.0', + 'created_at': datetime.now(timezone.utc).isoformat(), + 'data_handler': '0', # IntersectDataHandler.MESSAGE + 'operation_id': 'test_op', + } + + client._handle_userspace_message(b'"test"', 'application/json', response_headers) + + # User callback should NOT have been called since the request already timed out + assert len(user_callback_called) == 0 + + # Clean up + client._stop_timeout_thread.set() + client._timeout_thread.join()