Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/intersect_sdk/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
DataStoreConfigMap,
HierarchyConfig,
)
from .core_definitions import IntersectDataHandler, IntersectMimeType
from .core_definitions import IntersectDataHandler, IntersectEncryptionScheme, IntersectMimeType
from .exceptions import IntersectCapabilityError
from .schema import get_schema_from_capability_implementations
from .service import IntersectService
Expand Down Expand Up @@ -67,6 +67,7 @@
'IntersectClientConfig',
'IntersectDataHandler',
'IntersectDirectMessageParams',
'IntersectEncryptionScheme',
'IntersectEventDefinition',
'IntersectEventMessageParams',
'IntersectMimeType',
Expand Down Expand Up @@ -101,6 +102,7 @@
'IntersectClientConfig': '.config.client',
'IntersectDataHandler': '.core_definitions',
'IntersectDirectMessageParams': '.shared_callback_definitions',
'IntersectEncryptionScheme': '.core_definitions',
'IntersectEventDefinition': '.service_definitions',
'IntersectEventMessageParams': '.shared_callback_definitions',
'IntersectMimeType': '.core_definitions',
Expand Down
1 change: 1 addition & 0 deletions src/intersect_sdk/_internal/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@
RESPONSE_DATA = '__response_data_transfer_handler__'
STRICT_VALIDATION = '__strict_validation__'
SHUTDOWN_KEYS = '__ignore_message__'
ENCRYPTION_SCHEMES = '__intersect_encryption_schemes__'
40 changes: 39 additions & 1 deletion src/intersect_sdk/_internal/data_plane/data_plane_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,13 @@
from ...core_definitions import IntersectDataHandler, IntersectMimeType
from ..exceptions import IntersectError
from ..logger import logger
from .minio_utils import MinioPayload, create_minio_store, get_minio_object, send_minio_object
from .minio_utils import (
MinioPayload,
create_minio_store,
delete_minio_object,
get_minio_object,
send_minio_object,
)

if TYPE_CHECKING:
from ...config.shared import DataStoreConfigMap, HierarchyConfig
Expand Down Expand Up @@ -111,3 +117,35 @@ def outgoing_message_data_handler(
f'No support implemented for code {data_handler}, please upgrade your intersect-sdk version.'
)
raise IntersectError

def remove_remote_data(
self, message: bytes, request_data_handler: IntersectDataHandler
) -> None:
"""Removes data from the request data provider.

This does not raise an exception if unable to remove the data, just logs the problem.
In general, this should only be called if you can verify an issue in the headers

Params:
message: the message sent externally to this location
Returns:
the actual data we want to submit to the user function
"""
if request_data_handler == IntersectDataHandler.MINIO:
# TODO - we may want to send additional provider information in the payload
try:
payload: MinioPayload = MINIO_ADAPTER.validate_json(message)
except ValidationError as e:
logger.warning('remove_remote - invalid params', e)
return
provider = None
for store in self._minio_providers:
if store._base_url._url.geturl() == payload['minio_url']: # noqa: SLF001 (only way to get URL from MINIO API)
provider = store
break
if not provider:
logger.error(
f"You did not configure listening to MINIO instance '{payload['minio_url']}'. You must fix this to handle this data."
)
return
delete_minio_object(provider, payload)
24 changes: 24 additions & 0 deletions src/intersect_sdk/_internal/data_plane/minio_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,3 +152,27 @@ def get_minio_object(provider: Minio, payload: MinioPayload) -> bytes:
raise IntersectError from e
else:
return response.data


def delete_minio_object(provider: Minio, payload: MinioPayload) -> None:
"""Delete an object from the bucket, without returning it.

Params:
provider: a pre-cached MinIO provider from the data provider store
payload: the payload from the message (at this point, the minio_url should exist)

Raises:
IntersectException - if any non-fatal MinIO error is caught
"""
try:
provider.remove_object(
bucket_name=payload['minio_bucket'], object_name=payload['minio_object_id']
)
except MaxRetryError as e:
logger.warning(
f'Non-fatal MinIO error when retrieving object, the server may be under stress but you should double-check your configuration. Details: \n{e}'
)
except MinioException as e:
logger.error(
f'Important MinIO error when retrieving object, this usually indicates a problem with your configuration. Details: \n{e}'
)
10 changes: 9 additions & 1 deletion src/intersect_sdk/_internal/function_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@
if TYPE_CHECKING:
from pydantic import TypeAdapter

from ..core_definitions import IntersectDataHandler, IntersectMimeType
from ..core_definitions import (
IntersectDataHandler,
IntersectEncryptionScheme,
IntersectMimeType,
)


class FunctionMetadata(NamedTuple):
Expand Down Expand Up @@ -42,6 +46,10 @@ class FunctionMetadata(NamedTuple):
"""
How we intend on handling the response value
"""
encryption_schemes: set[IntersectEncryptionScheme]
"""
Supported encryption schemes
"""
strict_validation: bool
"""
Whether or not we're using lenient Pydantic validation (default, False) or strict
Expand Down
11 changes: 8 additions & 3 deletions src/intersect_sdk/_internal/messages/userspace.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@
from pydantic import AwareDatetime, BaseModel, Field, field_serializer

from ...constants import SYSTEM_OF_SYSTEM_REGEX
from ...core_definitions import (
IntersectDataHandler,
)
from ...core_definitions import IntersectDataHandler, IntersectEncryptionScheme
from ...version import version_string


Expand Down Expand Up @@ -116,6 +114,11 @@ class UserspaceMessageHeaders(BaseModel):
This should only be set to "True" on return messages sent by services - NEVER clients.
"""

encryption_scheme: IntersectEncryptionScheme = 'NONE'
"""
The encryption scheme of the message itself. This determines the requested payload.
"""

# make sure all non-string fields are serialized into strings, even in Python code

@field_serializer('message_id', mode='plain')
Expand All @@ -140,6 +143,7 @@ def create_userspace_message_headers(
destination: str,
operation_id: str,
data_handler: IntersectDataHandler,
encryption_scheme: IntersectEncryptionScheme,
message_id: uuid.UUID | None = None,
has_error: bool = False,
) -> dict[str, str]:
Expand All @@ -153,6 +157,7 @@ def create_userspace_message_headers(
created_at=datetime.datetime.now(tz=datetime.timezone.utc),
operation_id=operation_id,
data_handler=data_handler,
encryption_scheme=encryption_scheme,
has_error=has_error,
).model_dump(by_alias=True)

Expand Down
6 changes: 6 additions & 0 deletions src/intersect_sdk/_internal/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from .constants import (
BASE_RESPONSE_ATTR,
BASE_STATUS_ATTR,
ENCRYPTION_SCHEMES,
REQUEST_CONTENT,
RESPONSE_CONTENT,
RESPONSE_DATA,
Expand Down Expand Up @@ -411,6 +412,7 @@ def _introspection_baseline(

request_content = getattr(method, REQUEST_CONTENT)
response_content = getattr(method, RESPONSE_CONTENT)
encryption_schemes = getattr(method, ENCRYPTION_SCHEMES)

docstring = inspect.cleandoc(method.__doc__) if method.__doc__ else None
signature = inspect.signature(method)
Expand All @@ -434,13 +436,15 @@ def _introspection_baseline(
'message': {
'schemaFormat': f'application/vnd.aai.asyncapi+json;version={ASYNCAPI_VERSION}',
'contentType': request_content,
'encryption_schemes': sorted(encryption_schemes),
'traits': {'$ref': '#/components/messageTraits/commonHeaders'},
}
},
'subscribe': {
'message': {
'schemaFormat': f'application/vnd.aai.asyncapi+json;version={ASYNCAPI_VERSION}',
'contentType': response_content,
'encryption_schemes': sorted(encryption_schemes),
'traits': {'$ref': '#/components/messageTraits/commonHeaders'},
}
},
Expand Down Expand Up @@ -532,6 +536,7 @@ def _introspection_baseline(
request_content,
response_content,
data_handler,
encryption_schemes,
getattr(method, STRICT_VALIDATION),
getattr(method, SHUTDOWN_KEYS),
)
Expand Down Expand Up @@ -561,6 +566,7 @@ def _introspection_baseline(
getattr(status_fn, REQUEST_CONTENT),
getattr(status_fn, RESPONSE_CONTENT),
getattr(status_fn, RESPONSE_DATA),
{'NONE'}, # status functions should always be assumed to send out unencrypted messages.
getattr(status_fn, STRICT_VALIDATION),
getattr(status_fn, SHUTDOWN_KEYS),
)
Expand Down
21 changes: 19 additions & 2 deletions src/intersect_sdk/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ class IntersectClient:
- startup()
- shutdown()
- is_connected()
- considered_unrecoverable()

No other functions or parameters are guaranteed to remain stable.

Expand Down Expand Up @@ -262,6 +263,13 @@ def _handle_userspace_message(
request_params = self._data_plane_manager.incoming_message_data_handler(
payload, headers.data_handler
)
if not headers.has_error:
match headers.encryption_scheme:
case 'RSA':
# TODO - decrypt and reassign request_params here
pass
case _:
pass
if content_type == 'application/json':
request_params = GENERIC_MESSAGE_SERIALIZER.validate_json(request_params)
except ValidationError as e:
Expand Down Expand Up @@ -415,7 +423,15 @@ def _send_userspace_message(self, params: IntersectDirectMessageParams) -> None:
return
serialized_msg = params.payload

# TWO: SEND DATA TO APPROPRIATE DATA STORE
# TWO: encrypt message
match params.encryption_scheme:
case 'RSA':
# TODO reassign serialized_msg here to encrypted value
pass
case _:
pass

# THREE: SEND DATA TO APPROPRIATE DATA STORE
try:
payload = self._data_plane_manager.outgoing_message_data_handler(
serialized_msg, params.content_type, params.data_handler
Expand All @@ -427,12 +443,13 @@ def _send_userspace_message(self, params: IntersectDirectMessageParams) -> None:
send_os_signal()
return

# THREE: SEND MESSAGE
# FOUR: SEND MESSAGE
headers = create_userspace_message_headers(
source=self._hierarchy.hierarchy_string('.'),
destination=params.destination,
data_handler=params.data_handler,
operation_id=params.operation,
encryption_scheme=params.encryption_scheme,
)
logger.debug(f'Send userspace message:\n{headers}')
channel = f'{params.destination.replace(".", "/")}/request'
Expand Down
5 changes: 4 additions & 1 deletion src/intersect_sdk/core_definitions.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Core enumerations and structures used throughout INTERSECT, for both client and service."""

from enum import Enum
from typing import Annotated
from typing import Annotated, Literal

from pydantic import Field

Expand Down Expand Up @@ -45,3 +45,6 @@ class IntersectDataHandler(Enum):

- If your Content-Type value is ANYTHING ELSE, you MUST mark it as "bytes" . In this instance, INTERSECT will not base64-encode or base64-decode the value.
"""

IntersectEncryptionScheme = Literal['NONE', 'RSA']
"""Supported encryption schemes throughout INTERSECT. 'NONE' implies no encryption scheme."""
Loading