diff --git a/cirro/api/services/process.py b/cirro/api/services/process.py index 149b86f1..0f80ea84 100644 --- a/cirro/api/services/process.py +++ b/cirro/api/services/process.py @@ -55,9 +55,9 @@ def list(self, process_type: Executor = None) -> List[Process]: } } ''' - item_filter = {} + item_filter = None if process_type: - item_filter['executor'] = {'eq': process_type.value} + item_filter = {'executor': {'eq': process_type.value}} items = fetch_all_items(self._api_client, query, input_variables={'filter': item_filter}) diff --git a/cirro/cli/controller.py b/cirro/cli/controller.py index a7123913..ff43c653 100644 --- a/cirro/cli/controller.py +++ b/cirro/cli/controller.py @@ -57,12 +57,12 @@ def run_ingest(input_params: UploadArguments, interactive=False): if interactive: input_params = gather_upload_arguments(input_params, projects, processes) - + files = input_params['files'] + else: + files = get_files_in_directory(input_params['data_directory']) + if len(files) == 0: + raise RuntimeWarning("No files to upload, exiting") directory = input_params['data_directory'] - files = get_files_in_directory(directory) - if len(files) == 0: - raise RuntimeWarning("No files to upload, exiting") - process = get_item_from_name_or_id(processes, input_params['process']) cirro.process.check_dataset_files(files, process.id, directory) diff --git a/cirro/cli/interactive/upload_args.py b/cirro/cli/interactive/upload_args.py index fb8b785c..f264dc89 100644 --- a/cirro/cli/interactive/upload_args.py +++ b/cirro/cli/interactive/upload_args.py @@ -1,16 +1,18 @@ import sys +from fnmatch import fnmatch from pathlib import Path from typing import List from prompt_toolkit.shortcuts import CompleteStyle from prompt_toolkit.validation import Validator, ValidationError +from questionary import Choice from cirro.api.models.process import Process from cirro.api.models.project import Project from cirro.cli.interactive.common_args import ask_project -from cirro.cli.interactive.utils import prompt_wrapper +from cirro.cli.interactive.utils import ask from cirro.cli.models import UploadArguments -from cirro.file_utils import get_directory_stats +from cirro.file_utils import get_directory_stats, get_files_in_directory class DataDirectoryValidator(Validator): @@ -23,82 +25,83 @@ def validate(self, document): ) -def ask_data_directory(input_value: str) -> str: - directory_prompt = { - 'type': 'path', - 'name': 'data_directory', - 'message': 'Enter the full path of the data directory', - 'validate': DataDirectoryValidator, - 'default': input_value or '', - 'complete_style': CompleteStyle.READLINE_LIKE, - 'only_directories': True - } +def confirm_data_directory(directory: str, files: List[str]): + stats = get_directory_stats(directory, files) + is_accepted = ask( + 'confirm', + f'Please confirm that you wish to upload {stats["numberOfFiles"]} files ({stats["sizeFriendly"]})', + default=True + ) - answers = prompt_wrapper(directory_prompt) - return answers['data_directory'] - - -def confirm_data_directory(directory: str): - stats = get_directory_stats(directory) - answers = prompt_wrapper({ - 'type': 'confirm', - 'message': f'Please confirm that you wish to upload {stats["numberOfFiles"]} files ({stats["sizeFriendly"]})', - 'name': 'continue', - 'default': True - }) - - if not answers['continue']: + if not is_accepted: sys.exit(1) -def ask_name(input_value: str) -> str: - name_prompt = { - 'type': 'input', - 'name': 'name', - 'message': 'What is the name of this dataset?', - 'validate': lambda val: len(val.strip()) > 0 or 'Please enter a name', - 'default': input_value or '' - } - - answers = prompt_wrapper(name_prompt) - return answers['name'] - - -def ask_description(input_value: str) -> str: - description_prompt = { - 'type': 'input', - 'name': 'description', - 'message': 'Enter a description of the dataset (optional)', - 'default': input_value or '' - } - - answers = prompt_wrapper(description_prompt) - return answers['description'] - - def ask_process(processes: List[Process], input_value: str) -> str: process_names = [process.name for process in processes] - process_prompt = { - 'type': 'list', - 'name': 'process', - 'message': 'What type of files?', - 'choices': process_names, - 'default': input_value if input_value in process_names else None - } - answers = prompt_wrapper(process_prompt) - return answers['process'] + return ask( + 'select', + 'What type of files?', + default=input_value if input_value in process_names else None, + choices=process_names + ) def gather_upload_arguments(input_params: UploadArguments, projects: List[Project], processes: List[Process]): input_params['project'] = ask_project(projects, input_params.get('project')) - input_params['data_directory'] = ask_data_directory(input_params.get('data_directory')) - confirm_data_directory(input_params['data_directory']) + input_params['data_directory'] = ask( + 'path', + 'Enter the full path of the data directory', + required=True, + validate=DataDirectoryValidator, + default=input_params.get('data_directory') or '', + complete_style=CompleteStyle.READLINE_LIKE, + only_directories=True + ) + + upload_method = ask( + 'select', + 'What files would you like to upload?', + choices=[ + Choice('Upload all files in directory', 'all'), + Choice('Choose files from a list', 'select'), + Choice('Specify a glob pattern', 'glob'), + ] + ) + input_params['files'] = get_files_in_directory(input_params['data_directory']) + if upload_method == 'select': + input_params['files'] = ask( + 'checkbox', + 'Select the files you wish to upload', + choices=input_params['files'] + ) + elif upload_method == 'glob': + matching_files = None + while not matching_files: + glob_pattern = ask('text', 'Glob pattern:') + matching_files = [f for f in input_params['files'] if fnmatch(f, glob_pattern)] + if len(matching_files) == 0: + print('Glob pattern does not match any files, please specify another') + + input_params['files'] = matching_files + + confirm_data_directory(input_params['data_directory'], input_params['files']) input_params['process'] = ask_process(processes, input_params.get('process')) data_directory_name = Path(input_params['data_directory']).name default_name = input_params.get('name') or data_directory_name - input_params['name'] = ask_name(default_name) - input_params['description'] = ask_description(input_params.get('description')) + input_params['name'] = ask( + 'text', + 'What is the name of this dataset?', + default=default_name, + validate=lambda val: len(val.strip()) > 0 or 'Please enter a name' + ) + input_params['description'] = ask( + 'text', + 'Enter a description of the dataset (optional)', + default=input_params.get('description') or '' + ) + return input_params diff --git a/cirro/cli/interactive/utils.py b/cirro/cli/interactive/utils.py index c31c704f..665032f6 100644 --- a/cirro/cli/interactive/utils.py +++ b/cirro/cli/interactive/utils.py @@ -1,3 +1,5 @@ +from typing import List, Union, Callable + import questionary from questionary import prompt @@ -25,16 +27,23 @@ def type_validator(t, v): return False -def ask(fname, msg, validate_type=None, output_f=None, **kwargs) -> str: - """Wrap questionary functions to catch escapes and exit gracefully.""" +def ask(function_name: str, + msg: str, + validate_type=None, + output_transformer: Callable = None, + **kwargs) -> Union[str, List[str]]: + """ + Wrap questionary functions to catch escapes and exit gracefully. + function_name: https://questionary.readthedocs.io/en/stable/pages/types.html# + """ # Get the questionary function - questionary_f = questionary.__dict__.get(fname) + questionary_f = questionary.__dict__.get(function_name) # Make sure that the function exists - assert questionary_f is not None, f"No such questionary function: {fname}" + assert questionary_f is not None, f"No such questionary function: {function_name}" - if fname == "select": + if function_name == "select": kwargs["use_shortcuts"] = True if validate_type is not None: @@ -59,10 +68,9 @@ def ask(fname, msg, validate_type=None, output_f=None, **kwargs) -> str: raise KeyboardInterrupt() # If an output transformation function was defined - if output_f is not None: - + if output_transformer is not None: # Call the function - resp = output_f(resp) + resp = output_transformer(resp) # Otherwise return resp diff --git a/cirro/cli/models.py b/cirro/cli/models.py index ab46f96e..508df603 100644 --- a/cirro/cli/models.py +++ b/cirro/cli/models.py @@ -1,4 +1,4 @@ -from typing import TypedDict +from typing import TypedDict, List, Optional class DownloadArguments(TypedDict): @@ -15,6 +15,7 @@ class UploadArguments(TypedDict): process: str data_directory: str interactive: bool + files: Optional[List[str]] class ListArguments(TypedDict): diff --git a/cirro/file_utils.py b/cirro/file_utils.py index cffb3947..b6bd5df5 100644 --- a/cirro/file_utils.py +++ b/cirro/file_utils.py @@ -40,7 +40,7 @@ def _is_hidden_file(file_path: Path): def get_files_in_directory(directory) -> List[str]: - path = Path(directory) + path = Path(directory).expanduser() path_posix = str(path.as_posix()) paths = [] @@ -59,8 +59,11 @@ def get_files_in_directory(directory) -> List[str]: return paths -def get_directory_stats(directory) -> DirectoryStatistics: - sizes = [f.stat().st_size for f in Path(directory).glob('**/*') if f.is_file()] +def get_directory_stats(directory: str, files: List[str] = None) -> DirectoryStatistics: + if files: + sizes = [Path(directory, f).stat().st_size for f in files] + else: + sizes = [f.stat().st_size for f in Path(directory).glob('**/*') if f.is_file()] total_size = sum(sizes) / float(1 << 30) return { 'sizeFriendly': f'{total_size:,.3f} GB',