diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 29138c9..1d0d688 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -14,7 +14,7 @@ repos: args: [ --fix ] - id: ruff-format - repo: https://github.com/pdm-project/pdm - rev: 2.11.2 + rev: 2.17.2 hooks: - id: pdm-export args: ['--without-hashes'] diff --git a/docs/api/callback_definitions.rst b/docs/api/callback_definitions.rst new file mode 100644 index 0000000..125459a --- /dev/null +++ b/docs/api/callback_definitions.rst @@ -0,0 +1,27 @@ +==================== +Callback Definitions +==================== + +Shared +====== + +.. automodule:: intersect_sdk.shared_callback_definitions + :members: + :undoc-members: + :exclude-members: model_computed_fields, model_config, model_fields + +Client +====== + +.. automodule:: intersect_sdk.client_callback_definitions + :members: + :undoc-members: + :exclude-members: model_computed_fields, model_config, model_fields + +Service +======= + +.. automodule:: intersect_sdk.service_callback_definitions + :members: + :undoc-members: + :exclude-members: model_computed_fields, model_config, model_fields diff --git a/docs/api/client_callback_definitions.rst b/docs/api/client_callback_definitions.rst deleted file mode 100644 index 6099d80..0000000 --- a/docs/api/client_callback_definitions.rst +++ /dev/null @@ -1,7 +0,0 @@ -Client Callback Definitions -=========================== - -.. automodule:: intersect_sdk.client_callback_definitions - :members: - :undoc-members: - :exclude-members: model_computed_fields, model_config, model_fields diff --git a/docs/api/config.rst b/docs/api/config.rst new file mode 100644 index 0000000..3db052d --- /dev/null +++ b/docs/api/config.rst @@ -0,0 +1,27 @@ +====== +Config +====== + +Config - Shared +=============== + +.. automodule:: intersect_sdk.config.shared + :members: + :undoc-members: + :exclude-members: model_computed_fields, model_config, model_fields + +Config - Service +================ + +.. automodule:: intersect_sdk.config.service + :members: + :undoc-members: + :exclude-members: model_computed_fields, model_config, model_fields + +Config - Client +=============== + +.. automodule:: intersect_sdk.config.client + :members: + :undoc-members: + :exclude-members: model_computed_fields, model_config, model_fields diff --git a/docs/api/config_client.rst b/docs/api/config_client.rst deleted file mode 100644 index 3a51e53..0000000 --- a/docs/api/config_client.rst +++ /dev/null @@ -1,7 +0,0 @@ -Config - Client -=============== - -.. automodule:: intersect_sdk.config.client - :members: - :undoc-members: - :exclude-members: model_computed_fields, model_config, model_fields diff --git a/docs/api/config_service.rst b/docs/api/config_service.rst deleted file mode 100644 index b41cc35..0000000 --- a/docs/api/config_service.rst +++ /dev/null @@ -1,7 +0,0 @@ -Config - Service -================ - -.. automodule:: intersect_sdk.config.service - :members: - :undoc-members: - :exclude-members: model_computed_fields, model_config, model_fields diff --git a/docs/api/config_shared.rst b/docs/api/config_shared.rst deleted file mode 100644 index 7315af1..0000000 --- a/docs/api/config_shared.rst +++ /dev/null @@ -1,7 +0,0 @@ -Config - Shared -=============== - -.. automodule:: intersect_sdk.config.shared - :members: - :undoc-members: - :exclude-members: model_computed_fields, model_config, model_fields diff --git a/docs/index.rst b/docs/index.rst index ed8406e..7bb6945 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -31,16 +31,14 @@ This site provides documentation for INTERSECT's Python SDK. See the getting sta :maxdepth: 2 :caption: API documentation - api/config_service - api/config_client - api/config_shared + api/config api/core_definitions api/schema api/capability - api/service_definitions api/service - api/client_callback_definitions + api/service_definitions api/client + api/callback_definitions api/app_lifecycle api/constants api/version diff --git a/examples/1_hello_world/hello_client.py b/examples/1_hello_world/hello_client.py index 5d52bdb..315791a 100644 --- a/examples/1_hello_world/hello_client.py +++ b/examples/1_hello_world/hello_client.py @@ -5,7 +5,7 @@ IntersectClient, IntersectClientCallback, IntersectClientConfig, - IntersectClientMessageParams, + IntersectDirectMessageParams, default_intersect_lifecycle_loop, ) @@ -74,9 +74,9 @@ def simple_client_callback( you'll get a message back. """ initial_messages = [ - IntersectClientMessageParams( + IntersectDirectMessageParams( destination='hello-organization.hello-facility.hello-system.hello-subsystem.hello-service', - operation='say_hello_to_name', + operation='HelloExample.say_hello_to_name', payload='hello_client', ) ] diff --git a/examples/1_hello_world/hello_service.py b/examples/1_hello_world/hello_service.py index 80c2c8c..e0be5aa 100644 --- a/examples/1_hello_world/hello_service.py +++ b/examples/1_hello_world/hello_service.py @@ -79,11 +79,12 @@ def say_hello_to_name(self, name: str) -> str: @intersect_message and @intersect_status, and that these functions are appropriately type-annotated. """ capability = HelloServiceCapabilityImplementation() + capability.capability_name = 'HelloExample' """ step three - create service from both the configuration and your own capability """ - service = IntersectService(capability, config) + service = IntersectService([capability], config) """ step four - start lifecycle loop. The only necessary parameter is your service. diff --git a/examples/1_hello_world_amqp/hello_client.py b/examples/1_hello_world_amqp/hello_client.py index be3078e..6fb6e58 100644 --- a/examples/1_hello_world_amqp/hello_client.py +++ b/examples/1_hello_world_amqp/hello_client.py @@ -5,7 +5,7 @@ IntersectClient, IntersectClientCallback, IntersectClientConfig, - IntersectClientMessageParams, + IntersectDirectMessageParams, default_intersect_lifecycle_loop, ) @@ -76,9 +76,9 @@ def simple_client_callback( you'll get a message back. """ initial_messages = [ - IntersectClientMessageParams( + IntersectDirectMessageParams( destination='hello-organization.hello-facility.hello-system.hello-subsystem.hello-service', - operation='say_hello_to_name', + operation='HelloExample.say_hello_to_name', payload='hello_client', ) ] diff --git a/examples/1_hello_world_amqp/hello_service.py b/examples/1_hello_world_amqp/hello_service.py index 0593c06..192ee63 100644 --- a/examples/1_hello_world_amqp/hello_service.py +++ b/examples/1_hello_world_amqp/hello_service.py @@ -80,11 +80,12 @@ def say_hello_to_name(self, name: str) -> str: @intersect_message and @intersect_status, and that these functions are appropriately type-annotated. """ capability = HelloServiceCapabilityImplementation() + capability.capability_name = 'HelloExample' """ step three - create service from both the configuration and your own capability """ - service = IntersectService(capability, config) + service = IntersectService([capability], config) """ step four - start lifecycle loop. The only necessary parameter is your service. diff --git a/examples/1_hello_world_events/hello_client.py b/examples/1_hello_world_events/hello_client.py index eee163c..3e689a8 100644 --- a/examples/1_hello_world_events/hello_client.py +++ b/examples/1_hello_world_events/hello_client.py @@ -5,7 +5,7 @@ IntersectClient, IntersectClientCallback, IntersectClientConfig, - IntersectClientMessageParams, + IntersectDirectMessageParams, default_intersect_lifecycle_loop, ) @@ -95,9 +95,9 @@ def simple_event_callback( you'll get a message back. """ initial_messages = [ - IntersectClientMessageParams( + IntersectDirectMessageParams( destination='hello-organization.hello-facility.hello-system.hello-subsystem.hello-service', - operation='say_hello_to_name', + operation='HelloExample.say_hello_to_name', payload='hello_client', ) ] diff --git a/examples/1_hello_world_events/hello_service.py b/examples/1_hello_world_events/hello_service.py index 1eae705..97b9cb1 100644 --- a/examples/1_hello_world_events/hello_service.py +++ b/examples/1_hello_world_events/hello_service.py @@ -83,11 +83,12 @@ def say_hello_to_name(self, name: str) -> str: @intersect_message and @intersect_status, and that these functions are appropriately type-annotated. """ capability = HelloServiceCapabilityImplementation() + capability.capability_name = 'HelloExample' """ step three - create service from both the configuration and your own capability """ - service = IntersectService(capability, config) + service = IntersectService([capability], config) """ step four - start lifecycle loop. The only necessary parameter is your service. diff --git a/examples/2_counting/counting_client.py b/examples/2_counting/counting_client.py index 47fb484..4a35606 100644 --- a/examples/2_counting/counting_client.py +++ b/examples/2_counting/counting_client.py @@ -7,7 +7,7 @@ IntersectClient, IntersectClientCallback, IntersectClientConfig, - IntersectClientMessageParams, + IntersectDirectMessageParams, default_intersect_lifecycle_loop, ) @@ -38,54 +38,54 @@ def __init__(self) -> None: self.message_stack = [ # wait 5 seconds before stopping the counter. "Count" in response will be approx. 6 ( - IntersectClientMessageParams( + IntersectDirectMessageParams( destination='counting-organization.counting-facility.counting-system.counting-subsystem.counting-service', - operation='stop_count', + operation='CountingExample.stop_count', payload=None, ), 5.0, ), # start the counter up again - it will not be 0 at this point! "Count" in response will be approx. 7 ( - IntersectClientMessageParams( + IntersectDirectMessageParams( destination='counting-organization.counting-facility.counting-system.counting-subsystem.counting-service', - operation='start_count', + operation='CountingExample.start_count', payload=None, ), 1.0, ), # reset the counter, but have it immediately start running again. "Count" in response will be approx. 10 ( - IntersectClientMessageParams( + IntersectDirectMessageParams( destination='counting-organization.counting-facility.counting-system.counting-subsystem.counting-service', - operation='reset_count', + operation='CountingExample.reset_count', payload=True, ), 3.0, ), # reset the counter, but don't have it run again. "Count" in response will be approx. 6 ( - IntersectClientMessageParams( + IntersectDirectMessageParams( destination='counting-organization.counting-facility.counting-system.counting-subsystem.counting-service', - operation='reset_count', + operation='CountingExample.reset_count', payload=False, ), 5.0, ), # start the counter back up. "Count" in response will be approx. 1 ( - IntersectClientMessageParams( + IntersectDirectMessageParams( destination='counting-organization.counting-facility.counting-system.counting-subsystem.counting-service', - operation='start_count', + operation='CountingExample.start_count', payload=None, ), 3.0, ), # finally, stop the counter one last time. "Count" in response will be approx. 4 ( - IntersectClientMessageParams( + IntersectDirectMessageParams( destination='counting-organization.counting-facility.counting-system.counting-subsystem.counting-service', - operation='stop_count', + operation='CountingExample.stop_count', payload=None, ), 3.0, @@ -158,9 +158,9 @@ def client_callback( # The counter will start after the initial message. # If the service is already active and counting, this may do nothing. initial_messages = [ - IntersectClientMessageParams( + IntersectDirectMessageParams( destination='counting-organization.counting-facility.counting-system.counting-subsystem.counting-service', - operation='start_count', + operation='CountingExample.start_count', payload=None, ) ] diff --git a/examples/2_counting/counting_service.py b/examples/2_counting/counting_service.py index 767817c..9a32176 100644 --- a/examples/2_counting/counting_service.py +++ b/examples/2_counting/counting_service.py @@ -191,7 +191,8 @@ def _run_count(self) -> None: **from_config_file, ) capability = CountingServiceCapabilityImplementation() - service = IntersectService(capability, config) + capability.capability_name = 'CountingExample' + service = IntersectService([capability], config) logger.info('Starting counting_service, use Ctrl+C to exit.') default_intersect_lifecycle_loop( service, diff --git a/examples/2_counting_events/counting_service.py b/examples/2_counting_events/counting_service.py index 13a70d1..93974a9 100644 --- a/examples/2_counting_events/counting_service.py +++ b/examples/2_counting_events/counting_service.py @@ -83,7 +83,8 @@ def increment_counter_function(self) -> None: **from_config_file, ) capability = CountingServiceCapabilityImplementation() - service = IntersectService(capability, config) + capability.capability_name = 'CountingExample' + service = IntersectService([capability], config) logger.info('Starting counting_service, use Ctrl+C to exit.') """ diff --git a/examples/3_ping_pong_events/ping_service.py b/examples/3_ping_pong_events/ping_service.py index b117e35..e291024 100644 --- a/examples/3_ping_pong_events/ping_service.py +++ b/examples/3_ping_pong_events/ping_service.py @@ -11,7 +11,7 @@ logging.basicConfig(level=logging.INFO) -class PingCapabiilityImplementation(P_ngBaseCapabilityImplementation): +class PingCapabilityImplementation(P_ngBaseCapabilityImplementation): """Basic capability definition, very similar to the other capability except for the type of event it emits.""" def after_service_startup(self) -> None: @@ -32,4 +32,4 @@ def ping_event(self) -> None: if __name__ == '__main__': - run_service(PingCapabiilityImplementation()) + run_service(PingCapabilityImplementation()) diff --git a/examples/3_ping_pong_events/pong_service.py b/examples/3_ping_pong_events/pong_service.py index f8a13fd..b4f9cb5 100644 --- a/examples/3_ping_pong_events/pong_service.py +++ b/examples/3_ping_pong_events/pong_service.py @@ -11,7 +11,7 @@ logging.basicConfig(level=logging.INFO) -class PongCapabiilityImplementation(P_ngBaseCapabilityImplementation): +class PongCapabilityImplementation(P_ngBaseCapabilityImplementation): """Basic capability definition, very similar to the other capability except for the type of event it emits.""" def after_service_startup(self) -> None: @@ -32,4 +32,4 @@ def pong_event(self) -> None: if __name__ == '__main__': - run_service(PongCapabiilityImplementation()) + run_service(PongCapabilityImplementation()) diff --git a/examples/3_ping_pong_events/service_runner.py b/examples/3_ping_pong_events/service_runner.py index 14d3e81..8bbb93b 100644 --- a/examples/3_ping_pong_events/service_runner.py +++ b/examples/3_ping_pong_events/service_runner.py @@ -58,7 +58,8 @@ def run_service(capability: P_ngBaseCapabilityImplementation) -> None: status_interval=30.0, **from_config_file, ) - service = IntersectService(capability, config) + capability.capability_name = service_name + service = IntersectService([capability], config) logger.info('Starting %s_service, use Ctrl+C to exit.', service_name) """ diff --git a/examples/3_ping_pong_events_amqp/ping_service.py b/examples/3_ping_pong_events_amqp/ping_service.py index d64f8ec..68298b7 100644 --- a/examples/3_ping_pong_events_amqp/ping_service.py +++ b/examples/3_ping_pong_events_amqp/ping_service.py @@ -12,7 +12,7 @@ logging.getLogger('pika').setLevel(logging.WARNING) -class PingCapabiilityImplementation(P_ngBaseCapabilityImplementation): +class PingCapabilityImplementation(P_ngBaseCapabilityImplementation): """Basic capability definition, very similar to the other capability except for the type of event it emits.""" def after_service_startup(self) -> None: @@ -33,4 +33,4 @@ def ping_event(self) -> None: if __name__ == '__main__': - run_service(PingCapabiilityImplementation()) + run_service(PingCapabilityImplementation()) diff --git a/examples/3_ping_pong_events_amqp/pong_service.py b/examples/3_ping_pong_events_amqp/pong_service.py index ac9c15a..ae48410 100644 --- a/examples/3_ping_pong_events_amqp/pong_service.py +++ b/examples/3_ping_pong_events_amqp/pong_service.py @@ -12,7 +12,7 @@ logging.getLogger('pika').setLevel(logging.WARNING) -class PongCapabiilityImplementation(P_ngBaseCapabilityImplementation): +class PongCapabilityImplementation(P_ngBaseCapabilityImplementation): """Basic capability definition, very similar to the other capability except for the type of event it emits.""" def after_service_startup(self) -> None: @@ -33,4 +33,4 @@ def pong_event(self) -> None: if __name__ == '__main__': - run_service(PongCapabiilityImplementation()) + run_service(PongCapabilityImplementation()) diff --git a/examples/3_ping_pong_events_amqp/service_runner.py b/examples/3_ping_pong_events_amqp/service_runner.py index bb71d13..05dbb9f 100644 --- a/examples/3_ping_pong_events_amqp/service_runner.py +++ b/examples/3_ping_pong_events_amqp/service_runner.py @@ -58,7 +58,8 @@ def run_service(capability: P_ngBaseCapabilityImplementation) -> None: status_interval=30.0, **from_config_file, ) - service = IntersectService(capability, config) + capability.capability_name = service_name + service = IntersectService([capability], config) logger.info('Starting %s_service, use Ctrl+C to exit.', service_name) """ diff --git a/examples/4_service_to_service/__init__.py b/examples/4_service_to_service/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/examples/4_service_to_service/example_1_service.py b/examples/4_service_to_service/example_1_service.py new file mode 100644 index 0000000..0619e83 --- /dev/null +++ b/examples/4_service_to_service/example_1_service.py @@ -0,0 +1,76 @@ +"""First Service for example. Sends a message to service two and emits an event for the client.""" + +import logging + +from intersect_sdk import ( + HierarchyConfig, + IntersectBaseCapabilityImplementation, + IntersectDirectMessageParams, + IntersectEventDefinition, + IntersectService, + IntersectServiceConfig, + default_intersect_lifecycle_loop, + intersect_event, + intersect_message, + intersect_status, +) + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +class ExampleServiceOneCapabilityImplementation(IntersectBaseCapabilityImplementation): + """Service 1 Capability.""" + + @intersect_status() + def status(self) -> str: + """Basic status function which returns a hard-coded string.""" + return 'Up' + + @intersect_message() + def pass_text_to_service_2(self, text: str) -> None: + """Takes in a string parameter and sends it to service 2.""" + msg_to_send = IntersectDirectMessageParams( + destination='example-organization.example-facility.example-system.example-subsystem.service-two', + operation='ServiceTwo.test_service', + payload=text, + ) + + # Send intersect message to another service + self.intersect_sdk_call_service(msg_to_send, self.service_2_handler) + + @intersect_event(events={'response_event': IntersectEventDefinition(event_type=str)}) + def service_2_handler(self, msg: str) -> None: + """Handles response from service 2 and emits the response as an event for the client.""" + self.intersect_sdk_emit_event('response_event', f'Received Response from Service 2: {msg}') + + +if __name__ == '__main__': + from_config_file = { + 'brokers': [ + { + 'username': 'intersect_username', + 'password': 'intersect_password', + 'port': 1883, + 'protocol': 'mqtt3.1.1', + }, + ], + } + config = IntersectServiceConfig( + hierarchy=HierarchyConfig( + organization='example-organization', + facility='example-facility', + system='example-system', + subsystem='example-subsystem', + service='service-one', + ), + status_interval=30.0, + **from_config_file, + ) + capability = ExampleServiceOneCapabilityImplementation() + capability.capability_name = 'ServiceOne' + service = IntersectService([capability], config) + logger.info('Starting Service 1, use Ctrl+C to exit.') + default_intersect_lifecycle_loop( + service, + ) diff --git a/examples/4_service_to_service/example_1_service_schema.json b/examples/4_service_to_service/example_1_service_schema.json new file mode 100644 index 0000000..2cbb1d4 --- /dev/null +++ b/examples/4_service_to_service/example_1_service_schema.json @@ -0,0 +1,174 @@ +{ + "asyncapi": "2.6.0", + "x-intersect-version": "0.6.4", + "info": { + "title": "example-organization.example-facility.example-system.example-subsystem.service-one", + "version": "0.0.0", + "description": "Service One Capability.\n" + }, + "defaultContentType": "application/json", + "channels": { + "pass_text_to_service_2": { + "publish": { + "message": { + "schemaFormat": "application/vnd.aai.asyncapi+json;version=2.6.0", + "contentType": "application/json", + "traits": { + "$ref": "#/components/messageTraits/commonHeaders" + }, + "payload": { + "type": "null" + } + }, + "description": "Takes in a string parameter and sends it to service 2." + }, + "subscribe": { + "message": { + "schemaFormat": "application/vnd.aai.asyncapi+json;version=2.6.0", + "contentType": "application/json", + "traits": { + "$ref": "#/components/messageTraits/commonHeaders" + }, + "payload": { + "type": "string" + } + }, + "description": "Takes in a string parameter and sends it to service 2." + }, + "events": [] + } + }, + "events": { + "response_event": { + "type": "string" + } + }, + "components": { + "schemas": {}, + "messageTraits": { + "commonHeaders": { + "messageHeaders": { + "$defs": { + "IntersectDataHandler": { + "description": "What data transfer type do you want to use for handling the request/response?\n\nDefault: MESSAGE", + "enum": [ + 0, + 1 + ], + "title": "IntersectDataHandler", + "type": "integer" + } + }, + "description": "Matches the current header definition for INTERSECT messages.\n\nALL messages should contain this header.", + "properties": { + "source": { + "description": "source of the message", + "pattern": "([-a-z0-9]+\\.)*[-a-z0-9]", + "title": "Source", + "type": "string" + }, + "destination": { + "description": "destination of the message", + "pattern": "([-a-z0-9]+\\.)*[-a-z0-9]", + "title": "Destination", + "type": "string" + }, + "created_at": { + "description": "the UTC timestamp of message creation", + "format": "date-time", + "title": "Created At", + "type": "string" + }, + "sdk_version": { + "description": "SemVer string of SDK's version, used to check for compatibility", + "pattern": "^\\d+\\.\\d+\\.\\d+$", + "title": "Sdk Version", + "type": "string" + }, + "data_handler": { + "allOf": [ + { + "$ref": "#/components/messageTraits/commonHeaders/userspaceHeaders/$defs/IntersectDataHandler" + } + ], + "default": 0, + "description": "Code signifying where data is stored." + }, + "has_error": { + "default": false, + "description": "If this value is True, the payload will contain the error message (a string)", + "title": "Has Error", + "type": "boolean" + } + }, + "required": [ + "source", + "destination", + "created_at", + "sdk_version" + ], + "title": "UserspaceMessageHeader", + "type": "object" + }, + "eventHeaders": { + "$defs": { + "IntersectDataHandler": { + "description": "What data transfer type do you want to use for handling the request/response?\n\nDefault: MESSAGE", + "enum": [ + 0, + 1 + ], + "title": "IntersectDataHandler", + "type": "integer" + } + }, + "description": "Matches the current header definition for INTERSECT messages.\n\nALL messages should contain this header.", + "properties": { + "source": { + "description": "source of the message", + "pattern": "([-a-z0-9]+\\.)*[-a-z0-9]", + "title": "Source", + "type": "string" + }, + "created_at": { + "description": "the UTC timestamp of message creation", + "format": "date-time", + "title": "Created At", + "type": "string" + }, + "sdk_version": { + "description": "SemVer string of SDK's version, used to check for compatibility", + "pattern": "^\\d+\\.\\d+\\.\\d+$", + "title": "Sdk Version", + "type": "string" + }, + "data_handler": { + "allOf": [ + { + "$ref": "#/components/messageTraits/commonHeaders/eventHeaders/$defs/IntersectDataHandler" + } + ], + "default": 0, + "description": "Code signifying where data is stored." + }, + "event_name": { + "title": "Event Name", + "type": "string" + } + }, + "required": [ + "source", + "created_at", + "sdk_version", + "event_name" + ], + "title": "EventMessageHeaders", + "type": "object" + } + } + } + }, + "status": { + "type": "string" + } + } diff --git a/examples/4_service_to_service/example_2_service.py b/examples/4_service_to_service/example_2_service.py new file mode 100644 index 0000000..ef5670f --- /dev/null +++ b/examples/4_service_to_service/example_2_service.py @@ -0,0 +1,61 @@ +"""Second Service for example.""" + +import logging + +from intersect_sdk import ( + HierarchyConfig, + IntersectBaseCapabilityImplementation, + IntersectService, + IntersectServiceConfig, + default_intersect_lifecycle_loop, + intersect_message, + intersect_status, +) + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +class ExampleServiceTwoCapabilityImplementation(IntersectBaseCapabilityImplementation): + """Service 2 Capability.""" + + @intersect_status() + def status(self) -> str: + """Basic status function which returns a hard-coded string.""" + return 'Up' + + @intersect_message + def test_service(self, text: str) -> str: + """Returns the text given along with acknowledgement.""" + return f'Acknowledging service one text -> {text}' + + +if __name__ == '__main__': + from_config_file = { + 'brokers': [ + { + 'username': 'intersect_username', + 'password': 'intersect_password', + 'port': 1883, + 'protocol': 'mqtt3.1.1', + }, + ], + } + config = IntersectServiceConfig( + hierarchy=HierarchyConfig( + organization='example-organization', + facility='example-facility', + system='example-system', + subsystem='example-subsystem', + service='service-two', + ), + status_interval=30.0, + **from_config_file, + ) + capability = ExampleServiceTwoCapabilityImplementation() + capability.capability_name = 'ServiceTwo' + service = IntersectService([capability], config) + logger.info('Starting Service 2, use Ctrl+C to exit.') + default_intersect_lifecycle_loop( + service, + ) diff --git a/examples/4_service_to_service/example_2_service_schema.json b/examples/4_service_to_service/example_2_service_schema.json new file mode 100644 index 0000000..56d4741 --- /dev/null +++ b/examples/4_service_to_service/example_2_service_schema.json @@ -0,0 +1,170 @@ +{ + "asyncapi": "2.6.0", + "x-intersect-version": "0.6.4", + "info": { + "title": "example-organization.example-facility.example-system.example-subsystem.service-two", + "version": "0.0.0", + "description": "Service Two Capability.\n" + }, + "defaultContentType": "application/json", + "channels": { + "test_service": { + "publish": { + "message": { + "schemaFormat": "application/vnd.aai.asyncapi+json;version=2.6.0", + "contentType": "application/json", + "traits": { + "$ref": "#/components/messageTraits/commonHeaders" + }, + "payload": { + "type": "string" + } + }, + "description": "Returns the text given along with acknowledgement." + }, + "subscribe": { + "message": { + "schemaFormat": "application/vnd.aai.asyncapi+json;version=2.6.0", + "contentType": "application/json", + "traits": { + "$ref": "#/components/messageTraits/commonHeaders" + }, + "payload": { + "type": "string" + } + }, + "description": "Returns the text given along with acknowledgement." + }, + "events": [] + } + }, + "events": {}, + "components": { + "schemas": {}, + "messageTraits": { + "commonHeaders": { + "messageHeaders": { + "$defs": { + "IntersectDataHandler": { + "description": "What data transfer type do you want to use for handling the request/response?\n\nDefault: MESSAGE", + "enum": [ + 0, + 1 + ], + "title": "IntersectDataHandler", + "type": "integer" + } + }, + "description": "Matches the current header definition for INTERSECT messages.\n\nALL messages should contain this header.", + "properties": { + "source": { + "description": "source of the message", + "pattern": "([-a-z0-9]+\\.)*[-a-z0-9]", + "title": "Source", + "type": "string" + }, + "destination": { + "description": "destination of the message", + "pattern": "([-a-z0-9]+\\.)*[-a-z0-9]", + "title": "Destination", + "type": "string" + }, + "created_at": { + "description": "the UTC timestamp of message creation", + "format": "date-time", + "title": "Created At", + "type": "string" + }, + "sdk_version": { + "description": "SemVer string of SDK's version, used to check for compatibility", + "pattern": "^\\d+\\.\\d+\\.\\d+$", + "title": "Sdk Version", + "type": "string" + }, + "data_handler": { + "allOf": [ + { + "$ref": "#/components/messageTraits/commonHeaders/userspaceHeaders/$defs/IntersectDataHandler" + } + ], + "default": 0, + "description": "Code signifying where data is stored." + }, + "has_error": { + "default": false, + "description": "If this value is True, the payload will contain the error message (a string)", + "title": "Has Error", + "type": "boolean" + } + }, + "required": [ + "source", + "destination", + "created_at", + "sdk_version" + ], + "title": "UserspaceMessageHeader", + "type": "object" + }, + "eventHeaders": { + "$defs": { + "IntersectDataHandler": { + "description": "What data transfer type do you want to use for handling the request/response?\n\nDefault: MESSAGE", + "enum": [ + 0, + 1 + ], + "title": "IntersectDataHandler", + "type": "integer" + } + }, + "description": "Matches the current header definition for INTERSECT messages.\n\nALL messages should contain this header.", + "properties": { + "source": { + "description": "source of the message", + "pattern": "([-a-z0-9]+\\.)*[-a-z0-9]", + "title": "Source", + "type": "string" + }, + "created_at": { + "description": "the UTC timestamp of message creation", + "format": "date-time", + "title": "Created At", + "type": "string" + }, + "sdk_version": { + "description": "SemVer string of SDK's version, used to check for compatibility", + "pattern": "^\\d+\\.\\d+\\.\\d+$", + "title": "Sdk Version", + "type": "string" + }, + "data_handler": { + "allOf": [ + { + "$ref": "#/components/messageTraits/commonHeaders/eventHeaders/$defs/IntersectDataHandler" + } + ], + "default": 0, + "description": "Code signifying where data is stored." + }, + "event_name": { + "title": "Event Name", + "type": "string" + } + }, + "required": [ + "source", + "created_at", + "sdk_version", + "event_name" + ], + "title": "EventMessageHeaders", + "type": "object" + } + } + } + }, + "status": { + "type": "string" + } + } diff --git a/examples/4_service_to_service/example_client.py b/examples/4_service_to_service/example_client.py new file mode 100644 index 0000000..25e2573 --- /dev/null +++ b/examples/4_service_to_service/example_client.py @@ -0,0 +1,79 @@ +"""Client for service to service example. + +Kicks off exmaple by sending message to service one, and then +waits for an event from service one to confirm the messages were passed between the two services properly. + +""" + +import logging + +from intersect_sdk import ( + INTERSECT_JSON_VALUE, + IntersectClient, + IntersectClientCallback, + IntersectClientConfig, + IntersectDirectMessageParams, + default_intersect_lifecycle_loop, +) + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +class SampleOrchestrator: + """Simply contains an event callback for events from Service 1.""" + + def event_callback( + self, _source: str, _operation: str, _event_name: str, payload: INTERSECT_JSON_VALUE + ) -> None: + """This simply prints the event from Service 1 to your console. + + Params: + source: the source of the response message. + operation: the name of the function we called in the original message. + _has_error: Boolean value which represents an error. + payload: Value of the response from the Service. + """ + print(payload) + # break out of pubsub loop + raise Exception + + +if __name__ == '__main__': + from_config_file = { + 'brokers': [ + { + 'username': 'intersect_username', + 'password': 'intersect_password', + 'port': 1883, + 'protocol': 'mqtt3.1.1', + }, + ], + } + + # The counter will start after the initial message. + # If the service is already active and counting, this may do nothing. + initial_messages = [ + IntersectDirectMessageParams( + destination='example-organization.example-facility.example-system.example-subsystem.service-one', + operation='ServiceOne.pass_text_to_service_2', + payload='Kicking off the example!', + ) + ] + config = IntersectClientConfig( + initial_message_event_config=IntersectClientCallback( + messages_to_send=initial_messages, + services_to_start_listening_for_events=[ + 'example-organization.example-facility.example-system.example-subsystem.service-one' + ], + ), + **from_config_file, + ) + orchestrator = SampleOrchestrator() + client = IntersectClient( + config=config, + event_callback=orchestrator.event_callback, + ) + default_intersect_lifecycle_loop( + client, + ) diff --git a/examples/SUBMISSION_RULES.md b/examples/SUBMISSION_RULES.md index 389b2c9..7ad282b 100644 --- a/examples/SUBMISSION_RULES.md +++ b/examples/SUBMISSION_RULES.md @@ -77,7 +77,7 @@ import json if __name__ == '__main__': # everything before service creation omitted - service = IntersectService(capability, config) + service = IntersectService([capability], config) print(json.dumps(service._schema, indent=2)) ``` diff --git a/pyproject.toml b/pyproject.toml index db87f96..a73451d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -63,6 +63,7 @@ test = [ test-all = "pytest tests/ --cov=src/intersect_sdk/ --cov-fail-under=80 --cov-report=html:reports/htmlcov/ --cov-report=xml:reports/coverage_report.xml --junitxml=reports/junit.xml" test-all-debug = "pytest tests/ --cov=src/intersect_sdk/ --cov-fail-under=80 --cov-report=html:reports/htmlcov/ --cov-report=xml:reports/coverage_report.xml --junitxml=reports/junit.xml -s" test-unit = "pytest tests/unit --cov=src/intersect_sdk/" +test-e2e = "pytest tests/e2e --cov=src/intersect_sdk/" lint = {composite = ["lint-format", "lint-ruff", "lint-mypy"]} lint-format = "ruff format" lint-ruff = "ruff check --fix" @@ -84,7 +85,7 @@ isort = { known-first-party = ['src'] } pydocstyle = { convention = 'google'} flake8-quotes = {inline-quotes = 'single', multiline-quotes = 'double'} mccabe = { max-complexity = 20 } -pylint = { max-args = 10, max-branches = 20, max-returns = 10 } +pylint = { max-args = 10, max-branches = 20, max-returns = 10, max-statements = 75 } # pyflakes and the relevant pycodestyle rules are already configured extend-select = [ 'C90', # mccabe complexity diff --git a/src/intersect_sdk/__init__.py b/src/intersect_sdk/__init__.py index fd43448..52417b8 100644 --- a/src/intersect_sdk/__init__.py +++ b/src/intersect_sdk/__init__.py @@ -13,9 +13,7 @@ from .client_callback_definitions import ( INTERSECT_CLIENT_EVENT_CALLBACK_TYPE, INTERSECT_CLIENT_RESPONSE_CALLBACK_TYPE, - INTERSECT_JSON_VALUE, IntersectClientCallback, - IntersectClientMessageParams, ) from .config.client import IntersectClientConfig from .config.service import IntersectServiceConfig @@ -28,12 +26,19 @@ from .core_definitions import IntersectDataHandler, IntersectMimeType from .schema import get_schema_from_capability_implementation from .service import IntersectService +from .service_callback_definitions import ( + INTERSECT_SERVICE_RESPONSE_CALLBACK_TYPE, +) from .service_definitions import ( IntersectEventDefinition, intersect_event, intersect_message, intersect_status, ) +from .shared_callback_definitions import ( + INTERSECT_JSON_VALUE, + IntersectDirectMessageParams, +) from .version import __version__, version_info, version_string __all__ = [ @@ -47,10 +52,11 @@ 'IntersectService', 'IntersectClient', 'IntersectClientCallback', - 'IntersectClientMessageParams', + 'IntersectDirectMessageParams', 'INTERSECT_CLIENT_RESPONSE_CALLBACK_TYPE', 'INTERSECT_CLIENT_EVENT_CALLBACK_TYPE', 'INTERSECT_JSON_VALUE', + 'INTERSECT_SERVICE_RESPONSE_CALLBACK_TYPE', 'IntersectBaseCapabilityImplementation', 'default_intersect_lifecycle_loop', 'IntersectClientConfig', diff --git a/src/intersect_sdk/_internal/function_metadata.py b/src/intersect_sdk/_internal/function_metadata.py index ed49c27..43d6399 100644 --- a/src/intersect_sdk/_internal/function_metadata.py +++ b/src/intersect_sdk/_internal/function_metadata.py @@ -12,6 +12,10 @@ class FunctionMetadata(NamedTuple): NOTE: both this class and all properties in it should remain immutable after creation """ + capability: type + """ + The type of the class that implements the target method. + """ method: Callable[[Any], Any] """ The raw method of the function. The function itself is useless and should not be called, diff --git a/src/intersect_sdk/_internal/interfaces.py b/src/intersect_sdk/_internal/interfaces.py index b4308b0..ee18fea 100644 --- a/src/intersect_sdk/_internal/interfaces.py +++ b/src/intersect_sdk/_internal/interfaces.py @@ -1,5 +1,17 @@ +from __future__ import annotations + from abc import ABC, abstractmethod -from typing import Any +from typing import TYPE_CHECKING, Any + +if TYPE_CHECKING: + from uuid import UUID + + from ..service_callback_definitions import ( + INTERSECT_SERVICE_RESPONSE_CALLBACK_TYPE, + ) + from ..shared_callback_definitions import ( + IntersectDirectMessageParams, + ) class IntersectEventObserver(ABC): @@ -18,3 +30,20 @@ def _on_observe_event(self, event_name: str, event_value: Any, operation: str) - operation: The source of the event (generally the function name, not directly invoked by application devs) """ ... + + @abstractmethod + def create_external_request( + self, + request: IntersectDirectMessageParams, + response_handler: INTERSECT_SERVICE_RESPONSE_CALLBACK_TYPE | None = None, + ) -> UUID: + """Observed entity (capabilitiy) tells observer (i.e. service) to send an external request. + + Params: + - request: the request we want to send out, encapsulated as an IntersectClientMessageParams object + - response_handler: optional callback for how we want to handle the response from this request. + + Returns: + - generated RequestID associated with your request + """ + ... diff --git a/src/intersect_sdk/_internal/messages/userspace.py b/src/intersect_sdk/_internal/messages/userspace.py index e87f6a6..22bc75e 100644 --- a/src/intersect_sdk/_internal/messages/userspace.py +++ b/src/intersect_sdk/_internal/messages/userspace.py @@ -2,6 +2,13 @@ This module is internal-facing and should not be used directly by users. +Services have two associated channels which handle userspace messages: their request channel +and their response channel. Services always CONSUME messages from these channels, but never PRODUCE messages +on these channels. (A message is always sent in the receiver's namespace). + +The response channel is how the service handles external requests, the request channel is used when this service itself +needs to make external requests through INTERSECT. + Services should ALWAYS be CONSUMING from their userspace channel. They should NEVER be PRODUCING messages on their userspace channel. @@ -9,6 +16,8 @@ from services they explicitly messaged. """ +from __future__ import annotations + import datetime import uuid from typing import Any, Union @@ -16,10 +25,13 @@ from pydantic import AwareDatetime, Field, TypeAdapter from typing_extensions import Annotated, TypedDict -from ...constants import SYSTEM_OF_SYSTEM_REGEX -from ...core_definitions import IntersectDataHandler, IntersectMimeType +from ...constants import SYSTEM_OF_SYSTEM_REGEX # noqa: TCH001 (this is runtime checked) +from ...core_definitions import ( # noqa: TCH001 (this is runtime checked) + IntersectDataHandler, + IntersectMimeType, +) from ...version import version_string -from ..data_plane.minio_utils import MinioPayload +from ..data_plane.minio_utils import MinioPayload # noqa: TCH001 (this is runtime checked) class UserspaceMessageHeader(TypedDict): @@ -115,7 +127,7 @@ class UserspaceMessage(TypedDict): the headers of the message """ - payload: Union[bytes, MinioPayload] # noqa: FA100 (Pydantic uses runtime annotations) + payload: Union[bytes, MinioPayload] # noqa: UP007 (Pydantic uses runtime annotations) """ main payload of the message. Needs to match the schema format, including the content type. @@ -141,11 +153,13 @@ def create_userspace_message( content_type: IntersectMimeType, data_handler: IntersectDataHandler, payload: Any, + message_id: uuid.UUID | None = None, has_error: bool = False, ) -> UserspaceMessage: """Payloads depend on the data_handler and has_error.""" + msg_id = message_id if message_id else uuid.uuid4() return UserspaceMessage( - messageId=uuid.uuid4(), + messageId=msg_id, operationId=operation_id, contentType=content_type, payload=payload, diff --git a/src/intersect_sdk/_internal/schema.py b/src/intersect_sdk/_internal/schema.py index 63369c4..daf235e 100644 --- a/src/intersect_sdk/_internal/schema.py +++ b/src/intersect_sdk/_internal/schema.py @@ -59,7 +59,7 @@ class _FunctionAnalysisResult(NamedTuple): """private class generated from static analysis of function.""" - capability_name: str + class_name: str method_name: str method: Callable[[Any], Any] """raw method is for inspecting attributes""" @@ -218,18 +218,18 @@ def _status_fn_schema( - The status function's schema - The TypeAdapter to use for serializing outgoing responses """ - capability_name, status_fn_name, status_fn, min_params = status_info + class_name, status_fn_name, status_fn, min_params = status_info status_signature = inspect.signature(status_fn) method_params = tuple(status_signature.parameters.values()) if len(method_params) != min_params or any( p.kind not in (p.POSITIONAL_OR_KEYWORD, p.POSITIONAL_ONLY) for p in method_params ): die( - f"On capability '{capability_name}', capability status function '{status_fn_name}' should have no parameters other than 'self' (unless a staticmethod), and should not use keyword or variable length arguments (i.e. '*', *args, **kwargs)." + f"On capability '{class_name}', capability status function '{status_fn_name}' should have no parameters other than 'self' (unless a staticmethod), and should not use keyword or variable length arguments (i.e. '*', *args, **kwargs)." ) if status_signature.return_annotation is inspect.Signature.empty: die( - f"On capability '{capability_name}', capability status function '{status_fn_name}' should have a valid return annotation." + f"On capability '{class_name}', capability status function '{status_fn_name}' should have a valid return annotation." ) try: status_adapter = TypeAdapter(status_signature.return_annotation) @@ -246,12 +246,12 @@ def _status_fn_schema( ) except PydanticUserError as e: die( - f"On capability '{capability_name}', return annotation '{status_signature.return_annotation}' on function '{status_fn_name}' is invalid.\n{e}" + f"On capability '{class_name}', return annotation '{status_signature.return_annotation}' on function '{status_fn_name}' is invalid.\n{e}" ) def _add_events( - capability_name: str, + class_name: str, function_name: str, schemas: dict[str, Any], event_schemas: dict[str, Any], @@ -272,13 +272,13 @@ def _add_events( for d in differences_from_cache ) die( - f"On capability '{capability_name}', event key '{event_key}' on function '{function_name}' was previously defined differently. \n{diff_str}\n" + f"On capability '{class_name}', event key '{event_key}' on function '{function_name}' was previously defined differently. \n{diff_str}\n" ) metadata_value.operations.add(function_name) else: if event_definition.data_handler in excluded_data_handlers: die( - f"On capability '{capability_name}', function '{function_name}' should not set data_handler as {event_definition.data_handler} unless an instance is configured in IntersectConfig.data_stores ." + f"On capability '{class_name}', function '{function_name}' should not set data_handler as {event_definition.data_handler} unless an instance is configured in IntersectConfig.data_stores ." ) try: event_adapter: TypeAdapter[Any] = TypeAdapter(event_definition.event_type) @@ -297,12 +297,12 @@ def _add_events( ) except PydanticUserError as e: die( - f"On capability '{capability_name}', event key '{event_key}' on function '{function_name}' has an invalid value in the events mapping.\n{e}" + f"On capability '{class_name}', event key '{event_key}' on function '{function_name}' has an invalid value in the events mapping.\n{e}" ) def _introspection_baseline( - capability: type[IntersectBaseCapabilityImplementation], + capability: IntersectBaseCapabilityImplementation, excluded_data_handlers: set[IntersectDataHandler], ) -> tuple[ dict[Any, Any], # $defs for schemas (common) @@ -332,10 +332,13 @@ def _introspection_baseline( function_map = {} event_metadatas: dict[str, EventMetadata] = {} - status_func, response_funcs, event_funcs = _get_functions(capability) + cap_name = capability.capability_name + status_func, response_funcs, event_funcs = _get_functions(type(capability)) # parse functions - for capability_name, name, method, min_params in response_funcs: + for class_name, name, method, min_params in response_funcs: + public_name = f'{cap_name}.{name}' + # TODO - I'm placing this here for now because we'll eventually want to capture data plane and broker configs in the schema. # (It's possible we may want to separate the backing service schema from the application logic, but it's unlikely.) # At the moment, we're just validating that users can support their response_data_handler property. @@ -343,7 +346,7 @@ def _introspection_baseline( data_handler = getattr(method, RESPONSE_DATA) if data_handler in excluded_data_handlers: die( - f"On capability '{capability_name}', function '{name}' should not set response_data_type as {data_handler} unless an instance is configured in IntersectConfig.data_stores ." + f"On capability '{class_name}', function '{name}' should not set response_data_type as {data_handler} unless an instance is configured in IntersectConfig.data_stores ." ) docstring = inspect.cleandoc(method.__doc__) if method.__doc__ else None @@ -358,7 +361,7 @@ def _introspection_baseline( ) ): die( - f"On capability '{capability_name}', function '{name}' should have 'self' (unless a staticmethod) and zero or one additional parameters, and should not use keyword or variable length arguments (i.e. '*', *args, **kwargs)." + f"On capability '{class_name}', function '{name}' should have 'self' (unless a staticmethod) and zero or one additional parameters, and should not use keyword or variable length arguments (i.e. '*', *args, **kwargs)." ) # The schema format should be hard-coded and determined based on how Pydantic parses the schema. @@ -390,7 +393,7 @@ def _introspection_baseline( # Pydantic BaseModels require annotations even if using a default value, so we'll remain consistent. if annotation is inspect.Parameter.empty: die( - f"On capability '{capability_name}', parameter '{parameter.name}' type annotation on function '{name}' missing. {SCHEMA_HELP_MSG}" + f"On capability '{class_name}', parameter '{parameter.name}' type annotation on function '{name}' missing. {SCHEMA_HELP_MSG}" ) # rationale for disallowing function default values: # https://docs.pydantic.dev/latest/concepts/validation_decorator/#using-field-to-describe-function-arguments @@ -398,7 +401,7 @@ def _introspection_baseline( # also makes TypeAdapters considerably easier to construct if parameter.default is not inspect.Parameter.empty: die( - f"On capability '{capability_name}', parameter '{parameter.name}' should not use a default value in the function parameter (use 'typing_extensions.Annotated[TYPE, pydantic.Field(default=)]' instead - 'default_factory' is an acceptable, mutually exclusive argument to 'Field')." + f"On capability '{class_name}', parameter '{parameter.name}' should not use a default value in the function parameter (use 'typing_extensions.Annotated[TYPE, pydantic.Field(default=)]' instead - 'default_factory' is an acceptable, mutually exclusive argument to 'Field')." ) try: function_cache_request_adapter = TypeAdapter(annotation) @@ -410,7 +413,7 @@ def _introspection_baseline( ) except PydanticUserError as e: die( - f"On capability '{capability_name}', parameter '{parameter.name}' type annotation '{annotation}' on function '{name}' is invalid\n{e}" + f"On capability '{class_name}', parameter '{parameter.name}' type annotation '{annotation}' on function '{name}' is invalid\n{e}" ) else: @@ -419,7 +422,7 @@ def _introspection_baseline( # this block handles response parameters if return_annotation is inspect.Signature.empty: die( - f"On capability '{capability_name}', return type annotation on function '{name}' missing. {SCHEMA_HELP_MSG}" + f"On capability '{class_name}', return type annotation on function '{name}' missing. {SCHEMA_HELP_MSG}" ) try: function_cache_response_adapter = TypeAdapter(return_annotation) @@ -431,11 +434,12 @@ def _introspection_baseline( ) except PydanticUserError as e: die( - f"On capability '{capability_name}', return annotation '{return_annotation}' on function '{name}' is invalid.\n{e}" + f"On capability '{class_name}', return annotation '{return_annotation}' on function '{name}' is invalid.\n{e}" ) # final function mapping - function_map[name] = FunctionMetadata( + function_map[public_name] = FunctionMetadata( + type(capability), method, function_cache_request_adapter, function_cache_response_adapter, @@ -444,7 +448,7 @@ def _introspection_baseline( # this block handles events associated with intersect_messages (implies command pattern) function_events: dict[str, IntersectEventDefinition] = getattr(method, EVENT_ATTR_KEY) _add_events( - capability_name, + class_name, name, schemas, event_schemas, @@ -455,9 +459,9 @@ def _introspection_baseline( channels[name]['events'] = list(function_events.keys()) # parse global schemas - for capability_name, name, method, _ in event_funcs: + for class_name, name, method, _ in event_funcs: _add_events( - capability_name, + class_name, name, schemas, event_schemas, @@ -471,7 +475,9 @@ def _introspection_baseline( ) # this conditional allows for the status function to also be called like a message if status_fn_type_adapter and status_fn and status_fn_name: - function_map[status_fn_name] = FunctionMetadata( + public_status_name = f'{cap_name}.{status_fn_name}' + function_map[public_status_name] = FunctionMetadata( + type(capability), status_fn, None, status_fn_type_adapter, @@ -487,14 +493,15 @@ def _introspection_baseline( ) -def get_schema_and_functions_from_capability_implementation( - capability_type: type[IntersectBaseCapabilityImplementation], - capability_name: HierarchyConfig, +def get_schema_and_functions_from_capability_implementations( + capabilities: list[IntersectBaseCapabilityImplementation], + service_name: HierarchyConfig, excluded_data_handlers: set[IntersectDataHandler], ) -> tuple[ dict[str, Any], dict[str, FunctionMetadata], dict[str, EventMetadata], + IntersectBaseCapabilityImplementation | None, str | None, TypeAdapter[Any] | None, ]: @@ -502,20 +509,46 @@ def get_schema_and_functions_from_capability_implementation( In-depth introspection is handled later on. """ - ( - schemas, - (status_fn_name, status_schema, status_type_adapter), - channels, - function_map, - events, - event_map, - ) = _introspection_baseline(capability_type, excluded_data_handlers) + capability_type_docs: str = '' + status_function_cap: IntersectBaseCapabilityImplementation | None = None + status_function_name: str | None = None + status_function_schema: dict[str, Any] | None = None + status_function_adapter: TypeAdapter[Any] | None = None + schemas: dict[Any, Any] = {} + channels: dict[str, dict[str, dict[str, Any]]] = {} # endpoint schemas + function_map: dict[str, FunctionMetadata] = {} # endpoint functionality + events: dict[str, Any] = {} # event schemas + event_map: dict[str, EventMetadata] = {} # event functionality + for capability in capabilities: + capability_type: type[IntersectBaseCapabilityImplementation] = type(capability) + if capability_type.__doc__: + capability_type_docs += inspect.cleandoc(capability_type.__doc__) + '\n' + ( + cap_schemas, + (cap_status_fn_name, cap_status_schema, cap_status_type_adapter), + cap_channels, + cap_function_map, + cap_events, + cap_event_map, + ) = _introspection_baseline(capability, excluded_data_handlers) + + if cap_status_fn_name and cap_status_schema and cap_status_type_adapter: + status_function_cap = capability + status_function_name = cap_status_fn_name + status_function_schema = cap_status_schema + status_function_adapter = cap_status_type_adapter + + schemas.update(cap_schemas) + channels.update(cap_channels) + function_map.update(cap_function_map) + events.update(cap_events) + event_map.update(cap_event_map) asyncapi_spec = { 'asyncapi': ASYNCAPI_VERSION, 'x-intersect-version': version_string, 'info': { - 'title': capability_name.hierarchy_string('.'), + 'title': service_name.hierarchy_string('.'), 'version': '0.0.0', # NOTE: this will be modified by INTERSECT CORE, users do not manage their schema versions }, # applies to how an incoming message payload will be parsed. @@ -540,11 +573,12 @@ def get_schema_and_functions_from_capability_implementation( }, }, } - if capability_type.__doc__: - asyncapi_spec['info']['description'] = inspect.cleandoc(capability_type.__doc__) # type: ignore[index] - if status_schema: - asyncapi_spec['status'] = status_schema + if capability_type_docs != '': + asyncapi_spec['info']['description'] = capability_type_docs # type: ignore[index] + + if status_function_schema: + asyncapi_spec['status'] = status_function_schema """ TODO - might want to include these fields @@ -555,4 +589,11 @@ def get_schema_and_functions_from_capability_implementation( }, """ - return asyncapi_spec, function_map, event_map, status_fn_name, status_type_adapter + return ( + asyncapi_spec, + function_map, + event_map, + status_function_cap, + status_function_name, + status_function_adapter, + ) diff --git a/src/intersect_sdk/capability/base.py b/src/intersect_sdk/capability/base.py index a3c5143..5731115 100644 --- a/src/intersect_sdk/capability/base.py +++ b/src/intersect_sdk/capability/base.py @@ -11,21 +11,34 @@ from .._internal.logger import logger if TYPE_CHECKING: + from uuid import UUID + from .._internal.interfaces import IntersectEventObserver + from ..service_callback_definitions import ( + INTERSECT_SERVICE_RESPONSE_CALLBACK_TYPE, + ) + from ..shared_callback_definitions import ( + IntersectDirectMessageParams, + ) class IntersectBaseCapabilityImplementation: """Base class for all capabilities. EVERY capability implementation will need to extend this class. Additionally, if you redefine the constructor, - you MUST call super.__init__() . + you MUST call `super.__init__()` . """ def __init__(self) -> None: """This constructor just sets up observers. - NOTE: If you write your own constructor, you MUST call super.__init__() inside of it. The Service will throw an error if you don't. + NOTE: If you write your own constructor, you MUST call `super.__init__()` inside of it. The Service will throw an error if you don't. + """ + self._capability_name: str = 'InvalidCapability' + """ + The advertised name for the capability, as opposed to the implementation class name """ + self.__intersect_sdk_observers__: list[IntersectEventObserver] = [] """ INTERNAL USE ONLY. @@ -34,16 +47,31 @@ def __init__(self) -> None: """ def __init_subclass__(cls) -> None: - """This prevents users from overriding a few key functions.""" + """This prevents users from overriding a few key functions. + + General rule of thumb is that any function which starts with `intersect_sdk_` is a protected namespace for defining + the INTERSECT-SDK public API between a capability and its observers. + """ if ( cls._intersect_sdk_register_observer is not IntersectBaseCapabilityImplementation._intersect_sdk_register_observer or cls.intersect_sdk_emit_event is not IntersectBaseCapabilityImplementation.intersect_sdk_emit_event + or cls.intersect_sdk_call_service + is not IntersectBaseCapabilityImplementation.intersect_sdk_call_service ): - msg = f"{cls.__name__}: Cannot override functions '_intersect_sdk_register_observer' or 'intersect_sdk_emit_event'" + msg = f"{cls.__name__}: Attempted to override a reserved INTERSECT-SDK function (don't start your function names with '_intersect_sdk_' or 'intersect_sdk_')" raise RuntimeError(msg) + @property + def capability_name(self) -> str: + """The advertised name for the capability provided by this implementation.""" + return self._capability_name + + @capability_name.setter + def capability_name(self, cname: str) -> None: + self._capability_name = cname + @final def _intersect_sdk_register_observer(self, observer: IntersectEventObserver) -> None: """INTERNAL USE ONLY.""" @@ -99,3 +127,27 @@ def intersect_sdk_emit_event(self, event_name: str, event_value: Any) -> None: return for observer in self.__intersect_sdk_observers__: observer._on_observe_event(event_name, event_value, annotated_operation) # noqa: SLF001 (private for application devs, NOT for base implementation) + + @final + def intersect_sdk_call_service( + self, + request: IntersectDirectMessageParams, + response_handler: INTERSECT_SERVICE_RESPONSE_CALLBACK_TYPE | None = None, + ) -> list[UUID]: + """Create an external request that we'll send to a different Service. + + Params: + - request: the request we want to send out, encapsulated as an IntersectClientMessageParams object + - response_handler: optional callback for how we want to handle the response from this request. + + Returns: + - list of generated RequestIDs associated with your request. Note that for almost all use cases, + this list will have only one associated RequestID. + + Raises: + - pydantic.ValidationError - if the request parameter isn't valid + """ + return [ + observer.create_external_request(request, response_handler) + for observer in self.__intersect_sdk_observers__ + ] diff --git a/src/intersect_sdk/client.py b/src/intersect_sdk/client.py index 332fdd5..0219098 100644 --- a/src/intersect_sdk/client.py +++ b/src/intersect_sdk/client.py @@ -40,7 +40,6 @@ from ._internal.version_resolver import resolve_user_version from .client_callback_definitions import ( IntersectClientCallback, - IntersectClientMessageParams, ) from .config.client import IntersectClientConfig from .config.shared import HierarchyConfig @@ -50,6 +49,7 @@ INTERSECT_CLIENT_EVENT_CALLBACK_TYPE, INTERSECT_CLIENT_RESPONSE_CALLBACK_TYPE, ) + from .shared_callback_definitions import IntersectDirectMessageParams @final @@ -77,7 +77,7 @@ def __init__( Parameters: config: The IntersectClientConfig class - user_callback: The callback function you can use to handle response messages from the other Service. + user_callback: The callback function you can use to handle response messages from Services. If this is left empty, you can only send a single message event_callback: The callback function you can use to handle events from any Service. """ @@ -133,7 +133,7 @@ def __init__( if user_callback: # Do not persist, as we use the temporary client information to build this. self._control_plane_manager.add_subscription_channel( - f"{self._hierarchy.hierarchy_string('/')}/userspace", + f"{self._hierarchy.hierarchy_string('/')}/response", {self._handle_userspace_message_raw}, persist=False, ) @@ -425,16 +425,16 @@ def _handle_client_callback(self, user_value: IntersectClientCallback | None) -> for message in validated_result.messages_to_send: self._send_userspace_message(message) - def _send_userspace_message(self, params: IntersectClientMessageParams) -> None: + def _send_userspace_message(self, params: IntersectDirectMessageParams) -> None: """Send a userspace message, be it an initial message from the user or from the user's callback function.""" # ONE: SERIALIZE FUNCTION RESULTS # (function input should already be validated at this point) - response = GENERIC_MESSAGE_SERIALIZER.dump_json(params.payload, warnings=False) + msg_payload = GENERIC_MESSAGE_SERIALIZER.dump_json(params.payload, warnings=False) # TWO: SEND DATA TO APPROPRIATE DATA STORE try: - response_payload = self._data_plane_manager.outgoing_message_data_handler( - response, params.response_content_type, params.response_data_handler + out_payload = self._data_plane_manager.outgoing_message_data_handler( + msg_payload, params.content_type, params.data_handler ) except IntersectError: # NOTE @@ -447,17 +447,17 @@ def _send_userspace_message(self, params: IntersectClientMessageParams) -> None: msg = create_userspace_message( source=self._hierarchy.hierarchy_string('.'), destination=params.destination, - content_type=params.response_content_type, - data_handler=params.response_data_handler, + content_type=params.content_type, + data_handler=params.data_handler, operation_id=params.operation, - payload=response_payload, + payload=out_payload, ) logger.debug(f'Send userspace message:\n{msg}') - response_channel = f"{params.destination.replace('.', '/')}/userspace" + channel = f"{params.destination.replace('.', '/')}/request" # WARNING: If both the Service and the Client drop, the Service will execute the command # but cannot communicate the response to the Client. # in experiment controllers or production, you'll want to set persist to True - self._control_plane_manager.publish_message(response_channel, msg, persist=False) + self._control_plane_manager.publish_message(channel, msg, persist=False) # TODO - consider removing this entire concept def _heartbeat_ticker(self) -> None: diff --git a/src/intersect_sdk/client_callback_definitions.py b/src/intersect_sdk/client_callback_definitions.py index 6b3ee2b..9bad1b9 100644 --- a/src/intersect_sdk/client_callback_definitions.py +++ b/src/intersect_sdk/client_callback_definitions.py @@ -1,51 +1,15 @@ -"""Data types used in regard to client callbacks. Only relevant for Client authors.""" +"""Data types used in regard to client callbacks. Only relevant for Client authors. -from dataclasses import dataclass -from typing import Any, Callable, Dict, List, Optional, Union +See shared_callback_definitions for additional typings which are also shared by service authors. +""" + +from typing import Callable, Dict, List, Optional, Union from pydantic import BaseModel, ConfigDict, Field from typing_extensions import Annotated, TypeAlias, final from .constants import SYSTEM_OF_SYSTEM_REGEX -from .core_definitions import IntersectDataHandler, IntersectMimeType - - -@dataclass -class IntersectClientMessageParams: - """The user implementing the IntersectClient class will need to return this object in order to send a message to another Service.""" - - destination: Annotated[str, Field(pattern=SYSTEM_OF_SYSTEM_REGEX)] - """ - The destination string. You'll need to know the system-of-system representation of the Service. - - Note that this should match what you would see in the schema. - """ - - operation: str - """ - The name of the operation you want to call from the Service - this should be represented as it is in the Service's schema. - """ - - payload: Any - """ - The raw Python object you want to have serialized as the payload. - - If you want to just use the service's default value for a request (assuming it has a default value for a request), you may set this as None. - """ - - response_content_type: IntersectMimeType = IntersectMimeType.JSON - """ - The IntersectMimeType of your response. You'll want this to match with the ContentType of the function from the schema. - - default: IntersectMimeType.JSON - """ - - response_data_handler: IntersectDataHandler = IntersectDataHandler.MESSAGE - """ - The IntersectDataHandler you want to use (most people can just use IntersectDataHandler.MESSAGE here, unless your data is very large) - - default: IntersectDataHandler.MESSAGE - """ +from .shared_callback_definitions import IntersectDirectMessageParams @final @@ -55,7 +19,7 @@ class IntersectClientCallback(BaseModel): If you do not return a value of this type (or None), this will be treated as an Exception and will break the pub-sub loop. """ - messages_to_send: List[IntersectClientMessageParams] = [] # noqa: FA100 (runtime annotation) + messages_to_send: List[IntersectDirectMessageParams] = [] # noqa: FA100 (runtime annotation) """ Messages to send as a result of an event or a response from a Service. """ diff --git a/src/intersect_sdk/schema.py b/src/intersect_sdk/schema.py index 77cd583..313a318 100644 --- a/src/intersect_sdk/schema.py +++ b/src/intersect_sdk/schema.py @@ -33,7 +33,7 @@ Any, ) -from ._internal.schema import get_schema_and_functions_from_capability_implementation +from ._internal.schema import get_schema_and_functions_from_capability_implementations if TYPE_CHECKING: from .capability.base import IntersectBaseCapabilityImplementation @@ -42,13 +42,13 @@ def get_schema_from_capability_implementation( capability_type: type[IntersectBaseCapabilityImplementation], - capability_name: HierarchyConfig, + service_name: HierarchyConfig, ) -> dict[str, Any]: """The goal of this function is to be able to generate a complete schema matching the AsyncAPI spec 2.6.0 from a BaseModel class. Params: - capability_type - the SDK user will provide the class of their capability handler, which generates the schema - - capability_name - ideally, this could be scanned by the package name. Meant to be descriptive, i.e. "nionswift" + - service_name - ideally, this could be scanned by the package name. Meant to be descriptive, i.e. "nionswift" SOME NOTES ABOUT THE SCHEMA @@ -69,9 +69,10 @@ def get_schema_from_capability_implementation( - Channel names just mimic the function names for now """ - schemas, _, _, _, _ = get_schema_and_functions_from_capability_implementation( - capability_type, - capability_name, + cap_instance = capability_type() + schemas, _, _, _, _, _ = get_schema_and_functions_from_capability_implementations( + [cap_instance], + service_name, set(), # assume all data handlers are configured if user is just checking their schema ) return schemas diff --git a/src/intersect_sdk/service.py b/src/intersect_sdk/service.py index ec67099..5bdd9ae 100644 --- a/src/intersect_sdk/service.py +++ b/src/intersect_sdk/service.py @@ -16,11 +16,13 @@ from __future__ import annotations +from datetime import datetime, timezone +from threading import Condition, Lock from types import MappingProxyType -from typing import TYPE_CHECKING, Any, Callable -from uuid import uuid4 +from typing import TYPE_CHECKING, Any, Callable, Union +from uuid import UUID, uuid1, uuid3 -from pydantic import ValidationError +from pydantic import ConfigDict, ValidationError, validate_call from pydantic_core import PydanticSerializationError from typing_extensions import Self, final @@ -30,7 +32,10 @@ SHUTDOWN_KEYS, STRICT_VALIDATION, ) -from ._internal.control_plane.control_plane_manager import ControlPlaneManager +from ._internal.control_plane.control_plane_manager import ( + GENERIC_MESSAGE_SERIALIZER, + ControlPlaneManager, +) from ._internal.data_plane.data_plane_manager import DataPlaneManager from ._internal.exceptions import IntersectApplicationError, IntersectError from ._internal.interfaces import IntersectEventObserver @@ -42,13 +47,20 @@ create_userspace_message, deserialize_and_validate_userspace_message, ) -from ._internal.schema import get_schema_and_functions_from_capability_implementation +from ._internal.schema import get_schema_and_functions_from_capability_implementations from ._internal.stoppable_thread import StoppableThread from ._internal.utils import die from ._internal.version_resolver import resolve_user_version from .capability.base import IntersectBaseCapabilityImplementation from .config.service import IntersectServiceConfig from .core_definitions import IntersectDataHandler, IntersectMimeType +from .service_callback_definitions import ( + INTERSECT_SERVICE_RESPONSE_CALLBACK_TYPE, # noqa: TCH001 (runtime-checked annotation) +) +from .shared_callback_definitions import ( + INTERSECT_JSON_VALUE, # noqa: TCH001 (runtime-checked annotation) + IntersectDirectMessageParams, # noqa: TCH001 (runtime-checked annotation) +) from .version import version_string if TYPE_CHECKING: @@ -64,67 +76,104 @@ class IntersectService(IntersectEventObserver): you return your output. The service automatically integrates all of the following components: - - The user-defined capability + - The user-defined capabilities - Any message brokers - Any core INTERSECT data layers What it does NOT do: - deal with any custom messaging logic - i.e. Pyro logic, an internal ZeroMQ system, etc. ... these should be defined on the capability level. - - deal with any application logic - that should be handled by the user's capability + - deal with any application logic - that should be handled by the capabilities - Users should generally not need to interact with objects of this class outside of the constructor and the startup() and shutdown() functions. It's advisable - not to mutate the service object yourself, though you can freely log out properties for debugging purposes. + Users should generally not need to interact with objects of this class outside of the constructor and the following functions: startup(), add_startup_messages(), shutdown(), add_shutdown_messages(), create_external_request(). + It's advisable not to mutate the service object yourself, though you can freely log out properties for debugging purposes. Note: The ONLY stable function methods are: - the constructor - startup() + - add_startup_messages() - shutdown() + - add_shutdown_messages() - is_connected() + - considered_unrecoverable() - forbid_keys() - allow_keys() - allow_all_functions() - get_blocked_keys() + - create_external_request() No other functions or parameters are guaranteed to remain stable. """ + class _ExternalRequest: + """Class representative of an ongoing request to another service.""" + + def __init__( + self, + req_id: UUID, + req_name: str, + request: IntersectDirectMessageParams, + response_handler: INTERSECT_SERVICE_RESPONSE_CALLBACK_TYPE | None = None, + ) -> None: + """Create an external request.""" + self.cv = Condition() + self.request_id = req_id + self.request_name = req_name + self.processed: bool = False + self.error: str | None = None + self.request = request + self.response: INTERSECT_JSON_VALUE = None + self.got_valid_response: bool = False + self.response_fn = response_handler + self.waiting: bool = False + self.cleanup_req = False + """When this flag is set to True, mark this request for GC deletion.""" + def __init__( self, - capability: IntersectBaseCapabilityImplementation, + capabilities: list[IntersectBaseCapabilityImplementation], config: IntersectServiceConfig, ) -> None: """The constructor performs almost all validation checks necessary to function in the INTERSECT ecosystem, with the exception of checking connections/credentials to any backing services. Parameters: - capability: Your capability implementation class + capabilities: Your list of capability implementation classes config: The IntersectConfig class """ - if not isinstance(capability, IntersectBaseCapabilityImplementation): - die( - f'IntersectService parameter must inherit from intersect_sdk.IntersectBaseCapabilityImplementation instead of "{capability.__class__.__name__}" .' - ) - if not hasattr(capability, '__intersect_sdk_observers__'): - die( - f'{capability.__class__.__name__} needs to call "super().__init__()" in the constructor.' - ) + for cap in capabilities: + if not isinstance(cap, IntersectBaseCapabilityImplementation): + die( + f'IntersectService parameter must inherit from intersect_sdk.IntersectBaseCapabilityImplementation instead of "{cap.__class__.__name__}" .' + ) + if not hasattr(cap, '__intersect_sdk_observers__'): + die( + f'{cap.__class__.__name__} needs to call "super().__init__()" in the constructor.' + ) + + # we generally start observing and don't stop, doesn't really matter if we startup or shutdown + cap._intersect_sdk_register_observer(self) # noqa: SLF001 (we don't want users calling or overriding it, but this is fine.) + + self.capabilities = capabilities + # this is called here in case a user created the object using "IntersectServiceConfig.model_construct()" to skip validation config = IntersectServiceConfig.model_validate(config) - self.capability = capability + ( schema, function_map, event_map, + status_fn_capability, status_fn_name, status_type_adapter, - ) = get_schema_and_functions_from_capability_implementation( - capability.__class__, - capability_name=config.hierarchy, + ) = get_schema_and_functions_from_capability_implementations( + self.capabilities, + service_name=config.hierarchy, excluded_data_handlers=config.data_stores.get_missing_data_store_types(), ) self._schema = schema """ Stringified schema of the user's application. Gets sent in several status message requests. """ + self._function_map = MappingProxyType(function_map) """ INTERNAL USE ONLY @@ -134,6 +183,7 @@ def __init__( You can get user-defined properties from the method via getattr(_function_map.method, KEY), the keys get set in the intersect_message decorator function (annotations.py). """ + self._event_map = MappingProxyType(event_map) """ INTERNAL USE ONLY @@ -153,13 +203,14 @@ def __init__( """ self._hierarchy = config.hierarchy + self._uuid = uuid3(uuid1(), config.hierarchy.hierarchy_string('.')) self._status_thread: StoppableThread | None = None self._status_ticker_interval = config.status_interval self._status_retrieval_fn: Callable[[], bytes] = ( ( lambda: status_type_adapter.dump_json( - getattr(self.capability, status_fn_name)(), by_alias=True, warnings='error' + getattr(status_fn_capability, status_fn_name)(), by_alias=True, warnings='error' ) ) if status_type_adapter and status_fn_name @@ -168,25 +219,47 @@ def __init__( self._status_memo = self._status_retrieval_fn() + self._external_request_thread: StoppableThread | None = None + self._external_requests_lock = Lock() + self._external_requests: dict[str, IntersectService._ExternalRequest] = {} + self._external_request_ctr = 0 + + self._startup_messages: list[ + tuple[IntersectDirectMessageParams, INTERSECT_SERVICE_RESPONSE_CALLBACK_TYPE | None] + ] = [] + self._resend_startup_messages = True + self._sent_startup_messages = False + + self._shutdown_messages: list[ + tuple[IntersectDirectMessageParams, INTERSECT_SERVICE_RESPONSE_CALLBACK_TYPE | None] + ] = [] + self._data_plane_manager = DataPlaneManager(self._hierarchy, config.data_stores) # we PUBLISH messages on this channel self._lifecycle_channel_name = f"{config.hierarchy.hierarchy_string('/')}/lifecycle" # we PUBLISH event messages on this channel self._events_channel_name = f"{config.hierarchy.hierarchy_string('/')}/events" - # we SUBSCRIBE to messages on this channel - self._userspace_channel_name = f"{config.hierarchy.hierarchy_string('/')}/userspace" + # we SUBSCRIBE to messages on this channel to receive requests + self._service_channel_name = f"{config.hierarchy.hierarchy_string('/')}/request" + # we SUBSCRIBE to messages on this channel to receive responses + self._client_channel_name = f"{config.hierarchy.hierarchy_string('/')}/response" + self._control_plane_manager = ControlPlaneManager( control_configs=config.brokers, ) # our userspace queue should be able to survive shutdown self._control_plane_manager.add_subscription_channel( - self._userspace_channel_name, - {self._handle_userspace_message_raw}, - persist=True, + self._service_channel_name, {self._handle_service_message_raw}, persist=True + ) + self._control_plane_manager.add_subscription_channel( + self._client_channel_name, {self._handle_client_message_raw}, persist=True ) - # we generally start observing and don't stop, doesn't really matter if we startup or shutdown - self.capability._intersect_sdk_register_observer(self) # noqa: SLF001 (we don't want users calling or overriding it, but this is fine.) + def _get_capability(self, target: str) -> Any | None: + for cap in self.capabilities: + if cap.capability_name == target: + return cap + return None @final def startup(self) -> Self: @@ -219,10 +292,27 @@ def startup(self) -> Self: # Start the status thread if it doesn't already exist if self._status_thread is None: self._status_thread = StoppableThread( - target=self._status_ticker, name=f'IntersectService_{uuid4()!s}_status_thread' + target=self._status_ticker, name=f'IntersectService_{self._uuid}_status_thread' ) self._status_thread.start() + # Process pending startup messages + if self._resend_startup_messages or not self._sent_startup_messages: + logger.info('Sending startup messages') + for tup in self._startup_messages: + message, fn = tup + self.create_external_request(request=message, response_handler=fn) + self._process_external_requests() + self._sent_startup_messages = True + + # Start the external request thread if it doesn't already exist + if self._external_request_thread is None: + self._external_request_thread = StoppableThread( + target=self._send_external_requests, + name=f'IntersectService_{self._uuid}_ext_req_thread', + ) + self._external_request_thread.start() + logger.info('Service startup complete') return self @@ -243,6 +333,17 @@ def shutdown(self, reason: str | None = None) -> Self: """ logger.info(f'Service is shutting down (reason: {reason})') + if self._external_request_thread is not None: + self._external_request_thread.stop() + self._external_request_thread.join() + self._external_request_thread = None + + logger.info('Sending shutdown messages') + for tup in self._shutdown_messages: + message, fn = tup + self.create_external_request(request=message, response_handler=fn) + self._process_external_requests() + # Stop polling if self._status_thread is not None: self._status_thread.stop() @@ -348,7 +449,133 @@ def get_blocked_keys(self) -> set[str]: """ return self._function_keys.copy() - def _handle_userspace_message_raw(self, raw: bytes) -> None: + def add_startup_messages( + self, + messages: list[ + tuple[IntersectDirectMessageParams, INTERSECT_SERVICE_RESPONSE_CALLBACK_TYPE | None] + ], + ) -> None: + """Add request messages to send out to various microservices when this service starts. + + Params: + - messages: list of tuples - first value in tuple contains the messages you want to send out on startup, + second value in tuple contains the callback function for handling the response from the other Service. + """ + self._startup_messages.extend(messages) + + def add_shutdown_messages( + self, + messages: list[ + tuple[IntersectDirectMessageParams, INTERSECT_SERVICE_RESPONSE_CALLBACK_TYPE | None] + ], + ) -> None: + """Add request messages to send out to various microservices on shutdown. + + Params: + - messages: list of tuples - first value in tuple contains the messages you want to send out on startup, + second value in tuple contains the callback function for handling the response from the other Service. + """ + self._shutdown_messages.extend(messages) + + @validate_call(config=ConfigDict(revalidate_instances='always')) + def create_external_request( + self, + request: IntersectDirectMessageParams, + response_handler: Union[INTERSECT_SERVICE_RESPONSE_CALLBACK_TYPE, None] = None, # noqa: UP007 (runtime checked annotation) + ) -> UUID: + """Create an external request that we'll send to a different Service. + + Params: + - request: the request we want to send out, encapsulated as an IntersectClientMessageParams object + - response_handler: optional callback for how we want to handle the response from this request. + + Returns: + - generated RequestID associated with your request + + Raises: + - pydantic.ValidationError - if the request parameter isn't valid + """ + self._external_request_ctr += 1 + request_name = f'ext-req-{self._external_request_ctr}' + request_uuid = uuid3(self._uuid, request_name) + extreq = IntersectService._ExternalRequest( + req_id=request_uuid, + req_name=request_name, + request=request, + response_handler=response_handler, + ) + self._external_requests_lock.acquire_lock(blocking=True) + self._external_requests[str(request_uuid)] = extreq + self._external_requests_lock.release_lock() + return request_uuid + + def _get_external_request(self, req_id: UUID) -> IntersectService._ExternalRequest | None: + req_id_str = str(req_id) + if req_id_str in self._external_requests: + req: IntersectService._ExternalRequest = self._external_requests[req_id_str] + return req + return None + + def _process_external_requests(self) -> None: + self._external_requests_lock.acquire_lock(blocking=True) + + # process requests + for extreq in self._external_requests.values(): + if not extreq.processed: + self._process_external_request(extreq) + # delete requests + cleanup_list = [ + str(extreq.request_id) + for extreq in self._external_requests.values() + if extreq.cleanup_req + ] + for extreq_id in cleanup_list: + extreq = self._external_requests.pop(extreq_id) + del extreq + + self._external_requests_lock.release_lock() + + def _process_external_request(self, extreq: IntersectService._ExternalRequest) -> None: + response = None + + now = datetime.now(timezone.utc) + logger.debug(f'Processing external request {extreq.request_id} @ {now}') + + with extreq.cv: + # execute the request + extreq.processed = True + if self._send_client_message(request_id=extreq.request_id, params=extreq.request): + # MJB NOTE: currently it is impossible to get a response for the + # external request when this function is called while + # handling an incoming request, so we are just ignoring + # any wait timeouts below. + + # wait on the response condition and get the response + extreq.waiting = True + if extreq.cv.wait(timeout=1.0): + if extreq.error is None: + response = extreq.response + else: + error_msg = extreq.error + logger.warning( + f'External service request encountered an error: {error_msg}' + ) + extreq.cleanup_req = True + else: + logger.debug('Request wait timed-out!') + extreq.waiting = False + else: + logger.warning('Failed to send request!') + + # process the response + if ( + extreq.got_valid_response + and extreq.response_fn is not None + and extreq.error is None + ): + extreq.response_fn(response) + + def _handle_service_message_raw(self, raw: bytes) -> None: """Main broker callback function. Deserializes and validates a userspace message from a broker. @@ -358,14 +585,14 @@ def _handle_userspace_message_raw(self, raw: bytes) -> None: try: message = deserialize_and_validate_userspace_message(raw) logger.debug(f'Received userspace message:\n{message}') - response_msg = self._handle_userspace_message(message) + response_msg = self._handle_service_message(message) if response_msg: logger.debug( 'Send %s message:\n%s', 'error' if response_msg['headers']['has_error'] else 'userspace', response_msg, ) - response_channel = f"{message['headers']['source'].replace('.', '/')}/userspace" + response_channel = f"{message['headers']['source'].replace('.', '/')}/response" # Persistent userspace messages may be useful for orchestration. # Persistence will not hurt anything. self._control_plane_manager.publish_message( @@ -376,7 +603,7 @@ def _handle_userspace_message_raw(self, raw: bytes) -> None: f'Invalid message received on userspace message channel, ignoring. Full message:\n{e}' ) - def _handle_userspace_message(self, message: UserspaceMessage) -> UserspaceMessage | None: + def _handle_service_message(self, message: UserspaceMessage) -> UserspaceMessage | None: """Main logic for handling a userspace message, minus all broker logic. Params @@ -406,6 +633,13 @@ def _handle_userspace_message(self, message: UserspaceMessage) -> UserspaceMessa logger.error(err_msg) return self._make_error_message(err_msg, message) + operation_capability, operation_method = operation.split('.') + target_capability = self._get_capability(operation_capability) + if target_capability is None: + err_msg = f"Could not locate service capability providing '{operation_capability}' for operation {operation}." + logger.error(err_msg) + return self._make_error_message(err_msg, message) + # THREE: GET DATA FROM APPROPRIATE DATA STORE try: request_params = self._data_plane_manager.incoming_message_data_handler(message) @@ -416,7 +650,9 @@ def _handle_userspace_message(self, message: UserspaceMessage) -> UserspaceMessa try: # FOUR: CALL USER FUNCTION AND GET MESSAGE - response = self._call_user_function(operation, operation_meta, request_params) + response = self._call_user_function( + target_capability, operation_method, operation_meta, request_params + ) # FIVE: SEND DATA TO APPROPRIATE DATA STORE response_data_handler = getattr(operation_meta.method, RESPONSE_DATA) response_content_type = getattr(operation_meta.method, RESPONSE_CONTENT) @@ -443,10 +679,79 @@ def _handle_userspace_message(self, message: UserspaceMessage) -> UserspaceMessa data_handler=response_data_handler, operation_id=message['operationId'], payload=response_payload, + message_id=message['messageId'], # associate response with request + ) + + def _handle_client_message_raw(self, raw: bytes) -> None: + """Broker callback, deserialize and validate a userspace message from a broker.""" + try: + message = deserialize_and_validate_userspace_message(raw) + logger.debug(f'Received userspace message:\n{message}') + self._handle_client_message(message) + except ValidationError as e: + logger.warning( + f'Invalid message received on client message channel, ignoring. Full message:\n{e}' + ) + + def _handle_client_message(self, message: UserspaceMessage) -> None: + """Handle a deserialized userspace message.""" + extreq = self._get_external_request(message['messageId']) + if extreq is not None: + error_msg: str | None = None + try: + msg_payload = GENERIC_MESSAGE_SERIALIZER.validate_json( + self._data_plane_manager.incoming_message_data_handler(message) + ) + except ValidationError as e: + error_msg = f'Service sent back invalid response:\n{e}' + logger.warning(error_msg) + except IntersectError: + error_msg = 'INTERNAL ERROR: failed to get message payload from data handler' + logger.error(error_msg) + + with extreq.cv: + if error_msg is not None: + extreq.error = error_msg + else: + extreq.response = msg_payload + extreq.got_valid_response = True + if extreq.waiting: + extreq.cv.notify() + else: + error_msg = f'No external request found for message:\n{message}' + logger.warning(error_msg) + + def _send_client_message(self, request_id: UUID, params: IntersectDirectMessageParams) -> bool: + """Send a userspace message.""" + # "params" should already be validated at this stage. + request = GENERIC_MESSAGE_SERIALIZER.dump_json(params.payload, warnings=False) + + # TWO: SEND DATA TO APPROPRIATE DATA STORE + try: + request_payload = self._data_plane_manager.outgoing_message_data_handler( + request, params.content_type, params.data_handler + ) + except IntersectError: + return False + + # THREE: SEND MESSAGE + msg = create_userspace_message( + source=self._hierarchy.hierarchy_string('.'), + destination=params.destination, + content_type=params.content_type, + data_handler=params.data_handler, + operation_id=params.operation, + payload=request_payload, + message_id=request_id, ) + logger.debug(f'Sending client message:\n{msg}') + request_channel = f"{params.destination.replace('.', '/')}/request" + self._control_plane_manager.publish_message(request_channel, msg, persist=True) + return True def _call_user_function( self, + fn_cap: IntersectBaseCapabilityImplementation, fn_name: str, fn_meta: FunctionMetadata, fn_params: bytes, @@ -456,6 +761,7 @@ def _call_user_function( Basic validations defined from a user's type definitions will also occur here. Params + fn_cap = capability implementing the user function fn_name = operation. These get represented in the schema as "channels". fn_meta = all information stored about the user's operation. This includes user-defined params and the request/response (de)serializers. fn_params = the request argument. @@ -501,7 +807,7 @@ def _call_user_function( logger.warning(err_msg) raise try: - response = getattr(self.capability, fn_name)(request_obj) + response = getattr(fn_cap, fn_name)(request_obj) except ( Exception ) as e: # (need to catch all possible exceptions to gracefully handle the thread) @@ -509,7 +815,7 @@ def _call_user_function( raise IntersectApplicationError from e else: try: - response = getattr(self.capability, fn_name)() + response = getattr(fn_cap, fn_name)() except ( Exception ) as e: # (need to catch all possible exceptions to gracefully handle the thread) @@ -587,6 +893,7 @@ def _make_error_message( data_handler=IntersectDataHandler.MESSAGE, operation_id=original_message['operationId'], payload=error_string, + message_id=original_message['messageId'], # associate error reply with original has_error=True, ) @@ -638,3 +945,12 @@ def _status_ticker(self) -> None: payload={'schema': self._schema, 'status': self._status_memo}, ) self._status_thread.wait(self._status_ticker_interval) + + def _send_external_requests(self) -> None: + """Periodically sends messages added to self._external_messages. Runs in a separate thread.""" + # initial wait should guarantee that first request message does not beat initial startup message + if self._external_request_thread: + self._external_request_thread.wait(10.0) + while not self._external_request_thread.stopped(): + self._process_external_requests() + self._external_request_thread.wait(0.5) diff --git a/src/intersect_sdk/service_callback_definitions.py b/src/intersect_sdk/service_callback_definitions.py new file mode 100644 index 0000000..4eb0c08 --- /dev/null +++ b/src/intersect_sdk/service_callback_definitions.py @@ -0,0 +1,16 @@ +"""Callback definitions used by Services and Capabilities. + +Please see shared_callback_definitions for definitions which are also used by Clients. +""" + +from typing import Callable + +from .client_callback_definitions import INTERSECT_JSON_VALUE + +INTERSECT_SERVICE_RESPONSE_CALLBACK_TYPE = Callable[[INTERSECT_JSON_VALUE], None] +"""Callback typing for the function which handles another Service's response. + +The function accepts one argument - the direct response from the other Service. Message metadata will not be included. + +This callback type should only be used on Capabilities - for client callback functions, use INTERSECT_CLIENT_RESPONSE_CALLBACK_TYPE . +""" diff --git a/src/intersect_sdk/shared_callback_definitions.py b/src/intersect_sdk/shared_callback_definitions.py new file mode 100644 index 0000000..fd1ff04 --- /dev/null +++ b/src/intersect_sdk/shared_callback_definitions.py @@ -0,0 +1,67 @@ +"""Callback definitions shared between Services, Capabilities, and Clients.""" + +from typing import Any, Dict, List, Union + +from pydantic import BaseModel, ConfigDict, Field +from typing_extensions import Annotated, TypeAlias + +from .constants import SYSTEM_OF_SYSTEM_REGEX +from .core_definitions import IntersectDataHandler, IntersectMimeType + +INTERSECT_JSON_VALUE: TypeAlias = Union[ + List['INTERSECT_JSON_VALUE'], + Dict[str, 'INTERSECT_JSON_VALUE'], + str, + bool, + int, + float, + None, +] +""" +This is a simple type representation of JSON as a Python object. INTERSECT will automatically deserialize service payloads into one of these types. + +(Pydantic has a similar type, "JsonValue", which should be used if you desire functionality beyond type hinting. This is strictly a type hint.) +""" + + +class IntersectDirectMessageParams(BaseModel): + """These are the public-facing properties of a message which can be sent to another Service. + + This object can be used by Clients, and by Services if initiating a service-to-service request. + """ + + destination: Annotated[str, Field(pattern=SYSTEM_OF_SYSTEM_REGEX)] + """ + The destination string. You'll need to know the system-of-system representation of the Service. + + Note that this should match what you would see in the schema. + """ + + operation: str + """ + The name of the operation you want to call from the Service - this should be represented as it is in the Service's schema. + """ + + payload: Any + """ + The raw Python object you want to have serialized as the payload. + + If you want to just use the service's default value for a request (assuming it has a default value for a request), you may set this as None. + """ + + content_type: IntersectMimeType = IntersectMimeType.JSON + """ + The IntersectMimeType of your message. You'll want this to match with the ContentType of the function from the schema. + + default: IntersectMimeType.JSON + """ + + data_handler: IntersectDataHandler = IntersectDataHandler.MESSAGE + """ + The IntersectDataHandler you want to use (most people can just use IntersectDataHandler.MESSAGE here, unless your data is very large) + + default: IntersectDataHandler.MESSAGE + """ + + # pydantic config + model_config = ConfigDict(revalidate_instances='always') diff --git a/tests/e2e/test_examples.py b/tests/e2e/test_examples.py index 7d9d679..f1990df 100644 --- a/tests/e2e/test_examples.py +++ b/tests/e2e/test_examples.py @@ -82,13 +82,13 @@ def test_example_2_counter(): == 'Source: "counting-organization.counting-facility.counting-system.counting-subsystem.counting-service"' ) # test operation - assert lines[1] == 'Operation: "start_count"' - assert lines[5] == 'Operation: "stop_count"' - assert lines[9] == 'Operation: "start_count"' - assert lines[13] == 'Operation: "reset_count"' - assert lines[17] == 'Operation: "reset_count"' - assert lines[21] == 'Operation: "start_count"' - assert lines[25] == 'Operation: "stop_count"' + assert lines[1] == 'Operation: "CountingExample.start_count"' + assert lines[5] == 'Operation: "CountingExample.stop_count"' + assert lines[9] == 'Operation: "CountingExample.start_count"' + assert lines[13] == 'Operation: "CountingExample.reset_count"' + assert lines[17] == 'Operation: "CountingExample.reset_count"' + assert lines[21] == 'Operation: "CountingExample.start_count"' + assert lines[25] == 'Operation: "CountingExample.stop_count"' # test payloads # if 'count' is within 3 steps of the subtrahend, just pass the test @@ -132,3 +132,10 @@ def test_example_3_ping_pong_events(): def test_example_3_ping_pong_events_amqp(): assert run_example_test('3_ping_pong_events_amqp') == 'ping\npong\nping\npong\n' + + +def test_example_4_service_to_service(): + assert ( + run_example_test('4_service_to_service') + == 'Received Response from Service 2: Acknowledging service one text -> Kicking off the example!\n' + ) diff --git a/tests/fixtures/example_schema.json b/tests/fixtures/example_schema.json index 99e4b7d..d923449 100644 --- a/tests/fixtures/example_schema.json +++ b/tests/fixtures/example_schema.json @@ -4,7 +4,7 @@ "info": { "title": "test.test.test.test.test", "version": "0.0.0", - "description": "This is an example of the overarching capability class a user creates that we want to inject into the service.\n\nWhen defining entrypoints to your capability, use the @intersect_message() annotation. Your class will need\nat least one function with this annotation. These functions REQUIRE type annotations to function properly.\nSee the @intersect_message() annotation for more information.\n\nYou can potentially extend from multiple preexisting Capabilities in this class - each Capability may have\nseveral abstract functions which would need to be implemented by the user.\n\nBeyond this, you may define your capability class however you like, including through its constructor." + "description": "This is an example of the overarching capability class a user creates that we want to inject into the service.\n\nWhen defining entrypoints to your capability, use the @intersect_message() annotation. Your class will need\nat least one function with this annotation. These functions REQUIRE type annotations to function properly.\nSee the @intersect_message() annotation for more information.\n\nYou can potentially extend from multiple preexisting Capabilities in this class - each Capability may have\nseveral abstract functions which would need to be implemented by the user.\n\nBeyond this, you may define your capability class however you like, including through its constructor.\n" }, "defaultContentType": "application/json", "channels": { diff --git a/tests/fixtures/example_schema.py b/tests/fixtures/example_schema.py index 37b2df8..b36edd2 100644 --- a/tests/fixtures/example_schema.py +++ b/tests/fixtures/example_schema.py @@ -235,6 +235,7 @@ def __init__(self) -> None: which handles talking to the various INTERSECT-related backing services. """ super().__init__() + self.capability_name = 'DummyCapability' self._status_example = DummyStatus( functions_called=0, last_function_called='', diff --git a/tests/integration/test_return_type_mismatch.py b/tests/integration/test_return_type_mismatch.py index 5d99a69..c5c7954 100644 --- a/tests/integration/test_return_type_mismatch.py +++ b/tests/integration/test_return_type_mismatch.py @@ -44,8 +44,10 @@ def wrong_return_annotation(self, param: int) -> int: def make_intersect_service() -> IntersectService: + capability = ReturnTypeMismatchCapabilityImplementation() + capability.capability_name = 'ReturnTypeMismatchCapability' return IntersectService( - ReturnTypeMismatchCapabilityImplementation(), + [capability], IntersectServiceConfig( hierarchy=FAKE_HIERARCHY_CONFIG, data_stores=DataStoreConfigMap( @@ -96,19 +98,19 @@ def userspace_msg_callback(payload: bytes) -> None: msg[0] = deserialize_and_validate_userspace_message(payload) message_interceptor.add_subscription_channel( - 'msg/msg/msg/msg/msg/userspace', {userspace_msg_callback}, False + 'msg/msg/msg/msg/msg/response', {userspace_msg_callback}, False ) message_interceptor.connect() intersect_service.startup() time.sleep(1.0) message_interceptor.publish_message( - intersect_service._userspace_channel_name, + intersect_service._service_channel_name, create_userspace_message( source='msg.msg.msg.msg.msg', destination='test.test.test.test.test', content_type=IntersectMimeType.JSON, data_handler=IntersectDataHandler.MESSAGE, - operation_id='wrong_return_annotation', + operation_id='ReturnTypeMismatchCapability.wrong_return_annotation', # calculate_fibonacci takes in a tuple of two integers but we'll just send it one payload=b'2', ), diff --git a/tests/integration/test_service.py b/tests/integration/test_service.py index 3cf7a52..e8a8ecd 100644 --- a/tests/integration/test_service.py +++ b/tests/integration/test_service.py @@ -39,7 +39,7 @@ def make_intersect_service() -> IntersectService: return IntersectService( - DummyCapabilityImplementation(), + [DummyCapabilityImplementation()], IntersectServiceConfig( hierarchy=FAKE_HIERARCHY_CONFIG, data_stores=DataStoreConfigMap( @@ -92,13 +92,16 @@ def test_control_plane_connections(): assert intersect_service.is_connected() is False channels = intersect_service._control_plane_manager.get_subscription_channels() - # we have one channel (even if we're disconnected) ... - assert len(channels) == 1 - # ... and one callback function for this channel - channel_key = next(iter(channels)) - assert len(channels[channel_key].callbacks) == 1 - - intersect_service._control_plane_manager.remove_subscription_channel(channel_key) + # we have two channels (even if we're disconnected) ... + assert len(channels) == 2 + # ... and one callback function for each channel + channel_keys = [] + for channel_key in iter(channels): + channel_keys.append(channel_key) + assert len(channels[channel_key].callbacks) == 1 + + for channel_key in channel_keys: + intersect_service._control_plane_manager.remove_subscription_channel(channel_key) assert len(intersect_service._control_plane_manager.get_subscription_channels()) == 0 @@ -112,19 +115,19 @@ def userspace_msg_callback(payload: bytes) -> None: msg[0] = deserialize_and_validate_userspace_message(payload) message_interceptor.add_subscription_channel( - 'msg/msg/msg/msg/msg/userspace', {userspace_msg_callback}, False + 'msg/msg/msg/msg/msg/response', {userspace_msg_callback}, False ) message_interceptor.connect() intersect_service.startup() time.sleep(1.0) message_interceptor.publish_message( - intersect_service._userspace_channel_name, + intersect_service._service_channel_name, create_userspace_message( source='msg.msg.msg.msg.msg', destination='test.test.test.test.test', content_type=IntersectMimeType.JSON, data_handler=IntersectDataHandler.MESSAGE, - operation_id='calculate_fibonacci', + operation_id='DummyCapability.calculate_fibonacci', payload=b'[4,6]', ), True, @@ -147,19 +150,19 @@ def userspace_msg_callback(payload: bytes) -> None: msg[0] = deserialize_and_validate_userspace_message(payload) message_interceptor.add_subscription_channel( - 'msg/msg/msg/msg/msg/userspace', {userspace_msg_callback}, False + 'msg/msg/msg/msg/msg/response', {userspace_msg_callback}, False ) message_interceptor.connect() intersect_service.startup() time.sleep(1.0) message_interceptor.publish_message( - intersect_service._userspace_channel_name, + intersect_service._service_channel_name, create_userspace_message( source='msg.msg.msg.msg.msg', destination='test.test.test.test.test', content_type=IntersectMimeType.JSON, data_handler=IntersectDataHandler.MESSAGE, - operation_id='test_generator', + operation_id='DummyCapability.test_generator', payload=b'"res"', ), True, @@ -181,19 +184,19 @@ def userspace_msg_callback(payload: bytes) -> None: msg[0] = deserialize_and_validate_userspace_message(payload) message_interceptor.add_subscription_channel( - 'msg/msg/msg/msg/msg/userspace', {userspace_msg_callback}, False + 'msg/msg/msg/msg/msg/response', {userspace_msg_callback}, False ) message_interceptor.connect() intersect_service.startup() time.sleep(1.0) message_interceptor.publish_message( - intersect_service._userspace_channel_name, + intersect_service._service_channel_name, create_userspace_message( source='msg.msg.msg.msg.msg', destination='test.test.test.test.test', content_type=IntersectMimeType.JSON, data_handler=IntersectDataHandler.MESSAGE, - operation_id='valid_default_argument', + operation_id='DummyCapability.valid_default_argument', payload=b'null', # if sending null as the payload, the SDK will call the function's default value ), True, @@ -216,19 +219,19 @@ def userspace_msg_callback(payload: bytes) -> None: msg[0] = deserialize_and_validate_userspace_message(payload) message_interceptor.add_subscription_channel( - 'msg/msg/msg/msg/msg/userspace', {userspace_msg_callback}, False + 'msg/msg/msg/msg/msg/response', {userspace_msg_callback}, False ) message_interceptor.connect() intersect_service.startup() time.sleep(1.0) message_interceptor.publish_message( - intersect_service._userspace_channel_name, + intersect_service._service_channel_name, create_userspace_message( source='msg.msg.msg.msg.msg', destination='test.test.test.test.test', content_type=IntersectMimeType.JSON, data_handler=IntersectDataHandler.MESSAGE, - operation_id='calculate_fibonacci', + operation_id='DummyCapability.calculate_fibonacci', # calculate_fibonacci takes in a tuple of two integers but we'll just send it one payload=b'[2]', ), @@ -255,20 +258,19 @@ def userspace_msg_callback(payload: bytes) -> None: msg[0] = deserialize_and_validate_userspace_message(payload) message_interceptor.add_subscription_channel( - 'msg/msg/msg/msg/msg/userspace', {userspace_msg_callback}, False + 'msg/msg/msg/msg/msg/response', {userspace_msg_callback}, False ) message_interceptor.connect() intersect_service.startup() time.sleep(1.0) message_interceptor.publish_message( - intersect_service._userspace_channel_name, + intersect_service._service_channel_name, create_userspace_message( source='msg.msg.msg.msg.msg', destination='test.test.test.test.test', content_type=IntersectMimeType.JSON, data_handler=IntersectDataHandler.MESSAGE, - operation_id='THIS_FUNCTION_DOES_NOT_EXIST', - # calculate_fibonacci takes in a tuple of two integers but we'll just send it one + operation_id='DummyCapability.THIS_FUNCTION_DOES_NOT_EXIST', payload=b'null', ), True, @@ -292,19 +294,19 @@ def userspace_msg_callback(payload: bytes) -> None: msg[0] = deserialize_and_validate_userspace_message(payload) message_interceptor.add_subscription_channel( - 'msg/msg/msg/msg/msg/userspace', {userspace_msg_callback}, False + 'msg/msg/msg/msg/msg/response', {userspace_msg_callback}, False ) message_interceptor.connect() intersect_service.startup() time.sleep(1.0) message_interceptor.publish_message( - intersect_service._userspace_channel_name, + intersect_service._service_channel_name, create_userspace_message( source='msg.msg.msg.msg.msg', destination='test.test.test.test.test', content_type=IntersectMimeType.JSON, data_handler=IntersectDataHandler.MESSAGE, - operation_id='test_datetime', + operation_id='DummyCapability.test_datetime', payload=b'"1970-01-01T00:00:00Z"', ), True, @@ -339,7 +341,7 @@ def lifecycle_msg_callback(payload: bytes) -> None: 'test/test/test/test/test/lifecycle', {lifecycle_msg_callback}, False ) # we do not really care about the userspace message response, but we'll listen to it to consume it - message_interceptor.add_subscription_channel('msg/msg/msg/msg/msg/userspace', set(), False) + message_interceptor.add_subscription_channel('msg/msg/msg/msg/msg/response', set(), False) message_interceptor.connect() # sleep a moment to make sure message_interceptor catches the startup message time.sleep(1.0) @@ -349,13 +351,13 @@ def lifecycle_msg_callback(payload: bytes) -> None: # send a message to trigger a status update (just the way the example service's domain works, not intrinsic) message_interceptor.publish_message( - intersect_service._userspace_channel_name, + intersect_service._service_channel_name, create_userspace_message( source='msg.msg.msg.msg.msg', destination='test.test.test.test.test', content_type=IntersectMimeType.JSON, data_handler=IntersectDataHandler.MESSAGE, - operation_id='verify_float_dict', + operation_id='DummyCapability.verify_float_dict', # note that the dict key MUST be a string, even though the input wants a float key payload=b'{"1.2":"one point two"}', ), diff --git a/tests/unit/test_base_capability_implementation.py b/tests/unit/test_base_capability_implementation.py index 38147fa..c7277b0 100644 --- a/tests/unit/test_base_capability_implementation.py +++ b/tests/unit/test_base_capability_implementation.py @@ -1,10 +1,13 @@ from __future__ import annotations from typing import Any +from uuid import UUID, uuid4 import pytest from intersect_sdk import ( + INTERSECT_SERVICE_RESPONSE_CALLBACK_TYPE, IntersectBaseCapabilityImplementation, + IntersectDirectMessageParams, IntersectEventDefinition, intersect_event, intersect_message, @@ -18,10 +21,23 @@ class MockObserver(IntersectEventObserver): def __init__(self) -> None: self.tracked_events: list[tuple[str, Any, str]] = [] + self.registered_requests: dict[ + UUID, + tuple[IntersectDirectMessageParams, INTERSECT_SERVICE_RESPONSE_CALLBACK_TYPE | None], + ] = {} def _on_observe_event(self, event_name: str, event_value: Any, operation: str) -> None: self.tracked_events.append((event_name, event_value, operation)) + def create_external_request( + self, + request: IntersectDirectMessageParams, + response_handler: INTERSECT_SERVICE_RESPONSE_CALLBACK_TYPE | None = None, + ) -> UUID: + request_id = uuid4() + self.registered_requests[request_id] = (request, response_handler) + return request_id + # TESTS #################### @@ -33,7 +49,7 @@ class BadClass1(IntersectBaseCapabilityImplementation): def _intersect_sdk_register_observer(self, observer: IntersectEventObserver) -> None: return super()._intersect_sdk_register_observer(observer) - assert 'BadClass1: Cannot override functions' in str(ex) + assert 'BadClass1: Attempted to override a reserved INTERSECT-SDK function' in str(ex) with pytest.raises(RuntimeError) as ex: @@ -41,7 +57,19 @@ class BadClass2(IntersectBaseCapabilityImplementation): def intersect_sdk_emit_event(self, event_name: str, event_value: Any) -> None: return super().intersect_sdk_emit_event(event_name, event_value) - assert 'BadClass2: Cannot override functions' in str(ex) + assert 'BadClass2: Attempted to override a reserved INTERSECT-SDK function' in str(ex) + + with pytest.raises(RuntimeError) as ex: + + class BadClass3(IntersectBaseCapabilityImplementation): + def intersect_sdk_call_service( + self, + request: IntersectDirectMessageParams, + response_handler: INTERSECT_SERVICE_RESPONSE_CALLBACK_TYPE | None = None, + ) -> None: + return super().intersect_sdk_call_service(request, response_handler) + + assert 'BadClass3: Attempted to override a reserved INTERSECT-SDK function' in str(ex) # Note that the ONLY thing the capability itself checks for are annotated functions. @@ -115,3 +143,41 @@ def _inner_function(self) -> None: assert len(observer.tracked_events) == 2 capability.outer_function() assert len(observer.tracked_events) == 3 + + +def test_functions_handle_requests(): + class Inner(IntersectBaseCapabilityImplementation): + def __init__(self) -> None: + super().__init__() + self.tracked_responses = [] + + @intersect_message() + def mock_request_flow(self, fake_request_value: str) -> UUID: + return self.intersect_sdk_call_service( + IntersectDirectMessageParams( + destination='fake.fake.fake.fake.fake', + operation='Fake.fake', + payload=fake_request_value, + ), + self._mock_other_service_callback, + )[0] + + def _mock_other_service_callback(self, param: INTERSECT_SERVICE_RESPONSE_CALLBACK_TYPE): + self.tracked_responses.append(param) + + # setup + observer = MockObserver() + capability = Inner() + capability._intersect_sdk_register_observer(observer) + + # mock making the request and setting up the response handler + body = 'ping' + reqid = capability.mock_request_flow(body) + assert len(observer.registered_requests) == 1 + req, res = observer.registered_requests[reqid] + assert req.payload == body + + # mock calling the response handler + res('pong') + assert len(capability.tracked_responses) == 1 + assert capability.tracked_responses[0] == 'pong' diff --git a/tests/unit/test_invalid_schema_runtime.py b/tests/unit/test_invalid_schema_runtime.py index a6fd00c..8d5430f 100644 --- a/tests/unit/test_invalid_schema_runtime.py +++ b/tests/unit/test_invalid_schema_runtime.py @@ -38,5 +38,5 @@ def test_minio_not_allowed_without_config(caplog: pytest.LogCaptureFixture): ], ) with pytest.raises(SystemExit): - IntersectService(cap, conf) + IntersectService([cap], conf) assert "function 'arbitrary_function' should not set response_data_type as 1" in caplog.text diff --git a/tests/unit/test_schema_valid.py b/tests/unit/test_schema_valid.py index 44bffc4..2d10100 100644 --- a/tests/unit/test_schema_valid.py +++ b/tests/unit/test_schema_valid.py @@ -8,7 +8,7 @@ RESPONSE_DATA, STRICT_VALIDATION, ) -from intersect_sdk._internal.schema import get_schema_and_functions_from_capability_implementation +from intersect_sdk._internal.schema import get_schema_and_functions_from_capability_implementations from intersect_sdk.schema import get_schema_from_capability_implementation from tests.fixtures.example_schema import ( @@ -37,43 +37,57 @@ def test_schema_comparison(): def test_verify_status_fn(): - ( - schema, - function_map, - _, - status_fn_name, - status_type_adapter, - ) = get_schema_and_functions_from_capability_implementation( - DummyCapabilityImplementation, FAKE_HIERARCHY_CONFIG, set() + dummy_cap = DummyCapabilityImplementation() + (schema, function_map, _, status_fn_capability, status_fn_name, status_type_adapter) = ( + get_schema_and_functions_from_capability_implementations( + [dummy_cap], FAKE_HIERARCHY_CONFIG, set() + ) ) + assert status_fn_capability is dummy_cap assert status_fn_name == 'get_status' - - assert status_fn_name in function_map assert status_fn_name not in schema['channels'] - assert status_type_adapter == function_map[status_fn_name].response_adapter - assert function_map[status_fn_name].request_adapter is None + + scoped_name = f'{status_fn_capability.capability_name}.{status_fn_name}' + assert scoped_name in function_map + assert status_type_adapter == function_map[scoped_name].response_adapter + assert function_map[scoped_name].request_adapter is None assert status_type_adapter.json_schema() == schema['components']['schemas']['DummyStatus'] def test_verify_attributes(): - _, function_map, _, _, _ = get_schema_and_functions_from_capability_implementation( - DummyCapabilityImplementation, - FAKE_HIERARCHY_CONFIG, - set(), + dummy_cap = DummyCapabilityImplementation() + _, function_map, _, _, _, _ = get_schema_and_functions_from_capability_implementations( + [dummy_cap], FAKE_HIERARCHY_CONFIG, set() ) # test defaults assert ( - getattr(function_map['verify_float_dict'].method, RESPONSE_DATA) + getattr(function_map['DummyCapability.verify_float_dict'].method, RESPONSE_DATA) == IntersectDataHandler.MESSAGE ) - assert getattr(function_map['verify_nested'].method, REQUEST_CONTENT) == IntersectMimeType.JSON - assert getattr(function_map['verify_nested'].method, RESPONSE_CONTENT) == IntersectMimeType.JSON - assert getattr(function_map['verify_nested'].method, STRICT_VALIDATION) is False + assert ( + getattr(function_map['DummyCapability.verify_nested'].method, REQUEST_CONTENT) + == IntersectMimeType.JSON + ) + assert ( + getattr(function_map['DummyCapability.verify_nested'].method, RESPONSE_CONTENT) + == IntersectMimeType.JSON + ) + assert getattr(function_map['DummyCapability.verify_nested'].method, STRICT_VALIDATION) is False # test non-defaults assert ( - getattr(function_map['verify_nested'].method, RESPONSE_DATA) == IntersectDataHandler.MINIO + getattr(function_map['DummyCapability.verify_nested'].method, RESPONSE_DATA) + == IntersectDataHandler.MINIO + ) + assert ( + getattr(function_map['DummyCapability.ip4_to_ip6'].method, RESPONSE_CONTENT) + == IntersectMimeType.STRING + ) + assert ( + getattr(function_map['DummyCapability.test_path'].method, REQUEST_CONTENT) + == IntersectMimeType.STRING + ) + assert ( + getattr(function_map['DummyCapability.calculate_weird_algorithm'].method, STRICT_VALIDATION) + is True ) - assert getattr(function_map['ip4_to_ip6'].method, RESPONSE_CONTENT) == IntersectMimeType.STRING - assert getattr(function_map['test_path'].method, REQUEST_CONTENT) == IntersectMimeType.STRING - assert getattr(function_map['calculate_weird_algorithm'].method, STRICT_VALIDATION) is True