Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 15 additions & 6 deletions cirro/services/execution.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from typing import List, Optional
from typing import List, Optional, Dict

from cirro_api_client.v1.api.execution import run_analysis, stop_analysis, get_project_summary, \
get_tasks_for_execution, get_task_logs, get_execution_logs
from cirro_api_client.v1.api.processes import get_process_parameters
from cirro_api_client.v1.models import RunAnalysisRequest, CreateResponse, GetProjectSummaryResponse200, Task
from cirro_api_client.v1.models import RunAnalysisRequest, CreateResponse, Task

from cirro.models.form_specification import ParameterSpecification
from cirro.services.base import BaseService
Expand Down Expand Up @@ -82,7 +82,7 @@ def stop_analysis(self, project_id: str, dataset_id: str):
client=self._api_client
)

def get_project_summary(self, project_id: str) -> GetProjectSummaryResponse200:
def get_project_summary(self, project_id: str) -> Dict[str, List[Task]]:
"""
Gets an overview of the executions currently running in the project, by job queue

Expand All @@ -98,54 +98,63 @@ def get_project_summary(self, project_id: str) -> GetProjectSummaryResponse200:
client=self._api_client
).additional_properties

def get_execution_logs(self, project_id: str, dataset_id: str) -> str:
def get_execution_logs(self, project_id: str, dataset_id: str, force_live=False) -> str:
"""
Gets live logs from main execution task

Args:
project_id (str): ID of the Project
dataset_id (str): ID of the Dataset
force_live (bool): If True, it will fetch logs from CloudWatch,
even if the execution is already completed

"""

resp = get_execution_logs.sync(
project_id=project_id,
dataset_id=dataset_id,
force_live=force_live,
client=self._api_client
)

return '\n'.join(e.message for e in resp.events)

def get_tasks_for_execution(self, project_id: str, dataset_id: str) -> Optional[List[Task]]:
def get_tasks_for_execution(self, project_id: str, dataset_id: str, force_live=False) -> Optional[List[Task]]:
"""
Gets the tasks submitted by the workflow execution

Args:
project_id (str): ID of the Project
dataset_id (str): ID of the Dataset
force_live (bool): If True, it will try to get the list of jobs
from the executor (i.e., AWS Batch), rather than the workflow report
"""

return get_tasks_for_execution.sync(
project_id=project_id,
dataset_id=dataset_id,
force_live=force_live,
client=self._api_client
)

def get_task_logs(self, project_id: str, dataset_id: str, task_id: str) -> str:
def get_task_logs(self, project_id: str, dataset_id: str, task_id: str, force_live=False) -> str:
"""
Gets the log output from an individual task

Args:
project_id (str): ID of the Project
dataset_id (str): ID of the Dataset
task_id (str): ID of the task
force_live (bool): If True, it will fetch logs from CloudWatch,
even if the execution is already completed

"""

resp = get_task_logs.sync(
project_id=project_id,
dataset_id=dataset_id,
task_id=task_id,
force_live=force_live,
client=self._api_client
)

Expand Down
Loading