From fa60a8ac05bc69e84a92858a1d20e2b144faf92a Mon Sep 17 00:00:00 2001 From: philogicae <38438271+philogicae@users.noreply.github.com> Date: Fri, 31 Jan 2025 19:23:02 +0200 Subject: [PATCH 01/16] Add manage_flow to superfluid.py --- src/aleph/sdk/chains/ethereum.py | 36 +++++++++++++--- src/aleph/sdk/chains/evm.py | 9 ++++ src/aleph/sdk/connectors/superfluid.py | 60 ++++++++++++++++++++++++-- src/aleph/sdk/evm_utils.py | 12 +++++- tests/unit/test_superfluid.py | 13 ++++++ 5 files changed, 119 insertions(+), 11 deletions(-) diff --git a/src/aleph/sdk/chains/ethereum.py b/src/aleph/sdk/chains/ethereum.py index ab93df56..7f0dcc6e 100644 --- a/src/aleph/sdk/chains/ethereum.py +++ b/src/aleph/sdk/chains/ethereum.py @@ -22,12 +22,13 @@ BALANCEOF_ABI, MIN_ETH_BALANCE, MIN_ETH_BALANCE_WEI, + FlowUpdate, + from_wei_token, get_chain_id, get_chains_with_super_token, get_rpc, get_super_token_address, get_token_address, - to_human_readable_token, ) from ..exceptions import BadSignatureError from ..utils import bytes_from_hex @@ -107,7 +108,7 @@ def can_transact(self, block=True) -> bool: if not valid and block: raise InsufficientFundsError( required_funds=MIN_ETH_BALANCE, - available_funds=to_human_readable_token(balance), + available_funds=float(from_wei_token(balance)), ) return valid @@ -162,15 +163,25 @@ def get_super_token_balance(self) -> Decimal: return Decimal(contract.functions.balanceOf(self.get_address()).call()) return Decimal(0) + @property + def has_superfluid_connector(self) -> bool: + return self.superfluid_connector is not None + + def can_start_flow(self, flow: Decimal) -> Awaitable[bool]: + """Check if the account has enough funds to start a Superfluid flow of the given size.""" + if not self.has_superfluid_connector: + raise ValueError("Superfluid connector is required to check a flow") + return self.superfluid_connector.can_start_flow(flow) + def create_flow(self, receiver: str, flow: Decimal) -> Awaitable[str]: """Creat a Superfluid flow between this account and the receiver address.""" - if not self.superfluid_connector: + if not self.has_superfluid_connector: raise ValueError("Superfluid connector is required to create a flow") return self.superfluid_connector.create_flow(receiver=receiver, flow=flow) def get_flow(self, receiver: str) -> Awaitable[Web3FlowInfo]: """Get the Superfluid flow between this account and the receiver address.""" - if not self.superfluid_connector: + if not self.has_superfluid_connector: raise ValueError("Superfluid connector is required to get a flow") return self.superfluid_connector.get_flow( sender=self.get_address(), receiver=receiver @@ -178,16 +189,29 @@ def get_flow(self, receiver: str) -> Awaitable[Web3FlowInfo]: def update_flow(self, receiver: str, flow: Decimal) -> Awaitable[str]: """Update the Superfluid flow between this account and the receiver address.""" - if not self.superfluid_connector: + if not self.has_superfluid_connector: raise ValueError("Superfluid connector is required to update a flow") return self.superfluid_connector.update_flow(receiver=receiver, flow=flow) def delete_flow(self, receiver: str) -> Awaitable[str]: """Delete the Superfluid flow between this account and the receiver address.""" - if not self.superfluid_connector: + if not self.has_superfluid_connector: raise ValueError("Superfluid connector is required to delete a flow") return self.superfluid_connector.delete_flow(receiver=receiver) + def manage_flow( + self, + receiver: str, + flow: Decimal, + update_type: FlowUpdate, + ) -> Awaitable[Optional[str]]: + """Manage the Superfluid flow between this account and the receiver address.""" + if not self.has_superfluid_connector: + raise ValueError("Superfluid connector is required to manage a flow") + return self.superfluid_connector.manage_flow( + receiver=receiver, flow=flow, update_type=update_type + ) + def get_fallback_account( path: Optional[Path] = None, chain: Optional[Chain] = None diff --git a/src/aleph/sdk/chains/evm.py b/src/aleph/sdk/chains/evm.py index 5bf66ef1..e946c920 100644 --- a/src/aleph/sdk/chains/evm.py +++ b/src/aleph/sdk/chains/evm.py @@ -5,6 +5,7 @@ from aleph_message.models import Chain from eth_account import Account # type: ignore +from ..evm_utils import FlowUpdate from .common import get_fallback_private_key from .ethereum import ETHAccount @@ -29,6 +30,9 @@ def get_token_balance(self) -> Decimal: def get_super_token_balance(self) -> Decimal: raise ValueError(f"Super token not implemented for this chain {self.CHAIN}") + def can_start_flow(self, flow: Decimal) -> Awaitable[bool]: + raise ValueError(f"Flow checking not implemented for this chain {self.CHAIN}") + def create_flow(self, receiver: str, flow: Decimal) -> Awaitable[str]: raise ValueError(f"Flow creation not implemented for this chain {self.CHAIN}") @@ -41,6 +45,11 @@ def update_flow(self, receiver: str, flow: Decimal) -> Awaitable[str]: def delete_flow(self, receiver: str) -> Awaitable[str]: raise ValueError(f"Flow deletion not implemented for this chain {self.CHAIN}") + def manage_flow( + self, receiver: str, flow: Decimal, update_type: FlowUpdate + ) -> Awaitable[Optional[str]]: + raise ValueError(f"Flow management not implemented for this chain {self.CHAIN}") + def get_fallback_account( path: Optional[Path] = None, chain: Optional[Chain] = None diff --git a/src/aleph/sdk/connectors/superfluid.py b/src/aleph/sdk/connectors/superfluid.py index 4b7274f8..3c559781 100644 --- a/src/aleph/sdk/connectors/superfluid.py +++ b/src/aleph/sdk/connectors/superfluid.py @@ -1,14 +1,19 @@ from __future__ import annotations from decimal import Decimal -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Optional from eth_utils import to_normalized_address from superfluid import CFA_V1, Operation, Web3FlowInfo from aleph.sdk.exceptions import InsufficientFundsError -from ..evm_utils import get_super_token_address, to_human_readable_token, to_wei_token +from ..evm_utils import ( + FlowUpdate, + from_wei_token, + get_super_token_address, + to_wei_token, +) if TYPE_CHECKING: from aleph.sdk.chains.ethereum import ETHAccount @@ -44,6 +49,7 @@ async def _execute_operation_with_account(self, operation: Operation) -> str: return await self.account._sign_and_send_transaction(populated_transaction) def can_start_flow(self, flow: Decimal, block=True) -> bool: + """Check if the account has enough funds to start a Superfluid flow of the given size.""" valid = False if self.account.can_transact(block=block): balance = self.account.get_super_token_balance() @@ -52,7 +58,7 @@ def can_start_flow(self, flow: Decimal, block=True) -> bool: if not valid and block: raise InsufficientFundsError( required_funds=float(MIN_FLOW_4H), - available_funds=to_human_readable_token(balance), + available_funds=float(from_wei_token(balance)), ) return valid @@ -96,3 +102,51 @@ async def update_flow(self, receiver: str, flow: Decimal) -> str: flow_rate=int(to_wei_token(flow)), ), ) + + async def manage_flow( + self, + receiver: str, + flow: Decimal, + update_type: FlowUpdate, + ) -> Optional[str]: + """ + Update the flow of a Superfluid stream between a sender and receiver. + This function either increases or decreases the flow rate between the sender and receiver, + based on the update_type. If no flow exists and the update type is augmentation, it creates a new flow + with the specified rate. If the update type is reduction and the reduction amount brings the flow to zero + or below, the flow is deleted. + + :param receiver: Address of the receiver in hexadecimal format. + :param flow: The flow rate to be added or removed (in ether). + :param update_type: The type of update to perform (augmentation or reduction). + :return: The transaction hash of the executed operation (create, update, or delete flow). + """ + + # Retrieve current flow info + flow_info: Web3FlowInfo = await self.account.get_flow(receiver) + + current_flow_rate_wei: Decimal = Decimal(flow_info["flowRate"] or "0") + flow_rate_wei: int = int(to_wei_token(flow)) + + if update_type == FlowUpdate.INCREASE: + if current_flow_rate_wei > 0: + # Update existing flow by increasing the rate + new_flow_rate_wei = current_flow_rate_wei + flow_rate_wei + new_flow_rate_ether = from_wei_token(new_flow_rate_wei) + return await self.account.update_flow(receiver, new_flow_rate_ether) + else: + # Create a new flow if none exists + return await self.account.create_flow(receiver, flow) + else: + if current_flow_rate_wei > 0: + # Reduce the existing flow + new_flow_rate_wei = current_flow_rate_wei - flow_rate_wei + # Ensure to not leave infinitesimal flows + # Often, there were 1-10 wei remaining in the flow rate, which prevented the flow from being deleted + if new_flow_rate_wei > 99: + new_flow_rate_ether = from_wei_token(new_flow_rate_wei) + return await self.account.update_flow(receiver, new_flow_rate_ether) + else: + # Delete the flow if the new flow rate is zero or negative + return await self.account.delete_flow(receiver) + return None diff --git a/src/aleph/sdk/evm_utils.py b/src/aleph/sdk/evm_utils.py index 4d2026ef..0ddfe755 100644 --- a/src/aleph/sdk/evm_utils.py +++ b/src/aleph/sdk/evm_utils.py @@ -1,4 +1,5 @@ from decimal import Decimal +from enum import Enum from typing import List, Optional, Union from aleph_message.models import Chain @@ -21,11 +22,18 @@ }]""" -def to_human_readable_token(amount: Decimal) -> float: - return float(amount / (Decimal(10) ** Decimal(settings.TOKEN_DECIMALS))) +class FlowUpdate(str, Enum): + REDUCE = "reduce" + INCREASE = "increase" + + +def from_wei_token(amount: Decimal) -> Decimal: + """Converts the given wei value to ether.""" + return amount / Decimal(10) ** Decimal(settings.TOKEN_DECIMALS) def to_wei_token(amount: Decimal) -> Decimal: + """Converts the given ether value to wei.""" return amount * Decimal(10) ** Decimal(settings.TOKEN_DECIMALS) diff --git a/tests/unit/test_superfluid.py b/tests/unit/test_superfluid.py index c2f853bd..74bcc38e 100644 --- a/tests/unit/test_superfluid.py +++ b/tests/unit/test_superfluid.py @@ -7,6 +7,7 @@ from eth_utils import to_checksum_address from aleph.sdk.chains.ethereum import ETHAccount +from aleph.sdk.evm_utils import FlowUpdate def generate_fake_eth_address(): @@ -24,6 +25,7 @@ def mock_superfluid(): mock_superfluid.create_flow = AsyncMock(return_value="0xTransactionHash") mock_superfluid.delete_flow = AsyncMock(return_value="0xTransactionHash") mock_superfluid.update_flow = AsyncMock(return_value="0xTransactionHash") + mock_superfluid.manage_flow = AsyncMock(return_value="0xTransactionHash") # Mock get_flow to return a mock Web3FlowInfo mock_flow_info = {"timestamp": 0, "flowRate": 0, "deposit": 0, "owedDeposit": 0} @@ -98,3 +100,14 @@ async def test_get_flow(eth_account, mock_superfluid): assert flow_info["flowRate"] == 0 assert flow_info["deposit"] == 0 assert flow_info["owedDeposit"] == 0 + + +@pytest.mark.asyncio +async def test_manage_flow(eth_account, mock_superfluid): + receiver = generate_fake_eth_address() + flow = Decimal("0.005") + + tx_hash = await eth_account.manage_flow(receiver, flow, FlowUpdate.INCREASE) + + assert tx_hash == "0xTransactionHash" + mock_superfluid.manage_flow.assert_awaited_once() From a4016989ff9cf6d497d6473da0ee77252bb8ed5d Mon Sep 17 00:00:00 2001 From: philogicae <38438271+philogicae@users.noreply.github.com> Date: Mon, 3 Feb 2025 11:33:24 +0200 Subject: [PATCH 02/16] mypy fixes --- src/aleph/sdk/chains/ethereum.py | 18 +++++++----------- src/aleph/sdk/chains/evm.py | 2 +- src/aleph/sdk/connectors/superfluid.py | 2 +- 3 files changed, 9 insertions(+), 13 deletions(-) diff --git a/src/aleph/sdk/chains/ethereum.py b/src/aleph/sdk/chains/ethereum.py index 7f0dcc6e..3cb7bc20 100644 --- a/src/aleph/sdk/chains/ethereum.py +++ b/src/aleph/sdk/chains/ethereum.py @@ -163,25 +163,21 @@ def get_super_token_balance(self) -> Decimal: return Decimal(contract.functions.balanceOf(self.get_address()).call()) return Decimal(0) - @property - def has_superfluid_connector(self) -> bool: - return self.superfluid_connector is not None - - def can_start_flow(self, flow: Decimal) -> Awaitable[bool]: + def can_start_flow(self, flow: Decimal) -> bool: """Check if the account has enough funds to start a Superfluid flow of the given size.""" - if not self.has_superfluid_connector: + if not self.superfluid_connector: raise ValueError("Superfluid connector is required to check a flow") return self.superfluid_connector.can_start_flow(flow) def create_flow(self, receiver: str, flow: Decimal) -> Awaitable[str]: """Creat a Superfluid flow between this account and the receiver address.""" - if not self.has_superfluid_connector: + if not self.superfluid_connector: raise ValueError("Superfluid connector is required to create a flow") return self.superfluid_connector.create_flow(receiver=receiver, flow=flow) def get_flow(self, receiver: str) -> Awaitable[Web3FlowInfo]: """Get the Superfluid flow between this account and the receiver address.""" - if not self.has_superfluid_connector: + if not self.superfluid_connector: raise ValueError("Superfluid connector is required to get a flow") return self.superfluid_connector.get_flow( sender=self.get_address(), receiver=receiver @@ -189,13 +185,13 @@ def get_flow(self, receiver: str) -> Awaitable[Web3FlowInfo]: def update_flow(self, receiver: str, flow: Decimal) -> Awaitable[str]: """Update the Superfluid flow between this account and the receiver address.""" - if not self.has_superfluid_connector: + if not self.superfluid_connector: raise ValueError("Superfluid connector is required to update a flow") return self.superfluid_connector.update_flow(receiver=receiver, flow=flow) def delete_flow(self, receiver: str) -> Awaitable[str]: """Delete the Superfluid flow between this account and the receiver address.""" - if not self.has_superfluid_connector: + if not self.superfluid_connector: raise ValueError("Superfluid connector is required to delete a flow") return self.superfluid_connector.delete_flow(receiver=receiver) @@ -206,7 +202,7 @@ def manage_flow( update_type: FlowUpdate, ) -> Awaitable[Optional[str]]: """Manage the Superfluid flow between this account and the receiver address.""" - if not self.has_superfluid_connector: + if not self.superfluid_connector: raise ValueError("Superfluid connector is required to manage a flow") return self.superfluid_connector.manage_flow( receiver=receiver, flow=flow, update_type=update_type diff --git a/src/aleph/sdk/chains/evm.py b/src/aleph/sdk/chains/evm.py index e946c920..a5eeed84 100644 --- a/src/aleph/sdk/chains/evm.py +++ b/src/aleph/sdk/chains/evm.py @@ -30,7 +30,7 @@ def get_token_balance(self) -> Decimal: def get_super_token_balance(self) -> Decimal: raise ValueError(f"Super token not implemented for this chain {self.CHAIN}") - def can_start_flow(self, flow: Decimal) -> Awaitable[bool]: + def can_start_flow(self, flow: Decimal) -> bool: raise ValueError(f"Flow checking not implemented for this chain {self.CHAIN}") def create_flow(self, receiver: str, flow: Decimal) -> Awaitable[str]: diff --git a/src/aleph/sdk/connectors/superfluid.py b/src/aleph/sdk/connectors/superfluid.py index 3c559781..1ab20a9f 100644 --- a/src/aleph/sdk/connectors/superfluid.py +++ b/src/aleph/sdk/connectors/superfluid.py @@ -125,7 +125,7 @@ async def manage_flow( # Retrieve current flow info flow_info: Web3FlowInfo = await self.account.get_flow(receiver) - current_flow_rate_wei: Decimal = Decimal(flow_info["flowRate"] or "0") + current_flow_rate_wei: Decimal = Decimal(flow_info["flowRate"] or 0) flow_rate_wei: int = int(to_wei_token(flow)) if update_type == FlowUpdate.INCREASE: From 8599556d5a9acb41c5da064131613eb6433a9e8a Mon Sep 17 00:00:00 2001 From: philogicae <38438271+philogicae@users.noreply.github.com> Date: Thu, 6 Feb 2025 13:56:51 +0200 Subject: [PATCH 03/16] Add ether_rounding to evm_utils --- src/aleph/sdk/evm_utils.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/aleph/sdk/evm_utils.py b/src/aleph/sdk/evm_utils.py index 0ddfe755..37dc3843 100644 --- a/src/aleph/sdk/evm_utils.py +++ b/src/aleph/sdk/evm_utils.py @@ -1,4 +1,4 @@ -from decimal import Decimal +from decimal import ROUND_CEILING, Decimal from enum import Enum from typing import List, Optional, Union @@ -37,6 +37,11 @@ def to_wei_token(amount: Decimal) -> Decimal: return amount * Decimal(10) ** Decimal(settings.TOKEN_DECIMALS) +def ether_rounding(amount: Decimal) -> Decimal: + """Rounds the given value to 18 decimals.""" + return amount.quantize(Decimal(1) / 10**18, rounding=ROUND_CEILING) + + def get_chain_id(chain: Union[Chain, str, None]) -> Optional[int]: """Returns the CHAIN_ID of a given EVM blockchain""" if chain: From 8cf2e43b34169a5e86d791c5c8bb375e0c9356e0 Mon Sep 17 00:00:00 2001 From: philogicae <38438271+philogicae@users.noreply.github.com> Date: Thu, 6 Feb 2025 17:17:48 +0200 Subject: [PATCH 04/16] Add make_instance_content --- src/aleph/sdk/utils.py | 83 +++++++++++++++++++++++++++++++++++++++++- 1 file changed, 81 insertions(+), 2 deletions(-) diff --git a/src/aleph/sdk/utils.py b/src/aleph/sdk/utils.py index c3fc154a..ce0258ab 100644 --- a/src/aleph/sdk/utils.py +++ b/src/aleph/sdk/utils.py @@ -28,9 +28,24 @@ from uuid import UUID from zipfile import BadZipFile, ZipFile -from aleph_message.models import ItemHash, MessageType +from aleph_message.models import ( + Chain, + InstanceContent, + ItemHash, + MessageType, + ProgramContent, +) +from aleph_message.models.execution.base import Payment, PaymentType +from aleph_message.models.execution.environment import ( + HostRequirements, + HypervisorType, + InstanceEnvironment, + MachineResources, + TrustedExecutionEnvironment, +) +from aleph_message.models.execution.instance import RootfsVolume from aleph_message.models.execution.program import Encoding -from aleph_message.models.execution.volume import MachineVolume +from aleph_message.models.execution.volume import MachineVolume, ParentVolume from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from jwcrypto.jwa import JWA @@ -401,3 +416,67 @@ def safe_getattr(obj, attr, default=None): if obj is default: break return obj + + +def make_instance_content( + rootfs: str, + rootfs_size: int, + payment: Optional[Payment] = None, + environment_variables: Optional[Mapping[str, str]] = None, + address: Optional[str] = None, + memory: Optional[int] = None, + vcpus: Optional[int] = None, + timeout_seconds: Optional[float] = None, + allow_amend: bool = False, + internet: bool = True, + aleph_api: bool = True, + hypervisor: Optional[HypervisorType] = None, + trusted_execution: Optional[TrustedExecutionEnvironment] = None, + volumes: Optional[List[Mapping]] = None, + ssh_keys: Optional[List[str]] = None, + metadata: Optional[Mapping[str, Any]] = None, + requirements: Optional[HostRequirements] = None, +) -> InstanceContent: + """ + Create InstanceContent object given the provided fields. + """ + + address = address or "0x0000000000000000000000000000000000000000" + payment = payment or Payment(chain=Chain.ETH, type=PaymentType.hold) + selected_hypervisor: HypervisorType = hypervisor or HypervisorType.qemu + vcpus = vcpus or settings.DEFAULT_VM_VCPUS + memory = memory or settings.DEFAULT_VM_MEMORY + timeout_seconds = timeout_seconds or settings.DEFAULT_VM_TIMEOUT + volumes = volumes if volumes is not None else [] + + return InstanceContent( + address=address, + allow_amend=allow_amend, + environment=InstanceEnvironment( + internet=internet, + aleph_api=aleph_api, + hypervisor=selected_hypervisor, + trusted_execution=trusted_execution, + ), + variables=environment_variables, + resources=MachineResources( + vcpus=vcpus, + memory=memory, + seconds=timeout_seconds, + ), + rootfs=RootfsVolume( + parent=ParentVolume( + ref=rootfs, + use_latest=True, + ), + size_mib=rootfs_size, + persistence="host", + use_latest=True, + ), + volumes=[parse_volume(volume) for volume in volumes], + requirements=requirements, + time=datetime.now().timestamp(), + authorized_keys=ssh_keys, + metadata=metadata, + payment=payment, + ) From 435d6972db5a95931ef2b9de717b35b47f206d56 Mon Sep 17 00:00:00 2001 From: philogicae <38438271+philogicae@users.noreply.github.com> Date: Thu, 6 Feb 2025 17:18:33 +0200 Subject: [PATCH 05/16] Simplify create_instance --- src/aleph/sdk/client/authenticated_http.py | 63 +++++++--------------- 1 file changed, 19 insertions(+), 44 deletions(-) diff --git a/src/aleph/sdk/client/authenticated_http.py b/src/aleph/sdk/client/authenticated_http.py index f84b97ca..68b86d7a 100644 --- a/src/aleph/sdk/client/authenticated_http.py +++ b/src/aleph/sdk/client/authenticated_http.py @@ -12,10 +12,8 @@ AggregateContent, AggregateMessage, AlephMessage, - Chain, ForgetContent, ForgetMessage, - InstanceContent, InstanceMessage, ItemHash, MessageType, @@ -26,24 +24,22 @@ StoreContent, StoreMessage, ) -from aleph_message.models.execution.base import Encoding, Payment, PaymentType +from aleph_message.models.execution.base import Encoding, Payment from aleph_message.models.execution.environment import ( FunctionEnvironment, HostRequirements, HypervisorType, - InstanceEnvironment, MachineResources, TrustedExecutionEnvironment, ) -from aleph_message.models.execution.instance import RootfsVolume from aleph_message.models.execution.program import CodeContent, FunctionRuntime -from aleph_message.models.execution.volume import MachineVolume, ParentVolume +from aleph_message.models.execution.volume import MachineVolume from aleph_message.status import MessageStatus from ..conf import settings from ..exceptions import BroadcastError, InsufficientFundsError, InvalidMessageError from ..types import Account, StorageEnum -from ..utils import extended_json_encoder, parse_volume +from ..utils import extended_json_encoder, make_instance_content, parse_volume from .abstract import AuthenticatedAlephClient from .http import AlephHttpClient @@ -530,47 +526,26 @@ async def create_instance( ) -> Tuple[InstanceMessage, MessageStatus]: address = address or settings.ADDRESS_TO_USE or self.account.get_address() - volumes = volumes if volumes is not None else [] - memory = memory or settings.DEFAULT_VM_MEMORY - vcpus = vcpus or settings.DEFAULT_VM_VCPUS - timeout_seconds = timeout_seconds or settings.DEFAULT_VM_TIMEOUT - - payment = payment or Payment(chain=Chain.ETH, type=PaymentType.hold) - - # Default to the QEMU hypervisor for instances. - selected_hypervisor: HypervisorType = hypervisor or HypervisorType.qemu - - content = InstanceContent( + content = make_instance_content( + rootfs=rootfs, + rootfs_size=rootfs_size, + payment=payment, + environment_variables=environment_variables, address=address, + memory=memory, + vcpus=vcpus, + timeout_seconds=timeout_seconds, allow_amend=allow_amend, - environment=InstanceEnvironment( - internet=internet, - aleph_api=aleph_api, - hypervisor=selected_hypervisor, - trusted_execution=trusted_execution, - ), - variables=environment_variables, - resources=MachineResources( - vcpus=vcpus, - memory=memory, - seconds=timeout_seconds, - ), - rootfs=RootfsVolume( - parent=ParentVolume( - ref=rootfs, - use_latest=True, - ), - size_mib=rootfs_size, - persistence="host", - use_latest=True, - ), - volumes=[parse_volume(volume) for volume in volumes], - requirements=requirements, - time=time.time(), - authorized_keys=ssh_keys, + internet=internet, + aleph_api=aleph_api, + hypervisor=hypervisor, + trusted_execution=trusted_execution, + volumes=volumes, + ssh_keys=ssh_keys, metadata=metadata, - payment=payment, + requirements=requirements, ) + message, status, response = await self.submit( content=content.dict(exclude_none=True), message_type=MessageType.instance, From 84e6a804c4ed84fd082cc6649c1b3f7ba0547076 Mon Sep 17 00:00:00 2001 From: philogicae <38438271+philogicae@users.noreply.github.com> Date: Thu, 6 Feb 2025 17:18:56 +0200 Subject: [PATCH 06/16] Add get_estimated_price --- src/aleph/sdk/client/abstract.py | 14 +++++++++ src/aleph/sdk/client/http.py | 50 +++++++++++++++++++++++++++++++- 2 files changed, 63 insertions(+), 1 deletion(-) diff --git a/src/aleph/sdk/client/abstract.py b/src/aleph/sdk/client/abstract.py index 025aae6a..1e56f731 100644 --- a/src/aleph/sdk/client/abstract.py +++ b/src/aleph/sdk/client/abstract.py @@ -20,12 +20,14 @@ from aleph_message.models import ( AlephMessage, + InstanceContent, ItemHash, ItemType, MessagesResponse, MessageType, Payment, PostMessage, + ProgramContent, parse_message, ) from aleph_message.models.execution.environment import ( @@ -242,6 +244,18 @@ def watch_messages( """ raise NotImplementedError("Did you mean to import `AlephHttpClient`?") + @abstractmethod + def get_estimated_price( + self, + content: ProgramContent | InstanceContent, + ) -> Coroutine[Any, Any, PriceResponse]: + """ + Get Instance/Program content estimated price + + :param content: Instance or Program content + """ + raise NotImplementedError("Did you mean to import `AlephHttpClient`?") + @abstractmethod def get_program_price( self, diff --git a/src/aleph/sdk/client/http.py b/src/aleph/sdk/client/http.py index 4b42f08a..36748f90 100644 --- a/src/aleph/sdk/client/http.py +++ b/src/aleph/sdk/client/http.py @@ -2,6 +2,7 @@ import logging import os.path import ssl +import time from io import BytesIO from pathlib import Path from typing import ( @@ -20,7 +21,15 @@ import aiohttp from aiohttp.web import HTTPNotFound from aleph_message import parse_message -from aleph_message.models import AlephMessage, ItemHash, ItemType, MessageType +from aleph_message.models import ( + AlephMessage, + Chain, + InstanceContent, + ItemHash, + ItemType, + MessageType, + ProgramContent, +) from aleph_message.status import MessageStatus from pydantic import ValidationError @@ -37,6 +46,7 @@ from ..utils import ( Writable, check_unix_socket_valid, + compute_sha256, copy_async_readable_to_buffer, extended_json_encoder, get_message_type_value, @@ -448,6 +458,44 @@ async def watch_messages( elif msg.type == aiohttp.WSMsgType.ERROR: break + async def get_estimated_price( + self, + content: ProgramContent | InstanceContent, + ) -> PriceResponse: + item_content: str = json.dumps( + content, separators=(",", ":"), default=extended_json_encoder + ) + message = parse_message( + dict( + sender=content.address, + chain=Chain.ETH, + type=( + MessageType.program + if isinstance(content, ProgramContent) + else MessageType.instance + ), + content=content.dict(exclude_none=True), + item_content=item_content, + time=time.time(), + channel=settings.DEFAULT_CHANNEL, + item_type=ItemType.inline, + item_hash=compute_sha256(item_content), + ) + ) + + async with self.http_session.post( + "/api/v0/price/estimate", json=dict(message=message) + ) as resp: + try: + resp.raise_for_status() + response_json = await resp.json() + return PriceResponse( + required_tokens=response_json["required_tokens"], + payment_type=response_json["payment_type"], + ) + except aiohttp.ClientResponseError as e: + raise e + async def get_program_price(self, item_hash: str) -> PriceResponse: async with self.http_session.get(f"/api/v0/price/{item_hash}") as resp: try: From 30190e6e5c075d64baf70807818e4533eb4c2866 Mon Sep 17 00:00:00 2001 From: philogicae <38438271+philogicae@users.noreply.github.com> Date: Thu, 6 Feb 2025 19:25:50 +0200 Subject: [PATCH 07/16] Add token_type to InsufficientFundsError + displayable_amount() --- src/aleph/sdk/chains/ethereum.py | 2 ++ src/aleph/sdk/client/authenticated_http.py | 6 ++++-- src/aleph/sdk/connectors/superfluid.py | 7 +++--- src/aleph/sdk/evm_utils.py | 2 +- src/aleph/sdk/exceptions.py | 15 +++++++++---- src/aleph/sdk/types.py | 13 +++++++++++ src/aleph/sdk/utils.py | 25 ++++++++++++++++------ 7 files changed, 53 insertions(+), 17 deletions(-) diff --git a/src/aleph/sdk/chains/ethereum.py b/src/aleph/sdk/chains/ethereum.py index 3cb7bc20..c185d174 100644 --- a/src/aleph/sdk/chains/ethereum.py +++ b/src/aleph/sdk/chains/ethereum.py @@ -15,6 +15,7 @@ from web3.types import TxParams, TxReceipt from aleph.sdk.exceptions import InsufficientFundsError +from aleph.sdk.types import TokenType from ..conf import settings from ..connectors.superfluid import Superfluid @@ -107,6 +108,7 @@ def can_transact(self, block=True) -> bool: valid = balance > MIN_ETH_BALANCE_WEI if self.chain else False if not valid and block: raise InsufficientFundsError( + token_type=TokenType.GAS, required_funds=MIN_ETH_BALANCE, available_funds=float(from_wei_token(balance)), ) diff --git a/src/aleph/sdk/client/authenticated_http.py b/src/aleph/sdk/client/authenticated_http.py index 68b86d7a..9d81c113 100644 --- a/src/aleph/sdk/client/authenticated_http.py +++ b/src/aleph/sdk/client/authenticated_http.py @@ -38,7 +38,7 @@ from ..conf import settings from ..exceptions import BroadcastError, InsufficientFundsError, InvalidMessageError -from ..types import Account, StorageEnum +from ..types import Account, StorageEnum, TokenType from ..utils import extended_json_encoder, make_instance_content, parse_volume from .abstract import AuthenticatedAlephClient from .http import AlephHttpClient @@ -569,7 +569,9 @@ async def create_instance( account_balance = float(error["account_balance"]) required_balance = float(error["required_balance"]) raise InsufficientFundsError( - required_funds=required_balance, available_funds=account_balance + token_type=TokenType.ALEPH, + required_funds=required_balance, + available_funds=account_balance, ) else: raise ValueError(f"Unknown error code {error_code}: {rejected_message}") diff --git a/src/aleph/sdk/connectors/superfluid.py b/src/aleph/sdk/connectors/superfluid.py index 1ab20a9f..5c02badd 100644 --- a/src/aleph/sdk/connectors/superfluid.py +++ b/src/aleph/sdk/connectors/superfluid.py @@ -6,14 +6,14 @@ from eth_utils import to_normalized_address from superfluid import CFA_V1, Operation, Web3FlowInfo -from aleph.sdk.exceptions import InsufficientFundsError - -from ..evm_utils import ( +from aleph.sdk.evm_utils import ( FlowUpdate, from_wei_token, get_super_token_address, to_wei_token, ) +from aleph.sdk.exceptions import InsufficientFundsError +from aleph.sdk.types import TokenType if TYPE_CHECKING: from aleph.sdk.chains.ethereum import ETHAccount @@ -57,6 +57,7 @@ def can_start_flow(self, flow: Decimal, block=True) -> bool: valid = balance > MIN_FLOW_4H if not valid and block: raise InsufficientFundsError( + token_type=TokenType.ALEPH, required_funds=float(MIN_FLOW_4H), available_funds=float(from_wei_token(balance)), ) diff --git a/src/aleph/sdk/evm_utils.py b/src/aleph/sdk/evm_utils.py index 37dc3843..2bd1da09 100644 --- a/src/aleph/sdk/evm_utils.py +++ b/src/aleph/sdk/evm_utils.py @@ -39,7 +39,7 @@ def to_wei_token(amount: Decimal) -> Decimal: def ether_rounding(amount: Decimal) -> Decimal: """Rounds the given value to 18 decimals.""" - return amount.quantize(Decimal(1) / 10**18, rounding=ROUND_CEILING) + return amount.quantize(Decimal(1) / Decimal(10**18), rounding=ROUND_CEILING) def get_chain_id(chain: Union[Chain, str, None]) -> Optional[int]: diff --git a/src/aleph/sdk/exceptions.py b/src/aleph/sdk/exceptions.py index a538a31c..6af603b1 100644 --- a/src/aleph/sdk/exceptions.py +++ b/src/aleph/sdk/exceptions.py @@ -1,5 +1,8 @@ from abc import ABC +from .types import TokenType +from .utils import displayable_amount + class QueryError(ABC, ValueError): """The result of an API query is inconsistent.""" @@ -69,14 +72,18 @@ class ForgottenMessageError(QueryError): class InsufficientFundsError(Exception): """Raised when the account does not have enough funds to perform an action""" + token_type: TokenType required_funds: float available_funds: float - def __init__(self, required_funds: float, available_funds: float): - self.required_funds = required_funds - self.available_funds = available_funds + def __init__( + self, token_type: TokenType, required_funds: float, available_funds: float + ): + self.token_type = token_type + self.required_funds = displayable_amount(required_funds, decimals=8) + self.available_funds = displayable_amount(available_funds, decimals=8) super().__init__( - f"Insufficient funds: required {required_funds}, available {available_funds}" + f"Insufficient funds ({self.token_type.value}): required {self.required_funds}, available {self.available_funds}" ) diff --git a/src/aleph/sdk/types.py b/src/aleph/sdk/types.py index c698da5d..05fa9815 100644 --- a/src/aleph/sdk/types.py +++ b/src/aleph/sdk/types.py @@ -83,7 +83,20 @@ class ChainInfo(BaseModel): class StoredContent(BaseModel): + """ + A stored content. + """ + filename: Optional[str] hash: Optional[str] url: Optional[str] error: Optional[str] + + +class TokenType(str, Enum): + """ + A token type. + """ + + GAS = "GAS" + ALEPH = "ALEPH" diff --git a/src/aleph/sdk/utils.py b/src/aleph/sdk/utils.py index ce0258ab..37a0147e 100644 --- a/src/aleph/sdk/utils.py +++ b/src/aleph/sdk/utils.py @@ -8,6 +8,7 @@ import os import subprocess from datetime import date, datetime, time +from decimal import ROUND_CEILING, Decimal from enum import Enum from pathlib import Path from shutil import make_archive @@ -28,13 +29,7 @@ from uuid import UUID from zipfile import BadZipFile, ZipFile -from aleph_message.models import ( - Chain, - InstanceContent, - ItemHash, - MessageType, - ProgramContent, -) +from aleph_message.models import Chain, InstanceContent, ItemHash, MessageType from aleph_message.models.execution.base import Payment, PaymentType from aleph_message.models.execution.environment import ( HostRequirements, @@ -418,6 +413,22 @@ def safe_getattr(obj, attr, default=None): return obj +def displayable_amount( + amount: str | int | float | Decimal, decimals: Optional[int] = None +) -> str: + """Returns the amount as a string without unnecessary decimals.""" + + str_amount = "" + try: + dec_amount = Decimal(amount) + if decimals: + dec_amount = dec_amount.quantize(Decimal(1) / Decimal(10**decimals)) + str_amount = str(format(dec_amount.normalize(), "f")) + except ValueError as e: + raise ValueError(f"Invalid amount: {amount}") from e + return str_amount + + def make_instance_content( rootfs: str, rootfs_size: int, From aca81f01c684a72301f590978ecf9503c4bd34bd Mon Sep 17 00:00:00 2001 From: philogicae <38438271+philogicae@users.noreply.github.com> Date: Fri, 7 Feb 2025 15:07:41 +0200 Subject: [PATCH 08/16] mypy: unsupported operand type(s) for | in py3.9, rm unused import, fix wrong types --- src/aleph/sdk/client/abstract.py | 5 ++--- src/aleph/sdk/client/http.py | 4 ++-- src/aleph/sdk/exceptions.py | 6 +++--- src/aleph/sdk/utils.py | 4 ++-- 4 files changed, 9 insertions(+), 10 deletions(-) diff --git a/src/aleph/sdk/client/abstract.py b/src/aleph/sdk/client/abstract.py index 1e56f731..00636ebb 100644 --- a/src/aleph/sdk/client/abstract.py +++ b/src/aleph/sdk/client/abstract.py @@ -20,14 +20,13 @@ from aleph_message.models import ( AlephMessage, - InstanceContent, + ExecutableContent, ItemHash, ItemType, MessagesResponse, MessageType, Payment, PostMessage, - ProgramContent, parse_message, ) from aleph_message.models.execution.environment import ( @@ -247,7 +246,7 @@ def watch_messages( @abstractmethod def get_estimated_price( self, - content: ProgramContent | InstanceContent, + content: ExecutableContent, ) -> Coroutine[Any, Any, PriceResponse]: """ Get Instance/Program content estimated price diff --git a/src/aleph/sdk/client/http.py b/src/aleph/sdk/client/http.py index 36748f90..73c182fa 100644 --- a/src/aleph/sdk/client/http.py +++ b/src/aleph/sdk/client/http.py @@ -24,7 +24,7 @@ from aleph_message.models import ( AlephMessage, Chain, - InstanceContent, + ExecutableContent, ItemHash, ItemType, MessageType, @@ -460,7 +460,7 @@ async def watch_messages( async def get_estimated_price( self, - content: ProgramContent | InstanceContent, + content: ExecutableContent, ) -> PriceResponse: item_content: str = json.dumps( content, separators=(",", ":"), default=extended_json_encoder diff --git a/src/aleph/sdk/exceptions.py b/src/aleph/sdk/exceptions.py index 6af603b1..05ed755f 100644 --- a/src/aleph/sdk/exceptions.py +++ b/src/aleph/sdk/exceptions.py @@ -80,10 +80,10 @@ def __init__( self, token_type: TokenType, required_funds: float, available_funds: float ): self.token_type = token_type - self.required_funds = displayable_amount(required_funds, decimals=8) - self.available_funds = displayable_amount(available_funds, decimals=8) + self.required_funds = required_funds + self.available_funds = available_funds super().__init__( - f"Insufficient funds ({self.token_type.value}): required {self.required_funds}, available {self.available_funds}" + f"Insufficient funds ({self.token_type.value}): required {displayable_amount(self.required_funds, decimals=8)}, available {displayable_amount(self.available_funds, decimals=8)}" ) diff --git a/src/aleph/sdk/utils.py b/src/aleph/sdk/utils.py index 37a0147e..ed78858e 100644 --- a/src/aleph/sdk/utils.py +++ b/src/aleph/sdk/utils.py @@ -8,7 +8,7 @@ import os import subprocess from datetime import date, datetime, time -from decimal import ROUND_CEILING, Decimal +from decimal import Decimal from enum import Enum from pathlib import Path from shutil import make_archive @@ -414,7 +414,7 @@ def safe_getattr(obj, attr, default=None): def displayable_amount( - amount: str | int | float | Decimal, decimals: Optional[int] = None + amount: Union[str, int, float, Decimal], decimals: Optional[int] = None ) -> str: """Returns the amount as a string without unnecessary decimals.""" From 97ac50952b2e7b48b3a4a5448f4c6a16c79961bf Mon Sep 17 00:00:00 2001 From: philogicae <38438271+philogicae@users.noreply.github.com> Date: Thu, 13 Feb 2025 14:47:56 +0200 Subject: [PATCH 09/16] Fix decimal issue --- src/aleph/sdk/connectors/superfluid.py | 2 +- src/aleph/sdk/evm_utils.py | 18 ++++++++++-------- src/aleph/sdk/utils.py | 16 +++++++++++----- 3 files changed, 22 insertions(+), 14 deletions(-) diff --git a/src/aleph/sdk/connectors/superfluid.py b/src/aleph/sdk/connectors/superfluid.py index 5c02badd..76bbf907 100644 --- a/src/aleph/sdk/connectors/superfluid.py +++ b/src/aleph/sdk/connectors/superfluid.py @@ -58,7 +58,7 @@ def can_start_flow(self, flow: Decimal, block=True) -> bool: if not valid and block: raise InsufficientFundsError( token_type=TokenType.ALEPH, - required_funds=float(MIN_FLOW_4H), + required_funds=float(from_wei_token(MIN_FLOW_4H)), available_funds=float(from_wei_token(balance)), ) return valid diff --git a/src/aleph/sdk/evm_utils.py b/src/aleph/sdk/evm_utils.py index 2bd1da09..a425d580 100644 --- a/src/aleph/sdk/evm_utils.py +++ b/src/aleph/sdk/evm_utils.py @@ -1,4 +1,4 @@ -from decimal import ROUND_CEILING, Decimal +from decimal import ROUND_CEILING, Context, Decimal from enum import Enum from typing import List, Optional, Union @@ -27,19 +27,21 @@ class FlowUpdate(str, Enum): INCREASE = "increase" +def ether_rounding(amount: Decimal) -> Decimal: + """Rounds the given value to 18 decimals.""" + return amount.quantize( + Decimal(1) / Decimal(10**18), rounding=ROUND_CEILING, context=Context(prec=36) + ) + + def from_wei_token(amount: Decimal) -> Decimal: """Converts the given wei value to ether.""" - return amount / Decimal(10) ** Decimal(settings.TOKEN_DECIMALS) + return ether_rounding(amount / Decimal(10) ** Decimal(settings.TOKEN_DECIMALS)) def to_wei_token(amount: Decimal) -> Decimal: """Converts the given ether value to wei.""" - return amount * Decimal(10) ** Decimal(settings.TOKEN_DECIMALS) - - -def ether_rounding(amount: Decimal) -> Decimal: - """Rounds the given value to 18 decimals.""" - return amount.quantize(Decimal(1) / Decimal(10**18), rounding=ROUND_CEILING) + return Decimal(int(amount * Decimal(10) ** Decimal(settings.TOKEN_DECIMALS))) def get_chain_id(chain: Union[Chain, str, None]) -> Optional[int]: diff --git a/src/aleph/sdk/utils.py b/src/aleph/sdk/utils.py index ed78858e..1c7b43f8 100644 --- a/src/aleph/sdk/utils.py +++ b/src/aleph/sdk/utils.py @@ -8,7 +8,7 @@ import os import subprocess from datetime import date, datetime, time -from decimal import Decimal +from decimal import Context, Decimal, InvalidOperation from enum import Enum from pathlib import Path from shutil import make_archive @@ -414,7 +414,7 @@ def safe_getattr(obj, attr, default=None): def displayable_amount( - amount: Union[str, int, float, Decimal], decimals: Optional[int] = None + amount: Union[str, int, float, Decimal], decimals: int = 18 ) -> str: """Returns the amount as a string without unnecessary decimals.""" @@ -422,10 +422,16 @@ def displayable_amount( try: dec_amount = Decimal(amount) if decimals: - dec_amount = dec_amount.quantize(Decimal(1) / Decimal(10**decimals)) + dec_amount = dec_amount.quantize( + Decimal(1) / Decimal(10**decimals), context=Context(prec=36) + ) str_amount = str(format(dec_amount.normalize(), "f")) - except ValueError as e: - raise ValueError(f"Invalid amount: {amount}") from e + except ValueError: + logger.error(f"Invalid amount to display: {amount}") + exit(1) + except InvalidOperation: + logger.error(f"Invalid operation on amount to display: {amount}") + exit(1) return str_amount From 78163488f8afb58c274678b553283bc64cd724e9 Mon Sep 17 00:00:00 2001 From: philogicae <38438271+philogicae@users.noreply.github.com> Date: Mon, 17 Feb 2025 17:54:03 +0200 Subject: [PATCH 10/16] mypy fixes --- src/aleph/sdk/client/abstract.py | 67 +++++++++++----------- src/aleph/sdk/client/authenticated_http.py | 62 ++++++++++---------- src/aleph/sdk/client/http.py | 24 +++++--- src/aleph/sdk/utils.py | 36 +++++------- tests/unit/aleph_vm_authentication.py | 2 +- tests/unit/test_asynchronous.py | 10 +++- tests/unit/test_price.py | 12 ++-- 7 files changed, 107 insertions(+), 106 deletions(-) diff --git a/src/aleph/sdk/client/abstract.py b/src/aleph/sdk/client/abstract.py index 00636ebb..7f9fed8e 100644 --- a/src/aleph/sdk/client/abstract.py +++ b/src/aleph/sdk/client/abstract.py @@ -23,7 +23,6 @@ ExecutableContent, ItemHash, ItemType, - MessagesResponse, MessageType, Payment, PostMessage, @@ -42,7 +41,7 @@ from aleph.sdk.utils import extended_json_encoder from ..query.filters import MessageFilter, PostFilter -from ..query.responses import PostsResponse, PriceResponse +from ..query.responses import MessagesResponse, PostsResponse, PriceResponse from ..types import GenericMessage, StorageEnum from ..utils import Writable, compute_sha256 @@ -111,7 +110,7 @@ async def get_posts_iterator( ) page += 1 for post in resp.posts: - yield post + yield post # type: ignore @abstractmethod async def download_file(self, file_hash: str) -> bytes: @@ -278,7 +277,7 @@ async def create_post( post_type: str, ref: Optional[str] = None, address: Optional[str] = None, - channel: Optional[str] = None, + channel: Optional[str] = settings.DEFAULT_CHANNEL, inline: bool = True, storage_engine: StorageEnum = StorageEnum.storage, sync: bool = False, @@ -303,9 +302,9 @@ async def create_post( async def create_aggregate( self, key: str, - content: Mapping[str, Any], + content: dict[str, Any], address: Optional[str] = None, - channel: Optional[str] = None, + channel: Optional[str] = settings.DEFAULT_CHANNEL, inline: bool = True, sync: bool = False, ) -> Tuple[AlephMessage, MessageStatus]: @@ -315,7 +314,7 @@ async def create_aggregate( :param key: Key to use to store the content :param content: Content to store :param address: Address to use to sign the message - :param channel: Channel to use (Default: "TEST") + :param channel: Channel to use (Default: "ALEPH-CLOUDSOLUTIONS") :param inline: Whether to write content inside the message (Default: True) :param sync: If true, waits for the message to be processed by the API server (Default: False) """ @@ -334,7 +333,7 @@ async def create_store( ref: Optional[str] = None, storage_engine: StorageEnum = StorageEnum.storage, extra_fields: Optional[dict] = None, - channel: Optional[str] = None, + channel: Optional[str] = settings.DEFAULT_CHANNEL, sync: bool = False, ) -> Tuple[AlephMessage, MessageStatus]: """ @@ -363,22 +362,22 @@ async def create_program( program_ref: str, entrypoint: str, runtime: str, - environment_variables: Optional[Mapping[str, str]] = None, - storage_engine: StorageEnum = StorageEnum.storage, - channel: Optional[str] = None, + metadata: Optional[dict[str, Any]] = None, address: Optional[str] = None, - sync: bool = False, - memory: Optional[int] = None, vcpus: Optional[int] = None, + memory: Optional[int] = None, timeout_seconds: Optional[float] = None, - persistent: bool = False, - allow_amend: bool = False, internet: bool = True, + allow_amend: bool = False, aleph_api: bool = True, encoding: Encoding = Encoding.zip, + persistent: bool = False, volumes: Optional[List[Mapping]] = None, - subscriptions: Optional[List[Mapping]] = None, - metadata: Optional[Mapping[str, Any]] = None, + environment_variables: Optional[dict[str, str]] = None, + subscriptions: Optional[List[dict]] = None, + sync: bool = False, + channel: Optional[str] = settings.DEFAULT_CHANNEL, + storage_engine: StorageEnum = StorageEnum.storage, ) -> Tuple[AlephMessage, MessageStatus]: """ Post a (create) PROGRAM message. @@ -386,22 +385,22 @@ async def create_program( :param program_ref: Reference to the program to run :param entrypoint: Entrypoint to run :param runtime: Runtime to use - :param environment_variables: Environment variables to pass to the program - :param storage_engine: Storage engine to use (Default: "storage") - :param channel: Channel to use (Default: "TEST") + :param metadata: Metadata to attach to the message :param address: Address to use (Default: account.get_address()) - :param sync: If true, waits for the message to be processed by the API server - :param memory: Memory in MB for the VM to be allocated (Default: 128) :param vcpus: Number of vCPUs to allocate (Default: 1) + :param memory: Memory in MB for the VM to be allocated (Default: 128) :param timeout_seconds: Timeout in seconds (Default: 30.0) - :param persistent: Whether the program should be persistent or not (Default: False) - :param allow_amend: Whether the deployed VM image may be changed (Default: False) :param internet: Whether the VM should have internet connectivity. (Default: True) + :param allow_amend: Whether the deployed VM image may be changed (Default: False) :param aleph_api: Whether the VM needs access to Aleph messages API (Default: True) :param encoding: Encoding to use (Default: Encoding.zip) + :param persistent: Whether the program should be persistent or not (Default: False) :param volumes: Volumes to mount + :param environment_variables: Environment variables to pass to the program :param subscriptions: Patterns of aleph.im messages to forward to the program's event receiver - :param metadata: Metadata to attach to the message + :param sync: If true, waits for the message to be processed by the API server + :param channel: Channel to use (Default: "ALEPH-CLOUDSOLUTIONS") + :param storage_engine: Storage engine to use (Default: "storage") """ raise NotImplementedError( "Did you mean to import `AuthenticatedAlephHttpClient`?" @@ -413,9 +412,9 @@ async def create_instance( rootfs: str, rootfs_size: int, payment: Optional[Payment] = None, - environment_variables: Optional[Mapping[str, str]] = None, + environment_variables: Optional[dict[str, str]] = None, storage_engine: StorageEnum = StorageEnum.storage, - channel: Optional[str] = None, + channel: Optional[str] = settings.DEFAULT_CHANNEL, address: Optional[str] = None, sync: bool = False, memory: Optional[int] = None, @@ -429,7 +428,7 @@ async def create_instance( volumes: Optional[List[Mapping]] = None, volume_persistence: str = "host", ssh_keys: Optional[List[str]] = None, - metadata: Optional[Mapping[str, Any]] = None, + metadata: Optional[dict[str, Any]] = None, requirements: Optional[HostRequirements] = None, ) -> Tuple[AlephMessage, MessageStatus]: """ @@ -440,7 +439,7 @@ async def create_instance( :param payment: Payment method used to pay for the instance :param environment_variables: Environment variables to pass to the program :param storage_engine: Storage engine to use (Default: "storage") - :param channel: Channel to use (Default: "TEST") + :param channel: Channel to use (Default: "ALEPH-CLOUDSOLUTIONS") :param address: Address to use (Default: account.get_address()) :param sync: If true, waits for the message to be processed by the API server :param memory: Memory in MB for the VM to be allocated (Default: 2048) @@ -468,7 +467,7 @@ async def forget( hashes: List[ItemHash], reason: Optional[str], storage_engine: StorageEnum = StorageEnum.storage, - channel: Optional[str] = None, + channel: Optional[str] = settings.DEFAULT_CHANNEL, address: Optional[str] = None, sync: bool = False, ) -> Tuple[AlephMessage, MessageStatus]: @@ -481,7 +480,7 @@ async def forget( :param hashes: Hashes of the messages to forget :param reason: Reason for forgetting the messages :param storage_engine: Storage engine to use (Default: "storage") - :param channel: Channel to use (Default: "TEST") + :param channel: Channel to use (Default: "ALEPH-CLOUDSOLUTIONS") :param address: Address to use (Default: account.get_address()) :param sync: If true, waits for the message to be processed by the API server (Default: False) """ @@ -503,7 +502,7 @@ async def generate_signed_message( :param message_type: Type of the message (PostMessage, ...) :param content: User-defined content of the message - :param channel: Channel to use (Default: "TEST") + :param channel: Channel to use (Default: "ALEPH-CLOUDSOLUTIONS") :param allow_inlining: Whether to allow inlining the content of the message (Default: True) :param storage_engine: Storage engine to use (Default: "storage") """ @@ -550,7 +549,7 @@ async def submit( self, content: Dict[str, Any], message_type: MessageType, - channel: Optional[str] = None, + channel: Optional[str] = settings.DEFAULT_CHANNEL, storage_engine: StorageEnum = StorageEnum.storage, allow_inlining: bool = True, sync: bool = False, @@ -562,7 +561,7 @@ async def submit( :param content: Content of the message :param message_type: Type of the message - :param channel: Channel to use (Default: "TEST") + :param channel: Channel to use (Default: "ALEPH-CLOUDSOLUTIONS") :param storage_engine: Storage engine to use (Default: "storage") :param allow_inlining: Whether to allow inlining the content of the message (Default: True) :param sync: If true, waits for the message to be processed by the API server (Default: False) diff --git a/src/aleph/sdk/client/authenticated_http.py b/src/aleph/sdk/client/authenticated_http.py index 9d81c113..2a2f9cf4 100644 --- a/src/aleph/sdk/client/authenticated_http.py +++ b/src/aleph/sdk/client/authenticated_http.py @@ -5,7 +5,7 @@ import time from io import BytesIO from pathlib import Path -from typing import Any, Dict, List, Mapping, NoReturn, Optional, Tuple, Union +from typing import Any, Dict, Mapping, NoReturn, Optional, Tuple, Union import aiohttp from aleph_message.models import ( @@ -16,30 +16,26 @@ ForgetMessage, InstanceMessage, ItemHash, + ItemType, MessageType, PostContent, PostMessage, - ProgramContent, ProgramMessage, StoreContent, StoreMessage, ) from aleph_message.models.execution.base import Encoding, Payment from aleph_message.models.execution.environment import ( - FunctionEnvironment, HostRequirements, HypervisorType, - MachineResources, TrustedExecutionEnvironment, ) -from aleph_message.models.execution.program import CodeContent, FunctionRuntime -from aleph_message.models.execution.volume import MachineVolume from aleph_message.status import MessageStatus from ..conf import settings from ..exceptions import BroadcastError, InsufficientFundsError, InvalidMessageError from ..types import Account, StorageEnum, TokenType -from ..utils import extended_json_encoder, make_instance_content, parse_volume +from ..utils import extended_json_encoder, make_instance_content, make_program_content from .abstract import AuthenticatedAlephClient from .http import AlephHttpClient @@ -281,7 +277,7 @@ async def create_post( post_type: str, ref: Optional[str] = None, address: Optional[str] = None, - channel: Optional[str] = None, + channel: Optional[str] = settings.DEFAULT_CHANNEL, inline: bool = True, storage_engine: StorageEnum = StorageEnum.storage, sync: bool = False, @@ -304,14 +300,14 @@ async def create_post( storage_engine=storage_engine, sync=sync, ) - return message, status + return message, status # type: ignore async def create_aggregate( self, key: str, - content: Mapping[str, Any], + content: dict[str, Any], address: Optional[str] = None, - channel: Optional[str] = None, + channel: Optional[str] = settings.DEFAULT_CHANNEL, inline: bool = True, sync: bool = False, ) -> Tuple[AggregateMessage, MessageStatus]: @@ -331,7 +327,7 @@ async def create_aggregate( allow_inlining=inline, sync=sync, ) - return message, status + return message, status # type: ignore async def create_store( self, @@ -343,7 +339,7 @@ async def create_store( ref: Optional[str] = None, storage_engine: StorageEnum = StorageEnum.storage, extra_fields: Optional[dict] = None, - channel: Optional[str] = None, + channel: Optional[str] = settings.DEFAULT_CHANNEL, sync: bool = False, ) -> Tuple[StoreMessage, MessageStatus]: address = address or settings.ADDRESS_TO_USE or self.account.get_address() @@ -396,7 +392,7 @@ async def create_store( if extra_fields is not None: values.update(extra_fields) - content = StoreContent(**values) + content = StoreContent.parse_obj(values) message, status, _ = await self.submit( content=content.dict(exclude_none=True), @@ -405,7 +401,7 @@ async def create_store( allow_inlining=True, sync=sync, ) - return message, status + return message, status # type: ignore async def create_program( self, @@ -457,7 +453,7 @@ async def create_program( address=address, allow_amend=allow_amend, code=CodeContent( - encoding=encoding, + encoding=encoding, entrypoint=entrypoint, ref=program_ref, use_latest=True, @@ -498,16 +494,16 @@ async def create_program( storage_engine=storage_engine, sync=sync, ) - return message, status + return message, status # type: ignore async def create_instance( self, rootfs: str, rootfs_size: int, payment: Optional[Payment] = None, - environment_variables: Optional[Mapping[str, str]] = None, + environment_variables: Optional[dict[str, str]] = None, storage_engine: StorageEnum = StorageEnum.storage, - channel: Optional[str] = None, + channel: Optional[str] = settings.DEFAULT_CHANNEL, address: Optional[str] = None, sync: bool = False, memory: Optional[int] = None, @@ -518,10 +514,10 @@ async def create_instance( aleph_api: bool = True, hypervisor: Optional[HypervisorType] = None, trusted_execution: Optional[TrustedExecutionEnvironment] = None, - volumes: Optional[List[Mapping]] = None, + volumes: Optional[list[Mapping]] = None, volume_persistence: str = "host", - ssh_keys: Optional[List[str]] = None, - metadata: Optional[Mapping[str, Any]] = None, + ssh_keys: Optional[list[str]] = None, + metadata: Optional[dict[str, Any]] = None, requirements: Optional[HostRequirements] = None, ) -> Tuple[InstanceMessage, MessageStatus]: address = address or settings.ADDRESS_TO_USE or self.account.get_address() @@ -555,7 +551,7 @@ async def create_instance( raise_on_rejected=False, ) if status in (MessageStatus.PROCESSED, MessageStatus.PENDING): - return message, status + return message, status # type: ignore # get the reason for rejection rejected_message = await self.get_message_error(message.item_hash) @@ -578,10 +574,10 @@ async def create_instance( async def forget( self, - hashes: List[ItemHash], + hashes: list[ItemHash], reason: Optional[str], storage_engine: StorageEnum = StorageEnum.storage, - channel: Optional[str] = None, + channel: Optional[str] = settings.DEFAULT_CHANNEL, address: Optional[str] = None, sync: bool = False, ) -> Tuple[ForgetMessage, MessageStatus]: @@ -602,13 +598,13 @@ async def forget( allow_inlining=True, sync=sync, ) - return message, status + return message, status # type: ignore async def submit( self, content: Dict[str, Any], message_type: MessageType, - channel: Optional[str] = None, + channel: Optional[str] = settings.DEFAULT_CHANNEL, storage_engine: StorageEnum = StorageEnum.storage, allow_inlining: bool = True, sync: bool = False, @@ -630,7 +626,7 @@ async def _storage_push_file_with_message( self, file_content: bytes, store_content: StoreContent, - channel: Optional[str] = None, + channel: Optional[str] = settings.DEFAULT_CHANNEL, sync: bool = False, ) -> Tuple[StoreMessage, MessageStatus]: """Push a file to the storage service.""" @@ -662,7 +658,7 @@ async def _storage_push_file_with_message( message_status = ( MessageStatus.PENDING if resp.status == 202 else MessageStatus.PROCESSED ) - return message, message_status + return message, message_status # type: ignore async def _upload_file_native( self, @@ -671,7 +667,7 @@ async def _upload_file_native( guess_mime_type: bool = False, ref: Optional[str] = None, extra_fields: Optional[dict] = None, - channel: Optional[str] = None, + channel: Optional[str] = settings.DEFAULT_CHANNEL, sync: bool = False, ) -> Tuple[StoreMessage, MessageStatus]: file_hash = hashlib.sha256(file_content).hexdigest() @@ -683,9 +679,9 @@ async def _upload_file_native( store_content = StoreContent( address=address, ref=ref, - item_type=StorageEnum.storage, - item_hash=file_hash, - mime_type=mime_type, + item_type=ItemType.storage, + item_hash=ItemHash(file_hash), + mime_type=mime_type, # type: ignore time=time.time(), **extra_fields, ) diff --git a/src/aleph/sdk/client/http.py b/src/aleph/sdk/client/http.py index 73c182fa..2b1c15b0 100644 --- a/src/aleph/sdk/client/http.py +++ b/src/aleph/sdk/client/http.py @@ -368,7 +368,7 @@ async def get_messages( ) @overload - async def get_message( + async def get_message( # type: ignore self, item_hash: str, message_type: Optional[Type[GenericMessage]] = None, @@ -393,7 +393,7 @@ async def get_message( resp.raise_for_status() except aiohttp.ClientResponseError as e: if e.status == 404: - raise MessageNotFoundError(f"No such hash {item_hash}") + raise MessageNotFoundError(f"No such hash {item_hash}") from e raise e message_raw = await resp.json() if message_raw["status"] == "forgotten": @@ -409,9 +409,9 @@ async def get_message( f"does not match the expected type '{expected_type}'" ) if with_status: - return message, message_raw["status"] + return message, message_raw["status"] # type: ignore else: - return message + return message # type: ignore async def get_message_error( self, @@ -539,15 +539,21 @@ async def get_stored_content( resp = f"Invalid CID: {message.content.item_hash}" else: filename = safe_getattr(message.content, "metadata.name") - hash = message.content.item_hash + item_hash = message.content.item_hash url = ( f"{self.api_server}/api/v0/storage/raw/" - if len(hash) == 64 + if len(item_hash) == 64 else settings.IPFS_GATEWAY - ) + hash - result = StoredContent(filename=filename, hash=hash, url=url) + ) + item_hash + result = StoredContent( + filename=filename, hash=item_hash, url=url, error=None + ) except MessageNotFoundError: resp = f"Message not found: {item_hash}" except ForgottenMessageError: resp = f"Message forgotten: {item_hash}" - return result if result else StoredContent(error=resp) + return ( + result + if result + else StoredContent(error=resp, filename=None, hash=None, url=None) + ) diff --git a/src/aleph/sdk/utils.py b/src/aleph/sdk/utils.py index 1c7b43f8..b33ee058 100644 --- a/src/aleph/sdk/utils.py +++ b/src/aleph/sdk/utils.py @@ -16,7 +16,6 @@ Any, Dict, Iterable, - List, Mapping, Optional, Protocol, @@ -187,19 +186,15 @@ def extended_json_encoder(obj: Any) -> Any: def parse_volume(volume_dict: Union[Mapping, MachineVolume]) -> MachineVolume: - # Python 3.9 does not support `isinstance(volume_dict, MachineVolume)`, - # so we need to iterate over all types. - if any( + if not any( isinstance(volume_dict, volume_type) for volume_type in get_args(MachineVolume) ): - return volume_dict for volume_type in get_args(MachineVolume): try: return volume_type.parse_obj(volume_dict) - except ValueError: - continue - else: - raise ValueError(f"Could not parse volume: {volume_dict}") + except ValueError as e: + raise ValueError(f"Could not parse volume: {volume_dict}") from e + return volume_dict # type: ignore def compute_sha256(s: str) -> str: @@ -244,7 +239,7 @@ def sign_vm_control_payload(payload: Dict[str, str], ephemeral_key) -> str: async def run_in_subprocess( - command: List[str], check: bool = True, stdin_input: Optional[bytes] = None + command: list[str], check: bool = True, stdin_input: Optional[bytes] = None ) -> bytes: """Run the specified command in a subprocess, returns the stdout of the process.""" logger.debug(f"command: {' '.join(command)}") @@ -439,7 +434,7 @@ def make_instance_content( rootfs: str, rootfs_size: int, payment: Optional[Payment] = None, - environment_variables: Optional[Mapping[str, str]] = None, + environment_variables: Optional[dict[str, str]] = None, address: Optional[str] = None, memory: Optional[int] = None, vcpus: Optional[int] = None, @@ -449,9 +444,9 @@ def make_instance_content( aleph_api: bool = True, hypervisor: Optional[HypervisorType] = None, trusted_execution: Optional[TrustedExecutionEnvironment] = None, - volumes: Optional[List[Mapping]] = None, - ssh_keys: Optional[List[str]] = None, - metadata: Optional[Mapping[str, Any]] = None, + volumes: Optional[list[Mapping]] = None, + ssh_keys: Optional[list[str]] = None, + metadata: Optional[dict[str, Any]] = None, requirements: Optional[HostRequirements] = None, ) -> InstanceContent: """ @@ -459,7 +454,7 @@ def make_instance_content( """ address = address or "0x0000000000000000000000000000000000000000" - payment = payment or Payment(chain=Chain.ETH, type=PaymentType.hold) + payment = payment or Payment(chain=Chain.ETH, type=PaymentType.hold, receiver=None) selected_hypervisor: HypervisorType = hypervisor or HypervisorType.qemu vcpus = vcpus or settings.DEFAULT_VM_VCPUS memory = memory or settings.DEFAULT_VM_MEMORY @@ -478,17 +473,16 @@ def make_instance_content( variables=environment_variables, resources=MachineResources( vcpus=vcpus, - memory=memory, - seconds=timeout_seconds, + memory=Mebibytes(memory), + seconds=int(timeout_seconds), ), rootfs=RootfsVolume( parent=ParentVolume( - ref=rootfs, + ref=ItemHash(rootfs), use_latest=True, ), - size_mib=rootfs_size, - persistence="host", - use_latest=True, + size_mib=PersistentVolumeSizeMib(rootfs_size), + persistence=VolumePersistence.host, ), volumes=[parse_volume(volume) for volume in volumes], requirements=requirements, diff --git a/tests/unit/aleph_vm_authentication.py b/tests/unit/aleph_vm_authentication.py index 491da51a..6083a119 100644 --- a/tests/unit/aleph_vm_authentication.py +++ b/tests/unit/aleph_vm_authentication.py @@ -263,7 +263,7 @@ async def authenticate_websocket_message( signed_operation = SignedOperation.parse_obj(message["X-SignedOperation"]) if signed_operation.content.domain != domain_name: logger.debug( - f"Invalid domain '{signed_pubkey.content.domain}' != '{domain_name}'" + f"Invalid domain '{signed_operation.content.domain}' != '{domain_name}'" ) raise web.HTTPUnauthorized(reason="Invalid domain") return verify_signed_operation(signed_operation, signed_pubkey) diff --git a/tests/unit/test_asynchronous.py b/tests/unit/test_asynchronous.py index b044e170..e2647590 100644 --- a/tests/unit/test_asynchronous.py +++ b/tests/unit/test_asynchronous.py @@ -7,6 +7,7 @@ Chain, ForgetMessage, InstanceMessage, + ItemHash, MessageType, Payment, PaymentType, @@ -184,12 +185,16 @@ async def test_create_confidential_instance(mock_session_with_post_success): ), hypervisor=HypervisorType.qemu, trusted_execution=TrustedExecutionEnvironment( - firmware="cafecafecafecafecafecafecafecafecafecafecafecafecafecafecafecafe", + firmware=ItemHash( + "cafecafecafecafecafecafecafecafecafecafecafecafecafecafecafecafe" + ), policy=0b1, ), requirements=HostRequirements( node=NodeRequirements( - node_hash="cafecafecafecafecafecafecafecafecafecafecafecafecafecafecafecafe", + node_hash=ItemHash( + "cafecafecafecafecafecafecafecafecafecafecafecafecafecafecafecafe" + ), ) ), ) @@ -285,5 +290,6 @@ async def test_create_instance_insufficient_funds_error( payment=Payment( chain=Chain.ETH, type=PaymentType.hold, + receiver=None, ), ) diff --git a/tests/unit/test_price.py b/tests/unit/test_price.py index bed9304a..fe9e3468 100644 --- a/tests/unit/test_price.py +++ b/tests/unit/test_price.py @@ -11,14 +11,14 @@ async def test_get_program_price_valid(): Test that the get_program_price method returns the correct PriceResponse when given a valid item hash. """ - expected_response = { - "required_tokens": 3.0555555555555556e-06, - "payment_type": "superfluid", - } - mock_session = make_mock_get_session(expected_response) + expected = PriceResponse( + required_tokens=3.0555555555555556e-06, + payment_type="superfluid", + ) + mock_session = make_mock_get_session(expected.dict()) async with mock_session: response = await mock_session.get_program_price("cacacacacacaca") - assert response == PriceResponse(**expected_response) + assert response == expected @pytest.mark.asyncio From 6de3236e87f68b7670ce25fb01042d4d79564810 Mon Sep 17 00:00:00 2001 From: philogicae <38438271+philogicae@users.noreply.github.com> Date: Mon, 17 Feb 2025 18:04:31 +0200 Subject: [PATCH 11/16] Add make_program_content --- src/aleph/sdk/utils.py | 110 ++++++++++++++++++++++++++++++++++++++--- 1 file changed, 104 insertions(+), 6 deletions(-) diff --git a/src/aleph/sdk/utils.py b/src/aleph/sdk/utils.py index b33ee058..be0849c6 100644 --- a/src/aleph/sdk/utils.py +++ b/src/aleph/sdk/utils.py @@ -28,18 +28,38 @@ from uuid import UUID from zipfile import BadZipFile, ZipFile -from aleph_message.models import Chain, InstanceContent, ItemHash, MessageType +from aleph_message.models import ( + Chain, + InstanceContent, + ItemHash, + MachineType, + MessageType, + ProgramContent, +) from aleph_message.models.execution.base import Payment, PaymentType from aleph_message.models.execution.environment import ( + FunctionEnvironment, + FunctionTriggers, HostRequirements, HypervisorType, InstanceEnvironment, MachineResources, + Subscription, TrustedExecutionEnvironment, ) from aleph_message.models.execution.instance import RootfsVolume -from aleph_message.models.execution.program import Encoding -from aleph_message.models.execution.volume import MachineVolume, ParentVolume +from aleph_message.models.execution.program import ( + CodeContent, + Encoding, + FunctionRuntime, +) +from aleph_message.models.execution.volume import ( + MachineVolume, + ParentVolume, + PersistentVolumeSizeMib, + VolumePersistence, +) +from aleph_message.utils import Mebibytes from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from jwcrypto.jwa import JWA @@ -189,9 +209,9 @@ def parse_volume(volume_dict: Union[Mapping, MachineVolume]) -> MachineVolume: if not any( isinstance(volume_dict, volume_type) for volume_type in get_args(MachineVolume) ): - for volume_type in get_args(MachineVolume): - try: - return volume_type.parse_obj(volume_dict) + for volume_type in get_args(MachineVolume): + try: + return volume_type.parse_obj(volume_dict) except ValueError as e: raise ValueError(f"Could not parse volume: {volume_dict}") from e return volume_dict # type: ignore @@ -491,3 +511,81 @@ def make_instance_content( metadata=metadata, payment=payment, ) + + +def make_program_content( + program_ref: str, + entrypoint: str, + runtime: str, + metadata: Optional[dict[str, Any]] = None, + address: Optional[str] = None, + vcpus: Optional[int] = None, + memory: Optional[int] = None, + timeout_seconds: Optional[float] = None, + internet: bool = False, + aleph_api: bool = True, + allow_amend: bool = False, + encoding: Encoding = Encoding.zip, + persistent: bool = False, + volumes: Optional[list[Mapping]] = None, + environment_variables: Optional[dict[str, str]] = None, + subscriptions: Optional[list[dict]] = None, + payment: Optional[Payment] = None, +) -> ProgramContent: + """ + Create ProgramContent object given the provided fields. + """ + + address = address or "0x0000000000000000000000000000000000000000" + payment = payment or Payment(chain=Chain.ETH, type=PaymentType.hold, receiver=None) + vcpus = vcpus or settings.DEFAULT_VM_VCPUS + memory = memory or settings.DEFAULT_VM_MEMORY + timeout_seconds = timeout_seconds or settings.DEFAULT_VM_TIMEOUT + volumes = volumes if volumes is not None else [] + subscriptions = ( + [Subscription(**sub) for sub in subscriptions] + if subscriptions is not None + else None + ) + + return ProgramContent( + type=MachineType.vm_function, + address=address, + allow_amend=allow_amend, + code=CodeContent( + encoding=encoding, + entrypoint=entrypoint, + ref=ItemHash(program_ref), + use_latest=True, + ), + on=FunctionTriggers( + http=True, + persistent=persistent, + message=subscriptions, + ), + environment=FunctionEnvironment( + reproducible=False, + internet=internet, + aleph_api=aleph_api, + ), + variables=environment_variables, + resources=MachineResources( + vcpus=vcpus, + memory=Mebibytes(memory), + seconds=int(timeout_seconds), + ), + runtime=FunctionRuntime( + ref=ItemHash(runtime), + use_latest=True, + comment=( + "Official aleph.im runtime" + if runtime == settings.DEFAULT_RUNTIME_ID + else "" + ), + ), + volumes=[parse_volume(volume) for volume in volumes], + time=datetime.now().timestamp(), + metadata=metadata, + authorized_keys=[], + payment=payment, + ) From e0dabacc30be5bae5b4a7622a0e18efaefa54131 Mon Sep 17 00:00:00 2001 From: philogicae <38438271+philogicae@users.noreply.github.com> Date: Mon, 17 Feb 2025 18:04:44 +0200 Subject: [PATCH 12/16] Simplify create_program --- src/aleph/sdk/client/authenticated_http.py | 112 +++++++++------------ 1 file changed, 46 insertions(+), 66 deletions(-) diff --git a/src/aleph/sdk/client/authenticated_http.py b/src/aleph/sdk/client/authenticated_http.py index 2a2f9cf4..9bb9a1e7 100644 --- a/src/aleph/sdk/client/authenticated_http.py +++ b/src/aleph/sdk/client/authenticated_http.py @@ -408,93 +408,73 @@ async def create_program( program_ref: str, entrypoint: str, runtime: str, - environment_variables: Optional[Mapping[str, str]] = None, - storage_engine: StorageEnum = StorageEnum.storage, - channel: Optional[str] = None, + metadata: Optional[dict[str, Any]] = None, address: Optional[str] = None, - sync: bool = False, - memory: Optional[int] = None, vcpus: Optional[int] = None, + memory: Optional[int] = None, timeout_seconds: Optional[float] = None, - persistent: bool = False, - allow_amend: bool = False, internet: bool = True, + allow_amend: bool = False, aleph_api: bool = True, encoding: Encoding = Encoding.zip, - volumes: Optional[List[Mapping]] = None, - subscriptions: Optional[List[Mapping]] = None, - metadata: Optional[Mapping[str, Any]] = None, + persistent: bool = False, + volumes: Optional[list[Mapping]] = None, + environment_variables: Optional[dict[str, str]] = None, + subscriptions: Optional[list[dict]] = None, + sync: bool = False, + channel: Optional[str] = settings.DEFAULT_CHANNEL, + storage_engine: StorageEnum = StorageEnum.storage, ) -> Tuple[ProgramMessage, MessageStatus]: address = address or settings.ADDRESS_TO_USE or self.account.get_address() - volumes = volumes if volumes is not None else [] - memory = memory or settings.DEFAULT_VM_MEMORY - vcpus = vcpus or settings.DEFAULT_VM_VCPUS - timeout_seconds = timeout_seconds or settings.DEFAULT_VM_TIMEOUT - - # TODO: Check that program_ref, runtime and data_ref exist - - # Register the different ways to trigger a VM - if subscriptions: - # Trigger on HTTP calls and on aleph.im message subscriptions. - triggers = { - "http": True, - "persistent": persistent, - "message": subscriptions, - } - else: - # Trigger on HTTP calls. - triggers = {"http": True, "persistent": persistent} - - volumes: List[MachineVolume] = [parse_volume(volume) for volume in volumes] - - content = ProgramContent( - type="vm-function", + content = make_program_content( + program_ref=program_ref, + entrypoint=entrypoint, + runtime=runtime, + metadata=metadata, address=address, + vcpus=vcpus, + memory=memory, + timeout_seconds=timeout_seconds, + internet=internet, + aleph_api=aleph_api, allow_amend=allow_amend, - code=CodeContent( encoding=encoding, - entrypoint=entrypoint, - ref=program_ref, - use_latest=True, - ), - on=triggers, - environment=FunctionEnvironment( - reproducible=False, - internet=internet, - aleph_api=aleph_api, - ), - variables=environment_variables, - resources=MachineResources( - vcpus=vcpus, - memory=memory, - seconds=timeout_seconds, - ), - runtime=FunctionRuntime( - ref=runtime, - use_latest=True, - comment=( - "Official aleph.im runtime" - if runtime == settings.DEFAULT_RUNTIME_ID - else "" - ), - ), - volumes=[parse_volume(volume) for volume in volumes], - time=time.time(), - metadata=metadata, + persistent=persistent, + volumes=volumes, + environment_variables=environment_variables, + subscriptions=subscriptions, ) - # Ensure that the version of aleph-message used supports the field. - assert content.on.persistent == persistent - message, status, _ = await self.submit( content=content.dict(exclude_none=True), message_type=MessageType.program, channel=channel, storage_engine=storage_engine, sync=sync, + raise_on_rejected=False, ) - return message, status # type: ignore + if status in (MessageStatus.PROCESSED, MessageStatus.PENDING): + return message, status # type: ignore + + # get the reason for rejection + rejected_message = await self.get_message_error(message.item_hash) + assert rejected_message, "No rejected message found" + error_code = rejected_message["error_code"] + if error_code == 5: + # not enough balance + details = rejected_message["details"] + errors = details["errors"] + error = errors[0] + account_balance = float(error["account_balance"]) + required_balance = float(error["required_balance"]) + raise InsufficientFundsError( + token_type=TokenType.ALEPH, + required_funds=required_balance, + available_funds=account_balance, + ) + else: + raise ValueError(f"Unknown error code {error_code}: {rejected_message}") async def create_instance( self, From 45760275ec09b3ccc5fe13b2006f9e4c552b2d47 Mon Sep 17 00:00:00 2001 From: philogicae <38438271+philogicae@users.noreply.github.com> Date: Mon, 17 Feb 2025 18:31:08 +0200 Subject: [PATCH 13/16] pytest fixes --- src/aleph/sdk/utils.py | 16 +++++++++------- tests/unit/test_utils.py | 10 +++++++--- 2 files changed, 16 insertions(+), 10 deletions(-) diff --git a/src/aleph/sdk/utils.py b/src/aleph/sdk/utils.py index be0849c6..5cbc1e8c 100644 --- a/src/aleph/sdk/utils.py +++ b/src/aleph/sdk/utils.py @@ -206,15 +206,17 @@ def extended_json_encoder(obj: Any) -> Any: def parse_volume(volume_dict: Union[Mapping, MachineVolume]) -> MachineVolume: - if not any( + if any( isinstance(volume_dict, volume_type) for volume_type in get_args(MachineVolume) ): - for volume_type in get_args(MachineVolume): - try: - return volume_type.parse_obj(volume_dict) - except ValueError as e: - raise ValueError(f"Could not parse volume: {volume_dict}") from e - return volume_dict # type: ignore + return volume_dict # type: ignore + + for volume_type in get_args(MachineVolume): + try: + return volume_type.parse_obj(volume_dict) + except ValueError: + pass + raise ValueError(f"Could not parse volume: {volume_dict}") def compute_sha256(s: str) -> str: diff --git a/tests/unit/test_utils.py b/tests/unit/test_utils.py index bfca23a5..c560455d 100644 --- a/tests/unit/test_utils.py +++ b/tests/unit/test_utils.py @@ -1,5 +1,6 @@ import base64 import datetime +from unittest.mock import MagicMock import pytest as pytest from aleph_message.models import ( @@ -158,6 +159,7 @@ def test_parse_immutable_volume(): def test_parse_ephemeral_volume(): volume_dict = { "comment": "Dummy hash", + "mount": "/opt/data", "ephemeral": True, "size_mib": 1, } @@ -169,6 +171,8 @@ def test_parse_ephemeral_volume(): def test_parse_persistent_volume(): volume_dict = { + "comment": "Dummy hash", + "mount": "/opt/data", "parent": { "ref": "QmX8K1c22WmQBAww5ShWQqwMiFif7XFrJD6iFBj7skQZXW", "use_latest": True, @@ -184,9 +188,9 @@ def test_parse_persistent_volume(): assert isinstance(volume, PersistentVolume) -def test_calculate_firmware_hash(mocker): - mock_path = mocker.Mock( - read_bytes=mocker.Mock(return_value=b"abc"), +def test_calculate_firmware_hash(): + mock_path = MagicMock( + read_bytes=MagicMock(return_value=b"abc"), ) assert ( From 5a586c1939580412f325d590231ba22c9197d97d Mon Sep 17 00:00:00 2001 From: philogicae <38438271+philogicae@users.noreply.github.com> Date: Tue, 18 Feb 2025 11:25:40 +0200 Subject: [PATCH 14/16] Add ubuntu24 for qemu and remove firecracker rootfs for instances --- src/aleph/sdk/conf.py | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/src/aleph/sdk/conf.py b/src/aleph/sdk/conf.py index 4dc7c9e7..90b32527 100644 --- a/src/aleph/sdk/conf.py +++ b/src/aleph/sdk/conf.py @@ -44,27 +44,22 @@ class Settings(BaseSettings): HTTP_REQUEST_TIMEOUT = 15.0 DEFAULT_CHANNEL: str = "ALEPH-CLOUDSOLUTIONS" + + # Firecracker runtime for programs DEFAULT_RUNTIME_ID: str = ( "63f07193e6ee9d207b7d1fcf8286f9aee34e6f12f101d2ec77c1229f92964696" ) - DEBIAN_11_ROOTFS_ID: str = ( - "887957042bb0e360da3485ed33175882ce72a70d79f1ba599400ff4802b7cee7" - ) - DEBIAN_12_ROOTFS_ID: str = ( - "6e30de68c6cedfa6b45240c2b51e52495ac6fb1bd4b36457b3d5ca307594d595" - ) - UBUNTU_22_ROOTFS_ID: str = ( - "77fef271aa6ff9825efa3186ca2e715d19e7108279b817201c69c34cedc74c27" - ) - DEBIAN_11_QEMU_ROOTFS_ID: str = ( - "f7e68c568906b4ebcd3cd3c4bfdff96c489cd2a9ef73ba2d7503f244dfd578de" - ) + + # Qemu rootfs for instances DEBIAN_12_QEMU_ROOTFS_ID: str = ( "b6ff5c3a8205d1ca4c7c3369300eeafff498b558f71b851aa2114afd0a532717" ) UBUNTU_22_QEMU_ROOTFS_ID: str = ( "4a0f62da42f4478544616519e6f5d58adb1096e069b392b151d47c3609492d0c" ) + UBUNTU_24_QEMU_ROOTFS_ID: str = ( + "5330dcefe1857bcd97b7b7f24d1420a7d46232d53f27be280c8a7071d88bd84e" + ) DEFAULT_CONFIDENTIAL_FIRMWARE: str = ( "ba5bb13f3abca960b101a759be162b229e2b7e93ecad9d1307e54de887f177ff" From 6e5225c728f9c5a70f7ca2e06f5f6d6c8c528170 Mon Sep 17 00:00:00 2001 From: philogicae <38438271+philogicae@users.noreply.github.com> Date: Tue, 18 Feb 2025 12:03:00 +0200 Subject: [PATCH 15/16] Fix wrong content on get_estimated_price --- src/aleph/sdk/client/http.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/aleph/sdk/client/http.py b/src/aleph/sdk/client/http.py index 2b1c15b0..f4e8b898 100644 --- a/src/aleph/sdk/client/http.py +++ b/src/aleph/sdk/client/http.py @@ -462,8 +462,11 @@ async def get_estimated_price( self, content: ExecutableContent, ) -> PriceResponse: + cleaned_content = content.dict(exclude_none=True) item_content: str = json.dumps( - content, separators=(",", ":"), default=extended_json_encoder + cleaned_content, + separators=(",", ":"), + default=extended_json_encoder, ) message = parse_message( dict( @@ -474,7 +477,7 @@ async def get_estimated_price( if isinstance(content, ProgramContent) else MessageType.instance ), - content=content.dict(exclude_none=True), + content=cleaned_content, item_content=item_content, time=time.time(), channel=settings.DEFAULT_CHANNEL, From 0b03df250ea869013ea8185822b13329705fbb59 Mon Sep 17 00:00:00 2001 From: philogicae <38438271+philogicae@users.noreply.github.com> Date: Tue, 18 Feb 2025 18:12:48 +0200 Subject: [PATCH 16/16] Add CRN_URL_FOR_PROGRAMS to conf --- src/aleph/sdk/conf.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/aleph/sdk/conf.py b/src/aleph/sdk/conf.py index 90b32527..c925a05e 100644 --- a/src/aleph/sdk/conf.py +++ b/src/aleph/sdk/conf.py @@ -81,6 +81,7 @@ class Settings(BaseSettings): VM_URL_PATH = "https://aleph.sh/vm/{hash}" VM_URL_HOST = "https://{hash_base32}.aleph.sh" IPFS_GATEWAY = "https://ipfs.aleph.cloud/ipfs/" + CRN_URL_FOR_PROGRAMS = "https://dchq.staging.aleph.sh/" # Web3Provider settings TOKEN_DECIMALS = 18