diff --git a/.env.example b/.env.example new file mode 100644 index 00000000..51b7a6a0 --- /dev/null +++ b/.env.example @@ -0,0 +1,5 @@ +# To modify aleph.sdk.conf's settings, create a .env file and add: +# ALEPH_= +# To modify active & rpc fields in CHAINS, follow this example: +# ALEPH_CHAINS_SEPOLIA_ACTIVE=True +# ALEPH_CHAINS_SEPOLIA_RPC=https://... \ No newline at end of file diff --git a/.gitignore b/.gitignore index 6f59cd09..d8507411 100644 --- a/.gitignore +++ b/.gitignore @@ -46,9 +46,12 @@ cover/* MANIFEST # Per-project virtualenvs -.env .venv*/ venv/* **/device.key +# Environment variables +.env +.env.local + .gitsigners diff --git a/src/aleph_client/commands/account.py b/src/aleph_client/commands/account.py index 68b95637..6b63a877 100644 --- a/src/aleph_client/commands/account.py +++ b/src/aleph_client/commands/account.py @@ -2,7 +2,6 @@ import asyncio import base64 -import json import logging import sys from pathlib import Path @@ -13,7 +12,7 @@ from aleph.sdk.account import _load_account from aleph.sdk.chains.common import generate_key from aleph.sdk.chains.ethereum import ETHAccount -from aleph.sdk.conf import settings as sdk_settings +from aleph.sdk.conf import settings from aleph.sdk.types import AccountFromPrivateKey from typer.colors import RED @@ -37,7 +36,7 @@ def create( setup_logging(debug) if private_key_file is None: - private_key_file = Path(typer.prompt("Enter file in which to save the key", sdk_settings.PRIVATE_KEY_FILE)) + private_key_file = Path(typer.prompt("Enter file in which to save the key", settings.PRIVATE_KEY_FILE)) if private_key_file.exists() and not replace: typer.secho(f"Error: key already exists: '{private_key_file}'", fg=RED) @@ -63,8 +62,8 @@ def create( @app.command() def address( - private_key: Optional[str] = typer.Option(sdk_settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY), - private_key_file: Optional[Path] = typer.Option(sdk_settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE), + private_key: Optional[str] = typer.Option(settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY), + private_key_file: Optional[Path] = typer.Option(settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE), ): """ Display your public address. @@ -82,8 +81,8 @@ def address( @app.command() def export_private_key( - private_key: Optional[str] = typer.Option(sdk_settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY), - private_key_file: Optional[Path] = typer.Option(sdk_settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE), + private_key: Optional[str] = typer.Option(settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY), + private_key_file: Optional[Path] = typer.Option(settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE), ): """ Display your private key. @@ -105,15 +104,15 @@ def export_private_key( @app.command() def path(): - if sdk_settings.PRIVATE_KEY_FILE: - typer.echo(sdk_settings.PRIVATE_KEY_FILE) + if settings.PRIVATE_KEY_FILE: + typer.echo(settings.PRIVATE_KEY_FILE) @app.command("sign-bytes") def sign_bytes( message: Optional[str] = typer.Option(None, help="Message to sign"), - private_key: Optional[str] = typer.Option(sdk_settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY), - private_key_file: Optional[Path] = typer.Option(sdk_settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE), + private_key: Optional[str] = typer.Option(settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY), + private_key_file: Optional[Path] = typer.Option(settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE), debug: bool = False, ): """Sign a message using your private key.""" @@ -134,8 +133,8 @@ def sign_bytes( @app.command() async def balance( address: Optional[str] = typer.Option(None, help="Address"), - private_key: Optional[str] = typer.Option(sdk_settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY), - private_key_file: Optional[Path] = typer.Option(sdk_settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE), + private_key: Optional[str] = typer.Option(settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY), + private_key_file: Optional[Path] = typer.Option(settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE), ): account: AccountFromPrivateKey = _load_account(private_key, private_key_file) @@ -143,7 +142,7 @@ async def balance( address = account.get_address() if address: - uri = f"{sdk_settings.API_HOST}/api/v0/addresses/{address}/balance" + uri = f"{settings.API_HOST}/api/v0/addresses/{address}/balance" async with aiohttp.ClientSession() as session: response = await session.get(uri) diff --git a/src/aleph_client/commands/aggregate.py b/src/aleph_client/commands/aggregate.py index 2050ebf4..5b1cd6f1 100644 --- a/src/aleph_client/commands/aggregate.py +++ b/src/aleph_client/commands/aggregate.py @@ -7,7 +7,7 @@ import typer from aleph.sdk.account import _load_account from aleph.sdk.client import AuthenticatedAlephHttpClient -from aleph.sdk.conf import settings as sdk_settings +from aleph.sdk.conf import settings from aleph.sdk.query.filters import MessageFilter from aleph.sdk.types import AccountFromPrivateKey from aleph.sdk.utils import extended_json_encoder @@ -15,7 +15,6 @@ from aleph_client.commands import help_strings from aleph_client.commands.utils import setup_logging -from aleph_client.conf import settings from aleph_client.utils import AsyncTyper app = AsyncTyper(no_args_is_help=True) @@ -26,8 +25,8 @@ async def forget( key: str = typer.Argument(..., help="Aggregate item hash to be removed."), reason: Optional[str] = typer.Option(None, help="A description of why the messages are being forgotten"), channel: Optional[str] = typer.Option(default=settings.DEFAULT_CHANNEL, help=help_strings.CHANNEL), - private_key: Optional[str] = typer.Option(sdk_settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY), - private_key_file: Optional[Path] = typer.Option(sdk_settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE), + private_key: Optional[str] = typer.Option(settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY), + private_key_file: Optional[Path] = typer.Option(settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE), debug: bool = False, ): """Forget all the messages composing an aggregate.""" @@ -36,7 +35,7 @@ async def forget( account: AccountFromPrivateKey = _load_account(private_key, private_key_file) - async with AuthenticatedAlephHttpClient(account=account, api_server=sdk_settings.API_HOST) as client: + async with AuthenticatedAlephHttpClient(account=account, api_server=settings.API_HOST) as client: message_response = await client.get_messages( message_filter=MessageFilter( addresses=[account.get_address()], @@ -57,8 +56,8 @@ async def post( channel: Optional[str] = typer.Option(default=settings.DEFAULT_CHANNEL, help=help_strings.CHANNEL), inline: bool = typer.Option(False, help="inline"), sync: bool = typer.Option(False, help="Sync response"), - private_key: Optional[str] = typer.Option(sdk_settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY), - private_key_file: Optional[Path] = typer.Option(sdk_settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE), + private_key: Optional[str] = typer.Option(settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY), + private_key_file: Optional[Path] = typer.Option(settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE), debug: bool = False, ): """Create or Update aggregate""" @@ -73,7 +72,7 @@ async def post( typer.echo("Invalid JSON for content. Please provide valid JSON.") raise typer.Exit(1) - async with AuthenticatedAlephHttpClient(account=account, api_server=sdk_settings.API_HOST) as client: + async with AuthenticatedAlephHttpClient(account=account, api_server=settings.API_HOST) as client: message, _ = await client.create_aggregate( key=key, content=content_dict, @@ -90,8 +89,8 @@ async def post( async def get( key: str = typer.Argument(..., help="Aggregate key to be fetched."), address: Optional[str] = typer.Option(default=None, help="Address"), - private_key: Optional[str] = typer.Option(sdk_settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY), - private_key_file: Optional[Path] = typer.Option(sdk_settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE), + private_key: Optional[str] = typer.Option(settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY), + private_key_file: Optional[Path] = typer.Option(settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE), debug: bool = False, ): """Fetch an aggregate by key and content.""" @@ -103,7 +102,7 @@ async def get( # if no address we load current account as a private key address = account.get_address() if address is None else address - async with AuthenticatedAlephHttpClient(account=account, api_server=sdk_settings.API_HOST) as client: + async with AuthenticatedAlephHttpClient(account=account, api_server=settings.API_HOST) as client: aggregates = await client.fetch_aggregate(address=address, key=key) if aggregates: diff --git a/src/aleph_client/commands/domain.py b/src/aleph_client/commands/domain.py index d38ffade..c4328241 100644 --- a/src/aleph_client/commands/domain.py +++ b/src/aleph_client/commands/domain.py @@ -7,7 +7,7 @@ import typer from aleph.sdk.account import _load_account from aleph.sdk.client import AlephHttpClient, AuthenticatedAlephHttpClient -from aleph.sdk.conf import settings as sdk_settings +from aleph.sdk.conf import settings from aleph.sdk.domain import ( DomainValidator, Hostname, @@ -32,7 +32,7 @@ async def get_aggregate_domain_info(account, fqdn): - async with AlephHttpClient(api_server=sdk_settings.API_HOST) as client: + async with AlephHttpClient(api_server=settings.API_HOST) as client: aggregates = await client.get_messages( message_filter=MessageFilter( addresses=[str(account.get_address())], @@ -105,7 +105,7 @@ async def attach_resource( if (not interactive) or Confirm.ask("Continue"): """Create aggregate message""" - async with AuthenticatedAlephHttpClient(account=account, api_server=sdk_settings.API_HOST) as client: + async with AuthenticatedAlephHttpClient(account=account, api_server=settings.API_HOST) as client: options: Optional[Dict] = None if catch_all_path and catch_all_path.startswith("/"): @@ -158,7 +158,7 @@ async def detach_resource(account: AccountFromPrivateKey, fqdn: Hostname, intera if (not interactive) or Confirm.ask("Continue"): """Update aggregate message""" - async with AuthenticatedAlephHttpClient(account=account, api_server=sdk_settings.API_HOST) as client: + async with AuthenticatedAlephHttpClient(account=account, api_server=settings.API_HOST) as client: aggregate_content = {str(fqdn): None} aggregate_message, message_status = await client.create_aggregate( @@ -173,8 +173,8 @@ async def detach_resource(account: AccountFromPrivateKey, fqdn: Hostname, intera @app.command() async def add( - private_key: Optional[str] = typer.Option(sdk_settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY), - private_key_file: Optional[Path] = typer.Option(sdk_settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE), + private_key: Optional[str] = typer.Option(settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY), + private_key_file: Optional[Path] = typer.Option(settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE), fqdn: str = typer.Argument(..., help=help_strings.CUSTOM_DOMAIN_NAME), target: Optional[TargetType] = typer.Option(None, help=help_strings.CUSTOM_DOMAIN_TARGET_TYPES), item_hash: Optional[str] = typer.Option(None, help=help_strings.CUSTOM_DOMAIN_ITEM_HASH), @@ -257,8 +257,8 @@ async def add( @app.command() async def attach( - private_key: Optional[str] = typer.Option(sdk_settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY), - private_key_file: Optional[Path] = typer.Option(sdk_settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE), + private_key: Optional[str] = typer.Option(settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY), + private_key_file: Optional[Path] = typer.Option(settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE), fqdn: str = typer.Argument(..., help=help_strings.CUSTOM_DOMAIN_NAME), item_hash: Optional[str] = typer.Option(None, help=help_strings.CUSTOM_DOMAIN_ITEM_HASH), catch_all_path: str = typer.Option(default=None, help=help_strings.IPFS_CATCH_ALL_PATH), @@ -279,8 +279,8 @@ async def attach( @app.command() async def detach( - private_key: Optional[str] = typer.Option(sdk_settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY), - private_key_file: Optional[Path] = typer.Option(sdk_settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE), + private_key: Optional[str] = typer.Option(settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY), + private_key_file: Optional[Path] = typer.Option(settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE), fqdn: str = typer.Argument(..., help=help_strings.CUSTOM_DOMAIN_NAME), ask: bool = typer.Option(default=True, help=help_strings.ASK_FOR_CONFIRMATION), ): @@ -293,8 +293,8 @@ async def detach( @app.command() async def info( - private_key: Optional[str] = typer.Option(sdk_settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY), - private_key_file: Optional[Path] = typer.Option(sdk_settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE), + private_key: Optional[str] = typer.Option(settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY), + private_key_file: Optional[Path] = typer.Option(settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE), fqdn: str = typer.Argument(..., help=help_strings.CUSTOM_DOMAIN_NAME), ): """Show Custom Domain Details.""" diff --git a/src/aleph_client/commands/files.py b/src/aleph_client/commands/files.py index ddf61c1e..095202b8 100644 --- a/src/aleph_client/commands/files.py +++ b/src/aleph_client/commands/files.py @@ -10,7 +10,7 @@ import typer from aleph.sdk import AlephHttpClient, AuthenticatedAlephHttpClient from aleph.sdk.account import _load_account -from aleph.sdk.conf import settings as sdk_settings +from aleph.sdk.conf import settings from aleph.sdk.types import AccountFromPrivateKey, StorageEnum from aleph_message.models import ItemHash, StoreMessage from aleph_message.status import MessageStatus @@ -21,7 +21,6 @@ from aleph_client.commands import help_strings from aleph_client.commands.utils import setup_logging -from aleph_client.conf import settings from aleph_client.utils import AsyncTyper logger = logging.getLogger(__name__) @@ -32,8 +31,8 @@ async def pin( item_hash: str = typer.Argument(..., help="IPFS hash to pin on aleph.im"), channel: Optional[str] = typer.Option(default=settings.DEFAULT_CHANNEL, help=help_strings.CHANNEL), - private_key: Optional[str] = typer.Option(sdk_settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY), - private_key_file: Optional[Path] = typer.Option(sdk_settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE), + private_key: Optional[str] = typer.Option(settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY), + private_key_file: Optional[Path] = typer.Option(settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE), ref: Optional[str] = typer.Option(None, help=help_strings.REF), debug: bool = False, ): @@ -43,7 +42,7 @@ async def pin( account: AccountFromPrivateKey = _load_account(private_key, private_key_file) - async with AuthenticatedAlephHttpClient(account=account, api_server=sdk_settings.API_HOST) as client: + async with AuthenticatedAlephHttpClient(account=account, api_server=settings.API_HOST) as client: result: StoreMessage status: MessageStatus result, status = await client.create_store( @@ -60,8 +59,8 @@ async def pin( async def upload( path: Path = typer.Argument(..., help="Path of the file to upload"), channel: Optional[str] = typer.Option(default=settings.DEFAULT_CHANNEL, help=help_strings.CHANNEL), - private_key: Optional[str] = typer.Option(sdk_settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY), - private_key_file: Optional[Path] = typer.Option(sdk_settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE), + private_key: Optional[str] = typer.Option(settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY), + private_key_file: Optional[Path] = typer.Option(settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE), ref: Optional[str] = typer.Option(None, help=help_strings.REF), debug: bool = False, ): @@ -71,7 +70,7 @@ async def upload( account: AccountFromPrivateKey = _load_account(private_key, private_key_file) - async with AuthenticatedAlephHttpClient(account=account, api_server=sdk_settings.API_HOST) as client: + async with AuthenticatedAlephHttpClient(account=account, api_server=settings.API_HOST) as client: if not path.is_file(): typer.echo(f"Error: File not found: '{path}'") raise typer.Exit(code=1) @@ -115,7 +114,7 @@ async def download( output_file_path = output_path / f"{file_name}{file_extension}" - async with AlephHttpClient(api_server=sdk_settings.API_HOST) as client: + async with AlephHttpClient(api_server=settings.API_HOST) as client: logger.info(f"Downloading {hash} ...") with open(output_file_path, "wb") as fd: if not use_ipfs: @@ -131,8 +130,8 @@ async def forget( item_hash: str = typer.Argument(..., help="Hash to forget"), reason: str = typer.Argument("User deletion", help="reason to forget"), channel: Optional[str] = typer.Option(default=settings.DEFAULT_CHANNEL, help=help_strings.CHANNEL), - private_key: Optional[str] = typer.Option(sdk_settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY), - private_key_file: Optional[Path] = typer.Option(sdk_settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE), + private_key: Optional[str] = typer.Option(settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY), + private_key_file: Optional[Path] = typer.Option(settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE), debug: bool = False, ): """forget a file and his message on aleph.im.""" @@ -141,7 +140,7 @@ async def forget( account: AccountFromPrivateKey = _load_account(private_key, private_key_file) - async with AuthenticatedAlephHttpClient(account=account, api_server=sdk_settings.API_HOST) as client: + async with AuthenticatedAlephHttpClient(account=account, api_server=settings.API_HOST) as client: value = await client.forget(hashes=[ItemHash(item_hash)], reason=reason, channel=channel) typer.echo(f"{value[0].json(indent=4)}") @@ -208,8 +207,8 @@ def _show_files(files_data: dict) -> None: @app.command() async def list( address: Optional[str] = typer.Option(None, help="Address"), - private_key: Optional[str] = typer.Option(sdk_settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY), - private_key_file: Optional[Path] = typer.Option(sdk_settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE), + private_key: Optional[str] = typer.Option(settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY), + private_key_file: Optional[Path] = typer.Option(settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE), pagination: int = typer.Option(100, help="Maximum number of files to return."), page: int = typer.Option(1, help="Offset in pages."), sort_order: int = typer.Option( @@ -228,7 +227,7 @@ async def list( # Build the query parameters query_params = GetAccountFilesQueryParams(pagination=pagination, page=page, sort_order=sort_order) - uri = f"{sdk_settings.API_HOST}/api/v0/addresses/{address}/files" + uri = f"{settings.API_HOST}/api/v0/addresses/{address}/files" async with aiohttp.ClientSession() as session: response = await session.get(uri, params=query_params.dict()) if response.status == 200: diff --git a/src/aleph_client/commands/help_strings.py b/src/aleph_client/commands/help_strings.py index 9ba18416..f8502371 100644 --- a/src/aleph_client/commands/help_strings.py +++ b/src/aleph_client/commands/help_strings.py @@ -19,7 +19,7 @@ Example: --immutable-volume ref=25a393222692c2f73489dc6710ae87605a96742ceef7b91de4d7ec34bb688d94,mount=/lib/python3.8/site-packages""" ASK_FOR_CONFIRMATION = "Prompt user for confirmation" IPFS_CATCH_ALL_PATH = "Choose a relative path to catch all unmatched route or a 404 error" -PAYMENT_TYPE = "Payment method, either holding tokens or Pay-As-You-Go via token streaming" +PAYMENT_TYPE = "Payment method, either holding tokens, NFTs, or Pay-As-You-Go via token streaming" HYPERVISOR = "Hypervisor to use to launch your instance. Defaults to QEMU" INSTANCE_NAME = "Name of your new instance" ROOTFS = ( @@ -41,7 +41,7 @@ KEEP_SESSION = "Keeping the already initiated session" VM_SECRET = "Secret password to start the VM" CRN_URL_VM_DELETION = "Domain of the CRN where an associated VM is running. It ensures your VM will be stopped and erased on the CRN before the instance message is actually deleted" -VM_ID = "If provided, skip the instance creation" +VM_ID = "Item hash of your VM. If provided, skip the instance creation, else create a new one" VM_NOT_READY = "VM not initialized/started" VM_SCHEDULED = "VM scheduled but not available yet" VM_NOT_AVAILABLE_YET = "VM not available yet" diff --git a/src/aleph_client/commands/instance/__init__.py b/src/aleph_client/commands/instance/__init__.py index 797a149c..050d0e00 100644 --- a/src/aleph_client/commands/instance/__init__.py +++ b/src/aleph_client/commands/instance/__init__.py @@ -5,20 +5,19 @@ import logging import shutil from decimal import Decimal -from ipaddress import IPv6Interface from math import ceil from pathlib import Path -from typing import Dict, List, Optional, Tuple, Union, cast +from typing import List, Optional, Tuple, Union, cast import aiohttp import typer -from aiohttp import ClientConnectorError, ClientResponseError, ClientSession from aleph.sdk import AlephHttpClient, AuthenticatedAlephHttpClient from aleph.sdk.account import _load_account from aleph.sdk.chains.ethereum import ETHAccount from aleph.sdk.client.vm_client import VmClient from aleph.sdk.client.vm_confidential_client import VmConfidentialClient -from aleph.sdk.conf import settings as sdk_settings +from aleph.sdk.conf import settings +from aleph.sdk.evm_utils import get_chains_with_super_token from aleph.sdk.exceptions import ( ForgottenMessageError, InsufficientFundsError, @@ -47,22 +46,26 @@ from aleph_client.commands import help_strings from aleph_client.commands.instance.display import CRNTable -from aleph_client.commands.instance.network import fetch_crn_info, sanitize_url +from aleph_client.commands.instance.network import ( + fetch_crn_info, + fetch_vm_info, + find_crn_of_vm, + sanitize_url, +) +from aleph_client.commands.instance.superfluid import FlowUpdate, update_flow from aleph_client.commands.node import NodeInfo, _fetch_nodes from aleph_client.commands.utils import ( + filter_only_valid_messages, get_or_prompt_volumes, + safe_getattr, setup_logging, validated_int_prompt, validated_prompt, wait_for_confirmed_flow, wait_for_processed_instance, ) -from aleph_client.conf import settings from aleph_client.models import CRNInfo -from aleph_client.utils import AsyncTyper, fetch_json - -from ..utils import has_nested_attr -from .superfluid import FlowUpdate, update_flow +from aleph_client.utils import AsyncTyper logger = logging.getLogger(__name__) app = AsyncTyper(no_args_is_help=True) @@ -70,16 +73,23 @@ @app.command() async def create( - payment_type: PaymentType = typer.Option(None, help=help_strings.PAYMENT_TYPE), - payment_chain: Chain = typer.Option(None, help=help_strings.PAYMENT_CHAIN), - hypervisor: HypervisorType = typer.Option(None, help=help_strings.HYPERVISOR), + payment_type: Optional[str] = typer.Option( + None, + help=help_strings.PAYMENT_TYPE, + callback=lambda pt: None if pt is None else PaymentType.hold if pt == "nft" else PaymentType(pt), + metavar=f"[{'|'.join(PaymentType)}|nft]", + ), + payment_chain: Optional[Chain] = typer.Option( + None, help=help_strings.PAYMENT_CHAIN, metavar=f"[{'|'.join([Chain.ETH, Chain.AVAX, Chain.BASE])}]" + ), + hypervisor: Optional[HypervisorType] = typer.Option(None, help=help_strings.HYPERVISOR), name: Optional[str] = typer.Option(None, help=help_strings.INSTANCE_NAME), - rootfs: str = typer.Option(None, help=help_strings.ROOTFS), - rootfs_size: int = typer.Option(None, help=help_strings.ROOTFS_SIZE), - vcpus: int = typer.Option(None, help=help_strings.VCPUS), - memory: int = typer.Option(None, help=help_strings.MEMORY), + rootfs: Optional[str] = typer.Option(None, help=help_strings.ROOTFS), + rootfs_size: Optional[int] = typer.Option(None, help=help_strings.ROOTFS_SIZE), + vcpus: Optional[int] = typer.Option(None, help=help_strings.VCPUS), + memory: Optional[int] = typer.Option(None, help=help_strings.MEMORY), timeout_seconds: float = typer.Option( - sdk_settings.DEFAULT_VM_TIMEOUT, + settings.DEFAULT_VM_TIMEOUT, help=help_strings.TIMEOUT_SECONDS, ), ssh_pubkey_file: Path = typer.Option( @@ -100,8 +110,8 @@ async def create( help=help_strings.IMMUTABLE_VOLUME, ), channel: Optional[str] = typer.Option(default=settings.DEFAULT_CHANNEL, help=help_strings.CHANNEL), - private_key: Optional[str] = typer.Option(sdk_settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY), - private_key_file: Optional[Path] = typer.Option(sdk_settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE), + private_key: Optional[str] = typer.Option(settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY), + private_key_file: Optional[Path] = typer.Option(settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE), print_messages: bool = typer.Option(False), verbose: bool = typer.Option(True), debug: bool = False, @@ -133,17 +143,15 @@ def validate_ssh_pubkey_file(file: Union[str, Path]) -> Path: account: AccountFromPrivateKey = _load_account(private_key, private_key_file) if payment_type is None: - payment_type = PaymentType( - Prompt.ask( - "Which payment type do you want to use?", - choices=[ptype.value for ptype in PaymentType], - default=PaymentType.superfluid.value, - ) + payment_type = Prompt.ask( + "Which payment type do you want to use?", + choices=[ptype.value for ptype in PaymentType] + ["nft"], + default=PaymentType.superfluid.value, ) + payment_type = PaymentType.hold if payment_type == "nft" else PaymentType(payment_type) is_stream = payment_type != PaymentType.hold - # super_token_chains = get_chains_with_super_token() - super_token_chains = [Chain.AVAX.value] + super_token_chains = get_chains_with_super_token() if is_stream: if payment_chain is None or payment_chain not in super_token_chains: payment_chain = Chain( @@ -205,7 +213,7 @@ def validate_ssh_pubkey_file(file: Union[str, Path]) -> Path: elif not rootfs or rootfs not in os_choices: rootfs = Prompt.ask( "Use a custom rootfs or one of the following prebuilt ones:", - default=rootfs, + default="ubuntu22", choices=[*os_choices, "custom"], ) @@ -218,7 +226,7 @@ def validate_ssh_pubkey_file(file: Union[str, Path]) -> Path: rootfs = os_choices[rootfs] # Validate rootfs message exist - async with AlephHttpClient(api_server=sdk_settings.API_HOST) as client: + async with AlephHttpClient(api_server=settings.API_HOST) as client: rootfs_message: StoreMessage = await client.get_message(item_hash=rootfs, message_type=StoreMessage) if not rootfs_message: echo("Given rootfs volume does not exist on aleph.im") @@ -229,7 +237,7 @@ def validate_ssh_pubkey_file(file: Union[str, Path]) -> Path: # Validate confidential firmware message exist confidential_firmware_as_hash = None if confidential: - async with AlephHttpClient(api_server=sdk_settings.API_HOST) as client: + async with AlephHttpClient(api_server=settings.API_HOST) as client: confidential_firmware_as_hash = ItemHash(confidential_firmware) firmware_message: StoreMessage = await client.get_message( item_hash=confidential_firmware, message_type=StoreMessage @@ -243,7 +251,7 @@ def validate_ssh_pubkey_file(file: Union[str, Path]) -> Path: "Disk size in MiB", default=settings.DEFAULT_ROOTFS_SIZE, min_value=10_240, max_value=102_400 ) vcpus = vcpus or validated_int_prompt( - "Number of virtual cpus to allocate", default=sdk_settings.DEFAULT_VM_VCPUS, min_value=1, max_value=4 + "Number of virtual cpus to allocate", default=settings.DEFAULT_VM_VCPUS, min_value=1, max_value=4 ) memory = memory or validated_int_prompt( "Maximum memory allocation on vm in MiB", @@ -310,18 +318,18 @@ def validate_ssh_pubkey_file(file: Union[str, Path]) -> Path: continue if crn: - stream_reward_address = crn.stream_reward_address if has_nested_attr(crn, "stream_reward_address") else "" + stream_reward_address = crn.stream_reward_address if hasattr(crn, "stream_reward_address") else "" if is_stream and not stream_reward_address: echo("Selected CRN does not have a defined receiver address.") raise typer.Exit(1) - if is_qemu and (not has_nested_attr(crn, "qemu_support") or not crn.qemu_support): + if is_qemu and (not hasattr(crn, "qemu_support") or not crn.qemu_support): echo("Selected CRN does not support QEMU hypervisor.") raise typer.Exit(1) - if confidential and (not has_nested_attr(crn, "confidential_computing") or not crn.confidential_computing): + if confidential and (not hasattr(crn, "confidential_computing") or not crn.confidential_computing): echo("Selected CRN does not support confidential computing.") raise typer.Exit(1) - async with AuthenticatedAlephHttpClient(account=account, api_server=sdk_settings.API_HOST) as client: + async with AuthenticatedAlephHttpClient(account=account, api_server=settings.API_HOST) as client: payment = Payment( chain=payment_chain, receiver=stream_reward_address if stream_reward_address else None, @@ -402,7 +410,7 @@ def validate_ssh_pubkey_file(file: Union[str, Path]) -> Path: status, result = await crn_client.start_instance(vm_id=item_hash) logger.debug(status, result) if int(status) != 200: - echo(f"Could not start instance {item_hash} on CRN.") + echo(f"Could not allocate instance {item_hash} on CRN.") return item_hash, crn_url console.print(f"Your instance {item_hash_text} has been deployed on aleph.im.") if verbose: @@ -417,30 +425,23 @@ def validate_ssh_pubkey_file(file: Union[str, Path]) -> Path: ) # All confidential instances else: - crn_url_text = Text(crn.url, style="blue") console.print( "\n\nInitialize a confidential session using:\n\n", Text.assemble( " aleph instance confidential-init-session ", item_hash_text, - " ", - crn_url_text, style="italic", ), "\n\nThen start it using:\n\n", Text.assemble( " aleph instance confidential-start ", item_hash_text, - " ", - crn_url_text, style="italic", ), "\n\nOr just use the all-in-one command:\n\n", Text.assemble( " aleph instance confidential ", item_hash_text, - " ", - crn_url_text, "\n", style="italic", ), @@ -466,9 +467,9 @@ def validate_ssh_pubkey_file(file: Union[str, Path]) -> Path: async def delete( item_hash: str = typer.Argument(..., help="Instance item hash to forget"), reason: str = typer.Option("User deletion", help="Reason for deleting the instance"), - crn_url: str = typer.Option(None, help=help_strings.CRN_URL_VM_DELETION), - private_key: Optional[str] = sdk_settings.PRIVATE_KEY_STRING, - private_key_file: Optional[Path] = sdk_settings.PRIVATE_KEY_FILE, + crn_url: Optional[str] = typer.Option(None, help=help_strings.CRN_URL_VM_DELETION), + private_key: Optional[str] = settings.PRIVATE_KEY_STRING, + private_key_file: Optional[Path] = settings.PRIVATE_KEY_FILE, print_message: bool = typer.Option(False), debug: bool = False, ): @@ -477,7 +478,7 @@ async def delete( setup_logging(debug) account = _load_account(private_key, private_key_file) - async with AuthenticatedAlephHttpClient(account=account, api_server=sdk_settings.API_HOST) as client: + async with AuthenticatedAlephHttpClient(account=account, api_server=settings.API_HOST) as client: try: existing_message: InstanceMessage = await client.get_message( item_hash=ItemHash(item_hash), message_type=InstanceMessage @@ -493,25 +494,17 @@ async def delete( echo("You are not the owner of this instance") raise typer.Exit(code=1) - # Check for streaming payment and eventually stop it + # If PAYG, retrieve flow price payment: Optional[Payment] = existing_message.content.payment - if payment is not None and payment.type == PaymentType.superfluid: - price: PriceResponse = await client.get_program_price(item_hash) - if payment.receiver is not None: - if isinstance(account, ETHAccount): - account.switch_chain(payment.chain) - if account.superfluid_connector: - flow_hash = await update_flow( - account, payment.receiver, Decimal(price.required_tokens), FlowUpdate.REDUCE - ) - if flow_hash: - echo(f"Flow {flow_hash} has been deleted.") + price: Optional[PriceResponse] = None + if safe_getattr(payment, "type") == PaymentType.superfluid: + price = await client.get_program_price(item_hash) # Check status of the instance and eventually erase associated VM node_list: NodeInfo = await _fetch_nodes() - _, details = await _get_instance_details(existing_message, node_list) - auto_scheduled = details["allocation_type"] == help_strings.ALLOCATION_AUTO - crn_url = str(details["crn_url"]) + _, info = await fetch_vm_info(existing_message, node_list) + auto_scheduled = info["allocation_type"] == help_strings.ALLOCATION_AUTO + crn_url = str(info["crn_url"]) if not auto_scheduled and crn_url: try: status = await erase(item_hash, crn_url, private_key, private_key_file, True, debug) @@ -522,86 +515,33 @@ async def delete( else: echo(f"Instance {item_hash} was auto-scheduled, VM will be erased automatically.") + # Check for streaming payment and eventually stop it + if payment and payment.type == PaymentType.superfluid and payment.receiver and isinstance(account, ETHAccount): + account.switch_chain(payment.chain) + if account.superfluid_connector and price: + flow_hash = await update_flow( + account, payment.receiver, Decimal(price.required_tokens), FlowUpdate.REDUCE + ) + if flow_hash: + echo(f"Flow {flow_hash} has been deleted.") + message, status = await client.forget(hashes=[ItemHash(item_hash)], reason=reason) if print_message: echo(f"{message.json(indent=4)}") echo(f"Instance {item_hash} has been deleted.") -async def _get_instance_details(message: InstanceMessage, node_list: NodeInfo) -> Tuple[str, Dict[str, object]]: - async with ClientSession() as session: - hold = not message.content.payment or message.content.payment.type == PaymentType["hold"] - confidential = ( - has_nested_attr(message.content, "environment", "trusted_execution", "firmware") - and len(getattr(message.content.environment.trusted_execution, "firmware")) == 64 - ) - details = dict( - payment="hold\t " if hold else str(getattr(message.content.payment, "type").value), - confidential=confidential, - allocation_type="", - ipv6_logs="", - crn_url="", - ) - try: - # Fetch from the scheduler API directly if no payment or no receiver (hold-tier non-confidential) - if hold and not confidential: - try: - details["allocation_type"] = help_strings.ALLOCATION_AUTO - allocation = await fetch_json( - session, - f"https://scheduler.api.aleph.cloud/api/v0/allocation/{message.item_hash}", - ) - nodes = await fetch_json( - session, - "https://scheduler.api.aleph.cloud/api/v0/nodes", - ) - details["ipv6_logs"] = allocation["vm_ipv6"] - for node in nodes["nodes"]: - if node["ipv6"].split("::")[0] == ":".join(str(details["ipv6_logs"]).split(":")[:4]): - details["crn_url"] = node["url"].rstrip("/") - return message.item_hash, details - except: - details["ipv6_logs"] = help_strings.VM_SCHEDULED - details["crn_url"] = help_strings.CRN_PENDING - else: - # Fetch from the CRN API if PAYG-tier or confidential - details["allocation_type"] = help_strings.ALLOCATION_MANUAL - for node in node_list.nodes: - if ( - has_nested_attr(message.content, "payment", "receiver") - and node["stream_reward"] == getattr(message.content.payment, "receiver") - ) or ( - has_nested_attr(message.content, "requirements", "node", "node_hash") - and message.content.requirements is not None - and node["hash"] == getattr(message.content.requirements.node, "node_hash") - ): - details["crn_url"] = node["address"].rstrip("/") - path = f"{node['address'].rstrip('/')}/about/executions/list" - - executions = await fetch_json(session, path) - if message.item_hash in executions: - interface = IPv6Interface(executions[message.item_hash]["networking"]["ipv6"]) - details["ipv6_logs"] = str(interface.ip + 1) - return message.item_hash, details - details["ipv6_logs"] = help_strings.VM_NOT_READY if confidential else help_strings.VM_NOT_AVAILABLE_YET - except (ClientResponseError, ClientConnectorError) as e: - details["ipv6_logs"] = f"Not available. Server error: {e}" - return message.item_hash, details - - async def _show_instances(messages: List[InstanceMessage], node_list: NodeInfo): table = Table(box=box.SIMPLE_HEAVY) table.add_column(f"Instances [{len(messages)}]", style="blue", overflow="fold") table.add_column("Specifications", style="magenta") table.add_column("Logs", style="blue", overflow="fold") - scheduler_responses = dict( - await asyncio.gather(*[_get_instance_details(message, node_list) for message in messages]) - ) + scheduler_responses = dict(await asyncio.gather(*[fetch_vm_info(message, node_list) for message in messages])) uninitialized_confidential_found = False for message in messages: - resp = scheduler_responses[message.item_hash] - if resp["ipv6_logs"] == help_strings.VM_NOT_READY: + info = scheduler_responses[message.item_hash] + if info["ipv6_logs"] == help_strings.VM_NOT_READY: uninitialized_confidential_found = True name = Text( ( @@ -613,47 +553,50 @@ async def _show_instances(messages: List[InstanceMessage], node_list: NodeInfo): ), style="orchid", ) - item_hash_link = Text.from_markup( - f"[link={sdk_settings.API_HOST}/api/v0/messages/{message.item_hash}]{message.item_hash}[/link]", - style="bright_cyan", - ) + link = f"https://explorer.aleph.im/address/ETH/{message.sender}/message/INSTANCE/{message.item_hash}" + # link = f"{settings.API_HOST}/api/v0/messages/{message.item_hash}" + item_hash_link = Text.from_markup(f"[link={link}]{message.item_hash}[/link]", style="bright_cyan") payment = Text.assemble( "Payment: ", - Text(str(resp["payment"]).capitalize(), style="red" if resp["payment"] != PaymentType.hold else "orange3"), + Text( + str(info["payment"]).capitalize(), + style="red" if str(info["payment"]).startswith("hold") else "orange3", + ), ) confidential = ( Text.assemble("Type: ", Text("Confidential", style="green")) - if resp["confidential"] + if info["confidential"] else Text.assemble("Type: ", Text("Regular", style="grey50")) ) + chain = Text.assemble("Chain: ", Text(str(info["chain"]), style="cyan")) instance = Text.assemble( - "Item Hash ↓\t Name: ", name, "\n", item_hash_link, "\n", payment, " ", confidential + "Item Hash ↓\t Name: ", name, "\n", item_hash_link, "\n", payment, " ", confidential, "\n", chain ) specifications = ( f"vCPUs: {message.content.resources.vcpus}\n" f"RAM: {message.content.resources.memory / 1_024:.2f} GiB\n" f"Disk: {message.content.rootfs.size_mib / 1_024:.2f} GiB\n" - f"HyperV: {message.content.environment.hypervisor if has_nested_attr(message.content, 'environment', 'hypervisor') else 'firecracker'}\n" + f"HyperV: {safe_getattr(message, 'content.environment.hypervisor.value').capitalize() if safe_getattr(message, 'content.environment.hypervisor') else 'Firecracker'}\n" ) status_column = Text.assemble( Text.assemble( Text("Allocation: ", style="blue"), Text( - str(resp["allocation_type"]) + "\n", - style="magenta3" if resp["allocation_type"] == help_strings.ALLOCATION_MANUAL else "deep_sky_blue1", + str(info["allocation_type"]) + "\n", + style="magenta3" if info["allocation_type"] == help_strings.ALLOCATION_MANUAL else "deep_sky_blue1", ), ), Text.assemble( Text("Target CRN: ", style="blue"), Text( - str(resp["crn_url"]) + "\n", - style="green1" if str(resp["crn_url"]).startswith("http") else "dark_slate_gray1", + str(info["crn_url"]) + "\n", + style="green1" if str(info["crn_url"]).startswith("http") else "dark_slate_gray1", ), ), Text.assemble( Text("IPv6: ", style="blue"), - Text(str(resp["ipv6_logs"])), - style="bright_yellow" if len(str(resp["ipv6_logs"]).split(":")) == 8 else "dark_orange", + Text(str(info["ipv6_logs"])), + style="bright_yellow" if len(str(info["ipv6_logs"]).split(":")) == 8 else "dark_orange", ), ) table.add_row(instance, specifications, status_column) @@ -671,24 +614,18 @@ async def _show_instances(messages: List[InstanceMessage], node_list: NodeInfo): Text.assemble( " aleph instance confidential-init-session ", item_hash_field, - " ", - crn_url_field, "\n", style="italic", ), Text.assemble( " aleph instance confidential-start ", item_hash_field, - " ", - crn_url_field, style="italic", ), "\n\nOr just use the all-in-one command:\n\n", Text.assemble( " aleph instance confidential ", item_hash_field, - " ", - crn_url_field, "\n", style="italic", ), @@ -709,8 +646,8 @@ async def _show_instances(messages: List[InstanceMessage], node_list: NodeInfo): @app.command() async def list( address: Optional[str] = typer.Option(None, help="Owner address of the instance"), - private_key: Optional[str] = typer.Option(sdk_settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY), - private_key_file: Optional[Path] = typer.Option(sdk_settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE), + private_key: Optional[str] = typer.Option(settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY), + private_key_file: Optional[Path] = typer.Option(settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE), json: bool = typer.Option(default=False, help="Print as json instead of rich table"), debug: bool = False, ): @@ -722,7 +659,7 @@ async def list( account = _load_account(private_key, private_key_file) address = account.get_address() - async with AlephHttpClient(api_server=sdk_settings.API_HOST) as client: + async with AlephHttpClient(api_server=settings.API_HOST) as client: resp = await client.get_messages( message_filter=MessageFilter( message_types=[MessageType.instance], @@ -730,14 +667,15 @@ async def list( ), page_size=100, ) - if not resp or len(resp.messages) == 0: + messages = await filter_only_valid_messages(resp.messages) + if not messages: echo(f"Address: {address}\n\nNo instance found\n") raise typer.Exit(code=1) if json: - echo(resp.json(indent=4)) + echo(messages.json(indent=4)) else: # Since we filtered on message type, we can safely cast as InstanceMessage. - messages = cast(List[InstanceMessage], resp.messages) + messages = cast(List[InstanceMessage], messages) resource_nodes: NodeInfo = await _fetch_nodes() await _show_instances(messages, resource_nodes) @@ -745,14 +683,21 @@ async def list( @app.command() async def expire( vm_id: str = typer.Argument(..., help="VM item hash to expire"), - domain: str = typer.Argument(..., help="CRN domain where the VM is running"), - private_key: Optional[str] = typer.Option(sdk_settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY), - private_key_file: Optional[Path] = typer.Option(sdk_settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE), + domain: Optional[str] = typer.Option(None, help="CRN domain on which the VM is running"), + private_key: Optional[str] = typer.Option(settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY), + private_key_file: Optional[Path] = typer.Option(settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE), debug: bool = False, ): """Expire an instance""" setup_logging(debug) + + domain = ( + (domain and sanitize_url(domain)) + or await find_crn_of_vm(vm_id) + or Prompt.ask("URL of the CRN (Compute node) on which the VM is running") + ) + account = _load_account(private_key, private_key_file) async with VmClient(account, domain) as manager: @@ -766,9 +711,9 @@ async def expire( @app.command() async def erase( vm_id: str = typer.Argument(..., help="VM item hash to erase"), - domain: str = typer.Argument(..., help="CRN domain where the VM is stored or running"), - private_key: Optional[str] = typer.Option(sdk_settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY), - private_key_file: Optional[Path] = typer.Option(sdk_settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE), + domain: Optional[str] = typer.Option(None, help="CRN domain on which the VM is stored or running"), + private_key: Optional[str] = typer.Option(settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY), + private_key_file: Optional[Path] = typer.Option(settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE), silent: bool = False, debug: bool = False, ): @@ -776,6 +721,12 @@ async def erase( setup_logging(debug) + domain = ( + (domain and sanitize_url(domain)) + or await find_crn_of_vm(vm_id) + or Prompt.ask("URL of the CRN (Compute node) on which the VM is stored or running") + ) + account = _load_account(private_key, private_key_file) async with VmClient(account, domain) as manager: @@ -790,15 +741,21 @@ async def erase( @app.command() async def reboot( vm_id: str = typer.Argument(..., help="VM item hash to reboot"), - domain: str = typer.Argument(..., help="CRN domain where the VM is running"), - private_key: Optional[str] = typer.Option(sdk_settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY), - private_key_file: Optional[Path] = typer.Option(sdk_settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE), + domain: Optional[str] = typer.Option(None, help="CRN domain on which the VM is running"), + private_key: Optional[str] = typer.Option(settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY), + private_key_file: Optional[Path] = typer.Option(settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE), debug: bool = False, ): """Reboot an instance""" setup_logging(debug) + domain = ( + (domain and sanitize_url(domain)) + or await find_crn_of_vm(vm_id) + or Prompt.ask("URL of the CRN (Compute node) on which the VM is running") + ) + account = _load_account(private_key, private_key_file) async with VmClient(account, domain) as manager: @@ -812,15 +769,21 @@ async def reboot( @app.command() async def allocate( vm_id: str = typer.Argument(..., help="VM item hash to allocate"), - domain: str = typer.Argument(..., help="CRN domain where the VM will be allocated"), - private_key: Optional[str] = typer.Option(sdk_settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY), - private_key_file: Optional[Path] = typer.Option(sdk_settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE), + domain: Optional[str] = typer.Option(None, help="CRN domain on which the VM will be allocated"), + private_key: Optional[str] = typer.Option(settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY), + private_key_file: Optional[Path] = typer.Option(settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE), debug: bool = False, ): """Notify a CRN to start an instance (for Pay-As-You-Go and confidential instances only)""" setup_logging(debug) + domain = ( + (domain and sanitize_url(domain)) + or await find_crn_of_vm(vm_id) + or Prompt.ask("URL of the CRN (Compute node) on which the VM will be allocated") + ) + account = _load_account(private_key, private_key_file) async with VmClient(account, domain) as manager: @@ -834,35 +797,52 @@ async def allocate( @app.command() async def logs( vm_id: str = typer.Argument(..., help="VM item hash to retrieve the logs from"), - domain: str = typer.Argument(..., help="CRN domain where the VM is running"), - private_key: Optional[str] = typer.Option(sdk_settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY), - private_key_file: Optional[Path] = typer.Option(sdk_settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE), + domain: Optional[str] = typer.Option(None, help="CRN domain on which the VM is running"), + private_key: Optional[str] = typer.Option(settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY), + private_key_file: Optional[Path] = typer.Option(settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE), debug: bool = False, ): """Retrieve the logs of an instance""" setup_logging(debug) + domain = ( + (domain and sanitize_url(domain)) + or await find_crn_of_vm(vm_id) + or Prompt.ask("URL of the CRN (Compute node) on which the instance is running") + ) + account = _load_account(private_key, private_key_file) async with VmClient(account, domain) as manager: - async for log in manager.get_logs(vm_id=vm_id): - log_data = json.loads(log) - if "message" in log_data: - echo(log_data["message"]) + try: + async for log in manager.get_logs(vm_id=vm_id): + log_data = json.loads(log) + if "message" in log_data: + echo(log_data["message"]) + except aiohttp.ClientConnectorError as e: + echo(f"Unable to connect to domain: {domain}\nError: {e}") + except aiohttp.ClientResponseError: + echo(f"No VM associated with {vm_id} are currently running on {domain}") @app.command() async def stop( vm_id: str = typer.Argument(..., help="VM item hash to stop"), - domain: str = typer.Argument(..., help="CRN domain where the VM is running"), - private_key: Optional[str] = typer.Option(sdk_settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY), - private_key_file: Optional[Path] = typer.Option(sdk_settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE), + domain: Optional[str] = typer.Option(None, help="CRN domain on which the VM is running"), + private_key: Optional[str] = typer.Option(settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY), + private_key_file: Optional[Path] = typer.Option(settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE), debug: bool = False, ): """Stop an instance""" setup_logging(debug) + domain = ( + (domain and sanitize_url(domain)) + or await find_crn_of_vm(vm_id) + or Prompt.ask("URL of the CRN (Compute node) on which the instance is running") + ) + account = _load_account(private_key, private_key_file) async with VmClient(account, domain) as manager: @@ -876,11 +856,11 @@ async def stop( @app.command() async def confidential_init_session( vm_id: str = typer.Argument(..., help="VM item hash to initialize the session for"), - domain: str = typer.Argument(..., help="CRN domain where the session will be initialized"), + domain: Optional[str] = typer.Option(None, help="CRN domain on which the session will be initialized"), policy: int = typer.Option(default=0x1), keep_session: bool = typer.Option(None, help=help_strings.KEEP_SESSION), - private_key: Optional[str] = typer.Option(sdk_settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY), - private_key_file: Optional[Path] = typer.Option(sdk_settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE), + private_key: Optional[str] = typer.Option(settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY), + private_key_file: Optional[Path] = typer.Option(settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE), debug: bool = False, ): "Initialize a confidential communication session with the VM" @@ -890,6 +870,13 @@ async def confidential_init_session( session_dir.mkdir(exist_ok=True, parents=True) setup_logging(debug) + + domain = ( + (domain and sanitize_url(domain)) + or await find_crn_of_vm(vm_id) + or Prompt.ask("URL of the CRN (Compute node) on which the session will be initialized") + ) + account = _load_account(private_key, private_key_file) sevctl_path = find_sevctl_or_exit() @@ -945,15 +932,14 @@ def find_sevctl_or_exit() -> Path: @app.command() async def confidential_start( vm_id: str = typer.Argument(..., help="VM item hash to start"), - domain: str = typer.Argument(..., help="CRN domain where the VM will be started"), - policy: int = typer.Option(default=0x1), + domain: Optional[str] = typer.Option(None, help="CRN domain on which the VM will be started"), firmware_hash: str = typer.Option( settings.DEFAULT_CONFIDENTIAL_FIRMWARE_HASH, help=help_strings.CONFIDENTIAL_FIRMWARE_HASH ), firmware_file: str = typer.Option(None, help=help_strings.PRIVATE_KEY), vm_secret: str = typer.Option(None, help=help_strings.VM_SECRET), - private_key: Optional[str] = typer.Option(sdk_settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY), - private_key_file: Optional[Path] = typer.Option(sdk_settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE), + private_key: Optional[str] = typer.Option(settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY), + private_key_file: Optional[Path] = typer.Option(settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE), debug: bool = False, ): "Validate the authenticity of the VM and start it" @@ -965,6 +951,12 @@ async def confidential_start( account = _load_account(private_key, private_key_file) sevctl_path = find_sevctl_or_exit() + domain = ( + (domain and sanitize_url(domain)) + or await find_crn_of_vm(vm_id) + or Prompt.ask("URL of the CRN (Compute node) on which the VM will be started") + ) + client = VmConfidentialClient(account, sevctl_path, domain) bytes.fromhex(firmware_hash) @@ -1005,8 +997,6 @@ async def confidential_start( Text.assemble( " aleph instance logs ", Text(vm_id, style="bright_cyan"), - " ", - Text(domain, style="blue"), style="italic", ), "\n\nTo get the IPv6 address of the instance, check out:\n\n", @@ -1020,7 +1010,7 @@ async def confidential_start( @app.command() async def confidential( vm_id: Optional[str] = typer.Argument(default=None, help=help_strings.VM_ID), - crn_url: Optional[str] = typer.Argument(default=None, help=help_strings.CRN_URL), + crn_url: Optional[str] = typer.Option(default=None, help=help_strings.CRN_URL), crn_hash: Optional[str] = typer.Option(default=None, help=help_strings.CRN_HASH), policy: int = typer.Option(default=0x1), confidential_firmware: str = typer.Option( @@ -1029,18 +1019,25 @@ async def confidential( firmware_hash: str = typer.Option( settings.DEFAULT_CONFIDENTIAL_FIRMWARE_HASH, help=help_strings.CONFIDENTIAL_FIRMWARE_HASH ), - firmware_file: str = typer.Option(None, help=help_strings.PRIVATE_KEY), - keep_session: bool = typer.Option(None, help=help_strings.KEEP_SESSION), - vm_secret: str = typer.Option(None, help=help_strings.VM_SECRET), - payment_type: PaymentType = typer.Option(None, help=help_strings.PAYMENT_TYPE), - payment_chain: Optional[Chain] = typer.Option(None, help=help_strings.PAYMENT_CHAIN), + firmware_file: Optional[str] = typer.Option(None, help=help_strings.PRIVATE_KEY), + keep_session: Optional[bool] = typer.Option(None, help=help_strings.KEEP_SESSION), + vm_secret: Optional[str] = typer.Option(None, help=help_strings.VM_SECRET), + payment_type: Optional[str] = typer.Option( + None, + help=help_strings.PAYMENT_TYPE, + callback=lambda pt: None if pt is None else PaymentType.hold if pt == "nft" else PaymentType(pt), + metavar=f"[{'|'.join(PaymentType)}|nft]", + ), + payment_chain: Optional[Chain] = typer.Option( + None, help=help_strings.PAYMENT_CHAIN, metavar=f"[{'|'.join([Chain.ETH, Chain.AVAX, Chain.BASE])}]" + ), name: Optional[str] = typer.Option(None, help=help_strings.INSTANCE_NAME), - rootfs: str = typer.Option("ubuntu22", help=help_strings.ROOTFS), - rootfs_size: int = typer.Option(None, help=help_strings.ROOTFS_SIZE), - vcpus: int = typer.Option(None, help=help_strings.VCPUS), - memory: int = typer.Option(None, help=help_strings.MEMORY), + rootfs: Optional[str] = typer.Option(None, help=help_strings.ROOTFS), + rootfs_size: Optional[int] = typer.Option(None, help=help_strings.ROOTFS_SIZE), + vcpus: Optional[int] = typer.Option(None, help=help_strings.VCPUS), + memory: Optional[int] = typer.Option(None, help=help_strings.MEMORY), timeout_seconds: float = typer.Option( - sdk_settings.DEFAULT_VM_TIMEOUT, + settings.DEFAULT_VM_TIMEOUT, help=help_strings.TIMEOUT_SECONDS, ), ssh_pubkey_file: Path = typer.Option( @@ -1055,8 +1052,8 @@ async def confidential( help=help_strings.IMMUTABLE_VOLUME, ), channel: Optional[str] = typer.Option(default=settings.DEFAULT_CHANNEL, help=help_strings.CHANNEL), - private_key: Optional[str] = typer.Option(sdk_settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY), - private_key_file: Optional[Path] = typer.Option(sdk_settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE), + private_key: Optional[str] = typer.Option(settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY), + private_key_file: Optional[Path] = typer.Option(settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE), debug: bool = False, ): """Create, start and unlock a confidential VM (all-in-one command) @@ -1073,51 +1070,74 @@ async def confidential( allocated = False if not vm_id or len(vm_id) != 64: vm_id, crn_url = await create( - payment_type, - payment_chain, - None, - name, - rootfs, - rootfs_size, - vcpus, - memory, - timeout_seconds, - ssh_pubkey_file, - crn_hash, - crn_url, - True, - confidential_firmware, - skip_volume, - persistent_volume, - ephemeral_volume, - immutable_volume, - channel, - private_key, - private_key_file, - False, - False, - debug, + payment_type=payment_type, + payment_chain=payment_chain, + hypervisor=HypervisorType.qemu, + name=name, + rootfs=rootfs, + rootfs_size=rootfs_size, + vcpus=vcpus, + memory=memory, + timeout_seconds=timeout_seconds, + ssh_pubkey_file=ssh_pubkey_file, + crn_hash=crn_hash, + crn_url=crn_url, + confidential=True, + confidential_firmware=confidential_firmware, + skip_volume=skip_volume, + persistent_volume=persistent_volume, + ephemeral_volume=ephemeral_volume, + immutable_volume=immutable_volume, + channel=channel, + private_key=private_key, + private_key_file=private_key_file, + print_messages=False, + verbose=False, + debug=debug, ) if not vm_id or len(vm_id) != 64: echo("Could not create the VM") return 1 allocated = vm_id is not None - crn_url = crn_url or Prompt.ask("URL of the CRN (Compute node) on which the instance is running") + crn_url = ( + (crn_url and sanitize_url(crn_url)) + or await find_crn_of_vm(vm_id) + or Prompt.ask("URL of the CRN (Compute node) on which the instance is running") + ) if not allocated: - allocated = (await allocate(vm_id, crn_url, private_key, private_key_file, debug)) is None + allocated = ( + await allocate( + vm_id=vm_id, domain=crn_url, private_key=private_key, private_key_file=private_key_file, debug=debug + ) + ) is None if not allocated: echo("Could not allocate the VM") return 1 initialized = ( - await confidential_init_session(vm_id, crn_url, policy, keep_session, private_key, private_key_file, debug) + await confidential_init_session( + vm_id=vm_id, + domain=crn_url, + policy=policy, + keep_session=keep_session, + private_key=private_key, + private_key_file=private_key_file, + debug=debug, + ) ) is None if not initialized: echo("Could not initialize the session") return 1 await confidential_start( - vm_id, crn_url, policy, firmware_hash, firmware_file, vm_secret, private_key, private_key_file, debug + vm_id=vm_id, + domain=crn_url, + firmware_hash=firmware_hash, + firmware_file=firmware_file, + vm_secret=vm_secret, + private_key=private_key, + private_key_file=private_key_file, + debug=debug, ) diff --git a/src/aleph_client/commands/instance/network.py b/src/aleph_client/commands/instance/network.py index 3d01241c..0f5460ca 100644 --- a/src/aleph_client/commands/instance/network.py +++ b/src/aleph_client/commands/instance/network.py @@ -1,16 +1,24 @@ from __future__ import annotations import logging +from ipaddress import IPv6Interface from json import JSONDecodeError from typing import Optional from urllib.parse import ParseResult, urlparse import aiohttp -from aiohttp import InvalidURL +from aleph.sdk import AlephHttpClient +from aleph.sdk.conf import settings +from aleph_message.models import InstanceMessage +from aleph_message.models.execution.base import PaymentType +from aleph_message.models.item_hash import ItemHash from pydantic import ValidationError -from aleph_client.conf import settings +from aleph_client.commands import help_strings +from aleph_client.commands.node import NodeInfo, _fetch_nodes +from aleph_client.commands.utils import safe_getattr from aleph_client.models import MachineUsage +from aleph_client.utils import fetch_json logger = logging.getLogger(__name__) @@ -34,14 +42,36 @@ PATH_ABOUT_USAGE_SYSTEM = "/about/usage/system" -async def fetch_crn_info(node_url: str) -> Optional[dict]: +def sanitize_url(url: str) -> str: + """Ensure that the URL is valid and not obviously irrelevant. + + Args: + url: URL to sanitize. + Returns: + Sanitized URL. + """ + if not url: + raise aiohttp.InvalidURL("Empty URL") + parsed_url: ParseResult = urlparse(url) + if parsed_url.scheme not in ["http", "https"]: + raise aiohttp.InvalidURL(f"Invalid URL scheme: {parsed_url.scheme}") + if parsed_url.hostname in FORBIDDEN_HOSTS: + logger.debug( + f"Invalid URL {url} hostname {parsed_url.hostname} is in the forbidden host list " + f"({', '.join(FORBIDDEN_HOSTS)})" + ) + raise aiohttp.InvalidURL("Invalid URL host") + return url + + +async def fetch_crn_info(node_url: str) -> dict | None: """ Fetches compute node usage information and version. Args: node_url: URL of the compute node. Returns: - All CRN information. + CRN information. """ url = "" try: @@ -59,7 +89,7 @@ async def fetch_crn_info(node_url: str) -> Optional[dict]: system: dict = await resp.json() info["machine_usage"] = MachineUsage.parse_obj(system) return info - except InvalidURL as e: + except aiohttp.InvalidURL as e: logger.debug(f"Invalid CRN URL: {url}: {e}") except TimeoutError as e: logger.debug(f"Timeout while fetching CRN: {url}: {e}") @@ -76,23 +106,70 @@ async def fetch_crn_info(node_url: str) -> Optional[dict]: return None -def sanitize_url(url: str) -> str: - """Ensure that the URL is valid and not obviously irrelevant. +async def fetch_vm_info(message: InstanceMessage, node_list: NodeInfo) -> tuple[str, dict[str, object]]: + """ + Fetches VM information given an instance message and the node list. Args: - url: URL to sanitize. + message: Instance message. + node_list: Node list. Returns: - Sanitized URL. + VM information. """ - if not url: - raise InvalidURL("Empty URL") - parsed_url: ParseResult = urlparse(url) - if parsed_url.scheme not in ["http", "https"]: - raise InvalidURL(f"Invalid URL scheme: {parsed_url.scheme}") - if parsed_url.hostname in FORBIDDEN_HOSTS: - logger.debug( - f"Invalid URL {url} hostname {parsed_url.hostname} is in the forbidden host list " - f"({', '.join(FORBIDDEN_HOSTS)})" + async with aiohttp.ClientSession() as session: + hold = not message.content.payment or message.content.payment.type == PaymentType["hold"] + crn_hash = safe_getattr(message, "content.requirements.node.node_hash") + firmware = safe_getattr(message, "content.environment.trusted_execution.firmware") + confidential = firmware and len(firmware) == 64 + info = dict( + crn_hash=str(crn_hash) if crn_hash else "", + payment="hold\t " if hold else str(safe_getattr(message, "content.payment.type.value")), + chain="Any" if hold else str(safe_getattr(message, "content.payment.chain.value")), + confidential=confidential, + allocation_type="", + ipv6_logs="", + crn_url="", ) - raise InvalidURL("Invalid URL host") - return url + try: + # Fetch from the scheduler API directly if no payment or no receiver (hold-tier non-confidential) + if hold and not confidential: + try: + url = f"https://scheduler.api.aleph.cloud/api/v0/allocation/{message.item_hash}" + info["allocation_type"] = help_strings.ALLOCATION_AUTO + allocation = await fetch_json(session, url) + url = "https://scheduler.api.aleph.cloud/api/v0/nodes" + nodes = await fetch_json(session, url) + info["ipv6_logs"] = allocation["vm_ipv6"] + for node in nodes["nodes"]: + if node["ipv6"].split("::")[0] == ":".join(str(info["ipv6_logs"]).split(":")[:4]): + info["crn_url"] = node["url"].rstrip("/") + return message.item_hash, info + except (aiohttp.ClientResponseError, aiohttp.ClientConnectorError) as e: + info["ipv6_logs"] = help_strings.VM_SCHEDULED + info["crn_url"] = help_strings.CRN_PENDING + logger.debug(f"Error while calling Scheduler API ({url}): {e}") + else: + # Fetch from the CRN API if PAYG-tier or confidential + info["allocation_type"] = help_strings.ALLOCATION_MANUAL + for node in node_list.nodes: + if node["hash"] == safe_getattr(message, "content.requirements.node.node_hash"): + info["crn_url"] = node["address"].rstrip("/") + path = f"{node['address'].rstrip('/')}/about/executions/list" + executions = await fetch_json(session, path) + if message.item_hash in executions: + interface = IPv6Interface(executions[message.item_hash]["networking"]["ipv6"]) + info["ipv6_logs"] = str(interface.ip + 1) + return message.item_hash, info + info["ipv6_logs"] = help_strings.VM_NOT_READY if confidential else help_strings.VM_NOT_AVAILABLE_YET + except (aiohttp.ClientResponseError, aiohttp.ClientConnectorError) as e: + info["ipv6_logs"] = f"Not available. Server error: {e}" + return message.item_hash, info + + +async def find_crn_of_vm(vm_id: str) -> Optional[str]: + async with AlephHttpClient(api_server=settings.API_HOST) as client: + message: InstanceMessage = await client.get_message(item_hash=ItemHash(vm_id), message_type=InstanceMessage) + node_list: NodeInfo = await _fetch_nodes() + _, info = await fetch_vm_info(message, node_list) + is_valid = info["crn_url"] and info["crn_url"] != help_strings.CRN_PENDING + return str(info["crn_url"]) if is_valid else None diff --git a/src/aleph_client/commands/instance/superfluid.py b/src/aleph_client/commands/instance/superfluid.py index d7105993..09a67d81 100644 --- a/src/aleph_client/commands/instance/superfluid.py +++ b/src/aleph_client/commands/instance/superfluid.py @@ -4,7 +4,6 @@ from aleph.sdk.chains.ethereum import ETHAccount from aleph.sdk.conf import settings -from aleph_message.models import Chain from click import echo from eth_utils.currency import to_wei from superfluid import Web3FlowInfo @@ -57,7 +56,9 @@ async def update_flow(account: ETHAccount, receiver: str, flow: Decimal, update_ if current_flow_rate_wei > 0: # Reduce the existing flow new_flow_rate_wei = current_flow_rate_wei - flow_rate_wei - if new_flow_rate_wei > 0: + # Ensure to not leave infinitesimal flows + # Often, there were 1-10 wei remaining in the flow rate, which prevented the flow from being deleted + if new_flow_rate_wei > 99: new_flow_rate_ether = from_wei(new_flow_rate_wei) return await account.update_flow(receiver, new_flow_rate_ether) else: diff --git a/src/aleph_client/commands/message.py b/src/aleph_client/commands/message.py index 0795b5eb..db5e62fd 100644 --- a/src/aleph_client/commands/message.py +++ b/src/aleph_client/commands/message.py @@ -13,7 +13,7 @@ import typer from aleph.sdk import AlephHttpClient, AuthenticatedAlephHttpClient from aleph.sdk.account import _load_account -from aleph.sdk.conf import settings as sdk_settings +from aleph.sdk.conf import settings from aleph.sdk.query.filters import MessageFilter from aleph.sdk.query.responses import MessagesResponse from aleph.sdk.types import AccountFromPrivateKey, StorageEnum @@ -32,7 +32,6 @@ setup_logging, str_to_datetime, ) -from aleph_client.conf import settings from aleph_client.utils import AsyncTyper app = AsyncTyper(no_args_is_help=True) @@ -42,7 +41,7 @@ async def get( item_hash: str = typer.Argument(..., help="Item hash of the message"), ): - async with AlephHttpClient(api_server=sdk_settings.API_HOST) as client: + async with AlephHttpClient(api_server=settings.API_HOST) as client: message, status = await client.get_message(item_hash=ItemHash(item_hash), with_status=True) typer.echo(f"Message Status: {colorized_status(status)}") if status == MessageStatus.REJECTED: @@ -84,7 +83,7 @@ async def find( start_time = str_to_datetime(start_date) end_time = str_to_datetime(end_date) - async with AlephHttpClient(api_server=sdk_settings.API_HOST) as client: + async with AlephHttpClient(api_server=settings.API_HOST) as client: response: MessagesResponse = await client.get_messages( page_size=pagination, page=page, @@ -115,8 +114,8 @@ async def post( type: str = typer.Option("test", help="Text representing the message object type"), ref: Optional[str] = typer.Option(None, help=help_strings.REF), channel: Optional[str] = typer.Option(default=settings.DEFAULT_CHANNEL, help=help_strings.CHANNEL), - private_key: Optional[str] = typer.Option(sdk_settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY), - private_key_file: Optional[Path] = typer.Option(sdk_settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE), + private_key: Optional[str] = typer.Option(settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY), + private_key_file: Optional[Path] = typer.Option(settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE), debug: bool = False, ): """Post a message on aleph.im.""" @@ -147,7 +146,7 @@ async def post( typer.echo("Not valid JSON") raise typer.Exit(code=2) - async with AuthenticatedAlephHttpClient(account=account, api_server=sdk_settings.API_HOST) as client: + async with AuthenticatedAlephHttpClient(account=account, api_server=settings.API_HOST) as client: result, status = await client.create_post( post_content=content, post_type=type, @@ -163,8 +162,8 @@ async def post( @app.command() async def amend( item_hash: str = typer.Argument(..., help="Hash reference of the message to amend"), - private_key: Optional[str] = typer.Option(sdk_settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY), - private_key_file: Optional[Path] = typer.Option(sdk_settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE), + private_key: Optional[str] = typer.Option(settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY), + private_key_file: Optional[Path] = typer.Option(settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE), debug: bool = False, ): """Amend an existing aleph.im message.""" @@ -173,7 +172,7 @@ async def amend( account: AccountFromPrivateKey = _load_account(private_key, private_key_file) - async with AlephHttpClient(api_server=sdk_settings.API_HOST) as client: + async with AlephHttpClient(api_server=settings.API_HOST) as client: existing_message: AlephMessage = await client.get_message(item_hash=item_hash) editor: str = os.getenv("EDITOR", default="nano") @@ -202,7 +201,7 @@ async def amend( new_content.type = "amend" typer.echo(new_content) - async with AuthenticatedAlephHttpClient(account=account, api_server=sdk_settings.API_HOST) as client: + async with AuthenticatedAlephHttpClient(account=account, api_server=settings.API_HOST) as client: message, status, response = await client.submit( content=new_content.dict(), message_type=existing_message.type, @@ -216,8 +215,8 @@ async def forget( hashes: str = typer.Argument(..., help="Comma separated list of hash references of messages to forget"), reason: Optional[str] = typer.Option(None, help="A description of why the messages are being forgotten."), channel: Optional[str] = typer.Option(default=settings.DEFAULT_CHANNEL, help=help_strings.CHANNEL), - private_key: Optional[str] = typer.Option(sdk_settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY), - private_key_file: Optional[Path] = typer.Option(sdk_settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE), + private_key: Optional[str] = typer.Option(settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY), + private_key_file: Optional[Path] = typer.Option(settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE), debug: bool = False, ): """Forget an existing aleph.im message.""" @@ -227,7 +226,7 @@ async def forget( hash_list: List[ItemHash] = [ItemHash(h) for h in hashes.split(",")] account: AccountFromPrivateKey = _load_account(private_key, private_key_file) - async with AuthenticatedAlephHttpClient(account=account, api_server=sdk_settings.API_HOST) as client: + async with AuthenticatedAlephHttpClient(account=account, api_server=settings.API_HOST) as client: await client.forget(hashes=hash_list, reason=reason, channel=channel) @@ -241,7 +240,7 @@ async def watch( setup_logging(debug) - async with AlephHttpClient(api_server=sdk_settings.API_HOST) as client: + async with AlephHttpClient(api_server=settings.API_HOST) as client: original: AlephMessage = await client.get_message(item_hash=ref) async for message in client.watch_messages( message_filter=MessageFilter(refs=[ref], addresses=[original.content.address]) @@ -252,8 +251,8 @@ async def watch( @app.command() def sign( message: Optional[str] = typer.Option(None, help=help_strings.SIGNABLE_MESSAGE), - private_key: Optional[str] = typer.Option(sdk_settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY), - private_key_file: Optional[Path] = typer.Option(sdk_settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE), + private_key: Optional[str] = typer.Option(settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY), + private_key_file: Optional[Path] = typer.Option(settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE), debug: bool = False, ): """Sign an aleph message with a private key. If no --message is provided, the message will be read from stdin.""" diff --git a/src/aleph_client/commands/program.py b/src/aleph_client/commands/program.py index 25ef0e42..fd1953e7 100644 --- a/src/aleph_client/commands/program.py +++ b/src/aleph_client/commands/program.py @@ -3,14 +3,15 @@ import json import logging from base64 import b16decode, b32encode +from collections.abc import Mapping from pathlib import Path -from typing import List, Mapping, Optional +from typing import List, Optional from zipfile import BadZipFile import typer from aleph.sdk import AuthenticatedAlephHttpClient from aleph.sdk.account import _load_account -from aleph.sdk.conf import settings as sdk_settings +from aleph.sdk.conf import settings from aleph.sdk.types import AccountFromPrivateKey, StorageEnum from aleph_message.models import ProgramMessage, StoreMessage from aleph_message.models.execution.program import ProgramContent @@ -24,7 +25,6 @@ setup_logging, yes_no_input, ) -from aleph_client.conf import settings from aleph_client.utils import AsyncTyper, create_archive logger = logging.getLogger(__name__) @@ -36,14 +36,14 @@ async def upload( path: Path = typer.Argument(..., help="Path to your source code"), entrypoint: str = typer.Argument(..., help="Your program entrypoint"), channel: Optional[str] = typer.Option(default=settings.DEFAULT_CHANNEL, help=help_strings.CHANNEL), - memory: int = typer.Option(sdk_settings.DEFAULT_VM_MEMORY, help="Maximum memory allocation on vm in MiB"), - vcpus: int = typer.Option(sdk_settings.DEFAULT_VM_VCPUS, help="Number of virtual cpus to allocate."), + memory: int = typer.Option(settings.DEFAULT_VM_MEMORY, help="Maximum memory allocation on vm in MiB"), + vcpus: int = typer.Option(settings.DEFAULT_VM_VCPUS, help="Number of virtual cpus to allocate."), timeout_seconds: float = typer.Option( - sdk_settings.DEFAULT_VM_TIMEOUT, + settings.DEFAULT_VM_TIMEOUT, help="If vm is not called after [timeout_seconds] it will shutdown", ), - private_key: Optional[str] = typer.Option(sdk_settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY), - private_key_file: Optional[Path] = typer.Option(sdk_settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE), + private_key: Optional[str] = typer.Option(settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY), + private_key_file: Optional[Path] = typer.Option(settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE), print_messages: bool = typer.Option(False), print_code_message: bool = typer.Option(False), print_program_message: bool = typer.Option(False), @@ -81,9 +81,7 @@ async def upload( account: AccountFromPrivateKey = _load_account(private_key, private_key_file) - runtime = ( - runtime or input(f"Ref of runtime ? [{sdk_settings.DEFAULT_RUNTIME_ID}] ") or sdk_settings.DEFAULT_RUNTIME_ID - ) + runtime = runtime or input(f"Ref of runtime ? [{settings.DEFAULT_RUNTIME_ID}] ") or settings.DEFAULT_RUNTIME_ID volumes = get_or_prompt_volumes( persistent_volume=persistent_volume, @@ -102,7 +100,7 @@ async def upload( else: subscriptions = None - async with AuthenticatedAlephHttpClient(account=account, api_server=sdk_settings.API_HOST) as client: + async with AuthenticatedAlephHttpClient(account=account, api_server=settings.API_HOST) as client: # Upload the source code with open(path_object, "rb") as fd: logger.debug("Reading file") @@ -160,8 +158,8 @@ async def upload( async def update( item_hash: str = typer.Argument(..., help="Item hash to update"), path: Path = typer.Argument(..., help="Source path to upload"), - private_key: Optional[str] = sdk_settings.PRIVATE_KEY_STRING, - private_key_file: Optional[Path] = sdk_settings.PRIVATE_KEY_FILE, + private_key: Optional[str] = settings.PRIVATE_KEY_STRING, + private_key_file: Optional[Path] = settings.PRIVATE_KEY_FILE, print_message: bool = True, debug: bool = False, ): @@ -172,7 +170,7 @@ async def update( account = _load_account(private_key, private_key_file) path = path.absolute() - async with AuthenticatedAlephHttpClient(account=account, api_server=sdk_settings.API_HOST) as client: + async with AuthenticatedAlephHttpClient(account=account, api_server=settings.API_HOST) as client: program_message: ProgramMessage = await client.get_message(item_hash=item_hash, message_type=ProgramMessage) code_ref = program_message.content.code.ref code_message: StoreMessage = await client.get_message(item_hash=code_ref, message_type=StoreMessage) @@ -215,8 +213,8 @@ async def update( @app.command() async def unpersist( item_hash: str = typer.Argument(..., help="Item hash to unpersist"), - private_key: Optional[str] = sdk_settings.PRIVATE_KEY_STRING, - private_key_file: Optional[Path] = sdk_settings.PRIVATE_KEY_FILE, + private_key: Optional[str] = settings.PRIVATE_KEY_STRING, + private_key_file: Optional[Path] = settings.PRIVATE_KEY_FILE, debug: bool = False, ): """Stop a persistent virtual machine by making it non-persistent""" @@ -225,7 +223,7 @@ async def unpersist( account = _load_account(private_key, private_key_file) - async with AuthenticatedAlephHttpClient(account=account, api_server=sdk_settings.API_HOST) as client: + async with AuthenticatedAlephHttpClient(account=account, api_server=settings.API_HOST) as client: message: ProgramMessage = await client.get_message(item_hash=item_hash, message_type=ProgramMessage) content: ProgramContent = message.content.copy() diff --git a/src/aleph_client/commands/utils.py b/src/aleph_client/commands/utils.py index 70c3d8f4..93a2db25 100644 --- a/src/aleph_client/commands/utils.py +++ b/src/aleph_client/commands/utils.py @@ -7,18 +7,19 @@ from datetime import datetime from typing import Any, Callable, Dict, List, Optional, TypeVar, Union -import typer from aiohttp import ClientSession +from aleph.sdk import AlephHttpClient from aleph.sdk.chains.ethereum import ETHAccount -from aleph.sdk.conf import settings as sdk_settings +from aleph.sdk.conf import settings +from aleph.sdk.exceptions import ForgottenMessageError, MessageNotFoundError from aleph.sdk.types import GenericMessage -from aleph_message.models import ItemHash +from aleph_message.models import AlephMessage, ItemHash from aleph_message.status import MessageStatus from pygments import highlight from pygments.formatters.terminal256 import Terminal256Formatter from pygments.lexers import JsonLexer from rich.prompt import IntPrompt, Prompt, PromptError -from typer import echo +from typer import colors, echo, style from aleph_client.utils import fetch_json @@ -37,13 +38,13 @@ def colorful_json(obj: str): def colorized_status(status: MessageStatus) -> str: """Return a colored status string based on its value.""" status_colors = { - MessageStatus.REJECTED: typer.colors.RED, - MessageStatus.PROCESSED: typer.colors.GREEN, - MessageStatus.PENDING: typer.colors.YELLOW, - MessageStatus.FORGOTTEN: typer.colors.BRIGHT_BLACK, + MessageStatus.REJECTED: colors.RED, + MessageStatus.PROCESSED: colors.GREEN, + MessageStatus.PENDING: colors.YELLOW, + MessageStatus.FORGOTTEN: colors.BRIGHT_BLACK, } - color = status_colors.get(status, typer.colors.WHITE) - return typer.style(status, fg=color, bold=True) + color = status_colors.get(status, colors.WHITE) + return style(status, fg=color, bold=True) def colorful_message_json(message: GenericMessage): @@ -123,7 +124,7 @@ def get_or_prompt_volumes(ephemeral_volume, immutable_volume, persistent_volume) if persistent_volume is None or ephemeral_volume is None or immutable_volume is None: for volume in prompt_for_volumes(): volumes.append(volume) - typer.echo("\n") + echo("\n") # else parse all the volumes that have passed as the cli parameters and put it into volume list else: @@ -222,23 +223,23 @@ def is_environment_interactive() -> bool: ) -def has_nested_attr(obj, *attr_chain) -> bool: - for attr in attr_chain: - if not hasattr(obj, attr) or getattr(obj, attr) is None: - return False - obj = getattr(obj, attr) - return True +def safe_getattr(obj, attr, default=None): + for part in attr.split("."): + obj = getattr(obj, part, default) + if obj is default: + break + return obj async def wait_for_processed_instance(session: ClientSession, item_hash: ItemHash): """Wait for a message to be processed by CCN""" while True: - url = f"{sdk_settings.API_HOST.rstrip('/')}/api/v0/messages/{item_hash}" + url = f"{settings.API_HOST.rstrip('/')}/api/v0/messages/{item_hash}" message = await fetch_json(session, url) if message["status"] == "processed": return elif message["status"] == "pending": - typer.echo(f"Message {item_hash} is still pending, waiting 10sec...") + echo(f"Message {item_hash} is still pending, waiting 10sec...") await asyncio.sleep(10) elif message["status"] == "rejected": raise Exception(f"Message {item_hash} has been rejected") @@ -250,5 +251,23 @@ async def wait_for_confirmed_flow(account: ETHAccount, receiver: str): flow = await account.get_flow(receiver) if flow: return - typer.echo("Flow transaction is still pending, waiting 10sec...") + echo("Flow transaction is still pending, waiting 10sec...") await asyncio.sleep(10) + + +async def filter_only_valid_messages(messages: List[AlephMessage]): + """Iteratively check the status of each message from the API and only return + messages whose status is processed. + """ + filtered_messages = [] + async with AlephHttpClient(api_server=settings.API_HOST) as client: + for message in messages: + item_hash: ItemHash = message.item_hash + try: + msg = await client.get_message(ItemHash(item_hash)) + filtered_messages.append(msg) + except MessageNotFoundError: + logger.debug("Message not found %s", item_hash) + except ForgottenMessageError: + logger.debug("Message not found %s", item_hash) + return filtered_messages diff --git a/src/aleph_client/conf.py b/src/aleph_client/conf.py deleted file mode 100644 index 8710b0c5..00000000 --- a/src/aleph_client/conf.py +++ /dev/null @@ -1,75 +0,0 @@ -import os -from pathlib import Path -from shutil import which -from typing import Optional - -from aleph_message.models.execution.environment import HypervisorType -from pydantic import BaseSettings, Field - - -class Settings(BaseSettings): - CONFIG_HOME: Optional[str] = None - - # In case the user does not want to bother with handling private keys himself, - # do an ugly and insecure write and read from disk to this file. - PRIVATE_KEY_FILE: Path = Field( - default=Path("ethereum.key"), - description="Path to the private key used to sign messages", - ) - - PRIVATE_KEY_STRING: Optional[str] = None - API_HOST: str = "https://api2.aleph.im" - MAX_INLINE_SIZE: int = 50000 - API_UNIX_SOCKET: Optional[str] = None - REMOTE_CRYPTO_HOST: Optional[str] = None - REMOTE_CRYPTO_UNIX_SOCKET: Optional[str] = None - ADDRESS_TO_USE: Optional[str] = None - - DEFAULT_CHANNEL: str = "ALEPH-CLOUDSOLUTIONS" - DEFAULT_RUNTIME_ID: str = "63f07193e6ee9d207b7d1fcf8286f9aee34e6f12f101d2ec77c1229f92964696" - DEBIAN_11_ROOTFS_ID: str = "887957042bb0e360da3485ed33175882ce72a70d79f1ba599400ff4802b7cee7" - DEBIAN_12_ROOTFS_ID: str = "6e30de68c6cedfa6b45240c2b51e52495ac6fb1bd4b36457b3d5ca307594d595" - UBUNTU_22_ROOTFS_ID: str = "77fef271aa6ff9825efa3186ca2e715d19e7108279b817201c69c34cedc74c27" - DEBIAN_11_QEMU_ROOTFS_ID: str = "f7e68c568906b4ebcd3cd3c4bfdff96c489cd2a9ef73ba2d7503f244dfd578de" - DEBIAN_12_QEMU_ROOTFS_ID: str = "b6ff5c3a8205d1ca4c7c3369300eeafff498b558f71b851aa2114afd0a532717" - UBUNTU_22_QEMU_ROOTFS_ID: str = "4a0f62da42f4478544616519e6f5d58adb1096e069b392b151d47c3609492d0c" - DEFAULT_ROOTFS_SIZE: int = 20_480 - DEFAULT_INSTANCE_MEMORY: int = 2_048 - DEFAULT_HYPERVISOR: HypervisorType = HypervisorType.qemu - - DEFAULT_VM_MEMORY: int = 128 - DEFAULT_VM_VCPUS: int = 1 - DEFAULT_VM_TIMEOUT: float = 30.0 - - CODE_USES_SQUASHFS: bool = which("mksquashfs") is not None # True if command exists - - VM_URL_PATH = "https://aleph.sh/vm/{hash}" - VM_URL_HOST = "https://{hash_base32}.aleph.sh" - - DEFAULT_CONFIDENTIAL_FIRMWARE = "ba5bb13f3abca960b101a759be162b229e2b7e93ecad9d1307e54de887f177ff" - DEFAULT_CONFIDENTIAL_FIRMWARE_HASH = "89b76b0e64fe9015084fbffdf8ac98185bafc688bfe7a0b398585c392d03c7ee" - - class Config: - env_prefix = "ALEPH_" - case_sensitive = False - env_file = ".env" - - HTTP_REQUEST_TIMEOUT = 10.0 - - -# Settings singleton -settings = Settings() - -if settings.CONFIG_HOME is None: - xdg_data_home = os.environ.get("XDG_DATA_HOME") - if xdg_data_home is not None: - os.environ["ALEPH_CONFIG_HOME"] = str(Path(xdg_data_home, ".aleph-im")) - else: - home = os.path.expanduser("~") - os.environ["ALEPH_CONFIG_HOME"] = str(Path(home, ".aleph-im")) - - settings = Settings() - -assert settings.CONFIG_HOME -if str(settings.PRIVATE_KEY_FILE) == "ethereum.key": - settings.PRIVATE_KEY_FILE = Path(settings.CONFIG_HOME, "private-keys", "ethereum.key") diff --git a/src/aleph_client/utils.py b/src/aleph_client/utils.py index 24566176..9f8694e5 100644 --- a/src/aleph_client/utils.py +++ b/src/aleph_client/utils.py @@ -11,12 +11,11 @@ import typer from aiohttp import ClientSession +from aleph.sdk.conf import settings from aleph.sdk.types import GenericMessage from aleph_message.models.base import MessageType from aleph_message.models.execution.base import Encoding -from aleph_client.conf import settings - logger = logging.getLogger(__name__) try: