From 93789051425d7a3c21acbd97351f68e33af6596e Mon Sep 17 00:00:00 2001 From: Lance-Drane <35977511+Lance-Drane@users.noreply.github.com> Date: Fri, 12 Sep 2025 09:33:47 -0400 Subject: [PATCH 1/6] Update DEVELOPING.md --- DEVELOPING.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/DEVELOPING.md b/DEVELOPING.md index 7741ca3..7005fc2 100644 --- a/DEVELOPING.md +++ b/DEVELOPING.md @@ -4,7 +4,7 @@ This repository contains the Python SDK for INTERSECT. ## Documentation -Documentation for the INTERSECT Python SDK can be viewed at http://10.64.193.144:30002. The documentation is generated with [Sphinx](https://www.sphinx-doc.org) from the `docs` directory. See the documentation for more information about installation, usage, and examples of the Python SDK for INTERSECT. +Documentation for the INTERSECT Python SDK can be viewed at https://intersect-python-sdk.readthedocs.io/ . The documentation is generated with [Sphinx](https://www.sphinx-doc.org) from the `docs` directory. See the documentation for more information about installation, usage, and examples of the Python SDK for INTERSECT. ## Quickstart (developers) From 6d0789ef7a21290cb6e3b87bc5f4251492c56e65 Mon Sep 17 00:00:00 2001 From: Lance-Drane <35977511+Lance-Drane@users.noreply.github.com> Date: Mon, 22 Sep 2025 10:41:50 -0400 Subject: [PATCH 2/6] #35 - ReadTheDocs - enable PDF/etc. download options Closes #35 --- .readthedocs.yaml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.readthedocs.yaml b/.readthedocs.yaml index d3ff610..65105e4 100644 --- a/.readthedocs.yaml +++ b/.readthedocs.yaml @@ -13,6 +13,8 @@ sphinx: # Fail on all warnings to avoid broken references fail_on_warning: true +formats: all + python: install: - method: pip From 441bafb722c4aad15aea2fadc41ee43702a798d5 Mon Sep 17 00:00:00 2001 From: andrewfayres Date: Tue, 30 Sep 2025 13:04:36 -0500 Subject: [PATCH 3/6] Add timeout functionality to IntersectClient. --- src/intersect_sdk/client.py | 46 ++++ .../client_callback_definitions.py | 10 + .../shared_callback_definitions.py | 12 +- tests/unit/test_client.py | 228 ++++++++++++++++++ 4 files changed, 295 insertions(+), 1 deletion(-) create mode 100644 tests/unit/test_client.py diff --git a/src/intersect_sdk/client.py b/src/intersect_sdk/client.py index 7a2389c..2714693 100644 --- a/src/intersect_sdk/client.py +++ b/src/intersect_sdk/client.py @@ -13,6 +13,8 @@ from __future__ import annotations import time +from collections import defaultdict +from threading import Event, Thread from typing import TYPE_CHECKING from uuid import uuid4 @@ -47,6 +49,7 @@ from .client_callback_definitions import ( INTERSECT_CLIENT_EVENT_CALLBACK_TYPE, INTERSECT_CLIENT_RESPONSE_CALLBACK_TYPE, + INTERSECT_CLIENT_TIMEOUT_CALLBACK_TYPE, ) from .shared_callback_definitions import IntersectDirectMessageParams @@ -71,6 +74,7 @@ def __init__( config: IntersectClientConfig, user_callback: INTERSECT_CLIENT_RESPONSE_CALLBACK_TYPE | None = None, event_callback: INTERSECT_CLIENT_EVENT_CALLBACK_TYPE | None = None, + timeout_callback: INTERSECT_CLIENT_TIMEOUT_CALLBACK_TYPE | None = None, ) -> None: """The constructor performs almost all validation checks necessary to function in the INTERSECT ecosystem, with the exception of checking connections/credentials to any backing services. @@ -79,6 +83,7 @@ def __init__( user_callback: The callback function you can use to handle response messages from Services. If this is left empty, you can only send a single message event_callback: The callback function you can use to handle events from any Service. + timeout_callback: The callback function you can use to handle request timeouts. """ # this is called here in case a user created the object using "IntersectClientConfig.model_construct()" to skip validation config = IntersectClientConfig.model_validate(config) @@ -87,6 +92,8 @@ def __init__( die('user_callback function should be a callable function if defined') if event_callback is not None and not callable(event_callback): die('event_callback function should be a callable function if defined') + if timeout_callback is not None and not callable(timeout_callback): + die('timeout_callback function should be a callable function if defined') if not user_callback and not event_callback: die('must define at least one of user_callback or event_callback') if not user_callback: @@ -146,6 +153,10 @@ def __init__( ) self._user_callback = user_callback self._event_callback = event_callback + self._timeout_callback = timeout_callback + self._pending_requests: defaultdict[str, list] = defaultdict(list) + self._stop_timeout_thread = Event() + self._timeout_thread = Thread(target=self._check_timeouts, daemon=True) @final def startup(self) -> Self: @@ -172,6 +183,8 @@ def startup(self) -> Self: # and has nothing to do with the Service at all. time.sleep(1.0) + self._timeout_thread.start() + if self._resend_initial_messages or not self._sent_initial_messages: for message in self._initial_messages: self._send_userspace_message(message) @@ -200,11 +213,29 @@ def shutdown(self, reason: str | None = None) -> Self: """ logger.info(f'Client is shutting down (reason: {reason})') + self._stop_timeout_thread.set() + self._timeout_thread.join() self._control_plane_manager.disconnect() logger.info('Client shutdown complete') return self + def _check_timeouts(self) -> None: + """Periodically check for timed out requests.""" + while not self._stop_timeout_thread.is_set(): + now = time.time() + for operation_id, requests in list(self._pending_requests.items()): + for request in requests: + if now > request['timeout']: + try: + request['on_timeout'](operation_id) + except Exception as e: + logger.warning(f'Exception from timeout callback for operation {operation_id}:\n{e}') + requests.remove(request) + if not requests: + del self._pending_requests[operation_id] + time.sleep(0.1) # Sleep for a short duration + @final def is_connected(self) -> bool: """Check if we're currently connected to the INTERSECT brokers. @@ -258,6 +289,13 @@ def _handle_userspace_message(self, message: UserspaceMessage) -> None: send_os_signal() return + # If not in pending requests, it already timed out, so ignore this response + if message['operationId'] in self._pending_requests: + del self._pending_requests[message['operationId']] + else: + logger.debug(f'Received response for operation {message["operationId"]} that already timed out, ignoring') + return + # TWO: GET DATA FROM APPROPRIATE DATA STORE AND DESERIALIZE IT try: request_params = GENERIC_MESSAGE_SERIALIZER.validate_json( @@ -436,3 +474,11 @@ def _send_userspace_message(self, params: IntersectDirectMessageParams) -> None: # but cannot communicate the response to the Client. # in experiment controllers or production, you'll want to set persist to True self._control_plane_manager.publish_message(channel, msg, persist=False) + + if params.timeout is not None and params.on_timeout is not None: + self._pending_requests[params.operation].append( + { + 'timeout': time.time() + params.timeout, + 'on_timeout': params.on_timeout, + } + ) diff --git a/src/intersect_sdk/client_callback_definitions.py b/src/intersect_sdk/client_callback_definitions.py index 39a4019..640dea4 100644 --- a/src/intersect_sdk/client_callback_definitions.py +++ b/src/intersect_sdk/client_callback_definitions.py @@ -12,6 +12,16 @@ from .shared_callback_definitions import INTERSECT_JSON_VALUE, IntersectDirectMessageParams +INTERSECT_CLIENT_TIMEOUT_CALLBACK_TYPE = Callable[[str], None] +""" +This is a callable function type which should be defined by the user. + +Params + The SDK will send the function one argument: + 1) The operation ID of the request that timed out. +""" + + @final class IntersectClientCallback(BaseModel): """The value a user should return from ALL client callback functions. diff --git a/src/intersect_sdk/shared_callback_definitions.py b/src/intersect_sdk/shared_callback_definitions.py index fd1ff04..17cc724 100644 --- a/src/intersect_sdk/shared_callback_definitions.py +++ b/src/intersect_sdk/shared_callback_definitions.py @@ -1,6 +1,6 @@ """Callback definitions shared between Services, Capabilities, and Clients.""" -from typing import Any, Dict, List, Union +from typing import Any, Callable, Dict, List, Optional, Union from pydantic import BaseModel, ConfigDict, Field from typing_extensions import Annotated, TypeAlias @@ -65,3 +65,13 @@ class IntersectDirectMessageParams(BaseModel): # pydantic config model_config = ConfigDict(revalidate_instances='always') + + timeout: Optional[float] = None + """ + The timeout in seconds for the request. If the request is not fulfilled within this time, the on_timeout callback will be called. + """ + + on_timeout: Optional[Callable[[], None]] = None + """ + The callback to call if the request times out. + """ diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py new file mode 100644 index 0000000..d749935 --- /dev/null +++ b/tests/unit/test_client.py @@ -0,0 +1,228 @@ +from __future__ import annotations + +import time +from threading import Event, Thread +from unittest.mock import MagicMock, patch + +import pytest + +from intersect_sdk.client import IntersectClient +from intersect_sdk.client_callback_definitions import IntersectClientCallback +from intersect_sdk.config.client import IntersectClientConfig +from intersect_sdk.shared_callback_definitions import IntersectDirectMessageParams + + +def test_timeout_callback_is_called(): + """Tests that the timeout callback is called when a request times out.""" + config = IntersectClientConfig( + system='test', + facility='test', + organization='test', + brokers=[{'host': 'localhost', 'port': 1883, 'protocol': 'mqtt3.1.1', 'username': 'test', 'password': 'test'}], + initial_message_event_config=IntersectClientCallback(), + terminate_after_initial_messages=True, + ) + + timeout_called = [] + + def on_timeout(operation_id): + timeout_called.append(operation_id) + + def user_callback(source, operation_id, has_error, payload): + pass + + client = IntersectClient(config, user_callback=user_callback) + + # Mock the control plane and data plane managers + client._control_plane_manager = MagicMock() + client._control_plane_manager.is_connected.return_value = True + client._control_plane_manager.considered_unrecoverable.return_value = False + client._data_plane_manager = MagicMock() + client._data_plane_manager.outgoing_message_data_handler.return_value = b'test' + + # Manually start the timeout thread (since startup() would try to connect) + client._stop_timeout_thread = Event() + client._timeout_thread = Thread(target=client._check_timeouts, daemon=True) + client._timeout_thread.start() + + message = IntersectDirectMessageParams( + destination='test.test.test.test.test', + operation='test_op', + payload='test', + timeout=0.1, + on_timeout=on_timeout, + ) + + client._send_userspace_message(message) + + # Wait for timeout to trigger + time.sleep(0.3) + + assert len(timeout_called) == 1 + assert timeout_called[0] == 'test_op' + + # Clean up + client._stop_timeout_thread.set() + client._timeout_thread.join() + + +def test_timeout_callback_is_not_called(): + """Tests that the timeout callback is not called when a request is fulfilled.""" + config = IntersectClientConfig( + system='test', + facility='test', + organization='test', + brokers=[{'host': 'localhost', 'port': 1883, 'protocol': 'mqtt3.1.1', 'username': 'test', 'password': 'test'}], + initial_message_event_config=IntersectClientCallback(), + terminate_after_initial_messages=True, + ) + + timeout_called = [] + + def on_timeout(operation_id): + timeout_called.append(operation_id) + + user_callback_called = [] + + def user_callback(source, operation_id, has_error, payload): + user_callback_called.append(True) + return None + + client = IntersectClient(config, user_callback=user_callback) + + # Mock the control plane and data plane managers + client._control_plane_manager = MagicMock() + client._control_plane_manager.is_connected.return_value = True + client._control_plane_manager.considered_unrecoverable.return_value = False + client._data_plane_manager = MagicMock() + client._data_plane_manager.outgoing_message_data_handler.return_value = b'test' + client._data_plane_manager.incoming_message_data_handler.return_value = b'"test"' + + # Manually start the timeout thread + client._stop_timeout_thread = Event() + client._timeout_thread = Thread(target=client._check_timeouts, daemon=True) + client._timeout_thread.start() + + message = IntersectDirectMessageParams( + destination='test.test.test.test.test', + operation='test_op', + payload='test', + timeout=0.5, + on_timeout=on_timeout, + ) + + client._send_userspace_message(message) + + # Simulate receiving a response before the timeout + from datetime import datetime, timezone + from uuid import uuid4 + + response_message = { + 'messageId': str(uuid4()), + 'headers': { + 'source': 'test.test.test.test.test', + 'destination': client._hierarchy.hierarchy_string('.'), + 'has_error': False, + 'sdk_version': '0.8.0', + 'created_at': datetime.now(timezone.utc), + 'data_handler': 0, # IntersectDataHandler.MESSAGE + }, + 'operationId': 'test_op', + 'payload': 'test', + 'contentType': 'application/json', + } + + client._handle_userspace_message(response_message) + + # Wait to make sure timeout doesn't fire + time.sleep(0.7) + + assert len(timeout_called) == 0 + assert len(user_callback_called) == 1 + + # Clean up + client._stop_timeout_thread.set() + client._timeout_thread.join() + + +def test_response_after_timeout_is_ignored(): + """Tests that responses arriving after timeout are ignored and user_callback is not called.""" + config = IntersectClientConfig( + system='test', + facility='test', + organization='test', + brokers=[{'host': 'localhost', 'port': 1883, 'protocol': 'mqtt3.1.1', 'username': 'test', 'password': 'test'}], + initial_message_event_config=IntersectClientCallback(), + terminate_after_initial_messages=True, + ) + + timeout_called = [] + + def on_timeout(operation_id): + timeout_called.append(operation_id) + + user_callback_called = [] + + def user_callback(source, operation_id, has_error, payload): + user_callback_called.append(True) + return None + + client = IntersectClient(config, user_callback=user_callback) + + # Mock the control plane and data plane managers + client._control_plane_manager = MagicMock() + client._control_plane_manager.is_connected.return_value = True + client._control_plane_manager.considered_unrecoverable.return_value = False + client._data_plane_manager = MagicMock() + client._data_plane_manager.outgoing_message_data_handler.return_value = b'test' + client._data_plane_manager.incoming_message_data_handler.return_value = b'"test"' + + # Manually start the timeout thread + client._stop_timeout_thread = Event() + client._timeout_thread = Thread(target=client._check_timeouts, daemon=True) + client._timeout_thread.start() + + message = IntersectDirectMessageParams( + destination='test.test.test.test.test', + operation='test_op', + payload='test', + timeout=0.1, + on_timeout=on_timeout, + ) + + client._send_userspace_message(message) + + # Wait for timeout to trigger + time.sleep(0.3) + + # Timeout should have been called + assert len(timeout_called) == 1 + assert timeout_called[0] == 'test_op' + + # Now simulate receiving a late response after timeout + from datetime import datetime, timezone + from uuid import uuid4 + + response_message = { + 'messageId': str(uuid4()), + 'headers': { + 'source': 'test.test.test.test.test', + 'destination': client._hierarchy.hierarchy_string('.'), + 'has_error': False, + 'sdk_version': '0.8.0', + 'created_at': datetime.now(timezone.utc), + 'data_handler': 0, # IntersectDataHandler.MESSAGE + }, + 'operationId': 'test_op', + 'payload': 'test', + 'contentType': 'application/json', + } + + client._handle_userspace_message(response_message) + + # User callback should NOT have been called since the request already timed out + assert len(user_callback_called) == 0 + + # Clean up + client._stop_timeout_thread.set() + client._timeout_thread.join() From 1e9a75a7e8663b5128de57054dc0f716f2c75655 Mon Sep 17 00:00:00 2001 From: andrewfayres Date: Tue, 30 Sep 2025 13:40:13 -0500 Subject: [PATCH 4/6] Lint fixes --- src/intersect_sdk/client.py | 14 +++++--- .../client_callback_definitions.py | 1 - .../shared_callback_definitions.py | 8 +++-- tests/unit/test_client.py | 36 ++++++++++++++----- 4 files changed, 42 insertions(+), 17 deletions(-) diff --git a/src/intersect_sdk/client.py b/src/intersect_sdk/client.py index 2714693..3c00eb6 100644 --- a/src/intersect_sdk/client.py +++ b/src/intersect_sdk/client.py @@ -15,7 +15,7 @@ import time from collections import defaultdict from threading import Event, Thread -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any from uuid import uuid4 from pydantic import ValidationError @@ -154,7 +154,7 @@ def __init__( self._user_callback = user_callback self._event_callback = event_callback self._timeout_callback = timeout_callback - self._pending_requests: defaultdict[str, list] = defaultdict(list) + self._pending_requests: defaultdict[str, list[dict[str, Any]]] = defaultdict(list) self._stop_timeout_thread = Event() self._timeout_thread = Thread(target=self._check_timeouts, daemon=True) @@ -229,8 +229,10 @@ def _check_timeouts(self) -> None: if now > request['timeout']: try: request['on_timeout'](operation_id) - except Exception as e: - logger.warning(f'Exception from timeout callback for operation {operation_id}:\n{e}') + except Exception as e: # noqa: BLE001 + logger.warning( + f'Exception from timeout callback for operation {operation_id}:\n{e}' + ) requests.remove(request) if not requests: del self._pending_requests[operation_id] @@ -293,7 +295,9 @@ def _handle_userspace_message(self, message: UserspaceMessage) -> None: if message['operationId'] in self._pending_requests: del self._pending_requests[message['operationId']] else: - logger.debug(f'Received response for operation {message["operationId"]} that already timed out, ignoring') + logger.debug( + f'Received response for operation {message["operationId"]} that already timed out, ignoring' + ) return # TWO: GET DATA FROM APPROPRIATE DATA STORE AND DESERIALIZE IT diff --git a/src/intersect_sdk/client_callback_definitions.py b/src/intersect_sdk/client_callback_definitions.py index 640dea4..db90880 100644 --- a/src/intersect_sdk/client_callback_definitions.py +++ b/src/intersect_sdk/client_callback_definitions.py @@ -11,7 +11,6 @@ from .constants import SYSTEM_OF_SYSTEM_REGEX from .shared_callback_definitions import INTERSECT_JSON_VALUE, IntersectDirectMessageParams - INTERSECT_CLIENT_TIMEOUT_CALLBACK_TYPE = Callable[[str], None] """ This is a callable function type which should be defined by the user. diff --git a/src/intersect_sdk/shared_callback_definitions.py b/src/intersect_sdk/shared_callback_definitions.py index 17cc724..7b5071d 100644 --- a/src/intersect_sdk/shared_callback_definitions.py +++ b/src/intersect_sdk/shared_callback_definitions.py @@ -1,6 +1,8 @@ """Callback definitions shared between Services, Capabilities, and Clients.""" -from typing import Any, Callable, Dict, List, Optional, Union +from __future__ import annotations + +from typing import Any, Callable, Dict, List, Union from pydantic import BaseModel, ConfigDict, Field from typing_extensions import Annotated, TypeAlias @@ -66,12 +68,12 @@ class IntersectDirectMessageParams(BaseModel): # pydantic config model_config = ConfigDict(revalidate_instances='always') - timeout: Optional[float] = None + timeout: float | None = None """ The timeout in seconds for the request. If the request is not fulfilled within this time, the on_timeout callback will be called. """ - on_timeout: Optional[Callable[[], None]] = None + on_timeout: Callable[[], None] | None = None """ The callback to call if the request times out. """ diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index d749935..3cbfbf8 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -2,9 +2,7 @@ import time from threading import Event, Thread -from unittest.mock import MagicMock, patch - -import pytest +from unittest.mock import MagicMock from intersect_sdk.client import IntersectClient from intersect_sdk.client_callback_definitions import IntersectClientCallback @@ -18,7 +16,15 @@ def test_timeout_callback_is_called(): system='test', facility='test', organization='test', - brokers=[{'host': 'localhost', 'port': 1883, 'protocol': 'mqtt3.1.1', 'username': 'test', 'password': 'test'}], + brokers=[ + { + 'host': 'localhost', + 'port': 1883, + 'protocol': 'mqtt3.1.1', + 'username': 'test', + 'password': 'test', + } + ], initial_message_event_config=IntersectClientCallback(), terminate_after_initial_messages=True, ) @@ -72,7 +78,15 @@ def test_timeout_callback_is_not_called(): system='test', facility='test', organization='test', - brokers=[{'host': 'localhost', 'port': 1883, 'protocol': 'mqtt3.1.1', 'username': 'test', 'password': 'test'}], + brokers=[ + { + 'host': 'localhost', + 'port': 1883, + 'protocol': 'mqtt3.1.1', + 'username': 'test', + 'password': 'test', + } + ], initial_message_event_config=IntersectClientCallback(), terminate_after_initial_messages=True, ) @@ -86,7 +100,6 @@ def on_timeout(operation_id): def user_callback(source, operation_id, has_error, payload): user_callback_called.append(True) - return None client = IntersectClient(config, user_callback=user_callback) @@ -151,7 +164,15 @@ def test_response_after_timeout_is_ignored(): system='test', facility='test', organization='test', - brokers=[{'host': 'localhost', 'port': 1883, 'protocol': 'mqtt3.1.1', 'username': 'test', 'password': 'test'}], + brokers=[ + { + 'host': 'localhost', + 'port': 1883, + 'protocol': 'mqtt3.1.1', + 'username': 'test', + 'password': 'test', + } + ], initial_message_event_config=IntersectClientCallback(), terminate_after_initial_messages=True, ) @@ -165,7 +186,6 @@ def on_timeout(operation_id): def user_callback(source, operation_id, has_error, payload): user_callback_called.append(True) - return None client = IntersectClient(config, user_callback=user_callback) From 1ab0e1fa21d762bf8706f2d62a0b38b75e2910dd Mon Sep 17 00:00:00 2001 From: Lance-Drane Date: Wed, 15 Oct 2025 12:17:16 -0400 Subject: [PATCH 5/6] update client timeout code to use the new internal message structure Signed-off-by: Lance-Drane --- src/intersect_sdk/client.py | 8 +-- .../shared_callback_definitions.py | 4 +- tests/unit/test_client.py | 60 ++++++++----------- 3 files changed, 31 insertions(+), 41 deletions(-) diff --git a/src/intersect_sdk/client.py b/src/intersect_sdk/client.py index aba2649..c8de147 100644 --- a/src/intersect_sdk/client.py +++ b/src/intersect_sdk/client.py @@ -291,11 +291,11 @@ def _handle_userspace_message( return # If not in pending requests, it already timed out, so ignore this response - if message['operationId'] in self._pending_requests: - del self._pending_requests[message['operationId']] + if headers.operation_id in self._pending_requests: + del self._pending_requests[headers.operation_id] else: logger.debug( - f'Received response for operation {message["operationId"]} that already timed out, ignoring' + f'Received response for operation {headers.operation_id} that already timed out, ignoring' ) return @@ -484,7 +484,7 @@ def _send_userspace_message(self, params: IntersectDirectMessageParams) -> None: self._control_plane_manager.publish_message( channel, payload, params.content_type, headers, persist=False ) - + if params.timeout is not None and params.on_timeout is not None: self._pending_requests[params.operation].append( { diff --git a/src/intersect_sdk/shared_callback_definitions.py b/src/intersect_sdk/shared_callback_definitions.py index 27d1018..853a5bc 100644 --- a/src/intersect_sdk/shared_callback_definitions.py +++ b/src/intersect_sdk/shared_callback_definitions.py @@ -2,7 +2,8 @@ from __future__ import annotations -from typing import Annotated, Any, Callable, Dict, List, TypeAlias, Union +from collections.abc import Callable +from typing import Annotated, Any, TypeAlias from pydantic import BaseModel, ConfigDict, Field @@ -85,6 +86,7 @@ class IntersectDirectMessageParams(BaseModel): The callback to call if the request times out. """ + class IntersectEventMessageParams(BaseModel): """Public facing properties of events the Client/Service wants to listen to.""" diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index 3cbfbf8..54f3378 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -1,8 +1,10 @@ from __future__ import annotations import time +from datetime import datetime, timezone from threading import Event, Thread from unittest.mock import MagicMock +from uuid import uuid4 from intersect_sdk.client import IntersectClient from intersect_sdk.client_callback_definitions import IntersectClientCallback @@ -10,7 +12,7 @@ from intersect_sdk.shared_callback_definitions import IntersectDirectMessageParams -def test_timeout_callback_is_called(): +def test_timeout_callback_is_called() -> None: """Tests that the timeout callback is called when a request times out.""" config = IntersectClientConfig( system='test', @@ -20,7 +22,7 @@ def test_timeout_callback_is_called(): { 'host': 'localhost', 'port': 1883, - 'protocol': 'mqtt3.1.1', + 'protocol': 'mqtt5.0', 'username': 'test', 'password': 'test', } @@ -72,7 +74,7 @@ def user_callback(source, operation_id, has_error, payload): client._timeout_thread.join() -def test_timeout_callback_is_not_called(): +def test_timeout_callback_is_not_called() -> None: """Tests that the timeout callback is not called when a request is fulfilled.""" config = IntersectClientConfig( system='test', @@ -82,7 +84,7 @@ def test_timeout_callback_is_not_called(): { 'host': 'localhost', 'port': 1883, - 'protocol': 'mqtt3.1.1', + 'protocol': 'mqtt5.0', 'username': 'test', 'password': 'test', } @@ -127,25 +129,18 @@ def user_callback(source, operation_id, has_error, payload): client._send_userspace_message(message) # Simulate receiving a response before the timeout - from datetime import datetime, timezone - from uuid import uuid4 - - response_message = { + response_headers = { 'messageId': str(uuid4()), - 'headers': { - 'source': 'test.test.test.test.test', - 'destination': client._hierarchy.hierarchy_string('.'), - 'has_error': False, - 'sdk_version': '0.8.0', - 'created_at': datetime.now(timezone.utc), - 'data_handler': 0, # IntersectDataHandler.MESSAGE - }, + 'source': 'test.test.test.test.test', + 'destination': client._hierarchy.hierarchy_string('.'), + 'has_error': 'false', + 'sdk_version': '0.8.0', + 'created_at': datetime.now(timezone.utc).isoformat(), + 'data_handler': '0', # IntersectDataHandler.MESSAGE 'operationId': 'test_op', - 'payload': 'test', - 'contentType': 'application/json', } - client._handle_userspace_message(response_message) + client._handle_userspace_message(b'"test"', 'application/json', response_headers) # Wait to make sure timeout doesn't fire time.sleep(0.7) @@ -158,7 +153,7 @@ def user_callback(source, operation_id, has_error, payload): client._timeout_thread.join() -def test_response_after_timeout_is_ignored(): +def test_response_after_timeout_is_ignored() -> None: """Tests that responses arriving after timeout are ignored and user_callback is not called.""" config = IntersectClientConfig( system='test', @@ -168,7 +163,7 @@ def test_response_after_timeout_is_ignored(): { 'host': 'localhost', 'port': 1883, - 'protocol': 'mqtt3.1.1', + 'protocol': 'mqtt5.0', 'username': 'test', 'password': 'test', } @@ -220,25 +215,18 @@ def user_callback(source, operation_id, has_error, payload): assert timeout_called[0] == 'test_op' # Now simulate receiving a late response after timeout - from datetime import datetime, timezone - from uuid import uuid4 - - response_message = { + response_headers = { 'messageId': str(uuid4()), - 'headers': { - 'source': 'test.test.test.test.test', - 'destination': client._hierarchy.hierarchy_string('.'), - 'has_error': False, - 'sdk_version': '0.8.0', - 'created_at': datetime.now(timezone.utc), - 'data_handler': 0, # IntersectDataHandler.MESSAGE - }, + 'source': 'test.test.test.test.test', + 'destination': client._hierarchy.hierarchy_string('.'), + 'has_error': 'false', + 'sdk_version': '0.8.0', + 'created_at': datetime.now(timezone.utc).isoformat(), + 'data_handler': '0', # IntersectDataHandler.MESSAGE 'operationId': 'test_op', - 'payload': 'test', - 'contentType': 'application/json', } - client._handle_userspace_message(response_message) + client._handle_userspace_message(b'"test"', 'application/json', response_headers) # User callback should NOT have been called since the request already timed out assert len(user_callback_called) == 0 From aa2d353bf9b1eb29701d9dd1d9acabc4a4dd55da Mon Sep 17 00:00:00 2001 From: Lance-Drane Date: Wed, 15 Oct 2025 12:29:34 -0400 Subject: [PATCH 6/6] fix a possible issue with the client test Signed-off-by: Lance-Drane --- tests/unit/test_client.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index 54f3378..b98cced 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -130,14 +130,14 @@ def user_callback(source, operation_id, has_error, payload): # Simulate receiving a response before the timeout response_headers = { - 'messageId': str(uuid4()), + 'message_id': str(uuid4()), 'source': 'test.test.test.test.test', 'destination': client._hierarchy.hierarchy_string('.'), 'has_error': 'false', 'sdk_version': '0.8.0', 'created_at': datetime.now(timezone.utc).isoformat(), 'data_handler': '0', # IntersectDataHandler.MESSAGE - 'operationId': 'test_op', + 'operation_id': 'test_op', } client._handle_userspace_message(b'"test"', 'application/json', response_headers) @@ -216,14 +216,14 @@ def user_callback(source, operation_id, has_error, payload): # Now simulate receiving a late response after timeout response_headers = { - 'messageId': str(uuid4()), + 'message_id': str(uuid4()), 'source': 'test.test.test.test.test', 'destination': client._hierarchy.hierarchy_string('.'), 'has_error': 'false', 'sdk_version': '0.8.0', 'created_at': datetime.now(timezone.utc).isoformat(), 'data_handler': '0', # IntersectDataHandler.MESSAGE - 'operationId': 'test_op', + 'operation_id': 'test_op', } client._handle_userspace_message(b'"test"', 'application/json', response_headers)