diff --git a/.gitignore b/.gitignore index c4734889..c4f3c084 100644 --- a/.gitignore +++ b/.gitignore @@ -19,6 +19,7 @@ __pycache__/* .pydevproject .settings .idea +.vscode tags # Package files diff --git a/Pipfile b/Pipfile new file mode 100644 index 00000000..02aa350e --- /dev/null +++ b/Pipfile @@ -0,0 +1,22 @@ +[[source]] +url = "https://pypi.python.org/simple" +verify_ssl = true +name = "pypi" + +[dev-packages] +pylint = "*" + +[requires] +python_version = "3.7" + +[packages] +eth-account = "*" +paho-mqtt = "*" +click = "*" +certifi = "*" +python-magic = "*" +python-docker = "*" + +[packages.e1839a8] +path = "." +editable = true diff --git a/docker/Dockerfile b/docker/Dockerfile index 787d74ff..a97cf7b8 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -21,7 +21,7 @@ RUN pip install --upgrade pip wheel twine # Preinstall dependencies for faster steps RUN pip install --upgrade secp256k1 coincurve aiohttp eciespy python-magic typer RUN pip install --upgrade 'aleph-message~=0.2.3' eth_account pynacl base58 -RUN pip install --upgrade pytest pytest-cov pytest-asyncio mypy types-setuptools pytest-asyncio fastapi requests +RUN pip install --upgrade pytest pytest-cov pytest-asyncio mypy types-setuptools pytest-asyncio fastapi requests python-docker WORKDIR /opt/aleph-client/ COPY . . diff --git a/setup.cfg b/setup.cfg index bf583aa0..3fcec780 100644 --- a/setup.cfg +++ b/setup.cfg @@ -30,7 +30,7 @@ package_dir = # DON'T CHANGE THE FOLLOWING LINE! IT WILL BE UPDATED BY PYSCAFFOLD! setup_requires = pyscaffold>=3.2a0,<3.3a0 # Add here dependencies of your project (semicolon/line-separated), e.g. -install_requires = +install_requires = coincurve aiohttp>=3.8.0 eciespy @@ -39,6 +39,7 @@ install_requires = aleph-message~=0.2.3 eth_account>=0.4.0 python-magic + python-docker # The usage of test_requires is discouraged, see `Dependency Management` docs # tests_require = pytest; pytest-cov # Require a specific Python version, e.g. Python 2.7 or >= 3.4 diff --git a/src/aleph_client/__main__.py b/src/aleph_client/__main__.py index 9506065b..1abe8432 100644 --- a/src/aleph_client/__main__.py +++ b/src/aleph_client/__main__.py @@ -1,595 +1,46 @@ -"""Aleph Client command-line interface. """ -import asyncio -import json -import logging -import os.path -import subprocess -import tempfile -from base64 import b32encode, b16decode -from enum import Enum +Aleph Client command-line interface. +""" + +from typing import Optional from pathlib import Path -from typing import Optional, Dict, List -from zipfile import BadZipFile import typer -from aleph_message.models import ( - ProgramMessage, - StoreMessage, - MessageType, - PostMessage, - ForgetMessage, - AlephMessage, - MessagesResponse, - ProgramContent, -) -from typer import echo -from aleph_client.account import _load_account from aleph_client.types import AccountFromPrivateKey -from aleph_client.utils import create_archive -from . import synchronous -from .asynchronous import ( - get_fallback_session, - StorageEnum, +from aleph_client.account import _load_account +from aleph_client.conf import settings + +from .commands.container import cli_command as container +from .commands import ( + files, + message, + program, + help_strings, + aggregate, ) -from .conf import settings - -logger = logging.getLogger(__name__) -app = typer.Typer() - -class KindEnum(str, Enum): - json = "json" -def _input_multiline() -> str: - """Prompt the user for a multiline input.""" - echo("Enter/Paste your content. Ctrl-D or Ctrl-Z ( windows ) to save it.") - contents = "" - while True: - try: - line = input() - except EOFError: - break - contents += line + "\n" - return contents - - -def _setup_logging(debug: bool = False): - level = logging.DEBUG if debug else logging.WARNING - logging.basicConfig(level=level) +app = typer.Typer() +app.add_typer(files.app, name="file", help="File uploading and pinning on IPFS and Aleph.im") +app.add_typer(message.app, name="message", help="Post, amend, watch and forget messages on Aleph.im") +app.add_typer(program.app, name="program", help="Upload and update programs on Aleph's VM") +app.add_typer(aggregate.app, name="aggregate", help="Manage aggregate messages on Aleph.im") +app.add_typer(container.app, name="container", help="Upload docker containers as programs on Aleph.im") @app.command() def whoami( - private_key: Optional[str] = settings.PRIVATE_KEY_STRING, - private_key_file: Optional[Path] = settings.PRIVATE_KEY_FILE, + private_key: Optional[str] = typer.Option(settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY), + private_key_file: Optional[Path] = typer.Option(settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE), ): """ Display your public address. """ account: AccountFromPrivateKey = _load_account(private_key, private_key_file) - echo(account.get_public_key()) - - -@app.command() -def post( - path: Optional[Path] = None, - type: str = "test", - ref: Optional[str] = None, - channel: str = settings.DEFAULT_CHANNEL, - private_key: Optional[str] = settings.PRIVATE_KEY_STRING, - private_key_file: Optional[Path] = settings.PRIVATE_KEY_FILE, - debug: bool = False, -): - """Post a message on Aleph.im.""" - - _setup_logging(debug) - - account: AccountFromPrivateKey = _load_account(private_key, private_key_file) - storage_engine: str - content: Dict - - if path: - if not path.is_file(): - echo(f"Error: File not found: '{path}'") - raise typer.Exit(code=1) - - file_size = os.path.getsize(path) - storage_engine = ( - StorageEnum.ipfs if file_size > 4 * 1024 * 1024 else StorageEnum.storage - ) - - with open(path, "r") as fd: - content = json.load(fd) - - else: - content_raw = _input_multiline() - storage_engine = ( - StorageEnum.ipfs - if len(content_raw) > 4 * 1024 * 1024 - else StorageEnum.storage - ) - try: - content = json.loads(content_raw) - except json.decoder.JSONDecodeError: - echo("Not valid JSON") - raise typer.Exit(code=2) - - try: - result: PostMessage = synchronous.create_post( - account=account, - post_content=content, - post_type=type, - ref=ref, - channel=channel, - inline=True, - storage_engine=storage_engine, - ) - echo(result.json(indent=4)) - finally: - # Prevent aiohttp unclosed connector warning - asyncio.run(get_fallback_session().close()) - - -@app.command() -def upload( - path: Path, - channel: str = settings.DEFAULT_CHANNEL, - private_key: Optional[str] = settings.PRIVATE_KEY_STRING, - private_key_file: Optional[Path] = settings.PRIVATE_KEY_FILE, - ref: Optional[str] = None, - debug: bool = False, -): - """Upload and store a file on Aleph.im.""" - - _setup_logging(debug) - - account: AccountFromPrivateKey = _load_account(private_key, private_key_file) - - try: - if not path.is_file(): - echo(f"Error: File not found: '{path}'") - raise typer.Exit(code=1) - - with open(path, "rb") as fd: - logger.debug("Reading file") - # TODO: Read in lazy mode instead of copying everything in memory - file_content = fd.read() - storage_engine = ( - StorageEnum.ipfs - if len(file_content) > 4 * 1024 * 1024 - else StorageEnum.storage - ) - logger.debug("Uploading file") - result: StoreMessage = synchronous.create_store( - account=account, - file_content=file_content, - storage_engine=storage_engine, - channel=channel, - guess_mime_type=True, - ref=ref, - ) - logger.debug("Upload finished") - echo(f"{result.json(indent=4)}") - finally: - # Prevent aiohttp unclosed connector warning - asyncio.run(get_fallback_session().close()) - - -@app.command() -def pin( - hash: str, - channel: str = settings.DEFAULT_CHANNEL, - private_key: Optional[str] = settings.PRIVATE_KEY_STRING, - private_key_file: Optional[Path] = settings.PRIVATE_KEY_FILE, - ref: Optional[str] = None, - debug: bool = False, -): - """Persist a file from IPFS on Aleph.im.""" - - _setup_logging(debug) - - account: AccountFromPrivateKey = _load_account(private_key, private_key_file) - - try: - result: StoreMessage = synchronous.create_store( - account=account, - file_hash=hash, - storage_engine=StorageEnum.ipfs, - channel=channel, - ref=ref, - ) - logger.debug("Upload finished") - echo(f"{result.json(indent=4)}") - finally: - # Prevent aiohttp unclosed connector warning - asyncio.run(get_fallback_session().close()) - - -def yes_no_input(text: str, default: Optional[bool] = None): - while True: - if default is True: - response = input(f"{text} [Y/n] ") - elif default is False: - response = input(f"{text} [y/N] ") - else: - response = input(f"{text} ") - - if response.lower() in ("y", "yes"): - return True - elif response.lower() in ("n", "no"): - return False - elif response == "" and default is not None: - return default - else: - if default is None: - echo("Please enter 'y', 'yes', 'n' or 'no'") - else: - echo("Please enter 'y', 'yes', 'n', 'no' or nothing") - continue - - -def _prompt_for_volumes(): - while yes_no_input("Add volume ?", default=False): - comment = input("Description: ") or None - mount = input("Mount: ") - persistent = yes_no_input("Persist on VM host ?", default=False) - if persistent: - name = input("Volume name: ") - size_mib = int(input("Size in MiB: ")) - yield { - "comment": comment, - "mount": mount, - "name": name, - "persistence": "host", - "size_mib": size_mib, - } - else: - ref = input("Ref: ") - use_latest = yes_no_input("Use latest version ?", default=True) - yield { - "comment": comment, - "mount": mount, - "ref": ref, - "use_latest": use_latest, - } - - -@app.command() -def program( - path: Path, - entrypoint: str, - channel: str = settings.DEFAULT_CHANNEL, - memory: int = settings.DEFAULT_VM_MEMORY, - vcpus: int = settings.DEFAULT_VM_VCPUS, - timeout_seconds: float = settings.DEFAULT_VM_TIMEOUT, - private_key: Optional[str] = settings.PRIVATE_KEY_STRING, - private_key_file: Optional[Path] = settings.PRIVATE_KEY_FILE, - print_messages: bool = False, - print_code_message: bool = False, - print_program_message: bool = False, - runtime: str = None, - beta: bool = False, - debug: bool = False, - persistent: bool = False, -): - """Register a program to run on Aleph.im virtual machines from a zip archive.""" - - _setup_logging(debug) - - path = path.absolute() - - try: - path_object, encoding = create_archive(path) - except BadZipFile: - echo("Invalid zip archive") - raise typer.Exit(3) - except FileNotFoundError: - echo("No such file or directory") - raise typer.Exit(4) - - account: AccountFromPrivateKey = _load_account(private_key, private_key_file) - - runtime = ( - runtime - or input(f"Ref of runtime ? [{settings.DEFAULT_RUNTIME_ID}] ") - or settings.DEFAULT_RUNTIME_ID - ) - - volumes = [] - for volume in _prompt_for_volumes(): - volumes.append(volume) - echo("\n") - - subscriptions: Optional[List[Dict]] - if beta and yes_no_input("Subscribe to messages ?", default=False): - content_raw = _input_multiline() - try: - subscriptions = json.loads(content_raw) - except json.decoder.JSONDecodeError: - echo("Not valid JSON") - raise typer.Exit(code=2) - else: - subscriptions = None - - try: - # Upload the source code - with open(path_object, "rb") as fd: - logger.debug("Reading file") - # TODO: Read in lazy mode instead of copying everything in memory - file_content = fd.read() - storage_engine = ( - StorageEnum.ipfs - if len(file_content) > 4 * 1024 * 1024 - else StorageEnum.storage - ) - logger.debug("Uploading file") - user_code: StoreMessage = synchronous.create_store( - account=account, - file_content=file_content, - storage_engine=storage_engine, - channel=channel, - guess_mime_type=True, - ref=None, - ) - logger.debug("Upload finished") - if print_messages or print_code_message: - echo(f"{user_code.json(indent=4)}") - program_ref = user_code.item_hash - - # Register the program - result: ProgramMessage = synchronous.create_program( - account=account, - program_ref=program_ref, - entrypoint=entrypoint, - runtime=runtime, - storage_engine=StorageEnum.storage, - channel=channel, - memory=memory, - vcpus=vcpus, - timeout_seconds=timeout_seconds, - persistent=persistent, - encoding=encoding, - volumes=volumes, - subscriptions=subscriptions, - ) - logger.debug("Upload finished") - if print_messages or print_program_message: - echo(f"{result.json(indent=4)}") - - hash: str = result.item_hash - hash_base32 = b32encode(b16decode(hash.upper())).strip(b"=").lower().decode() - - echo( - f"Your program has been uploaded on Aleph .\n\n" - "Available on:\n" - f" {settings.VM_URL_PATH.format(hash=hash)}\n" - f" {settings.VM_URL_HOST.format(hash_base32=hash_base32)}\n" - "Visualise on:\n https://explorer.aleph.im/address/" - f"{result.chain}/{result.sender}/message/PROGRAM/{hash}\n" - ) - - finally: - # Prevent aiohttp unclosed connector warning - asyncio.run(get_fallback_session().close()) - - -@app.command() -def update( - hash: str, - path: Path, - private_key: Optional[str] = settings.PRIVATE_KEY_STRING, - private_key_file: Optional[Path] = settings.PRIVATE_KEY_FILE, - print_message: bool = True, - debug: bool = False, -): - """Update the code of an existing program""" - - _setup_logging(debug) - - account = _load_account(private_key, private_key_file) - path = path.absolute() - - try: - program_message: ProgramMessage = synchronous.get_message( - item_hash=hash, message_type=ProgramMessage - ) - code_ref = program_message.content.code.ref - code_message: StoreMessage = synchronous.get_message( - item_hash=code_ref, message_type=StoreMessage - ) - - try: - path, encoding = create_archive(path) - except BadZipFile: - echo("Invalid zip archive") - raise typer.Exit(3) - except FileNotFoundError: - echo("No such file or directory") - raise typer.Exit(4) - - if encoding != program_message.content.code.encoding: - logger.error( - f"Code must be encoded with the same encoding as the previous version " - f"('{encoding}' vs '{program_message.content.code.encoding}'" - ) - raise typer.Exit(1) - - # Upload the source code - with open(path, "rb") as fd: - logger.debug("Reading file") - # TODO: Read in lazy mode instead of copying everything in memory - file_content = fd.read() - logger.debug("Uploading file") - result = synchronous.create_store( - account=account, - file_content=file_content, - storage_engine=code_message.content.item_type, - channel=code_message.channel, - guess_mime_type=True, - ref=code_message.item_hash, - ) - logger.debug("Upload finished") - if print_message: - echo(f"{result.json(indent=4)}") - finally: - # Prevent aiohttp unclosed connector warning - asyncio.run(get_fallback_session().close()) - - -@app.command() -def unpersist( - hash: str, - private_key: Optional[str] = settings.PRIVATE_KEY_STRING, - private_key_file: Optional[Path] = settings.PRIVATE_KEY_FILE, - debug: bool = False, -): - """Stop a persistent virtual machine by making it non-persistent""" - - _setup_logging(debug) - - account = _load_account(private_key, private_key_file) - - existing: MessagesResponse = synchronous.get_messages(hashes=[hash]) - message: ProgramMessage = existing.messages[0] - content: ProgramContent = message.content.copy() - - content.on.persistent = False - content.replaces = message.item_hash - - result = synchronous.submit( - account=account, - content=content.dict(exclude_none=True), - message_type=message.type, - channel=message.channel, - ) - echo(f"{result.json(indent=4)}") - - -@app.command() -def amend( - hash: str, - private_key: Optional[str] = settings.PRIVATE_KEY_STRING, - private_key_file: Optional[Path] = settings.PRIVATE_KEY_FILE, - debug: bool = False, -): - """Amend an existing Aleph message.""" - - _setup_logging(debug) - - account: AccountFromPrivateKey = _load_account(private_key, private_key_file) - - existing_message: AlephMessage = synchronous.get_message(item_hash=hash) - - editor: str = os.getenv("EDITOR", default="nano") - with tempfile.NamedTemporaryFile(suffix="json") as fd: - # Fill in message template - fd.write(existing_message.content.json(indent=4).encode()) - fd.seek(0) - - # Launch editor - subprocess.run([editor, fd.name], check=True) - - # Read new message - fd.seek(0) - new_content_json = fd.read() - - content_type = type(existing_message).__annotations__["content"] - new_content_dict = json.loads(new_content_json) - new_content = content_type(**new_content_dict) - new_content.ref = existing_message.item_hash - echo(new_content) - result = synchronous.submit( - account=account, - content=new_content.dict(), - message_type=existing_message.type, - channel=existing_message.channel, - ) - echo(f"{result.json(indent=4)}") - - -def forget_messages( - account: AccountFromPrivateKey, - hashes: List[str], - reason: Optional[str], - channel: str, -): - try: - result: ForgetMessage = synchronous.forget( - account=account, - hashes=hashes, - reason=reason, - channel=channel, - ) - echo(f"{result.json(indent=4)}") - finally: - # Prevent aiohttp unclosed connector warning - asyncio.run(get_fallback_session().close()) - - -@app.command() -def forget( - hashes: str, - reason: Optional[str] = None, - channel: str = settings.DEFAULT_CHANNEL, - private_key: Optional[str] = settings.PRIVATE_KEY_STRING, - private_key_file: Optional[Path] = settings.PRIVATE_KEY_FILE, - debug: bool = False, -): - """Forget an existing Aleph message.""" - - _setup_logging(debug) - - account: AccountFromPrivateKey = _load_account(private_key, private_key_file) - - hash_list: List[str] = hashes.split(",") - forget_messages(account, hash_list, reason, channel) - - -@app.command() -def forget_aggregate( - key: str, - reason: Optional[str] = None, - channel: str = settings.DEFAULT_CHANNEL, - private_key: Optional[str] = settings.PRIVATE_KEY_STRING, - private_key_file: Optional[Path] = settings.PRIVATE_KEY_FILE, - debug: bool = False, -): - """Forget all the messages composing an aggregate.""" - - _setup_logging(debug) - - account: AccountFromPrivateKey = _load_account(private_key, private_key_file) - - message_response = synchronous.get_messages( - addresses=[account.get_address()], - message_type=MessageType.aggregate.value, - content_keys=[key], - ) - hash_list = [message["item_hash"] for message in message_response["messages"]] - forget_messages(account, hash_list, reason, channel) - - -@app.command() -def watch( - ref: str, - indent: Optional[int] = None, - debug: bool = False, -): - """Watch a hash for amends and print amend hashes""" - - _setup_logging(debug) - - original: AlephMessage = synchronous.get_message(item_hash=ref) - - for message in synchronous.watch_messages( - refs=[ref], addresses=[original.content.address] - ): - echo(f"{message.json(indent=indent)}") - + typer.echo(account.get_public_key()) if __name__ == "__main__": app() diff --git a/src/aleph_client/commands/aggregate.py b/src/aleph_client/commands/aggregate.py new file mode 100644 index 00000000..ce9f3cc4 --- /dev/null +++ b/src/aleph_client/commands/aggregate.py @@ -0,0 +1,40 @@ +import typer +from typing import Optional +from aleph_client.types import AccountFromPrivateKey +from aleph_client.account import _load_account +from aleph_client.conf import settings +from pathlib import Path +from aleph_client import synchronous +from aleph_client.commands import help_strings + +from aleph_client.commands.message import forget_messages + +from aleph_client.commands.utils import setup_logging + +from aleph_message.models import MessageType + +app = typer.Typer() + +@app.command() +def forget( + key: str = typer.Argument(..., help="Aggregate item hash to be removed."), + reason: Optional[str] = typer.Option(None, help="A description of why the messages are being forgotten"), + channel: str = typer.Option(settings.DEFAULT_CHANNEL, help=help_strings.CHANNEL), + private_key: Optional[str] = typer.Option(settings.PRIVATE_KEY_STRING, help = help_strings.PRIVATE_KEY), + private_key_file: Optional[Path] = typer.Option(settings.PRIVATE_KEY_FILE, help = help_strings.PRIVATE_KEY_FILE), + debug: bool = False, +): + """Forget all the messages composing an aggregate.""" + + setup_logging(debug) + + account: AccountFromPrivateKey = _load_account(private_key, private_key_file) + + message_response = synchronous.get_messages( + addresses=[account.get_address()], + message_type=MessageType.aggregate.value, + content_keys=[key], + ) + hash_list = [message["item_hash"] for message in message_response["messages"]] + forget_messages(account, hash_list, reason, channel) + diff --git a/src/aleph_client/commands/container/.gitignore b/src/aleph_client/commands/container/.gitignore new file mode 100644 index 00000000..55720316 --- /dev/null +++ b/src/aleph_client/commands/container/.gitignore @@ -0,0 +1,2 @@ +tests/test-image.tar +docker-data diff --git a/src/aleph_client/commands/container/cli_command.py b/src/aleph_client/commands/container/cli_command.py new file mode 100644 index 00000000..732ac9df --- /dev/null +++ b/src/aleph_client/commands/container/cli_command.py @@ -0,0 +1,228 @@ +import typer +import os +import json +import logging +import asyncio + +from pathlib import Path +from typing import Optional, Dict, List +from base64 import b32encode, b16decode +from typing import Optional, Dict, List + +from aleph_message.models import ( + StoreMessage, + ProgramMessage +) + +from aleph_client import synchronous +from aleph_client.account import _load_account, AccountFromPrivateKey +from aleph_client.conf import settings +from aleph_message.models.program import Encoding # type: ignore +from aleph_client.commands import help_strings +from aleph_client.account import _load_account + +from aleph_client.asynchronous import ( + get_fallback_session, + StorageEnum, +) + +from aleph_client.commands.utils import ( + yes_no_input, + input_multiline, + prompt_for_volumes, + yes_no_input +) + + + +from aleph_client.commands.container.save import save_tar +from aleph_client.commands.container.utils import create_container_volume + +logger = logging.getLogger(__name__) +app = typer.Typer() + +def upload_file( + path: str, + account: AccountFromPrivateKey, + channel: str, + print_messages: bool = False, + print_code_message: bool = False +) -> StoreMessage: + with open(path, "rb") as fd: + logger.debug("Reading file") + # TODO: Read in lazy mode instead of copying everything in memory + file_content = fd.read() + storage_engine = ( + StorageEnum.ipfs + if len(file_content) > 4 * 1024 * 1024 + else StorageEnum.storage + ) + logger.debug("Uploading file") + result: StoreMessage = synchronous.create_store( + account=account, + file_content=file_content, + storage_engine=storage_engine, + channel=channel, + guess_mime_type=True, + ref=None, + ) + logger.debug("Upload finished") + if print_messages or print_code_message: + typer.echo(f"{json.dumps(result, indent=4)}") + return result + +def MutuallyExclusiveBoolean(): + marker = None + def callback(ctx: typer.Context, param: typer.CallbackParam, value: str): + # Add cli option to group if it was called with a value + nonlocal marker + if value is False: + return value + if marker is None: + marker = param.name + if param.name != marker: + raise typer.BadParameter( + f"{param.name} is mutually exclusive with {marker}") + return value + return callback + +exclusivity_callback = MutuallyExclusiveBoolean() + +@app.command() +def upload( + image: str = typer.Argument(..., help="Path to an image archive exported with docker save."), + path: str = typer.Argument(..., metavar="SCRIPT", help="A small script to start your container with parameters"), + from_remote: bool = typer.Option(False, "--from-remote", help=" If --from-remote, IMAGE is a registry to pull the image from. e.g: library/alpine, library/ubuntu:latest", callback=exclusivity_callback), + from_daemon: bool = typer.Option(False, "--from-daemon", help=" If --from-daemon, IMAGE is an image in local docker deamon storage. You need docker installed for this command", callback=exclusivity_callback), + from_created: bool = typer.Option(False, "--from-created", help=" If --from-created, IMAGE the path to a file created with 'aleph container create'", callback=exclusivity_callback), + channel: str = typer.Option(settings.DEFAULT_CHANNEL, help=help_strings.CHANNEL), + memory: int = typer.Option(settings.DEFAULT_VM_MEMORY, help="Maximum memory allocation on vm in MiB"), + vcpus: int = typer.Option(settings.DEFAULT_VM_VCPUS, help="Number of virtual cpus to allocate."), + timeout_seconds: float = typer.Option(settings.DEFAULT_VM_TIMEOUT, help="If vm is not called after [timeout_seconds] it will shutdown"), + private_key: Optional[str] = typer.Option(settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY), + private_key_file: Optional[Path] = typer.Option(settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE), + docker_mountpoint: Optional[Path] = typer.Option(settings.DEFAULT_DOCKER_VOLUME_MOUNTPOINT, "--docker-mountpoint", help="The path where the created docker image volume will be mounted"), + optimize: bool = typer.Option(True, help="Activate volume size optimization"), + print_messages: bool = typer.Option(False), + print_code_message: bool = typer.Option(False), + print_program_message: bool = typer.Option(False), + beta: bool = False, +): + """ + Deploy a docker container on Aleph virtual machines. + Unless otherwise specified, you don't need docker on your machine to run this command. + """ + typer.echo("Preparing image for vm runtime") + docker_data_path=image + if not from_created: + docker_data_path = os.path.abspath("docker-data") + try: + create_container_volume(image, docker_data_path, from_remote, from_daemon, optimize, settings) + except Exception as e: + typer.echo(e) + raise typer.Exit(1) + assert os.path.isfile(docker_data_path) + encoding = Encoding.squashfs + path = os.path.abspath(path) + entrypoint = path + + account = _load_account(private_key, private_key_file) + + + volumes = [] + for volume in prompt_for_volumes(): + volumes.append(volume) + print() + + subscriptions: Optional[List[Dict]] + if beta and yes_no_input("Subscribe to messages ?", default=False): + content_raw = input_multiline() + try: + subscriptions = json.loads(content_raw) + except json.decoder.JSONDecodeError: + typer.echo("Not valid JSON") + raise typer.Exit(code=2) + else: + subscriptions = None + + try: + docker_upload_layers_result: StoreMessage = upload_file(f"{docker_data_path}/layers", account, channel, print_messages, print_code_message) + docker_upload_metadata_result: StoreMessage = upload_file(f"{docker_data_path}/metadata", account, channel, print_messages, print_code_message) + typer.echo(f"Docker image layers upload message address: {docker_upload_layers_result.item_hash}") + typer.echo(f"Docker image metadata upload message address: {docker_upload_metadata_result.item_hash}") + + volumes.append({ + "comment": "Docker image layers", + "mount": f"{str(docker_mountpoint)}/layers", + "ref": str(docker_upload_layers_result.item_hash), + "use_latest": True, + }) + + volumes.append({ + "comment": "Docker image metadata", + "mount": f"{str(docker_mountpoint)}/metadata", + "ref": str(docker_upload_metadata_result.item_hash), + "use_latest": True, + }) + + + program_result: StoreMessage = upload_file(path, account, channel, print_messages, print_code_message) + + # Register the program + result: ProgramMessage = synchronous.create_program( + account=account, + program_ref=program_result.item_hash, + entrypoint=entrypoint, + runtime=settings.DEFAULT_DOCKER_RUNTIME_ID, + storage_engine=StorageEnum.storage, + channel=channel, + memory=memory, + vcpus=vcpus, + timeout_seconds=timeout_seconds, + encoding=encoding, + volumes=volumes, + subscriptions=subscriptions, + environment_variables={ + "DOCKER_MOUNTPOINT": str(docker_mountpoint) + } + ) + logger.debug("Upload finished") + if print_messages or print_program_message: + typer.echo(f"{json.dumps(result, indent=4)}") + + hash: str = result.item_hash + hash_base32 = b32encode(b16decode(hash.upper())).strip(b"=").lower().decode() + + + typer.echo( + f"Your program has been uploaded on Aleph .\n\n" + "Available on:\n" + f" {settings.VM_URL_PATH.format(hash=hash)}\n" + f" {settings.VM_URL_HOST.format(hash_base32=hash_base32)}\n" + "Visualise on:\n https://explorer.aleph.im/address/" + f"{result.chain}/{result.sender}/message/PROGRAM/{hash}\n" + ) + + finally: + # Prevent aiohttp unclosed connector warning + asyncio.get_event_loop().run_until_complete(get_fallback_session().close()) + +@app.command() +def create( + image: str = typer.Argument(..., help="Path to an image archive exported with docker save."), + output: str = typer.Argument(..., help="The path where you want "), + from_remote: bool = typer.Option(False, "--from-remote", help=" If --from-remote, IMAGE is a registry to pull the image from. e.g: library/alpine, library/ubuntu:latest", callback=exclusivity_callback), + from_daemon: bool = typer.Option(False, "--from-daemon", help=" If --from-daemon, IMAGE is an image in local docker deamon storage. You need docker installed for this command", callback=exclusivity_callback), + optimize: bool = typer.Option(True, help="Activate volume size optimization"), +): + """ + Use a docker image to create an Aleph compatible image on your local machine. + You can later upload it with 'aleph container upload --from-' + """ + try: + create_container_volume(image, output, from_remote, from_daemon, optimize, settings) + typer.echo(f"Container volume created at {output}") + except Exception as e: + typer.echo(e) + raise typer.Exit(1) + return \ No newline at end of file diff --git a/src/aleph_client/commands/container/docker_conf.py b/src/aleph_client/commands/container/docker_conf.py new file mode 100644 index 00000000..f434cc3c --- /dev/null +++ b/src/aleph_client/commands/container/docker_conf.py @@ -0,0 +1,43 @@ +from dataclasses import dataclass +from typing import Dict, NewType, Union +from enum import Enum +from shutil import which + +class StorageDriverEnum(Enum): + VFS = 1 + # OVERLAY2 = 2 + + +@dataclass +class VFSSettings: + optimize: bool = True # Keep only last layer and delete previous ones + use_tarsplit: bool = which("tar-split") is not None and which("tar") is not None + +DriverConf = NewType("DriverConf", VFSSettings) # Use Union to accomodate new features + +drivers_conf: Dict[StorageDriverEnum, DriverConf] = { + StorageDriverEnum.VFS: VFSSettings() +} + + +@dataclass +class StorageDriverSettings: + kind: StorageDriverEnum + conf: DriverConf + + def __init__(self, kind: StorageDriverEnum): + self.kind = kind + self.conf = drivers_conf[kind] + + +@dataclass +class DockerSettings: + storage_driver: StorageDriverSettings + populate: bool + +docker_settings = DockerSettings( + storage_driver=StorageDriverSettings( + kind=StorageDriverEnum.VFS + ), + populate=False +) \ No newline at end of file diff --git a/src/aleph_client/commands/container/image.py b/src/aleph_client/commands/container/image.py new file mode 100644 index 00000000..2a478ec1 --- /dev/null +++ b/src/aleph_client/commands/container/image.py @@ -0,0 +1,76 @@ +from typing import List, Dict, Union, NewType +import os +import tarfile +from tarfile import TarFile +import json +from hashlib import sha256 + +Command = NewType('Command', Dict[str, str]) +ConfigValue = NewType('ConfigValue', Union[str, bool, None, Command]) + + +def compute_chain_ids(diff_ids: List[str], layers_ids: List[str]) -> List[str]: + # diff_ids are stored sequentially, from parent to child. + # If the file has been tempered, this method cannot work. + # ChainID(A) = DiffID(A) + # ChainID(A|B) = Digest(ChainID(A) + " " + DiffID(B)) + # ChainID(A|B|C) = Digest(ChainID(A|B) + " " + DiffID(C)) + # https://github.com/opencontainers/image-spec/blob/main/config.md + index = 0 + chain_ids = [] + diff_id = diff_ids[index] + chain_ids.append(diff_id) + index += 1 + while index < len(layers_ids): + chain_id = "sha256:" + sha256( + chain_ids[index - 1].encode() + + " ".encode() + + diff_ids[index].encode() + ).hexdigest() + chain_ids.append(chain_id) + index += 1 + return chain_ids + + +class Image: + config: Dict[str, ConfigValue] + image_digest: str + repositories: Dict[str, object] + archive_path: str + + # Parent at index 0, child at len(list) - 1 + layers_ids: List[str] + chain_ids: List[str] + diff_ids: List[str] + + def to_dict(self): + return self.__dict__ + + def get_tar_filenames(tar: TarFile) -> List[str]: + files = tar.getmembers() + filenames = [] + for file in files: + filenames.append(file.get_info()["name"]) + return filenames + + def __load_metadata(self, tar: TarFile, file: str) -> Dict[str, str]: + return json.load(tar.extractfile(file)) + + def __init__(self, path: str): + if not os.path.exists(path): + raise ValueError("File does not exist") + if not tarfile.is_tarfile(path): + raise ValueError("Invalid tar archive") + self.archive_path = path + with tarfile.open(self.archive_path, "r") as tar: + manifest = self.__load_metadata(tar, "manifest.json") + self.repositories = self.__load_metadata(tar, "repositories") + self.image_digest = manifest[0]["Config"].split(".")[0] + self.config = self.__load_metadata( + tar, f"{self.image_digest}.json") + self.layers_ids = list(map( + lambda name: name.split('/')[0], + manifest[0]["Layers"] + )) # Only keep the Layer id, not the file path + self.diff_ids = self.config["rootfs"]["diff_ids"] + self.chain_ids = compute_chain_ids(self.diff_ids, self.layers_ids) diff --git a/src/aleph_client/commands/container/save.py b/src/aleph_client/commands/container/save.py new file mode 100644 index 00000000..cf27dd46 --- /dev/null +++ b/src/aleph_client/commands/container/save.py @@ -0,0 +1,47 @@ +import sys +from aleph_client.commands.container.image import Image +from aleph_client.commands.container.storage_drivers import create_storage_driver +import os +from shutil import rmtree +from aleph_client.commands.container.docker_conf import docker_settings, DockerSettings + +dirs = { + "vfs": 0o710, + "image": 0o700, + "plugins": 0o700, + "swarm": 0o700, + "runtimes": 0o700, + "network": 0o750, + "trust": 0o700, + "volumes": 0o701, + "buildkit": 0o711, + "containers": 0o710, + "tmp": 0o700, +} + + +def populate_dir(output_path: str): + print("populating") + path = os.path.abspath(output_path) + if os.path.exists(output_path) and os.path.isdir(output_path): + try: + rmtree(output_path) + except: + raise "" # TODO: handle error + os.makedirs(output_path, 0o710) + for d, mode in dirs.items(): + os.makedirs(os.path.join(path, d), mode) + + +def save_tar(archive_path: str, output_path: str, settings: DockerSettings): + archive_path = os.path.abspath(archive_path) + output_path = os.path.abspath(output_path) + image = Image(archive_path) + if settings.populate: + populate_dir(output_path) + driver = create_storage_driver(image, output_path, settings) + driver.create_file_architecture() + + +if __name__ == "__main__": + save_tar(sys.argv[1], sys.argv[2], docker_settings) diff --git a/src/aleph_client/commands/container/storage_drivers.py b/src/aleph_client/commands/container/storage_drivers.py new file mode 100644 index 00000000..c47bce0b --- /dev/null +++ b/src/aleph_client/commands/container/storage_drivers.py @@ -0,0 +1,263 @@ +import tarfile +from typing import Dict, List +from .image import Image +import os +import json +from uuid import uuid4 +import tarfile +import subprocess +from shutil import rmtree +import tempfile +import gzip +from .docker_conf import DockerSettings, StorageDriverEnum + + +class IStorageDriver: + def create_file_architecture(self): + """ + Reproduce the /var/lib/docker needed files in output_dir based on an image object. + """ + pass + + +class AStorageDriver(IStorageDriver): + + image: Image + output_dir: str + layer_ids_dict: Dict[str, str] + driver_dir: str + + def __init__(self, image: Image, output_dir: str, driver_dir: str): + self.image = image + self.output_dir = output_dir + self.layer_ids_dict = {} + self.driver_dir = driver_dir + + def create_file_architecture(self): + path = os.path.join(self.output_dir, "image", self.driver_dir) + os.makedirs(path, 0o700) + self.create_distribution(path) + self.create_repositories_json(path) + self.create_imagedb(path) + self.create_layerdb(self.output_dir) + return + + def create_repositories_json(self, output_dir: str): + """ + Reproduce /var/lib/docker/image/{storage_driver}/repositories.json + in output_dir based on an image object. + """ + raise NotImplementedError(f"You must implement this method") + + def create_imagedb(self, output_dir: str): + """ + Reproduce /var/lib/docker/image/{storage_driver}/imagedb + in output_dir based on an image object. + """ + raise NotImplementedError(f"You must implement this method") + + def create_layerdb(self, output_dir: str): + """ + Reproduce /var/lib/docker/image/{storage_driver}/layerdb + in output_dir based on an image object after extracting the layers + to {output_dir}/{storage_driver}. + """ + raise NotImplementedError(f"You must implement this method") + + def create_distribution(self, output_dir: str): + """ + Reproduce /var/lib/docker/image/{storage_driver}/disctibution + in output_dir based on an image object. + """ + raise NotImplementedError(f"You must implement this method") + + def optimize(self, output_dir: str): + """ + Reproduce /var/lib/docker/image/{storage_driver}/disctibution + in output_dir based on an image object. + """ + raise NotImplementedError(f"You must implement this method") + + +# Since aleph vms can be running with an unknown host configuration, +# storage drivers can be different from a machine to an other. +# Although not the most performant one, VFS is the most compatible +# storage driver, hence the use of it. +# Future use of an other storage driver such as Overlay2 might become +# available as compatibility checks are done on vms +class Vfs(AStorageDriver): + + def __init__(self, image: Image, output_dir: str, settings: DockerSettings): + super().__init__(image, output_dir, "vfs") + self.optimize = settings.storage_driver.conf.optimize + self.use_tarsplit = settings.storage_driver.conf.use_tarsplit + + def create_distribution(self, output_dir: str): + os.makedirs(os.path.join(output_dir, "distribution"), 0o700) + + def create_repositories_json(self, output_dir: str): + repositories = {} + for name, tags in self.image.repositories.items(): + repositories[name] = {} + for tag in tags.keys(): + repositories[name][f"{name}:{tag}"] = f"sha256:{self.image.image_digest}" + repositories = {"Repositories": repositories} + path = os.path.join(output_dir, "repositories.json") + with open(path, "w") as f: + f.write(json.dumps(repositories, separators=(',', ':'))) + os.chmod(path, 0o0600) + + def create_imagedb(self, output_dir: str): + os.makedirs(os.path.join(output_dir, "imagedb"), 0o700) + os.makedirs(os.path.join(output_dir, "imagedb", "content"), 0o700) + os.makedirs(os.path.join(output_dir, "imagedb", "metadata"), 0o700) + os.makedirs(os.path.join(output_dir, "imagedb", + "content", "sha256"), 0o700) + os.makedirs(os.path.join(output_dir, "imagedb", + "metadata", "sha256"), 0o700) + # os.makedirs(os.path.join(metadata, self.image.image_digest)) + content = os.path.join(output_dir, "imagedb", "content", "sha256") + path = os.path.join(content, self.image.image_digest) + with open(path, "w") as f: + # This file must be dumped compactly in order to keep the correct sha256 digest + f.write(json.dumps(self.image.config, separators=(',', ':'))) + os.chmod(path, 0o0600) + # with open(os.path.join(metadata, self.image.image_digest, "parent"), "w") as f: + # f.write(self.image.config['config']['Image']) + return + + def create_layerdb(self, output_dir: str): + assert ( + len(self.image.chain_ids) == len(self.image.diff_ids) + and len(self.image.diff_ids) == len(self.image.layers_ids) + ) + layers_dir = os.path.join(output_dir, "vfs", "dir") + layerdb_path = os.path.join(output_dir, "image", "vfs", "layerdb") + os.makedirs(layerdb_path, 0o700) + os.makedirs(os.path.join(layerdb_path, "mounts"), 0o700) + os.makedirs(os.path.join(layerdb_path, "tmp"), 0o700) + layerdb_path = os.path.join(layerdb_path, "sha256") + os.makedirs(layerdb_path, 0o700) + + def save_layer_metadata(path: str, diff: str, cacheid: str, size: int, previous_chain_id: str or None): + dest = os.path.join(path, "diff") + with open(dest, "w") as fd: + fd.write(diff) + os.chmod(dest, 0o600) + dest = os.path.join(path, "cache-id") + with open(dest, "w") as fd: + fd.write(cacheid) + os.chmod(dest, 0o600) + dest = os.path.join(path, "size") + with open(dest, "w") as fd: + fd.write(str(size)) + os.chmod(dest, 0o600) + dest = os.path.join(path, "parent") + if previous_chain_id is not None: + with open(dest, "w") as fd: + fd.write(previous_chain_id) + os.chmod(dest, 0o600) + + def copy_layer(src: str, dest: str) -> None: + for folder in os.listdir(src): + subprocess.check_output( + ["cp", "-r", os.path.join(src, folder), dest]) + + def compute_layer_size(tar_data_json_path: str) -> int: + size = 0 + with gzip.open(tar_data_json_path, "r") as archive: + data = json.loads( + "[" + + archive.read().decode().replace("}\n{", "},\n{") + + "]" + ) # fixes poor formatting from tar-split + for elem in data: + if "size" in elem.keys(): + size = + elem["size"] + return size + + def remove_unused_layers(layers_dir: str, keep: List[str]): + return + + def extract_layer(path: str, archive_path: str, layerdb_subdir: str) -> int: + cwd = os.getcwd() + tmp_dir = tempfile.mkdtemp() + os.chdir(tmp_dir) + tar_src = os.path.join(tmp_dir, "layer.tar") + tar_dest = os.path.join(layer_id, "layer.tar") + with tarfile.open(archive_path, "r") as tar: + tar.extract(tar_dest) + os.rename(tar_dest, tar_src) + os.rmdir(layer_id) + os.chdir(path) + + # tar-split is used by docker to keep some archive metadata + # in order to compress the layer back with the exact same digest + # Mandatory if one plans to export a docker image to a tar file + # https://github.com/vbatts/tar-split + if self.use_tarsplit: + tar_data_json = os.path.join( + layerdb_subdir, "tar-split.json.gz") + os.system( + f"tar-split disasm --output {tar_data_json} {tar_src} | tar -C . -x") + # Differs from expected. Only messes with docker image size listing + size = compute_layer_size(tar_data_json) + os.remove(tar_src) + + # Also works, but won't be able to export images + else: + with tarfile.open(tar_src, "r") as tar: + os.remove(tar_src) + tar.extractall() + size = 0 + os.rmdir(tmp_dir) + os.chdir(cwd) + return size + + previous_cache_id = None + for i in range(0, len(self.image.chain_ids)): + chain_id = self.image.chain_ids[i] + layerdb_subdir = os.path.join( + layerdb_path, chain_id.replace("sha256:", "")) + os.makedirs(layerdb_subdir, 0o700) + cache_id = (str(uuid4()) + str(uuid4())).replace("-", "") + + layer_id = self.image.layers_ids[i] + current_layer_path = os.path.join(layers_dir, cache_id) + os.makedirs(current_layer_path, 0o700) + + # Merge layers + # The last layer contains changes from all the previous ones + if previous_cache_id: + previous_layer_path = os.path.join( + layers_dir, previous_cache_id) + copy_layer(previous_layer_path, current_layer_path) + if (self.optimize): + rmtree(previous_layer_path) + previous_cache_id = cache_id + size = extract_layer(current_layer_path, + self.image.archive_path, layerdb_subdir) + save_layer_metadata( + path=layerdb_subdir, + diff=self.image.diff_ids[i], + cacheid=cache_id, + size=size, + previous_chain_id=self.image.chain_ids[i - 1] + if i > 0 + else None + ) + if self.optimize: + layer_to_keep = os.path.join( + layers_dir, previous_cache_id + ) + remove_unused_layers(layers_dir, layer_to_keep) + + +def create_storage_driver( + image: Image, + output_dir: str, + settings: DockerSettings +) -> IStorageDriver: + if settings.storage_driver.kind == StorageDriverEnum.VFS: + return Vfs(image, output_dir, settings) + raise NotImplementedError("Only vfs supported now") diff --git a/src/aleph_client/commands/container/test_data/Dockerfile b/src/aleph_client/commands/container/test_data/Dockerfile new file mode 100644 index 00000000..aed8f85b --- /dev/null +++ b/src/aleph_client/commands/container/test_data/Dockerfile @@ -0,0 +1,7 @@ +FROM alpine:latest + +WORKDIR /app +RUN touch file.txt +RUN mkdir folder.d + +CMD ["/bin/sh"] \ No newline at end of file diff --git a/src/aleph_client/commands/container/test_data/create_tar_image.sh b/src/aleph_client/commands/container/test_data/create_tar_image.sh new file mode 100644 index 00000000..2f4175c2 --- /dev/null +++ b/src/aleph_client/commands/container/test_data/create_tar_image.sh @@ -0,0 +1,4 @@ +#!/bin/sh + +docker build -t test-image . +docker save test-image > test-image.tar || rm test-image.tar diff --git a/src/aleph_client/commands/container/test_data/custom-dockerd b/src/aleph_client/commands/container/test_data/custom-dockerd new file mode 100644 index 00000000..7a672d9c --- /dev/null +++ b/src/aleph_client/commands/container/test_data/custom-dockerd @@ -0,0 +1,5 @@ +#!/bin/sh +systemctl stop docker.service +dockerd --data-root $(pwd)/docker-data +systemctl restart docker.socket +systemctl start docker.service diff --git a/src/aleph_client/commands/container/tests.py b/src/aleph_client/commands/container/tests.py new file mode 100644 index 00000000..fbde327a --- /dev/null +++ b/src/aleph_client/commands/container/tests.py @@ -0,0 +1,233 @@ +import unittest +import os +import time +from typing import List +import filecmp +import subprocess +from shutil import rmtree +from aleph_client.commands.container.save import save_tar +from aleph_client.commands.container.docker_conf import docker_settings as settings + +TEST_DIR = os.path.abspath("test_data") +DOCKER_DATA = os.path.join(TEST_DIR, "docker") +IMAGE_NAME = "test-image" +TEST_DOCKER_DATA = os.path.join(TEST_DIR, "docker.emulate") +IMAGE_ARCHIVE = os.path.join(TEST_DIR, f"{IMAGE_NAME}.tar") + +# TODO: setup for following test cases: +# - VFS optimization is turned on +# - tar-split is not used + + +def compare_folders_content(folder1: str, folder2: str): + dcmp = filecmp.dircmp(folder1, folder2) + + def recursive_cmp(dcmp): + diff = dcmp.left_only + dcmp.right_only + dcmp.diff_files + for sub_dcmp in dcmp.subdirs.values(): + diff += recursive_cmp(sub_dcmp) + + return diff + + return recursive_cmp(dcmp) + + +docker_daemon: subprocess.Popen = None + + +class TestLoadImage(unittest.TestCase): + + @classmethod + def setUpClass(cls) -> None: + def cleanup_docker(): + os.system(f"rm -rf {DOCKER_DATA}") + os.system("systemctl stop docker.service") + cls.docker_daemon = subprocess.Popen( + ["dockerd", "--data-root", DOCKER_DATA, "--storage-driver=vfs"], stderr=subprocess.DEVNULL) + time.sleep(3) + + def build_test_image() -> bool: + if os.path.exists(IMAGE_ARCHIVE): + return True + return ( + os.system(f"docker build -t {IMAGE_NAME} {TEST_DIR}") == 0 + and os.system(f"docker save {IMAGE_NAME} > {IMAGE_ARCHIVE}") == 0 + ) + + def load_image(): + os.system(f"docker load -i {IMAGE_ARCHIVE}") + + if not build_test_image(): + raise Exception("Could not properly build imaqge") + cleanup_docker() + load_image() + settings.storage_driver.conf.optimize = False + save_tar(IMAGE_ARCHIVE, TEST_DOCKER_DATA, settings) + + @classmethod + def tearDownClass(cls) -> None: + rmtree(TEST_DOCKER_DATA) + rmtree(DOCKER_DATA) + if cls.docker_daemon is not None: + print("KILLING DOCKERD") + cls.docker_daemon.kill() + time.sleep(3) + os.system("systemctl restart docker.socket") + time.sleep(3) + os.system("systemctl restart docker.service") + + def test_dir_creation(self) -> None: + self.assertTrue(os.path.isdir(f"{TEST_DOCKER_DATA}")) + + def folder_cmp(self, expected_path: str, result_path: str) -> List[bool]: + res = [] + expected_result = os.listdir(expected_path) + result = os.listdir(result_path) + if not settings.storage_driver.conf.use_tarsplit: + expected_result = list(filter( + lambda result: result != "tar-split.json.gz", expected_result)) + self.assertEqual(len(expected_result), len(result)) + for folder in expected_result: + res.append(folder in result) + return res + + def permissions_cmp(self, expected_path, actual_path): + res = [] + expected_files = os.listdir(expected_path) + for f in expected_files: + expected_mode = os.stat(os.path.join(expected_path, f)).st_mode + actual_mode = os.stat(os.path.join(actual_path, f)).st_mode + res.append(expected_mode == actual_mode) + return res + + def test_docker_dir_same(self) -> None: + for res in self.folder_cmp(DOCKER_DATA, TEST_DOCKER_DATA): + self.assertTrue(res) + for res in self.permissions_cmp(DOCKER_DATA, TEST_DOCKER_DATA): + self.assertTrue(res) + + def test_docker_image_dir_same(self) -> None: + for res in self.folder_cmp( + os.path.join(DOCKER_DATA, "image"), + os.path.join(TEST_DOCKER_DATA, "image") + ): + self.assertTrue(res) + for res in self.permissions_cmp( + os.path.join(DOCKER_DATA, "image"), + os.path.join(TEST_DOCKER_DATA, "image") + ): + self.assertTrue(res) + + def test_docker_image_vfs_dir_same(self) -> None: + for res in self.folder_cmp( + os.path.join(DOCKER_DATA, "image", "vfs"), + os.path.join(TEST_DOCKER_DATA, "image", "vfs") + ): + self.assertTrue(res) + for res in self.permissions_cmp( + os.path.join(DOCKER_DATA, "image", "vfs"), + os.path.join(TEST_DOCKER_DATA, "image", "vfs") + ): + self.assertTrue(res) + + def test_compare_repositories_json(self) -> None: + path = os.path.join("image", "vfs", "repositories.json") + expected_result_path = os.path.join(DOCKER_DATA, path) + result_path = os.path.join(TEST_DOCKER_DATA, path) + self.assertTrue(filecmp.cmp(expected_result_path, result_path)) + + def test_imagedb_same(self) -> None: + for res in self.folder_cmp( + os.path.join(DOCKER_DATA, "image", "vfs", "imagedb"), + os.path.join(TEST_DOCKER_DATA, "image", "vfs", "imagedb") + ): + self.assertTrue(res) + for res in self.permissions_cmp( + os.path.join(DOCKER_DATA, "image", "vfs", "imagedb"), + os.path.join(TEST_DOCKER_DATA, "image", "vfs", "imagedb") + ): + self.assertTrue(res) + + def test_imagedb_content_same(self) -> None: + path = os.path.join("image", "vfs", "imagedb", "content", "sha256") + for res in self.folder_cmp( + os.path.join(DOCKER_DATA, path), + os.path.join(TEST_DOCKER_DATA, path) + ): + self.assertTrue(res) + + for res in self.permissions_cmp( + os.path.join(DOCKER_DATA, path), + os.path.join(TEST_DOCKER_DATA, path) + ): + self.assertTrue(res) + + def test_imagedb_meta_same(self) -> None: + path = os.path.join("image", "vfs", "imagedb", "metadata", "sha256") + for res in self.folder_cmp( + os.path.join(DOCKER_DATA, path), + os.path.join(TEST_DOCKER_DATA, path) + ): + self.assertTrue(res) + for res in self.permissions_cmp( + os.path.join(DOCKER_DATA, path), + os.path.join(TEST_DOCKER_DATA, path) + ): + self.assertTrue(res) + + def test_compare_imagedb_files(self) -> None: + path = os.path.join("image", "vfs", "imagedb", "content", "sha256") + expected_result_dir = os.path.join(DOCKER_DATA, path) + result_dir = os.path.join(TEST_DOCKER_DATA, path) + for f in os.listdir(expected_result_dir): + result_file = os.path.join(result_dir, f) + expected_result_file = os.path.join(expected_result_dir, f) + self.assertTrue(filecmp.cmp(expected_result_file, result_file)) + + def test_compare_layerdb_same(self) -> None: + path = os.path.join("image", "vfs", "layerdb", "sha256") + for res in self.folder_cmp( + os.path.join(DOCKER_DATA, path), + os.path.join(TEST_DOCKER_DATA, path) + ): + self.assertTrue(res) + for res in self.permissions_cmp( + os.path.join(DOCKER_DATA, path), + os.path.join(TEST_DOCKER_DATA, path) + ): + self.assertTrue(res) + + def test_compare_layerdb_files(self) -> None: + path = os.path.join("image", "vfs", "layerdb", "sha256") + for folder in os.listdir(os.path.join(DOCKER_DATA, path)): + for f in os.listdir(os.path.join(DOCKER_DATA, path, folder)): + if f == "tar-split.json.gz" and not settings.storage_driver.conf.use_tarsplit: + continue + if f == "size": # not ready yet + continue + result_file = os.path.join(TEST_DOCKER_DATA, path, folder, f) + expected_result_file = os.path.join( + DOCKER_DATA, path, folder, f) + res = filecmp.cmp(result_file, expected_result_file) + if f == "cache-id": # uuid should not be identical + self.assertFalse(res) + else: + self.assertTrue(res) + + def test_compare_layers(self) -> None: + path = os.path.join("image", "vfs", "layerdb", "sha256") + for folder in os.listdir(os.path.join(DOCKER_DATA, path)): + with open(os.path.join(DOCKER_DATA, path, folder, "cache-id"), "r") as f: + cache_id1 = f.read() + with open(os.path.join(TEST_DOCKER_DATA, path, folder, "cache-id"), "r") as f: + cache_id2 = f.read() + + res = compare_folders_content( + os.path.join(DOCKER_DATA, "vfs", "dir", cache_id1), + os.path.join(TEST_DOCKER_DATA, "vfs", "dir", cache_id2), + ) + self.assertEqual(len(res), 0) + + +if __name__ == '__main__': + unittest.main() diff --git a/src/aleph_client/commands/container/utils.py b/src/aleph_client/commands/container/utils.py new file mode 100644 index 00000000..c5552781 --- /dev/null +++ b/src/aleph_client/commands/container/utils.py @@ -0,0 +1,53 @@ + +import os +import logging +from shutil import rmtree + +from aleph_client.conf import Settings +from aleph_client.commands.container.save import save_tar + +logger = logging.getLogger(__name__) + + +def create_container_volume( + image: str, + output: str, + from_remote: bool, + from_daemon: bool, + optimize: bool, + settings: Settings +) -> str: + if from_remote: + raise NotImplementedError() + # echo(f"Downloading {image}") + # registry = Registry() + # tag = "latest" + # if ":" in image: + # l = image.split(":") + # tag = l[-1] + # image = l[0] + # print(tag) + # image_object = registry.pull_image(image, tag) + # manifest = registry.get_manifest_configuration(image, tag) + # image_archive = os.path.abspath(f"{str(uuid4())}.tar") + # image_object.write_filename(image_archive) + # image = image_archive + # print(manifest) + elif from_daemon: + output_from_daemon = f"{output}.image.tar" + if os.system(f"docker image inspect {image} >/dev/null 2>&1"): + raise Exception(f"Can't find image '{image}'") + if os.system(f"docker save {image} > {output_from_daemon}") != 0: + raise Exception("Error while saving docker image") + image = output_from_daemon + output = os.path.abspath(output) + tmp_output = f"{output}.tmp" + settings.DOCKER_SETTINGS.storage_driver.conf.optimize = optimize + save_tar(image, tmp_output, settings=settings.DOCKER_SETTINGS) + if not settings.CODE_USES_SQUASHFS: + raise Exception("The command mksquashfs must be installed!") + logger.debug("Creating squashfs archive...") + os.makedirs(output) + os.system(f"mksquashfs {tmp_output}/image/vfs {output}/metadata -noappend") + os.system(f"mksquashfs {tmp_output}/vfs {output}/layers -noappend") + rmtree(tmp_output) \ No newline at end of file diff --git a/src/aleph_client/commands/files.py b/src/aleph_client/commands/files.py new file mode 100644 index 00000000..85864d0b --- /dev/null +++ b/src/aleph_client/commands/files.py @@ -0,0 +1,100 @@ +import typer +import logging +from typing import Optional +from aleph_client.types import AccountFromPrivateKey +from aleph_client.account import _load_account +from aleph_client.conf import settings +from pathlib import Path +import asyncio +from aleph_client import synchronous + +from aleph_client.commands import help_strings + +from aleph_client.asynchronous import ( + get_fallback_session, + StorageEnum, +) + +from aleph_client.commands.utils import setup_logging + +from aleph_message.models import StoreMessage + + +logger = logging.getLogger(__name__) +app = typer.Typer() + + +@app.command() +def pin( + hash: str = typer.Argument(..., help="IPFS hash to pin on Aleph.im"), + channel: str = typer.Option(settings.DEFAULT_CHANNEL, help=help_strings.CHANNEL), + private_key: Optional[str] = typer.Option(settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY), + private_key_file: Optional[Path] = typer.Option(settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE), + ref: Optional[str] = typer.Option(None, help=help_strings.REF), + debug: bool = False, +): + """Persist a file from IPFS on Aleph.im.""" + + setup_logging(debug) + + account: AccountFromPrivateKey = _load_account(private_key, private_key_file) + + try: + result: StoreMessage = synchronous.create_store( + account=account, + file_hash=hash, + storage_engine=StorageEnum.ipfs, + channel=channel, + ref=ref, + ) + logger.debug("Upload finished") + typer.echo(f"{result.json(indent=4)}") + finally: + # Prevent aiohttp unclosed connector warning + asyncio.run(get_fallback_session().close()) + + + +@app.command() +def upload( + path: Path = typer.Argument(..., help="Path of the file to upload"), + channel: str = typer.Option(settings.DEFAULT_CHANNEL, help=help_strings.CHANNEL), + private_key: Optional[str] = typer.Option(settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY), + private_key_file: Optional[Path] = typer.Option(settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE), + ref: Optional[str] = typer.Option(None, help=help_strings.REF), + debug: bool = False, +): + """Upload and store a file on Aleph.im.""" + + setup_logging(debug) + + account: AccountFromPrivateKey = _load_account(private_key, private_key_file) + + try: + if not path.is_file(): + typer.echo(f"Error: File not found: '{path}'") + raise typer.Exit(code=1) + + with open(path, "rb") as fd: + logger.debug("Reading file") + # TODO: Read in lazy mode instead of copying everything in memory + file_content = fd.read() + storage_engine = ( + StorageEnum.ipfs + if len(file_content) > 4 * 1024 * 1024 + else StorageEnum.storage + ) + logger.debug("Uploading file") + result: StoreMessage = synchronous.create_store( + account=account, + file_content=file_content, + storage_engine=storage_engine, + channel=channel, + guess_mime_type=True, + ref=ref, + ) + logger.debug("Upload finished") + typer.echo(f"{result.json(indent=4)}") + finally: + # Prevent aiohttp unclosed connector warning + asyncio.run(get_fallback_session().close()) diff --git a/src/aleph_client/commands/help_strings.py b/src/aleph_client/commands/help_strings.py new file mode 100644 index 00000000..76fd2dfc --- /dev/null +++ b/src/aleph_client/commands/help_strings.py @@ -0,0 +1,5 @@ +IPFS_HASH = "IPFS Content identifier (CID)" +CHANNEL = "Aleph network channel where the message is located" +PRIVATE_KEY = "Your private key. Cannot be used with --private-key-file" +PRIVATE_KEY_FILE = "Path to your private key file" +REF = "Checkout https://aleph-im.gitbook.io/aleph-js/api-resources-reference/posts" diff --git a/src/aleph_client/commands/message.py b/src/aleph_client/commands/message.py new file mode 100644 index 00000000..2a6001c7 --- /dev/null +++ b/src/aleph_client/commands/message.py @@ -0,0 +1,192 @@ +import json +import os.path +import subprocess +from typing import Optional, Dict, List +from pathlib import Path +import tempfile +import asyncio + +import typer + +from aleph_message.models import ( + PostMessage, + ForgetMessage, + AlephMessage, +) + + +from aleph_client import synchronous +from aleph_client.commands import help_strings +from aleph_client.types import AccountFromPrivateKey +from aleph_client.account import _load_account +from aleph_client.conf import settings + +from aleph_client.asynchronous import ( + get_fallback_session, + StorageEnum, +) + +from aleph_client.commands.utils import ( + setup_logging, + input_multiline, +) + + +app = typer.Typer() + +@app.command() +def post( + path: Optional[Path] = typer.Option(None, help="Path to the content you want to post. If omitted, you can input your content directly"), + type: str = typer.Option("test", help="Text representing the message object type"), + ref: Optional[str] = typer.Option(None, help=help_strings.REF), + channel: str = typer.Option(settings.DEFAULT_CHANNEL, help=help_strings.CHANNEL), + private_key: Optional[str] = typer.Option(settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY), + private_key_file: Optional[Path] = typer.Option(settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE), + debug: bool = False, +): + """Post a message on Aleph.im.""" + + setup_logging(debug) + + account: AccountFromPrivateKey = _load_account(private_key, private_key_file) + storage_engine: str + content: Dict + + if path: + if not path.is_file(): + typer.echo(f"Error: File not found: '{path}'") + raise typer.Exit(code=1) + + file_size = os.path.getsize(path) + storage_engine = ( + StorageEnum.ipfs if file_size > 4 * 1024 * 1024 else StorageEnum.storage + ) + + with open(path, "r") as fd: + content = json.load(fd) + + else: + content_raw = input_multiline() + storage_engine = ( + StorageEnum.ipfs + if len(content_raw) > 4 * 1024 * 1024 + else StorageEnum.storage + ) + try: + content = json.loads(content_raw) + except json.decoder.JSONDecodeError: + typer.echo("Not valid JSON") + raise typer.Exit(code=2) + + try: + result: PostMessage = synchronous.create_post( + account=account, + post_content=content, + post_type=type, + ref=ref, + channel=channel, + inline=True, + storage_engine=storage_engine, + ) + typer.echo(result.json(indent=4)) + finally: + # Prevent aiohttp unclosed connector warning + asyncio.run(get_fallback_session().close()) + + +@app.command() +def amend( + hash: str = typer.Argument(..., help="Hash reference of the message to amend"), + private_key: Optional[str] = typer.Option(settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY), + private_key_file: Optional[Path] = typer.Option(settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE), + debug: bool = False, +): + """Amend an existing Aleph message.""" + + setup_logging(debug) + + account: AccountFromPrivateKey = _load_account(private_key, private_key_file) + + existing_message: AlephMessage = synchronous.get_message(item_hash=hash) + + editor: str = os.getenv("EDITOR", default="nano") + with tempfile.NamedTemporaryFile(suffix="json") as fd: + # Fill in message template + fd.write(existing_message.content.json(indent=4).encode()) + fd.seek(0) + + # Launch editor + subprocess.run([editor, fd.name], check=True) + + # Read new message + fd.seek(0) + new_content_json = fd.read() + + content_type = type(existing_message).__annotations__["content"] + new_content_dict = json.loads(new_content_json) + new_content = content_type(**new_content_dict) + new_content.ref = existing_message.item_hash + typer.echo(new_content) + result = synchronous.submit( + account=account, + content=new_content.dict(), + message_type=existing_message.type, + channel=existing_message.channel, + ) + typer.echo(f"{result.json(indent=4)}") + + +def forget_messages( + account: AccountFromPrivateKey, + hashes: List[str], + reason: Optional[str], + channel: str, +): + try: + result: ForgetMessage = synchronous.forget( + account=account, + hashes=hashes, + reason=reason, + channel=channel, + ) + typer.echo(f"{result.json(indent=4)}") + finally: + # Prevent aiohttp unclosed connector warning + asyncio.run(get_fallback_session().close()) + + +@app.command() +def forget( + hashes: str= typer.Argument(..., help="Comma separated list of hash references of messages to forget"), + reason: Optional[str] = typer.Option(None, help="A description of why the messages are being forgotten."), + channel: str = typer.Option(settings.DEFAULT_CHANNEL, help=help_strings.CHANNEL), + private_key: Optional[str] = typer.Option(settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY), + private_key_file: Optional[Path] = typer.Option(settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE), + debug: bool = False, +): + """Forget an existing Aleph message.""" + + setup_logging(debug) + + account: AccountFromPrivateKey = _load_account(private_key, private_key_file) + + hash_list: List[str] = hashes.split(",") + forget_messages(account, hash_list, reason, channel) + + +@app.command() +def watch( + ref: str = typer.Argument(..., help="Hash reference of the message to watch"), + indent: Optional[int] = typer.Option(None, help="Number of indents to use"), + debug: bool = False, +): + """Watch a hash for amends and print amend hashes""" + + setup_logging(debug) + + original: AlephMessage = synchronous.get_message(item_hash=ref) + + for message in synchronous.watch_messages( + refs=[ref], addresses=[original.content.address] + ): + typer.echo(f"{message.json(indent=indent)}") diff --git a/src/aleph_client/commands/program.py b/src/aleph_client/commands/program.py new file mode 100644 index 00000000..dd16fc2d --- /dev/null +++ b/src/aleph_client/commands/program.py @@ -0,0 +1,246 @@ +import typer +from typing import Optional, Dict, List +from pathlib import Path +import asyncio +import json +from zipfile import BadZipFile +import logging +from base64 import b32encode, b16decode + +from aleph_message.models import ( + ProgramMessage, + StoreMessage, + MessagesResponse, + ProgramContent, +) + +from aleph_client import synchronous +from aleph_client.conf import settings +from aleph_client.commands import help_strings +from aleph_client.types import AccountFromPrivateKey +from aleph_client.account import _load_account +from aleph_client.utils import create_archive + +from aleph_client.asynchronous import ( + get_fallback_session, + StorageEnum, +) + +from aleph_client.commands.utils import ( + setup_logging, + input_multiline, + prompt_for_volumes, + yes_no_input +) + +logger = logging.getLogger(__name__) +app = typer.Typer() + +@app.command() +def upload( + path: Path = typer.Argument(..., help="Path to your source code"), + entrypoint: str = typer.Argument(..., help="Your program entrypoint"), + channel: str = typer.Option(settings.DEFAULT_CHANNEL, help=help_strings.CHANNEL), + memory: int = typer.Option(settings.DEFAULT_VM_MEMORY, help="Maximum memory allocation on vm in MiB"), + vcpus: int = typer.Option(settings.DEFAULT_VM_VCPUS, help="Number of virtual cpus to allocate."), + timeout_seconds: float = typer.Option(settings.DEFAULT_VM_TIMEOUT, help="If vm is not called after [timeout_seconds] it will shutdown"), + private_key: Optional[str] = typer.Option(settings.PRIVATE_KEY_STRING, help=help_strings.PRIVATE_KEY), + private_key_file: Optional[Path] = typer.Option(settings.PRIVATE_KEY_FILE, help=help_strings.PRIVATE_KEY_FILE), + print_messages: bool = typer.Option(False), + print_code_message: bool = typer.Option(False), + print_program_message: bool = typer.Option(False), + runtime: str = typer.Option(None, help="Hash of the runtime to use for your program. Defaults to aleph debian with Python3.8 and node. You can also create your own runtime and pin it"), + beta: bool = typer.Option(False), + debug: bool = False, + persistent: bool = False, +): + """Register a program to run on Aleph.im virtual machines from a zip archive.""" + + setup_logging(debug) + + path = path.absolute() + + try: + path_object, encoding = create_archive(path) + except BadZipFile: + typer.echo("Invalid zip archive") + raise typer.Exit(3) + except FileNotFoundError: + typer.echo("No such file or directory") + raise typer.Exit(4) + + account: AccountFromPrivateKey = _load_account(private_key, private_key_file) + + runtime = ( + runtime + or input(f"Ref of runtime ? [{settings.DEFAULT_RUNTIME_ID}] ") + or settings.DEFAULT_RUNTIME_ID + ) + + volumes = [] + for volume in prompt_for_volumes(): + volumes.append(volume) + typer.echo("\n") + + subscriptions: Optional[List[Dict]] + if beta and yes_no_input("Subscribe to messages ?", default=False): + content_raw = input_multiline() + try: + subscriptions = json.loads(content_raw) + except json.decoder.JSONDecodeError: + typer.echo("Not valid JSON") + raise typer.Exit(code=2) + else: + subscriptions = None + + try: + # Upload the source code + with open(path_object, "rb") as fd: + logger.debug("Reading file") + # TODO: Read in lazy mode instead of copying everything in memory + file_content = fd.read() + storage_engine = ( + StorageEnum.ipfs + if len(file_content) > 4 * 1024 * 1024 + else StorageEnum.storage + ) + logger.debug("Uploading file") + user_code: StoreMessage = synchronous.create_store( + account=account, + file_content=file_content, + storage_engine=storage_engine, + channel=channel, + guess_mime_type=True, + ref=None, + ) + logger.debug("Upload finished") + if print_messages or print_code_message: + typer.echo(f"{user_code.json(indent=4)}") + program_ref = user_code.item_hash + + # Register the program + result: ProgramMessage = synchronous.create_program( + account=account, + program_ref=program_ref, + entrypoint=entrypoint, + runtime=runtime, + storage_engine=StorageEnum.storage, + channel=channel, + memory=memory, + vcpus=vcpus, + timeout_seconds=timeout_seconds, + persistent=persistent, + encoding=encoding, + volumes=volumes, + subscriptions=subscriptions, + ) + logger.debug("Upload finished") + if print_messages or print_program_message: + typer.echo(f"{result.json(indent=4)}") + + hash: str = result.item_hash + hash_base32 = b32encode(b16decode(hash.upper())).strip(b"=").lower().decode() + + typer.echo( + f"Your program has been uploaded on Aleph .\n\n" + "Available on:\n" + f" {settings.VM_URL_PATH.format(hash=hash)}\n" + f" {settings.VM_URL_HOST.format(hash_base32=hash_base32)}\n" + "Visualise on:\n https://explorer.aleph.im/address/" + f"{result.chain}/{result.sender}/message/PROGRAM/{hash}\n" + ) + + finally: + # Prevent aiohttp unclosed connector warning + asyncio.run(get_fallback_session().close()) + + +@app.command() +def update( + hash: str, + path: Path, + private_key: Optional[str] = settings.PRIVATE_KEY_STRING, + private_key_file: Optional[Path] = settings.PRIVATE_KEY_FILE, + print_message: bool = True, + debug: bool = False, +): + """Update the code of an existing program""" + + setup_logging(debug) + + account = _load_account(private_key, private_key_file) + path = path.absolute() + + try: + program_message: ProgramMessage = synchronous.get_message( + item_hash=hash, message_type=ProgramMessage + ) + code_ref = program_message.content.code.ref + code_message: StoreMessage = synchronous.get_message( + item_hash=code_ref, message_type=StoreMessage + ) + + try: + path, encoding = create_archive(path) + except BadZipFile: + typer.echo("Invalid zip archive") + raise typer.Exit(3) + except FileNotFoundError: + typer.echo("No such file or directory") + raise typer.Exit(4) + + if encoding != program_message.content.code.encoding: + logger.error( + f"Code must be encoded with the same encoding as the previous version " + f"('{encoding}' vs '{program_message.content.code.encoding}'" + ) + raise typer.Exit(1) + + # Upload the source code + with open(path, "rb") as fd: + logger.debug("Reading file") + # TODO: Read in lazy mode instead of copying everything in memory + file_content = fd.read() + logger.debug("Uploading file") + result = synchronous.create_store( + account=account, + file_content=file_content, + storage_engine=code_message.content.item_type, + channel=code_message.channel, + guess_mime_type=True, + ref=code_message.item_hash, + ) + logger.debug("Upload finished") + if print_message: + typer.echo(f"{result.json(indent=4)}") + finally: + # Prevent aiohttp unclosed connector warning + asyncio.run(get_fallback_session().close()) + +@app.command() +def unpersist( + hash: str, + private_key: Optional[str] = settings.PRIVATE_KEY_STRING, + private_key_file: Optional[Path] = settings.PRIVATE_KEY_FILE, + debug: bool = False, +): + """Stop a persistent virtual machine by making it non-persistent""" + + setup_logging(debug) + + account = _load_account(private_key, private_key_file) + + existing: MessagesResponse = synchronous.get_messages(hashes=[hash]) + message: ProgramMessage = existing.messages[0] + content: ProgramContent = message.content.copy() + + content.on.persistent = False + content.replaces = message.item_hash + + result = synchronous.submit( + account=account, + content=content.dict(exclude_none=True), + message_type=message.type, + channel=message.channel, + ) + typer.echo(f"{result.json(indent=4)}") diff --git a/src/aleph_client/commands/utils.py b/src/aleph_client/commands/utils.py new file mode 100644 index 00000000..0387cc89 --- /dev/null +++ b/src/aleph_client/commands/utils.py @@ -0,0 +1,69 @@ +import logging +from typer import echo +from typing import Optional + +def input_multiline() -> str: + """Prompt the user for a multiline input.""" + echo("Enter/Paste your content. Ctrl-D or Ctrl-Z ( windows ) to save it.") + contents = "" + while True: + try: + line = input() + except EOFError: + break + contents += line + "\n" + return contents + + +def setup_logging(debug: bool = False): + level = logging.DEBUG if debug else logging.WARNING + logging.basicConfig(level=level) + + +def yes_no_input(text: str, default: Optional[bool] = None): + while True: + if default is True: + response = input(f"{text} [Y/n] ") + elif default is False: + response = input(f"{text} [y/N] ") + else: + response = input(f"{text} ") + + if response.lower() in ("y", "yes"): + return True + elif response.lower() in ("n", "no"): + return False + elif response == "" and default is not None: + return default + else: + if default is None: + echo("Please enter 'y', 'yes', 'n' or 'no'") + else: + echo("Please enter 'y', 'yes', 'n', 'no' or nothing") + continue + + +def prompt_for_volumes(): + while yes_no_input("Add volume ?", default=False): + comment = input("Description: ") or None + mount = input("Mount: ") + persistent = yes_no_input("Persist on VM host ?", default=False) + if persistent: + name = input("Volume name: ") + size_mib = int(input("Size in MiB: ")) + yield { + "comment": comment, + "mount": mount, + "name": name, + "persistence": "host", + "size_mib": size_mib, + } + else: + ref = input("Ref: ") + use_latest = yes_no_input("Use latest version ?", default=True) + yield { + "comment": comment, + "mount": mount, + "ref": ref, + "use_latest": use_latest, + } diff --git a/src/aleph_client/conf.py b/src/aleph_client/conf.py index fba55163..e6ee9f32 100644 --- a/src/aleph_client/conf.py +++ b/src/aleph_client/conf.py @@ -1,7 +1,7 @@ from pathlib import Path from shutil import which from typing import Optional - +from .commands.container.docker_conf import docker_settings from pydantic import BaseSettings, Field @@ -24,6 +24,10 @@ class Settings(BaseSettings): DEFAULT_RUNTIME_ID: str = ( "bd79839bf96e595a06da5ac0b6ba51dea6f7e2591bb913deccded04d831d29f4" ) + DEFAULT_DOCKER_RUNTIME_ID: str = ( + "bd79839bf96e595a06da5ac0b6ba51dea6f7e2591bb913deccded04d831d29f4" # TODO: Replace + ) + DEFAULT_DOCKER_VOLUME_MOUNTPOINT="/docker_aleph" DEFAULT_VM_MEMORY: int = 128 DEFAULT_VM_VCPUS: int = 1 DEFAULT_VM_TIMEOUT: float = 30.0 @@ -32,6 +36,7 @@ class Settings(BaseSettings): VM_URL_PATH = "https://aleph.sh/vm/{hash}" VM_URL_HOST = "https://{hash_base32}.aleph.sh" + DOCKER_SETTINGS = docker_settings class Config: env_prefix = "ALEPH_"