From 3ac31b2da803605133d13060aec1e3286a14a298 Mon Sep 17 00:00:00 2001 From: mhh Date: Mon, 13 Nov 2023 17:04:58 +0100 Subject: [PATCH 1/4] Add allow_amend, internet and aleph_api parameters to create_program --- src/aleph/sdk/client/abstract.py | 6 ++++++ src/aleph/sdk/client/authenticated_http.py | 9 ++++++--- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/src/aleph/sdk/client/abstract.py b/src/aleph/sdk/client/abstract.py index 26a51221..4b5e4ba9 100644 --- a/src/aleph/sdk/client/abstract.py +++ b/src/aleph/sdk/client/abstract.py @@ -315,6 +315,9 @@ async def create_program( vcpus: Optional[int] = None, timeout_seconds: Optional[float] = None, persistent: bool = False, + allow_amend: bool = False, + internet: bool = True, + aleph_api: bool = True, encoding: Encoding = Encoding.zip, volumes: Optional[List[Mapping]] = None, subscriptions: Optional[List[Mapping]] = None, @@ -335,6 +338,9 @@ async def create_program( :param vcpus: Number of vCPUs to allocate (Default: 1) :param timeout_seconds: Timeout in seconds (Default: 30.0) :param persistent: Whether the program should be persistent or not (Default: False) + :param allow_amend: Whether the deployed VM image may be changed (Default: False) + :param internet: Whether the VM should have internet connectivity. (Default: True) + :param aleph_api: Whether the VM needs access to Aleph messages API (Default: True) :param encoding: Encoding to use (Default: Encoding.zip) :param volumes: Volumes to mount :param subscriptions: Patterns of aleph.im messages to forward to the program's event receiver diff --git a/src/aleph/sdk/client/authenticated_http.py b/src/aleph/sdk/client/authenticated_http.py index b4a7cc6a..58613cfa 100644 --- a/src/aleph/sdk/client/authenticated_http.py +++ b/src/aleph/sdk/client/authenticated_http.py @@ -401,6 +401,9 @@ async def create_program( vcpus: Optional[int] = None, timeout_seconds: Optional[float] = None, persistent: bool = False, + allow_amend: bool = False, + internet: bool = True, + aleph_api: bool = True, encoding: Encoding = Encoding.zip, volumes: Optional[List[Mapping]] = None, subscriptions: Optional[List[Mapping]] = None, @@ -434,7 +437,7 @@ async def create_program( content = ProgramContent( type="vm-function", address=address, - allow_amend=False, + allow_amend=allow_amend, code=CodeContent( encoding=encoding, entrypoint=entrypoint, @@ -444,8 +447,8 @@ async def create_program( on=triggers, environment=FunctionEnvironment( reproducible=False, - internet=True, - aleph_api=True, + internet=internet, + aleph_api=aleph_api, ), variables=environment_variables, resources=MachineResources( From 0693f703c8ed08290d564b9739088bef303fe9cf Mon Sep 17 00:00:00 2001 From: mhh Date: Tue, 14 Nov 2023 17:30:49 +0100 Subject: [PATCH 2/4] Add method to create VM instances --- src/aleph/sdk/client/abstract.py | 47 ++++++++++++ src/aleph/sdk/client/authenticated_http.py | 84 ++++++++++++++++++++-- tests/unit/test_asynchronous.py | 17 ++++- 3 files changed, 143 insertions(+), 5 deletions(-) diff --git a/src/aleph/sdk/client/abstract.py b/src/aleph/sdk/client/abstract.py index 4b5e4ba9..e07e8dcc 100644 --- a/src/aleph/sdk/client/abstract.py +++ b/src/aleph/sdk/client/abstract.py @@ -26,6 +26,7 @@ from aleph_message.status import MessageStatus from ..query.filters import MessageFilter, PostFilter +from ..query.params import VmParams from ..query.responses import PostsResponse from ..types import GenericMessage, StorageEnum from ..utils import Writable @@ -348,6 +349,52 @@ async def create_program( """ pass + @abstractmethod + async def create_instance( + self, + rootfs: str, + rootfs_size: int, + rootfs_name: str, + environment_variables: Optional[Mapping[str, str]] = None, + storage_engine: StorageEnum = StorageEnum.storage, + channel: Optional[str] = None, + address: Optional[str] = None, + sync: bool = False, + memory: Optional[int] = None, + vcpus: Optional[int] = None, + timeout_seconds: Optional[float] = None, + allow_amend: bool = False, + internet: bool = True, + aleph_api: bool = True, + encoding: Encoding = Encoding.zip, + volumes: Optional[List[Mapping]] = None, + ssh_keys: Optional[List[str]] = None, + metadata: Optional[Mapping[str, Any]] = None, + ) -> Tuple[AlephMessage, MessageStatus]: + """ + Post a (create) PROGRAM message. + + :param rootfs: Root filesystem to use + :param rootfs_size: Size of root filesystem + :param rootfs_name: Name of root filesystem + :param environment_variables: Environment variables to pass to the program + :param storage_engine: Storage engine to use (Default: "storage") + :param channel: Channel to use (Default: "TEST") + :param address: Address to use (Default: account.get_address()) + :param sync: If true, waits for the message to be processed by the API server + :param memory: Memory in MB for the VM to be allocated (Default: 128) + :param vcpus: Number of vCPUs to allocate (Default: 1) + :param timeout_seconds: Timeout in seconds (Default: 30.0) + :param allow_amend: Whether the deployed VM image may be changed (Default: False) + :param internet: Whether the VM should have internet connectivity. (Default: True) + :param aleph_api: Whether the VM needs access to Aleph messages API (Default: True) + :param encoding: Encoding to use (Default: Encoding.zip) + :param volumes: Volumes to mount + :param ssh_keys: SSH keys to authorize access to the VM + :param metadata: Metadata to attach to the message + """ + pass + @abstractmethod async def forget( self, diff --git a/src/aleph/sdk/client/authenticated_http.py b/src/aleph/sdk/client/authenticated_http.py index 58613cfa..d095cec2 100644 --- a/src/aleph/sdk/client/authenticated_http.py +++ b/src/aleph/sdk/client/authenticated_http.py @@ -20,20 +20,22 @@ ProgramContent, ProgramMessage, StoreContent, - StoreMessage, + StoreMessage, InstanceMessage, InstanceContent, ) from aleph_message.models.execution.base import Encoding from aleph_message.models.execution.environment import ( FunctionEnvironment, MachineResources, ) +from aleph_message.models.execution.instance import RootfsVolume from aleph_message.models.execution.program import CodeContent, FunctionRuntime -from aleph_message.models.execution.volume import MachineVolume +from aleph_message.models.execution.volume import MachineVolume, ParentVolume from aleph_message.status import MessageStatus from pydantic.json import pydantic_encoder from ..conf import settings from ..exceptions import BroadcastError, InvalidMessageError +from ..query.params import VmParams from ..types import Account, StorageEnum from .abstract import AuthenticatedAlephClient from .http import AlephHttpClient @@ -463,7 +465,9 @@ async def create_program( if runtime == settings.DEFAULT_RUNTIME_ID else "", ), - volumes=volumes, + volumes=[ + MachineVolume.parse_obj(volume) for volume in volumes + ], time=time.time(), metadata=metadata, ) @@ -473,7 +477,79 @@ async def create_program( return await self.submit( content=content.dict(exclude_none=True), - message_type=MessageType.program, + message_type=MessageType.instance, + channel=channel, + storage_engine=storage_engine, + sync=sync, + ) + + async def create_instance( + self, + rootfs: str, + rootfs_size: int, + rootfs_name: str, + environment_variables: Optional[Mapping[str, str]] = None, + storage_engine: StorageEnum = StorageEnum.storage, + channel: Optional[str] = None, + address: Optional[str] = None, + sync: bool = False, + memory: Optional[int] = None, + vcpus: Optional[int] = None, + timeout_seconds: Optional[float] = None, + allow_amend: bool = False, + internet: bool = True, + aleph_api: bool = True, + encoding: Encoding = Encoding.zip, + volumes: Optional[List[Mapping]] = None, + volume_persistence: str = "host", + ssh_keys: Optional[List[str]] = None, + metadata: Optional[Mapping[str, Any]] = None, + ) -> Tuple[InstanceMessage, MessageStatus]: + address = address or settings.ADDRESS_TO_USE or self.account.get_address() + + volumes = volumes if volumes is not None else [] + memory = memory or settings.DEFAULT_VM_MEMORY + vcpus = vcpus or settings.DEFAULT_VM_VCPUS + timeout_seconds = timeout_seconds or settings.DEFAULT_VM_TIMEOUT + + content = InstanceContent( + address=address, + allow_amend=allow_amend, + environment=FunctionEnvironment( + reproducible=False, + internet=internet, + aleph_api=aleph_api, + ), + variables=environment_variables, + resources=MachineResources( + vcpus=vcpus, + memory=memory, + seconds=timeout_seconds, + ), + rootfs=RootfsVolume( + parent=ParentVolume( + ref=rootfs, + use_latest=True, + ), + name=rootfs_name, + size_mib=rootfs_size, + persistence="host", + use_latest=True, + comment="Official Aleph Debian root filesystem" + if rootfs == settings.DEFAULT_RUNTIME_ID + else "", + ), + volumes=[ + MachineVolume.parse_obj(volume) for volume in volumes + ], + time=time.time(), + authorized_keys=ssh_keys, + metadata=metadata, + ) + + return await self.submit( + content=content.dict(exclude_none=True), + message_type=MessageType.instance, channel=channel, storage_engine=storage_engine, sync=sync, diff --git a/tests/unit/test_asynchronous.py b/tests/unit/test_asynchronous.py index 21c92feb..5fcbd8a2 100644 --- a/tests/unit/test_asynchronous.py +++ b/tests/unit/test_asynchronous.py @@ -7,7 +7,7 @@ ForgetMessage, PostMessage, ProgramMessage, - StoreMessage, + StoreMessage, InstanceMessage, ) from aleph_message.status import MessageStatus @@ -141,6 +141,21 @@ async def test_create_program(mock_session_with_post_success): assert isinstance(program_message, ProgramMessage) +@pytest.mark.asyncio +async def test_create_instance(mock_session_with_post_success): + async with mock_session_with_post_success as session: + instance_message, message_status = await session.create_instance( + rootfs="cafecafecafecafecafecafecafecafecafecafecafecafecafecafecafecafe", + rootfs_size=1, + rootfs_name="rootfs", + channel="TEST", + metadata={"tags": ["test"]}, + ) + + assert mock_session_with_post_success.http_session.post.called_once + assert isinstance(instance_message, InstanceMessage) + + @pytest.mark.asyncio async def test_forget(mock_session_with_post_success): async with mock_session_with_post_success as session: From db52fe7c2f8f89e4260b35f472785870eee013bb Mon Sep 17 00:00:00 2001 From: mhh Date: Tue, 14 Nov 2023 17:36:31 +0100 Subject: [PATCH 3/4] Fix formatting and interface issues --- src/aleph/sdk/client/abstract.py | 3 ++- src/aleph/sdk/client/authenticated_http.py | 13 +++++-------- tests/unit/test_asynchronous.py | 3 ++- 3 files changed, 9 insertions(+), 10 deletions(-) diff --git a/src/aleph/sdk/client/abstract.py b/src/aleph/sdk/client/abstract.py index e07e8dcc..bff6a227 100644 --- a/src/aleph/sdk/client/abstract.py +++ b/src/aleph/sdk/client/abstract.py @@ -26,7 +26,6 @@ from aleph_message.status import MessageStatus from ..query.filters import MessageFilter, PostFilter -from ..query.params import VmParams from ..query.responses import PostsResponse from ..types import GenericMessage, StorageEnum from ..utils import Writable @@ -368,6 +367,7 @@ async def create_instance( aleph_api: bool = True, encoding: Encoding = Encoding.zip, volumes: Optional[List[Mapping]] = None, + volume_persistence: str = "host", ssh_keys: Optional[List[str]] = None, metadata: Optional[Mapping[str, Any]] = None, ) -> Tuple[AlephMessage, MessageStatus]: @@ -390,6 +390,7 @@ async def create_instance( :param aleph_api: Whether the VM needs access to Aleph messages API (Default: True) :param encoding: Encoding to use (Default: Encoding.zip) :param volumes: Volumes to mount + :param volume_persistence: Where volumes are persisted, can be "host" or "store", meaning distributed across Aleph.im (Default: "host") :param ssh_keys: SSH keys to authorize access to the VM :param metadata: Metadata to attach to the message """ diff --git a/src/aleph/sdk/client/authenticated_http.py b/src/aleph/sdk/client/authenticated_http.py index d095cec2..4a274a05 100644 --- a/src/aleph/sdk/client/authenticated_http.py +++ b/src/aleph/sdk/client/authenticated_http.py @@ -13,6 +13,8 @@ AlephMessage, ForgetContent, ForgetMessage, + InstanceContent, + InstanceMessage, ItemType, MessageType, PostContent, @@ -20,7 +22,7 @@ ProgramContent, ProgramMessage, StoreContent, - StoreMessage, InstanceMessage, InstanceContent, + StoreMessage, ) from aleph_message.models.execution.base import Encoding from aleph_message.models.execution.environment import ( @@ -35,7 +37,6 @@ from ..conf import settings from ..exceptions import BroadcastError, InvalidMessageError -from ..query.params import VmParams from ..types import Account, StorageEnum from .abstract import AuthenticatedAlephClient from .http import AlephHttpClient @@ -465,9 +466,7 @@ async def create_program( if runtime == settings.DEFAULT_RUNTIME_ID else "", ), - volumes=[ - MachineVolume.parse_obj(volume) for volume in volumes - ], + volumes=[MachineVolume.parse_obj(volume) for volume in volumes], time=time.time(), metadata=metadata, ) @@ -539,9 +538,7 @@ async def create_instance( if rootfs == settings.DEFAULT_RUNTIME_ID else "", ), - volumes=[ - MachineVolume.parse_obj(volume) for volume in volumes - ], + volumes=[MachineVolume.parse_obj(volume) for volume in volumes], time=time.time(), authorized_keys=ssh_keys, metadata=metadata, diff --git a/tests/unit/test_asynchronous.py b/tests/unit/test_asynchronous.py index 5fcbd8a2..104482da 100644 --- a/tests/unit/test_asynchronous.py +++ b/tests/unit/test_asynchronous.py @@ -5,9 +5,10 @@ from aleph_message.models import ( AggregateMessage, ForgetMessage, + InstanceMessage, PostMessage, ProgramMessage, - StoreMessage, InstanceMessage, + StoreMessage, ) from aleph_message.status import MessageStatus From 460501291f81c996c3ab2969120b4377389a7b9c Mon Sep 17 00:00:00 2001 From: mhh Date: Tue, 14 Nov 2023 17:42:10 +0100 Subject: [PATCH 4/4] Fix create_program --- src/aleph/sdk/client/authenticated_http.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/aleph/sdk/client/authenticated_http.py b/src/aleph/sdk/client/authenticated_http.py index 4a274a05..3635919d 100644 --- a/src/aleph/sdk/client/authenticated_http.py +++ b/src/aleph/sdk/client/authenticated_http.py @@ -476,7 +476,7 @@ async def create_program( return await self.submit( content=content.dict(exclude_none=True), - message_type=MessageType.instance, + message_type=MessageType.program, channel=channel, storage_engine=storage_engine, sync=sync,