Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 51 additions & 15 deletions src/codeflare_sdk/cluster/cluster.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,25 @@
from .config import ClusterConfiguration
from .model import RayCluster, AppWrapper
from os import stat
from typing import List, Optional, Tuple

import openshift as oc

from ..utils import pretty_print
from ..utils.generate_yaml import generate_appwrapper
import openshift as oc
from typing import List, Optional
from .config import ClusterConfiguration
from .model import (AppWrapper, AppWrapperStatus, CodeFlareClusterStatus,
RayCluster, RayClusterStatus)


class Cluster:
def __init__(self, config: ClusterConfiguration):
self.config = config
self.app_wrapper_yaml = self.create_app_wrapper()
self.app_wrapper_yaml = self.create_app_wrapper()

def create_app_wrapper(self):
min_cpu=self.config.min_cpus
max_cpu=self.config.max_cpus
min_memory=self.config.min_memory
max_memory=self.config,max_memory
max_memory=self.config.max_memory
gpu=self.config.gpu
workers=self.config.max_worker
template=self.config.template
Expand All @@ -30,12 +34,12 @@ def create_app_wrapper(self):
# creates a new cluster with the provided or default spec
def up(self, namespace='default'):
with oc.project(namespace):
oc.invoke("apply", ["-f", self.app_wrapper_yaml ])
oc.invoke("apply", ["-f", self.app_wrapper_yaml])

def down(self, namespace='default'):
with oc.project(namespace):
oc.invoke("delete",["AppWrapper", self.app_wrapper_yaml])
oc.invoke("delete", ["AppWrapper", self.app_wrapper_yaml])

def status(self, print_to_console=True):
cluster = _ray_cluster_status(self.config.name)
if cluster:
Expand All @@ -45,6 +49,37 @@ def status(self, print_to_console=True):
else:
return None

# checks whether the ray cluster is ready
def is_ready(self):
ready = False
status = CodeFlareClusterStatus.UNKNOWN
# check the app wrapper status
appwrapper = _app_wrapper_status(self.config.name)
if appwrapper:
if appwrapper.status in [AppWrapperStatus.RUNNING, AppWrapperStatus.COMPLETED, AppWrapperStatus.RUNNING_HOLD_COMPLETION]:
ready = False
status = CodeFlareClusterStatus.QUEUED
elif appwrapper.status in [AppWrapperStatus.FAILED, AppWrapperStatus.DELETED]:
ready = False
status = CodeFlareClusterStatus.FAILED #should deleted be separate
return ready, status #exit early, no need to check ray status
elif appwrapper.status in [AppWrapperStatus.PENDING]:
ready = False
status = CodeFlareClusterStatus.QUEUED
return ready, status# no need to check the ray status since still in queue

# check the ray cluster status
cluster = _ray_cluster_status(self.config.name)
if cluster:
if cluster.status == RayClusterStatus.READY:
ready = True
status = CodeFlareClusterStatus.READY
elif cluster.status in [RayClusterStatus.UNHEALTHY, RayClusterStatus.FAILED]:
ready = False
status = CodeFlareClusterStatus.FAILED

return status, ready


def list_all_clusters(print_to_console=True):
clusters = _get_ray_clusters()
Expand All @@ -60,13 +95,14 @@ def _get_appwrappers(namespace='default'):
app_wrappers = oc.selector('appwrappers').qnames()
return app_wrappers


def _app_wrapper_status(name, namespace='default') -> Optional[AppWrapper]:
with oc.project(namespace), oc.timeout(10*60):
cluster = oc.selector(f'appwrapper/{name}').object()
if cluster:
return _map_to_app_wrapper(cluster)



def _ray_cluster_status(name, namespace='default') -> Optional[RayCluster]:
# FIXME should we check the appwrapper first
with oc.project(namespace), oc.timeout(10*60):
Expand All @@ -87,10 +123,10 @@ def _get_ray_clusters(namespace='default') -> List[RayCluster]:
return list_of_clusters


def _map_to_ray_cluster(cluster)->RayCluster:
def _map_to_ray_cluster(cluster) -> RayCluster:
cluster_model = cluster.model
return RayCluster(
name=cluster.name(), status=cluster_model.status.state,
name=cluster.name(), status=RayClusterStatus(cluster_model.status.state.lower()),
min_workers=cluster_model.spec.workerGroupSpecs[0].replicas,
max_workers=cluster_model.spec.workerGroupSpecs[0].replicas,
worker_mem_max=cluster_model.spec.workerGroupSpecs[
Expand All @@ -101,9 +137,9 @@ def _map_to_ray_cluster(cluster)->RayCluster:
worker_gpu=0)


def _map_to_app_wrapper(cluster)->AppWrapper:
def _map_to_app_wrapper(cluster) -> AppWrapper:
cluster_model = cluster.model
return AppWrapper(
name=cluster.name(), status=cluster_model.status.state,
name=cluster.name(), status=AppWrapperStatus(cluster_model.status.state.lower()),
can_run=cluster_model.status.canrun,
job_state=cluster_model.status.queuejobstate)
26 changes: 24 additions & 2 deletions src/codeflare_sdk/cluster/model.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,30 @@
from dataclasses import dataclass
from enum import Enum

class RayClusterStatus(Enum):
#https://github.com/ray-project/kuberay/blob/master/ray-operator/apis/ray/v1alpha1/raycluster_types.go#L95
READY = "ready"
UNHEALTHY = "unhealthy"
FAILED = "failed"
UNKNOWN = "unknown"

class AppWrapperStatus(Enum):
PENDING = "pending"
RUNNING = "running"
FAILED = "failed"
DELETED = "deleted"
COMPLETED = "completed"
RUNNING_HOLD_COMPLETION = "runningholdcompletion"

class CodeFlareClusterStatus(Enum):
READY = 1
QUEUED = 2
FAILED = 3
UNKNOWN = 4
@dataclass
class RayCluster:
name: str
status: str
status: RayClusterStatus
min_workers: int
max_workers: int
worker_mem_min: str
Expand All @@ -14,6 +35,7 @@ class RayCluster:
@dataclass
class AppWrapper:
name: str
status:str
status:AppWrapperStatus
can_run: bool
job_state: str

4 changes: 2 additions & 2 deletions tests/test_clusters.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

def test_cluster_up():
cluster = Cluster(ClusterConfiguration(name='raycluster-autoscaler'))
cluster.up()
cluster.up()
time.sleep(15)

def test_list_clusters():
Expand All @@ -19,5 +19,5 @@ def test_app_wrapper_status():

def test_cluster_down():
cluster = Cluster(ClusterConfiguration(name='raycluster-autoscaler'))
cluster.down(name='raycluster-autoscaler')
cluster.down()