diff --git a/src/codeflare_sdk/cluster/cluster.py b/src/codeflare_sdk/cluster/cluster.py index 129bebb3..70347522 100644 --- a/src/codeflare_sdk/cluster/cluster.py +++ b/src/codeflare_sdk/cluster/cluster.py @@ -1,21 +1,25 @@ -from .config import ClusterConfiguration -from .model import RayCluster, AppWrapper +from os import stat +from typing import List, Optional, Tuple + +import openshift as oc + from ..utils import pretty_print from ..utils.generate_yaml import generate_appwrapper -import openshift as oc -from typing import List, Optional +from .config import ClusterConfiguration +from .model import (AppWrapper, AppWrapperStatus, CodeFlareClusterStatus, + RayCluster, RayClusterStatus) class Cluster: def __init__(self, config: ClusterConfiguration): self.config = config - self.app_wrapper_yaml = self.create_app_wrapper() + self.app_wrapper_yaml = self.create_app_wrapper() def create_app_wrapper(self): min_cpu=self.config.min_cpus max_cpu=self.config.max_cpus min_memory=self.config.min_memory - max_memory=self.config,max_memory + max_memory=self.config.max_memory gpu=self.config.gpu workers=self.config.max_worker template=self.config.template @@ -30,12 +34,12 @@ def create_app_wrapper(self): # creates a new cluster with the provided or default spec def up(self, namespace='default'): with oc.project(namespace): - oc.invoke("apply", ["-f", self.app_wrapper_yaml ]) + oc.invoke("apply", ["-f", self.app_wrapper_yaml]) def down(self, namespace='default'): with oc.project(namespace): - oc.invoke("delete",["AppWrapper", self.app_wrapper_yaml]) - + oc.invoke("delete", ["AppWrapper", self.app_wrapper_yaml]) + def status(self, print_to_console=True): cluster = _ray_cluster_status(self.config.name) if cluster: @@ -45,6 +49,37 @@ def status(self, print_to_console=True): else: return None + # checks whether the ray cluster is ready + def is_ready(self): + ready = False + status = CodeFlareClusterStatus.UNKNOWN + # check the app wrapper status + appwrapper = _app_wrapper_status(self.config.name) + if appwrapper: + if appwrapper.status in [AppWrapperStatus.RUNNING, AppWrapperStatus.COMPLETED, AppWrapperStatus.RUNNING_HOLD_COMPLETION]: + ready = False + status = CodeFlareClusterStatus.QUEUED + elif appwrapper.status in [AppWrapperStatus.FAILED, AppWrapperStatus.DELETED]: + ready = False + status = CodeFlareClusterStatus.FAILED #should deleted be separate + return ready, status #exit early, no need to check ray status + elif appwrapper.status in [AppWrapperStatus.PENDING]: + ready = False + status = CodeFlareClusterStatus.QUEUED + return ready, status# no need to check the ray status since still in queue + + # check the ray cluster status + cluster = _ray_cluster_status(self.config.name) + if cluster: + if cluster.status == RayClusterStatus.READY: + ready = True + status = CodeFlareClusterStatus.READY + elif cluster.status in [RayClusterStatus.UNHEALTHY, RayClusterStatus.FAILED]: + ready = False + status = CodeFlareClusterStatus.FAILED + + return status, ready + def list_all_clusters(print_to_console=True): clusters = _get_ray_clusters() @@ -60,13 +95,14 @@ def _get_appwrappers(namespace='default'): app_wrappers = oc.selector('appwrappers').qnames() return app_wrappers - + def _app_wrapper_status(name, namespace='default') -> Optional[AppWrapper]: with oc.project(namespace), oc.timeout(10*60): cluster = oc.selector(f'appwrapper/{name}').object() if cluster: return _map_to_app_wrapper(cluster) - + + def _ray_cluster_status(name, namespace='default') -> Optional[RayCluster]: # FIXME should we check the appwrapper first with oc.project(namespace), oc.timeout(10*60): @@ -87,10 +123,10 @@ def _get_ray_clusters(namespace='default') -> List[RayCluster]: return list_of_clusters -def _map_to_ray_cluster(cluster)->RayCluster: +def _map_to_ray_cluster(cluster) -> RayCluster: cluster_model = cluster.model return RayCluster( - name=cluster.name(), status=cluster_model.status.state, + name=cluster.name(), status=RayClusterStatus(cluster_model.status.state.lower()), min_workers=cluster_model.spec.workerGroupSpecs[0].replicas, max_workers=cluster_model.spec.workerGroupSpecs[0].replicas, worker_mem_max=cluster_model.spec.workerGroupSpecs[ @@ -101,9 +137,9 @@ def _map_to_ray_cluster(cluster)->RayCluster: worker_gpu=0) -def _map_to_app_wrapper(cluster)->AppWrapper: +def _map_to_app_wrapper(cluster) -> AppWrapper: cluster_model = cluster.model return AppWrapper( - name=cluster.name(), status=cluster_model.status.state, + name=cluster.name(), status=AppWrapperStatus(cluster_model.status.state.lower()), can_run=cluster_model.status.canrun, job_state=cluster_model.status.queuejobstate) diff --git a/src/codeflare_sdk/cluster/model.py b/src/codeflare_sdk/cluster/model.py index 670d4d6d..5ce657a0 100644 --- a/src/codeflare_sdk/cluster/model.py +++ b/src/codeflare_sdk/cluster/model.py @@ -1,9 +1,30 @@ from dataclasses import dataclass +from enum import Enum +class RayClusterStatus(Enum): + #https://github.com/ray-project/kuberay/blob/master/ray-operator/apis/ray/v1alpha1/raycluster_types.go#L95 + READY = "ready" + UNHEALTHY = "unhealthy" + FAILED = "failed" + UNKNOWN = "unknown" + +class AppWrapperStatus(Enum): + PENDING = "pending" + RUNNING = "running" + FAILED = "failed" + DELETED = "deleted" + COMPLETED = "completed" + RUNNING_HOLD_COMPLETION = "runningholdcompletion" + +class CodeFlareClusterStatus(Enum): + READY = 1 + QUEUED = 2 + FAILED = 3 + UNKNOWN = 4 @dataclass class RayCluster: name: str - status: str + status: RayClusterStatus min_workers: int max_workers: int worker_mem_min: str @@ -14,6 +35,7 @@ class RayCluster: @dataclass class AppWrapper: name: str - status:str + status:AppWrapperStatus can_run: bool job_state: str + diff --git a/tests/test_clusters.py b/tests/test_clusters.py index e4155865..6e86ef9f 100644 --- a/tests/test_clusters.py +++ b/tests/test_clusters.py @@ -4,7 +4,7 @@ def test_cluster_up(): cluster = Cluster(ClusterConfiguration(name='raycluster-autoscaler')) - cluster.up() + cluster.up() time.sleep(15) def test_list_clusters(): @@ -19,5 +19,5 @@ def test_app_wrapper_status(): def test_cluster_down(): cluster = Cluster(ClusterConfiguration(name='raycluster-autoscaler')) - cluster.down(name='raycluster-autoscaler') + cluster.down()