From b79cd30565f3fa4696d2fc08683b41fdfbb1c4b4 Mon Sep 17 00:00:00 2001 From: Nathan Thorpe Date: Wed, 4 Dec 2024 09:55:33 -0700 Subject: [PATCH] add force live parameter --- cirro/services/execution.py | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/cirro/services/execution.py b/cirro/services/execution.py index d5090f4c..ad047f79 100644 --- a/cirro/services/execution.py +++ b/cirro/services/execution.py @@ -1,9 +1,9 @@ -from typing import List, Optional +from typing import List, Optional, Dict from cirro_api_client.v1.api.execution import run_analysis, stop_analysis, get_project_summary, \ get_tasks_for_execution, get_task_logs, get_execution_logs from cirro_api_client.v1.api.processes import get_process_parameters -from cirro_api_client.v1.models import RunAnalysisRequest, CreateResponse, GetProjectSummaryResponse200, Task +from cirro_api_client.v1.models import RunAnalysisRequest, CreateResponse, Task from cirro.models.form_specification import ParameterSpecification from cirro.services.base import BaseService @@ -82,7 +82,7 @@ def stop_analysis(self, project_id: str, dataset_id: str): client=self._api_client ) - def get_project_summary(self, project_id: str) -> GetProjectSummaryResponse200: + def get_project_summary(self, project_id: str) -> Dict[str, List[Task]]: """ Gets an overview of the executions currently running in the project, by job queue @@ -98,40 +98,46 @@ def get_project_summary(self, project_id: str) -> GetProjectSummaryResponse200: client=self._api_client ).additional_properties - def get_execution_logs(self, project_id: str, dataset_id: str) -> str: + def get_execution_logs(self, project_id: str, dataset_id: str, force_live=False) -> str: """ Gets live logs from main execution task Args: project_id (str): ID of the Project dataset_id (str): ID of the Dataset + force_live (bool): If True, it will fetch logs from CloudWatch, + even if the execution is already completed """ resp = get_execution_logs.sync( project_id=project_id, dataset_id=dataset_id, + force_live=force_live, client=self._api_client ) return '\n'.join(e.message for e in resp.events) - def get_tasks_for_execution(self, project_id: str, dataset_id: str) -> Optional[List[Task]]: + def get_tasks_for_execution(self, project_id: str, dataset_id: str, force_live=False) -> Optional[List[Task]]: """ Gets the tasks submitted by the workflow execution Args: project_id (str): ID of the Project dataset_id (str): ID of the Dataset + force_live (bool): If True, it will try to get the list of jobs + from the executor (i.e., AWS Batch), rather than the workflow report """ return get_tasks_for_execution.sync( project_id=project_id, dataset_id=dataset_id, + force_live=force_live, client=self._api_client ) - def get_task_logs(self, project_id: str, dataset_id: str, task_id: str) -> str: + def get_task_logs(self, project_id: str, dataset_id: str, task_id: str, force_live=False) -> str: """ Gets the log output from an individual task @@ -139,6 +145,8 @@ def get_task_logs(self, project_id: str, dataset_id: str, task_id: str) -> str: project_id (str): ID of the Project dataset_id (str): ID of the Dataset task_id (str): ID of the task + force_live (bool): If True, it will fetch logs from CloudWatch, + even if the execution is already completed """ @@ -146,6 +154,7 @@ def get_task_logs(self, project_id: str, dataset_id: str, task_id: str) -> str: project_id=project_id, dataset_id=dataset_id, task_id=task_id, + force_live=force_live, client=self._api_client )