Skip to content

Commit 0f4ef72

Browse files
Watch for RayCluster CRD then start RayCluster controller
1 parent 4606269 commit 0f4ef72

File tree

2 files changed

+72
-35
lines changed

2 files changed

+72
-35
lines changed

go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,9 @@ require (
1212
github.com/project-codeflare/codeflare-common v0.0.0-20240207083912-d7a229270a0a
1313
github.com/ray-project/kuberay/ray-operator v1.1.0
1414
go.uber.org/zap v1.26.0
15+
golang.org/x/exp v0.0.0-20230905200255-921286631fa9
1516
k8s.io/api v0.28.4
17+
k8s.io/apiextensions-apiserver v0.28.4
1618
k8s.io/apimachinery v0.28.4
1719
k8s.io/client-go v11.0.0+incompatible
1820
k8s.io/component-base v0.28.4
@@ -78,7 +80,6 @@ require (
7880
github.com/spf13/pflag v1.0.5 // indirect
7981
go.uber.org/atomic v1.11.0 // indirect
8082
go.uber.org/multierr v1.11.0 // indirect
81-
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect
8283
golang.org/x/net v0.23.0 // indirect
8384
golang.org/x/oauth2 v0.16.0 // indirect
8485
golang.org/x/sys v0.18.0 // indirect
@@ -92,7 +93,6 @@ require (
9293
gopkg.in/inf.v0 v0.9.1 // indirect
9394
gopkg.in/yaml.v2 v2.4.0 // indirect
9495
gopkg.in/yaml.v3 v3.0.1 // indirect
95-
k8s.io/apiextensions-apiserver v0.28.4 // indirect
9696
k8s.io/kube-openapi v0.0.0-20230901164831-6c774f458599 // indirect
9797
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
9898
sigs.k8s.io/structured-merge-diff/v4 v4.3.0 // indirect

main.go

Lines changed: 70 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -30,19 +30,24 @@ import (
3030
dsciv1 "github.com/opendatahub-io/opendatahub-operator/v2/apis/dscinitialization/v1"
3131
rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
3232
"go.uber.org/zap/zapcore"
33+
"golang.org/x/exp/slices"
3334

3435
corev1 "k8s.io/api/core/v1"
36+
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
37+
apiextensionsclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
3538
apierrors "k8s.io/apimachinery/pkg/api/errors"
3639
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3740
"k8s.io/apimachinery/pkg/runtime"
38-
"k8s.io/apimachinery/pkg/runtime/schema"
3941
"k8s.io/apimachinery/pkg/types"
4042
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
43+
"k8s.io/apimachinery/pkg/watch"
4144
"k8s.io/client-go/discovery"
4245
"k8s.io/client-go/kubernetes"
4346
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
4447
_ "k8s.io/client-go/plugin/pkg/client/auth"
4548
"k8s.io/client-go/rest"
49+
"k8s.io/client-go/tools/cache"
50+
retrywatch "k8s.io/client-go/tools/watch"
4651
configv1alpha1 "k8s.io/component-base/config/v1alpha1"
4752
"k8s.io/klog/v2"
4853
"k8s.io/utils/ptr"
@@ -169,34 +174,83 @@ func main() {
169174
exitOnError(err, cfg.KubeRay.IngressDomain)
170175
}
171176

172-
go setupControllers(mgr, kubeClient, cfg, isOpenShift(ctx, kubeClient.DiscoveryClient), certsReady)
173-
174177
setupLog.Info("setting up health endpoints")
175178
exitOnError(setupProbeEndpoints(mgr, cfg, certsReady), "unable to set up health check")
176179

180+
setupLog.Info("setting up RayCluster controller")
181+
exitOnError(waitForRayClusterAPIandSetupController(ctx, mgr, cfg, isOpenShift(ctx, kubeClient.DiscoveryClient), certsReady), "unable to setup RayCluster controller")
182+
177183
setupLog.Info("starting manager")
178184
exitOnError(mgr.Start(ctx), "error running manager")
179185
}
180186

181-
func setupControllers(mgr ctrl.Manager, dc discovery.DiscoveryInterface, cfg *config.CodeFlareOperatorConfiguration, isOpenShift bool, certsReady chan struct{}) {
187+
func setupRayClusterController(mgr ctrl.Manager, cfg *config.CodeFlareOperatorConfiguration, isOpenShift bool, certsReady chan struct{}) error {
182188
setupLog.Info("Waiting for certificate generation to complete")
183189
<-certsReady
184190
setupLog.Info("Certs ready")
185191

186-
exitOnError(controllers.SetupRayClusterWebhookWithManager(mgr, cfg.KubeRay), "error setting up RayCluster webhook")
192+
err := controllers.SetupRayClusterWebhookWithManager(mgr, cfg.KubeRay)
193+
if err != nil {
194+
return err
195+
}
187196

188-
ok, err := hasAPIResourceForGVK(dc, rayv1.GroupVersion.WithKind("RayCluster"))
189-
if ok {
190-
rayClusterController := controllers.RayClusterReconciler{
191-
Client: mgr.GetClient(),
192-
Scheme: mgr.GetScheme(),
193-
Config: cfg.KubeRay,
194-
IsOpenShift: isOpenShift,
195-
}
196-
exitOnError(rayClusterController.SetupWithManager(mgr), "Error setting up RayCluster controller")
197-
} else if err != nil {
198-
exitOnError(err, "Could not determine if RayCluster CR present on cluster.")
197+
rayClusterController := controllers.RayClusterReconciler{
198+
Client: mgr.GetClient(),
199+
Scheme: mgr.GetScheme(),
200+
Config: cfg.KubeRay,
201+
IsOpenShift: isOpenShift,
202+
}
203+
return rayClusterController.SetupWithManager(mgr)
204+
}
205+
206+
func waitForRayClusterAPIandSetupController(ctx context.Context, mgr ctrl.Manager, cfg *config.CodeFlareOperatorConfiguration, isOpenShift bool, certsReady chan struct{}) error {
207+
crdClient, err := apiextensionsclientset.NewForConfig(mgr.GetConfig())
208+
if err != nil {
209+
return err
210+
}
211+
crdList, err := crdClient.ApiextensionsV1().CustomResourceDefinitions().List(ctx, metav1.ListOptions{})
212+
if err != nil {
213+
return err
214+
}
215+
if slices.ContainsFunc(crdList.Items, func(crd apiextensionsv1.CustomResourceDefinition) bool {
216+
return crd.Name == "rayclusters.ray.io"
217+
}) {
218+
return setupRayClusterController(mgr, cfg, isOpenShift, certsReady)
219+
}
220+
221+
retryWatcher, err := retrywatch.NewRetryWatcher(crdList.ResourceVersion, &cache.ListWatch{
222+
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
223+
return crdClient.ApiextensionsV1().CustomResourceDefinitions().List(ctx, metav1.ListOptions{})
224+
},
225+
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
226+
return crdClient.ApiextensionsV1().CustomResourceDefinitions().Watch(ctx, metav1.ListOptions{})
227+
},
228+
})
229+
if err != nil {
230+
return err
199231
}
232+
233+
go func() {
234+
defer retryWatcher.Stop()
235+
for {
236+
select {
237+
case <-ctx.Done():
238+
return
239+
case event := <-retryWatcher.ResultChan():
240+
switch event.Type {
241+
case watch.Error:
242+
exitOnError(apierrors.FromObject(event.Object), "error watching for RayCluster API")
243+
244+
case watch.Added:
245+
setupLog.Info("RayCluster API installed, setting up controller")
246+
exitOnError(setupRayClusterController(mgr, cfg, isOpenShift, certsReady), "unable to setup RayCluster controller")
247+
return
248+
}
249+
}
250+
}
251+
}()
252+
253+
return nil
200254
}
201255

202256
// +kubebuilder:rbac:groups="",resources=secrets,verbs=get;list;watch;update
@@ -289,23 +343,6 @@ func createConfigMap(ctx context.Context, client kubernetes.Interface, ns, name
289343
return err
290344
}
291345

292-
func hasAPIResourceForGVK(dc discovery.DiscoveryInterface, gvk schema.GroupVersionKind) (bool, error) {
293-
gv, kind := gvk.ToAPIVersionAndKind()
294-
if resources, err := dc.ServerResourcesForGroupVersion(gv); err != nil {
295-
if apierrors.IsNotFound(err) {
296-
return false, nil
297-
}
298-
return false, err
299-
} else {
300-
for _, res := range resources.APIResources {
301-
if res.Kind == kind {
302-
return true, nil
303-
}
304-
}
305-
}
306-
return false, nil
307-
}
308-
309346
func namespaceOrDie() string {
310347
// This way assumes you've set the NAMESPACE environment variable either manually, when running
311348
// the operator standalone, or using the downward API, when running the operator in-cluster.

0 commit comments

Comments
 (0)