diff --git a/pkg/builder/controller.go b/pkg/builder/controller.go index 81f446d62f..57757a6fff 100644 --- a/pkg/builder/controller.go +++ b/pkg/builder/controller.go @@ -23,6 +23,7 @@ import ( "github.com/go-logr/logr" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/discovery" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/apiutil" @@ -72,6 +73,7 @@ type ForInput struct { predicates []predicate.Predicate objectProjection objectProjection err error + sporadic bool } // For defines the type of Object being *reconciled*, and configures the ControllerManagedBy to respond to create / delete / @@ -97,6 +99,7 @@ type OwnsInput struct { object client.Object predicates []predicate.Predicate objectProjection objectProjection + sporadic bool } // Owns defines types of Objects being *generated* by the ControllerManagedBy, and configures the ControllerManagedBy to respond to @@ -118,6 +121,7 @@ type WatchesInput struct { eventhandler handler.EventHandler predicates []predicate.Predicate objectProjection objectProjection + sporadic bool } // Watches exposes the lower-level ControllerManagedBy Watches functions through the builder. Consider using @@ -222,7 +226,37 @@ func (blder *Builder) doWatch() error { if err != nil { return err } - src := &source.Kind{Type: typeForSrc} + var src source.Source + fmt.Printf("blder.forInput.sporadic = %+v\n", blder.forInput.sporadic) + if blder.forInput.sporadic { + gvk, err := getGvk(blder.forInput.object, blder.mgr.GetScheme()) + if err != nil { + return err + } + fmt.Printf("gvk = %+v\n", gvk) + dc, err := discovery.NewDiscoveryClientForConfig(blder.mgr.GetConfig()) + if err != nil { + return err + } + existsInDiscovery := func() bool { + resources, err := dc.ServerResourcesForGroupVersion(gvk.GroupVersion().String()) + if err != nil { + fmt.Printf("NOT in discovery gvk = %+v\n", gvk) + return false + } + for _, res := range resources.APIResources { + if res.Kind == gvk.Kind { + fmt.Printf("YES in discovery gvk = %+v\n", gvk) + return true + } + } + fmt.Printf("NOT in discovery kind = %+v\n", gvk) + return false + } + src = &source.SporadicKind{Kind: source.Kind{Type: typeForSrc}, DiscoveryCheck: existsInDiscovery} + } else { + src = &source.Kind{Type: typeForSrc} + } hdler := &handler.EnqueueRequestForObject{} allPredicates := append(blder.globalPredicates, blder.forInput.predicates...) if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil { @@ -236,6 +270,7 @@ func (blder *Builder) doWatch() error { return err } src := &source.Kind{Type: typeForSrc} + // TODO: handle sporadic watches for owns types too hdler := &handler.EnqueueRequestForOwner{ OwnerType: blder.forInput.object, IsController: true, @@ -252,6 +287,8 @@ func (blder *Builder) doWatch() error { allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...) allPredicates = append(allPredicates, w.predicates...) + // TODO: handle sporadic watches for owns types too + // If the source of this watch is of type *source.Kind, project it. if srckind, ok := w.src.(*source.Kind); ok { typeForSrc, err := blder.project(srckind.Type, w.objectProjection) diff --git a/pkg/builder/options.go b/pkg/builder/options.go index 7bb4273094..4fc8db2eed 100644 --- a/pkg/builder/options.go +++ b/pkg/builder/options.go @@ -99,6 +99,12 @@ func (p projectAs) ApplyToWatches(opts *WatchesInput) { opts.objectProjection = objectProjection(p) } +type Sporadic struct{} + +func (s Sporadic) ApplyToFor(opts *ForInput) { + opts.sporadic = true +} + var ( // OnlyMetadata tells the controller to *only* cache metadata, and to watch // the the API server in metadata-only form. This is useful when watching diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index 71dfbd0454..b89ca4e6ea 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -54,6 +54,8 @@ type Informers interface { // API kind and resource. GetInformer(ctx context.Context, obj client.Object) (Informer, error) + GetStoppableInformer(ctx context.Context, obj client.Object) (Informer, <-chan struct{}, error) + // GetInformerForKind is similar to GetInformer, except that it takes a group-version-kind, instead // of the underlying object. GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind) (Informer, error) diff --git a/pkg/cache/informer_cache.go b/pkg/cache/informer_cache.go index 8ec3b921d9..f435a93636 100644 --- a/pkg/cache/informer_cache.go +++ b/pkg/cache/informer_cache.go @@ -57,7 +57,7 @@ func (ip *informerCache) Get(ctx context.Context, key client.ObjectKey, out clie return err } - started, cache, err := ip.InformersMap.Get(ctx, gvk, out) + started, cache, err := ip.InformersMap.Get(ctx, gvk, out, false) if err != nil { return err } @@ -76,7 +76,7 @@ func (ip *informerCache) List(ctx context.Context, out client.ObjectList, opts . return err } - started, cache, err := ip.InformersMap.Get(ctx, *gvk, cacheTypeObj) + started, cache, err := ip.InformersMap.Get(ctx, *gvk, cacheTypeObj, false) if err != nil { return err } @@ -138,7 +138,7 @@ func (ip *informerCache) GetInformerForKind(ctx context.Context, gvk schema.Grou return nil, err } - _, i, err := ip.InformersMap.Get(ctx, gvk, obj) + _, i, err := ip.InformersMap.Get(ctx, gvk, obj, false) if err != nil { return nil, err } @@ -152,13 +152,25 @@ func (ip *informerCache) GetInformer(ctx context.Context, obj client.Object) (In return nil, err } - _, i, err := ip.InformersMap.Get(ctx, gvk, obj) + _, i, err := ip.InformersMap.Get(ctx, gvk, obj, false) if err != nil { return nil, err } return i.Informer, err } +func (ip *informerCache) GetStoppableInformer(ctx context.Context, obj client.Object) (Informer, <-chan struct{}, error) { + gvk, err := apiutil.GVKForObject(obj, ip.Scheme) + if err != nil { + return nil, nil, err + } + _, i, err := ip.InformersMap.Get(ctx, gvk, obj, true) + if err != nil { + return nil, nil, err + } + return i.Informer, i.StopCh, err +} + // NeedLeaderElection implements the LeaderElectionRunnable interface // to indicate that this can be started without requiring the leader lock func (ip *informerCache) NeedLeaderElection() bool { diff --git a/pkg/cache/informertest/fake_cache.go b/pkg/cache/informertest/fake_cache.go index eb78e0bb65..fcaddb963e 100644 --- a/pkg/cache/informertest/fake_cache.go +++ b/pkg/cache/informertest/fake_cache.go @@ -79,6 +79,15 @@ func (c *FakeInformers) GetInformer(ctx context.Context, obj client.Object) (cac return c.informerFor(gvk, obj) } +// GetStoppableInformer implements informers +func (c *FakeInformers) GetStoppableInformer(ctx context.Context, obj client.Object) (cache.Informer, <-chan struct{}, error) { + // TODO: + //panic("not implemented") + i, e := c.GetInformer(ctx, obj) + return i, nil, e + +} + // WaitForCacheSync implements Informers func (c *FakeInformers) WaitForCacheSync(ctx context.Context) bool { if c.Synced == nil { diff --git a/pkg/cache/internal/deleg_map.go b/pkg/cache/internal/deleg_map.go index 02bb1919f7..811f3be023 100644 --- a/pkg/cache/internal/deleg_map.go +++ b/pkg/cache/internal/deleg_map.go @@ -89,18 +89,18 @@ func (m *InformersMap) WaitForCacheSync(ctx context.Context) bool { // Get will create a new Informer and add it to the map of InformersMap if none exists. Returns // the Informer from the map. -func (m *InformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object) (bool, *MapEntry, error) { +func (m *InformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object, stopOnError bool) (bool, *MapEntry, error) { switch obj.(type) { case *unstructured.Unstructured: - return m.unstructured.Get(ctx, gvk, obj) + return m.unstructured.Get(ctx, gvk, obj, stopOnError) case *unstructured.UnstructuredList: - return m.unstructured.Get(ctx, gvk, obj) + return m.unstructured.Get(ctx, gvk, obj, stopOnError) case *metav1.PartialObjectMetadata: - return m.metadata.Get(ctx, gvk, obj) + return m.metadata.Get(ctx, gvk, obj, stopOnError) case *metav1.PartialObjectMetadataList: - return m.metadata.Get(ctx, gvk, obj) + return m.metadata.Get(ctx, gvk, obj, stopOnError) default: - return m.structured.Get(ctx, gvk, obj) + return m.structured.Get(ctx, gvk, obj, stopOnError) } } diff --git a/pkg/cache/internal/informers_map.go b/pkg/cache/internal/informers_map.go index 6b57c6fa61..98b443d5e5 100644 --- a/pkg/cache/internal/informers_map.go +++ b/pkg/cache/internal/informers_map.go @@ -71,6 +71,10 @@ type MapEntry struct { // CacheReader wraps Informer and implements the CacheReader interface for a single type Reader CacheReader + + // informerDone is a channel that is closed after + // the informer stops + StopCh <-chan struct{} } // specificInformersMap create and caches Informers for (runtime.Object, schema.GroupVersionKind) pairs. @@ -166,7 +170,8 @@ func (ip *specificInformersMap) HasSyncedFuncs() []cache.InformerSynced { // Get will create a new Informer and add it to the map of specificInformersMap if none exists. Returns // the Informer from the map. -func (ip *specificInformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object) (bool, *MapEntry, error) { +func (ip *specificInformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object, stopOnError bool) (bool, *MapEntry, error) { + fmt.Println("inf Get") // Return the informer if it is found i, started, ok := func() (*MapEntry, bool, bool) { ip.mu.RLock() @@ -177,7 +182,7 @@ func (ip *specificInformersMap) Get(ctx context.Context, gvk schema.GroupVersion if !ok { var err error - if i, started, err = ip.addInformerToMap(gvk, obj); err != nil { + if i, started, err = ip.addInformerToMap(gvk, obj, stopOnError); err != nil { return started, nil, err } } @@ -192,7 +197,8 @@ func (ip *specificInformersMap) Get(ctx context.Context, gvk schema.GroupVersion return started, i, nil } -func (ip *specificInformersMap) addInformerToMap(gvk schema.GroupVersionKind, obj runtime.Object) (*MapEntry, bool, error) { +func (ip *specificInformersMap) addInformerToMap(gvk schema.GroupVersionKind, obj runtime.Object, stopOnError bool) (*MapEntry, bool, error) { + fmt.Println("inf addInf") ip.mu.Lock() defer ip.mu.Unlock() @@ -203,10 +209,13 @@ func (ip *specificInformersMap) addInformerToMap(gvk schema.GroupVersionKind, ob return i, ip.started, nil } + fmt.Println("inf createLW") + fmt.Printf("inf gvk = %+v\n", gvk) // Create a NewSharedIndexInformer and add it to the map. var lw *cache.ListWatch lw, err := ip.createListWatcher(gvk, ip) if err != nil { + fmt.Printf("inf createLW err = %+v\n", err) return nil, false, err } ni := cache.NewSharedIndexInformer(lw, obj, resyncPeriod(ip.resync)(), cache.Indexers{ @@ -214,20 +223,57 @@ func (ip *specificInformersMap) addInformerToMap(gvk schema.GroupVersionKind, ob }) rm, err := ip.mapper.RESTMapping(gvk.GroupKind(), gvk.Version) if err != nil { + fmt.Printf("RESTMapping err = %+v\n", err) + fmt.Printf("gvk = %+v\n", gvk) return nil, false, err } + fmt.Println("inf RM success") + informerStop := make(chan struct{}) i := &MapEntry{ Informer: ni, Reader: CacheReader{indexer: ni.GetIndexer(), groupVersionKind: gvk, scopeName: rm.Scope.Name()}, + StopCh: informerStop, } ip.informersByGVK[gvk] = i + go func() { + <-ip.stop + close(informerStop) + }() + + i.Informer.SetWatchErrorHandler(func(r *cache.Reflector, err error) { + ip.mu.RLock() + defer ip.mu.RUnlock() + fmt.Printf("inf handler err = %+v\n", err) + // TODO: check the error for not found + if stopOnError { + fmt.Println("inf stopping") + close(informerStop) + } + + }) + // Start the Informer if need by // TODO(seans): write thorough tests and document what happens here - can you add indexers? // can you add eventhandlers? + + // TODO: not cancelling? (leak) + //runCtx, cancel := context.WithCancel(ctx) + //go func() { + // <-ip.stop + // fmt.Println("inf ip stopped") + // cancel() + //}() if ip.started { - go i.Informer.Run(ip.stop) + fmt.Println("inf Run") + //go i.Informer.Run(informerStop) + go func() { + i.Informer.Run(informerStop) + fmt.Println("informer done running, remove from map") + delete(ip.informersByGVK, gvk) + }() } + fmt.Println("inf addInformer returning") return i, ip.started, nil } diff --git a/pkg/cache/multi_namespace_cache.go b/pkg/cache/multi_namespace_cache.go index f0e18c09b0..1fab974638 100644 --- a/pkg/cache/multi_namespace_cache.go +++ b/pkg/cache/multi_namespace_cache.go @@ -82,6 +82,21 @@ func (c *multiNamespaceCache) GetInformer(ctx context.Context, obj client.Object return &multiNamespaceInformer{namespaceToInformer: informers}, nil } +// TODO +func (c *multiNamespaceCache) GetStoppableInformer(ctx context.Context, obj client.Object) (Informer, <-chan struct{}, error) { + panic("not implemented") + //informers := map[string]Informer{} + //for ns, cache := range c.namespaceToCache { + + // informer, _, err := cache.GetStoppableInformer(ctx, obj) + // if err != nil { + // return nil, nil, err + // } + // informers[ns] = informer + //} + //return &multiNamespaceInformer{namespaceToInformer: informers}, nil, nil +} + func (c *multiNamespaceCache) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind) (Informer, error) { informers := map[string]Informer{} for ns, cache := range c.namespaceToCache { diff --git a/pkg/controller/controller_integration_test.go b/pkg/controller/controller_integration_test.go index 762b3d9fbb..629d916f17 100644 --- a/pkg/controller/controller_integration_test.go +++ b/pkg/controller/controller_integration_test.go @@ -18,15 +18,27 @@ package controller_test import ( "context" + goerrors "errors" + "path/filepath" + "time" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" "sigs.k8s.io/controller-runtime/pkg/cache" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllertest" + foo "sigs.k8s.io/controller-runtime/pkg/controller/testdata/foo/v1" + "sigs.k8s.io/controller-runtime/pkg/envtest" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" @@ -173,6 +185,198 @@ var _ = Describe("controller", func() { close(done) }, 5) }) + + It("should reconcile when the CRD is installed, uninstalled, reinstalled", func(done Done) { + By("Initializing the scheme and crd") + s := runtime.NewScheme() + err := v1beta1.AddToScheme(s) + Expect(err).NotTo(HaveOccurred()) + err = apiextensionsv1.AddToScheme(s) + Expect(err).NotTo(HaveOccurred()) + err = foo.AddToScheme(s) + Expect(err).NotTo(HaveOccurred()) + options := manager.Options{Scheme: s} + + By("Creating the Manager") + cm, err := manager.New(cfg, options) + Expect(err).NotTo(HaveOccurred()) + + By("Creating the Controller") + instance, err := controller.New("foo-controller", cm, controller.Options{ + Reconciler: reconcile.Func( + func(_ context.Context, request reconcile.Request) (reconcile.Result, error) { + reconciled <- request + return reconcile.Result{}, nil + }), + }) + Expect(err).NotTo(HaveOccurred()) + + By("Watching foo CRD as sporadic kinds") + f := &foo.Foo{} + gvk := schema.GroupVersionKind{ + Group: "bar.example.com", + Version: "v1", + Kind: "Foo", + } + Expect(err).NotTo(HaveOccurred()) + existsInDiscovery := func() bool { + resources, err := clientset.Discovery().ServerResourcesForGroupVersion(gvk.GroupVersion().String()) + if err != nil { + return false + } + for _, res := range resources.APIResources { + if res.Kind == gvk.Kind { + return true + } + } + return false + } + err = instance.Watch(&source.SporadicKind{Kind: source.Kind{Type: f}, DiscoveryCheck: existsInDiscovery}, &handler.EnqueueRequestForObject{}) + Expect(err).NotTo(HaveOccurred()) + + By("Starting the Manager") + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { + defer GinkgoRecover() + Expect(cm.Start(ctx)).NotTo(HaveOccurred()) + }() + + testFoo := &foo.Foo{ + TypeMeta: metav1.TypeMeta{Kind: gvk.Kind, APIVersion: gvk.GroupVersion().String()}, + ObjectMeta: metav1.ObjectMeta{Name: "test-foo", Namespace: "default"}, + } + + expectedReconcileRequest := reconcile.Request{NamespacedName: types.NamespacedName{ + Name: "test-foo", + Namespace: "default", + }} + _ = expectedReconcileRequest + + By("Failing to create a foo object if the crd isn't installed") + kindMatchErr := &meta.NoKindMatchError{} + err = cm.GetClient().Create(ctx, testFoo) + Expect(goerrors.As(err, &kindMatchErr)).To(BeTrue()) + + By("Installing the CRD") + crdPath := filepath.Join(".", "testdata", "foo", "foocrd.yaml") + crdOpts := envtest.CRDInstallOptions{ + Paths: []string{crdPath}, + MaxTime: 50 * time.Millisecond, + PollInterval: 15 * time.Millisecond, + } + crds, err := envtest.InstallCRDs(cfg, crdOpts) + Expect(err).NotTo(HaveOccurred()) + Expect(len(crds)).To(Equal(1)) + + By("Expecting to find the CRD") + crdv1 := &apiextensionsv1.CustomResourceDefinition{} + err = cm.GetClient().Get(context.TODO(), types.NamespacedName{Name: "foos.bar.example.com"}, crdv1) + Expect(err).NotTo(HaveOccurred()) + Expect(crdv1.Spec.Names.Kind).To(Equal("Foo")) + + err = envtest.WaitForCRDs(cfg, []client.Object{ + &v1beta1.CustomResourceDefinition{ + Spec: v1beta1.CustomResourceDefinitionSpec{ + Group: "bar.example.com", + Names: v1beta1.CustomResourceDefinitionNames{ + Kind: "Foo", + Plural: "foos", + }, + Versions: []v1beta1.CustomResourceDefinitionVersion{ + { + Name: "v1", + Storage: true, + Served: true, + }, + }}, + }, + }, + crdOpts, + ) + Expect(err).NotTo(HaveOccurred()) + + By("Invoking Reconcile for foo Create") + err = cm.GetClient().Create(ctx, testFoo) + Expect(err).NotTo(HaveOccurred()) + Expect(<-reconciled).To(Equal(expectedReconcileRequest)) + + By("Uninstalling the CRD") + err = envtest.UninstallCRDs(cfg, crdOpts) + Expect(err).NotTo(HaveOccurred()) + // wait for discovery to not recognize the resource after uninstall + wait.PollImmediate(15*time.Millisecond, 50*time.Millisecond, func() (bool, error) { + if _, err := clientset.Discovery().ServerResourcesForGroupVersion(gvk.Group + "/" + gvk.Version); err != nil { + if err.Error() == "the server could not find the requested resource" { + return true, nil + } + } + return false, nil + }) + + By("Failing create foo object if the crd isn't installed") + errNotFound := errors.NewGenericServerResponse(404, "POST", schema.GroupResource{Group: "bar.example.com", Resource: "foos"}, "", "404 page not found", 0, true) + err = cm.GetClient().Create(ctx, testFoo) + Expect(err).To(Equal(errNotFound)) + + By("Reinstalling the CRD") + crds, err = envtest.InstallCRDs(cfg, crdOpts) + Expect(err).NotTo(HaveOccurred()) + Expect(len(crds)).To(Equal(1)) + + By("Expecting to find the CRD") + crdv1 = &apiextensionsv1.CustomResourceDefinition{} + err = cm.GetClient().Get(context.TODO(), types.NamespacedName{Name: "foos.bar.example.com"}, crdv1) + Expect(err).NotTo(HaveOccurred()) + Expect(crdv1.Spec.Names.Kind).To(Equal("Foo")) + + err = envtest.WaitForCRDs(cfg, []client.Object{ + &v1beta1.CustomResourceDefinition{ + Spec: v1beta1.CustomResourceDefinitionSpec{ + Group: "bar.example.com", + Names: v1beta1.CustomResourceDefinitionNames{ + Kind: "Foo", + Plural: "foos", + }, + Versions: []v1beta1.CustomResourceDefinitionVersion{ + { + Name: "v1", + Storage: true, + Served: true, + }, + }}, + }, + }, + crdOpts, + ) + Expect(err).NotTo(HaveOccurred()) + + By("Invoking Reconcile for foo Create") + testFoo.ResourceVersion = "" + err = cm.GetClient().Create(ctx, testFoo) + Expect(err).NotTo(HaveOccurred()) + Expect(<-reconciled).To(Equal(expectedReconcileRequest)) + + By("Uninstalling the CRD") + err = envtest.UninstallCRDs(cfg, crdOpts) + Expect(err).NotTo(HaveOccurred()) + // wait for discovery to not recognize the resource after uninstall + wait.PollImmediate(15*time.Millisecond, 50*time.Millisecond, func() (bool, error) { + if _, err := clientset.Discovery().ServerResourcesForGroupVersion(gvk.Group + "/" + gvk.Version); err != nil { + if err.Error() == "the server could not find the requested resource" { + return true, nil + } + } + return false, nil + }) + + By("Failing create foo object if the crd isn't installed") + err = cm.GetClient().Create(ctx, testFoo) + Expect(err).To(Equal(errNotFound)) + + close(done) + }, 10) + }) func truePtr() *bool { diff --git a/pkg/controller/controller_test.go b/pkg/controller/controller_test.go index 4b0a2ee914..92b11a5c32 100644 --- a/pkg/controller/controller_test.go +++ b/pkg/controller/controller_test.go @@ -90,6 +90,7 @@ var _ = Describe("controller.Controller", func() { close(done) }) + // TODO: we should dupe this for sporadic controllers It("should not leak goroutines when stopped", func() { currentGRs := goleak.IgnoreCurrent() diff --git a/pkg/controller/testdata/foo/foo_types.go b/pkg/controller/testdata/foo/foo_types.go new file mode 100644 index 0000000000..4cd289275e --- /dev/null +++ b/pkg/controller/testdata/foo/foo_types.go @@ -0,0 +1,139 @@ +package foo + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "sigs.k8s.io/controller-runtime/pkg/scheme" +) + +type FooSpec struct{} +type FooStatus struct{} + +// +kubebuilder:object:root=true + +type Foo struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty"` + + Spec FooSpec `json:"spec,omitempty"` + Status FooStatus `json:"status,omitempty"` +} + +// +kubebuilder:object:root=true + +type FooList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata,omitempty"` + Items []Foo `json:"items"` +} + +func init() { + SchemeBuilder.Register(&Foo{}, &FooList{}) +} + +var ( + GroupVersion = schema.GroupVersion{Group: "bar.example.com", Version: "v1"} + SchemeBuilder = &scheme.Builder{GroupVersion: GroupVersion} + AddToScheme = SchemeBuilder.AddToScheme +) + +// TODO: autogen all this stuff (learn how to) and split into v1 directory +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Foo) DeepCopyInto(out *Foo) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + out.Spec = in.Spec + out.Status = in.Status +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Foo. +func (in *Foo) DeepCopy() *Foo { + if in == nil { + return nil + } + out := new(Foo) + in.DeepCopyInto(out) + return out +} + +func (in *Foo) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil + +} + +func (f *Foo) SetGroupVersionKind(kind schema.GroupVersionKind) {} +func (f *Foo) GroupVersionKind() schema.GroupVersionKind { + return schema.GroupVersionKind{ + Group: "bar.example.com", + Version: "v1", + Kind: "Foo", + } + +} + +func (f *Foo) GetObjectKind() schema.ObjectKind { + return f + +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *FooList) DeepCopyInto(out *FooList) { + *out = *in + out.TypeMeta = in.TypeMeta + out.ListMeta = in.ListMeta + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]Foo, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +func (in *FooList) DeepCopy() *FooList { + if in == nil { + return nil + } + out := new(FooList) + in.DeepCopyInto(out) + return out +} + +func (in *FooList) DeepCopyObject() runtime.Object { + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *FooSpec) DeepCopyInto(out *FooSpec) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new FooSpec. +func (in *FooSpec) DeepCopy() *FooSpec { + if in == nil { + return nil + } + out := new(FooSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *FooStatus) DeepCopyInto(out *FooStatus) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new FooStatus. +func (in *FooStatus) DeepCopy() *FooStatus { + if in == nil { + return nil + } + out := new(FooStatus) + in.DeepCopyInto(out) + return out +} diff --git a/pkg/controller/testdata/foo/foocrd.yaml b/pkg/controller/testdata/foo/foocrd.yaml new file mode 100644 index 0000000000..e2bddbc528 --- /dev/null +++ b/pkg/controller/testdata/foo/foocrd.yaml @@ -0,0 +1,17 @@ +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + name: foos.bar.example.com +spec: + group: bar.example.com + names: + kind: Foo + plural: foos + scope: Namespaced + versions: + - name: "v1" + storage: true + served: true + schema: + openAPIV3Schema: + type: object diff --git a/pkg/controller/testdata/foo/v1/foo_types.go b/pkg/controller/testdata/foo/v1/foo_types.go new file mode 100644 index 0000000000..380b7ce8c4 --- /dev/null +++ b/pkg/controller/testdata/foo/v1/foo_types.go @@ -0,0 +1,60 @@ +/* + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1 + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN! +// NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized. + +// FooSpec defines the desired state of Foo +type FooSpec struct { + // INSERT ADDITIONAL SPEC FIELDS - desired state of cluster + // Important: Run "make" to regenerate code after modifying this file + RunAt string `json:"runAt"` +} + +// FooStatus defines the observed state of Foo +type FooStatus struct { + // INSERT ADDITIONAL STATUS FIELD - define observed state of cluster + // Important: Run "make" to regenerate code after modifying this file +} + +// +kubebuilder:object:root=true + +// Foo is the Schema for the externaljobs API +type Foo struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty"` + + Spec FooSpec `json:"spec,omitempty"` + Status FooStatus `json:"status,omitempty"` +} + +// +kubebuilder:object:root=true + +// FooList contains a list of Foo +type FooList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata,omitempty"` + Items []Foo `json:"items"` +} + +func init() { + SchemeBuilder.Register(&Foo{}, &FooList{}) +} diff --git a/pkg/controller/testdata/foo/v1/groupversion_info.go b/pkg/controller/testdata/foo/v1/groupversion_info.go new file mode 100644 index 0000000000..a641b61024 --- /dev/null +++ b/pkg/controller/testdata/foo/v1/groupversion_info.go @@ -0,0 +1,38 @@ +/* +Copyright 2018 The Kubernetes authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// +kubebuilder:object:generate=true +// +groupName=chaosapps.metamagical.io +package v1 + +import ( + "k8s.io/apimachinery/pkg/runtime/schema" + logf "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/scheme" +) + +var ( + log = logf.Log.WithName("foo-resource") + + // SchemeGroupVersion is group version used to register these objects + SchemeGroupVersion = schema.GroupVersion{Group: "bar.example.com", Version: "v1"} + + // SchemeBuilder is used to add go types to the GroupVersionKind scheme + SchemeBuilder = &scheme.Builder{GroupVersion: SchemeGroupVersion} + + // AddToScheme is required by pkg/client/... + AddToScheme = SchemeBuilder.AddToScheme +) diff --git a/pkg/controller/testdata/foo/v1/zz_generated.deepcopy.go b/pkg/controller/testdata/foo/v1/zz_generated.deepcopy.go new file mode 100644 index 0000000000..01a5a9e34f --- /dev/null +++ b/pkg/controller/testdata/foo/v1/zz_generated.deepcopy.go @@ -0,0 +1,98 @@ +// +build !ignore_autogenerated + +// Code generated by controller-gen. DO NOT EDIT. + +package v1 + +import ( + runtime "k8s.io/apimachinery/pkg/runtime" +) + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Foo) DeepCopyInto(out *Foo) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + out.Spec = in.Spec + out.Status = in.Status +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Foo. +func (in *Foo) DeepCopy() *Foo { + if in == nil { + return nil + } + out := new(Foo) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *Foo) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *FooList) DeepCopyInto(out *FooList) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ListMeta.DeepCopyInto(&out.ListMeta) + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]Foo, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new FooList. +func (in *FooList) DeepCopy() *FooList { + if in == nil { + return nil + } + out := new(FooList) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *FooList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *FooSpec) DeepCopyInto(out *FooSpec) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new FooSpec. +func (in *FooSpec) DeepCopy() *FooSpec { + if in == nil { + return nil + } + out := new(FooSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *FooStatus) DeepCopyInto(out *FooStatus) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new FooStatus. +func (in *FooStatus) DeepCopy() *FooStatus { + if in == nil { + return nil + } + out := new(FooStatus) + in.DeepCopyInto(out) + return out +} diff --git a/pkg/internal/controller/controller.go b/pkg/internal/controller/controller.go index f5024502d9..3a7edb0de8 100644 --- a/pkg/internal/controller/controller.go +++ b/pkg/internal/controller/controller.go @@ -81,7 +81,8 @@ type Controller struct { CacheSyncTimeout time.Duration // startWatches maintains a list of sources, handlers, and predicates to start when the controller is started. - startWatches []watchDescription + startWatches []watchDescription + sporadicWatches []sporadicWatchDescription // Log is used to log messages to users during reconciliation, or for example when a watch is started. Log logr.Logger @@ -94,6 +95,12 @@ type watchDescription struct { predicates []predicate.Predicate } +type sporadicWatchDescription struct { + src source.SporadicSource + handler handler.EventHandler + predicates []predicate.Predicate +} + // Reconcile implements reconcile.Reconciler func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { log := c.Log.WithValues("name", req.Name, "namespace", req.Namespace) @@ -119,6 +126,12 @@ func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prc } } + // These get held on indefinitely + if sporadicSource, ok := src.(source.SporadicSource); ok && !c.Started { + c.sporadicWatches = append(c.sporadicWatches, sporadicWatchDescription{src: sporadicSource, handler: evthdler, predicates: prct}) + return nil + } + // Controller hasn't started yet, store the watches locally and return. // // These watches are going to be held on the controller struct until the manager or user calls Start(...). @@ -131,6 +144,32 @@ func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prc return src.Start(c.ctx, evthdler, c.Queue, prct...) } +// Rea +func (c *Controller) Ready(ctx context.Context) <-chan struct{} { + fmt.Printf("ctrl ReadyToStart len(c.sporadicWatches) = %+v\n", len(c.sporadicWatches)) + ready := make(chan struct{}) + if len(c.sporadicWatches) == 0 { + close(ready) + return ready + } + + var wg sync.WaitGroup + for _, w := range c.sporadicWatches { + wg.Add(1) + fmt.Println("ctrl checking src ready") + go w.src.Ready(ctx, &wg) + } + + go func() { + fmt.Println("ctrl ready wg wait starting") + wg.Wait() + fmt.Println("ctrl ready wg wait done closing ready") + close(ready) + fmt.Println("ctrl all sources ready") + }() + return ready +} + // Start implements controller.Controller func (c *Controller) Start(ctx context.Context) error { // use an IIFE to get proper lock handling @@ -142,12 +181,17 @@ func (c *Controller) Start(ctx context.Context) error { c.initMetrics() + // wrap the given context in a cancellable one so that we can + // stop sporadic watches when their done signal fires + ctrlCtx, ctrlCancel := context.WithCancel(ctx) + defer ctrlCancel() + // Set the internal context. - c.ctx = ctx + c.ctx = ctrlCtx c.Queue = c.MakeQueue() go func() { - <-ctx.Done() + <-ctrlCtx.Done() c.Queue.ShutDown() }() @@ -164,11 +208,33 @@ func (c *Controller) Start(ctx context.Context) error { for _, watch := range c.startWatches { c.Log.Info("Starting EventSource", "source", watch.src) - if err := watch.src.Start(ctx, watch.handler, c.Queue, watch.predicates...); err != nil { + if err := watch.src.Start(ctrlCtx, watch.handler, c.Queue, watch.predicates...); err != nil { return err } } + c.Log.Info("sporadic watches", "len", len(c.sporadicWatches)) + // TODO: do we need a waitgroup here so that we only clear c.Started once all the watches are done + // or should a single done watch trigger all the others to stop, or something else? + for _, watch := range c.sporadicWatches { + c.Log.Info("sporadic Starting EventSource", "source", watch.src) + + // Call a version of the Start method specific to SporadicSource that returns + // a done signal that fires when the CRD is no longer installed (and the informer gets shut down) + done, err := watch.src.StartNotifyDone(ctrlCtx, watch.handler, c.Queue, watch.predicates...) + if err != nil { + return err + } + // wait for done to fire and when it does, shut down the controller and reset c.Started + go func() { + fmt.Println("ctrl waiting for done") + <-done + fmt.Println("ctrl done fired, ctrlCancelling") + c.Started = false + ctrlCancel() + }() + } + // Start the SharedIndexInformer factories to begin populating the SharedIndexInformer caches c.Log.Info("Starting Controller") @@ -180,7 +246,7 @@ func (c *Controller) Start(ctx context.Context) error { if err := func() error { // use a context with timeout for launching sources and syncing caches. - sourceStartCtx, cancel := context.WithTimeout(ctx, c.CacheSyncTimeout) + sourceStartCtx, cancel := context.WithTimeout(ctrlCtx, c.CacheSyncTimeout) defer cancel() // WaitForSync waits for a definitive timeout, and returns if there @@ -211,7 +277,7 @@ func (c *Controller) Start(ctx context.Context) error { defer wg.Done() // Run a worker thread that just dequeues items, processes them, and marks them done. // It enforces that the reconcileHandler is never invoked concurrently with the same object. - for c.processNextWorkItem(ctx) { + for c.processNextWorkItem(ctrlCtx) { } }() } @@ -223,7 +289,7 @@ func (c *Controller) Start(ctx context.Context) error { return err } - <-ctx.Done() + <-ctrlCtx.Done() c.Log.Info("Shutdown signal received, waiting for all workers to finish") wg.Wait() c.Log.Info("All workers finished") diff --git a/pkg/manager/internal.go b/pkg/manager/internal.go index 57f95ba5b3..9c58032358 100644 --- a/pkg/manager/internal.go +++ b/pkg/manager/internal.go @@ -71,6 +71,8 @@ type controllerManager struct { // These Runnables will not be blocked by lead election. nonLeaderElectionRunnables []Runnable + sporadicRunnables []SporadicRunnable + // recorderProvider is used to generate event recorders that will be injected into Controllers // (and EventHandlers, Sources and Predicates). recorderProvider *intrec.Provider @@ -202,12 +204,20 @@ func (cm *controllerManager) Add(r Runnable) error { var shouldStart bool // Add the runnable to the leader election or the non-leaderelection list + //// TODO: currently we treat sporadicRunnable as separate from LER/non-LER + //// but we shouldn't right? + //if sporadicRunnable, ok := r.(SporadicRunnable); ok { + // fmt.Println("mgr adding sporadic") + // cm.sporadicRunnables = append(cm.sporadicRunnables, sporadicRunnable) + //} else if leRunnable, ok := r.(LeaderElectionRunnable); ok && !leRunnable.NeedLeaderElection() { if leRunnable, ok := r.(LeaderElectionRunnable); ok && !leRunnable.NeedLeaderElection() { + fmt.Println("mgr adding non ler") shouldStart = cm.started cm.nonLeaderElectionRunnables = append(cm.nonLeaderElectionRunnables, r) } else if hasCache, ok := r.(hasCache); ok { cm.caches = append(cm.caches, hasCache) } else { + fmt.Println("mgr adding ler") shouldStart = cm.startedLeader cm.leaderElectionRunnables = append(cm.leaderElectionRunnables, r) } @@ -427,6 +437,7 @@ func (cm *controllerManager) serveHealthProbes() { // Run server cm.startRunnable(RunnableFunc(func(_ context.Context) error { + cm.logger.Info("starting health probes") if err := server.Serve(cm.healthProbeListener); err != nil && err != http.ErrServerClosed { return err } @@ -443,6 +454,7 @@ func (cm *controllerManager) serveHealthProbes() { } func (cm *controllerManager) Start(ctx context.Context) (err error) { + fmt.Println("mgr Start") if err := cm.Add(cm.cluster); err != nil { return fmt.Errorf("failed to add cluster to runnables: %w", err) } @@ -486,6 +498,8 @@ func (cm *controllerManager) Start(ctx context.Context) (err error) { go cm.serveHealthProbes() } + //go cm.startSporadicRunnables() + go cm.startNonLeaderElectionRunnables() go func() { @@ -501,14 +515,19 @@ func (cm *controllerManager) Start(ctx context.Context) (err error) { } }() + fmt.Println("mgr start waiting on signal") select { case <-ctx.Done(): + fmt.Println("mgr start ctx.Done fired") // We are done return nil case err := <-cm.errChan: + fmt.Printf("mgr start errChan fired, %v\n", err) // Error starting or running a runnable return err } + fmt.Println("mgr start finished") + return nil } // engageStopProcedure signals all runnables to stop, reads potential errors @@ -583,6 +602,43 @@ func (cm *controllerManager) waitForRunnableToEnd(shutdownCancel context.CancelF return nil } +//// For each sporadicRunnable fire off a goroutine that +//// blocks on the runnable's Ready (or the shutdown context). +//// +//// Once ready, call a version of start runnable that blocks +//// until the runnable is terminated. +//// +//// Once the runnable stops, loop back and wait for ready again. +//func (cm *controllerManager) startSporadicRunnables() { +// cm.mu.Lock() +// cm.waitForCache(cm.internalCtx) +// cm.mu.Unlock() +// +// // TODO: what locking is necessary here? +// fmt.Printf("mgr len(cm.sporadicRunnables) = %+v\n", len(cm.sporadicRunnables)) +// +// for _, sr := range cm.sporadicRunnables { +// go func(sr SporadicRunnable) { +// fmt.Println("mgr got an sr") +// for { +// fmt.Println("mgr waiting on sr ReadyToStart") +// select { +// case <-cm.internalCtx.Done(): +// fmt.Println("mgr internal context fired") +// return +// case <-sr.Ready(cm.internalCtx): +// fmt.Println("mgr ready, starting the runnable") +// // this doesn't block +// cm.startBlockingRunnable(sr) +// fmt.Println("mgr runnable done running") +// return +// } +// fmt.Println("mgr done running, looping back to wait on ready") +// } +// }(sr) +// } +//} + func (cm *controllerManager) startNonLeaderElectionRunnables() { cm.mu.Lock() defer cm.mu.Unlock() @@ -591,6 +647,7 @@ func (cm *controllerManager) startNonLeaderElectionRunnables() { // Start the non-leaderelection Runnables after the cache has synced for _, c := range cm.nonLeaderElectionRunnables { + fmt.Println("nonLER") // Controllers block, but we want to return an error if any have an error starting. // Write any Start errors to a channel so we can return them cm.startRunnable(c) @@ -605,6 +662,7 @@ func (cm *controllerManager) startLeaderElectionRunnables() { // Start the leader election Runnables after the cache has synced for _, c := range cm.leaderElectionRunnables { + fmt.Println("LER") // Controllers block, but we want to return an error if any have an error starting. // Write any Start errors to a channel so we can return them cm.startRunnable(c) @@ -683,11 +741,61 @@ func (cm *controllerManager) Elected() <-chan struct{} { } func (cm *controllerManager) startRunnable(r Runnable) { - cm.waitForRunnable.Add(1) + if sporadicRunnable, ok := r.(SporadicRunnable); ok { + fmt.Printf("starting sporadic runnable = %+v\n", sporadicRunnable) + cm.startSporadicRunnable(sporadicRunnable) + } else { + cm.waitForRunnable.Add(1) + go func() { + fmt.Printf("starting non-sporadic runnable, %v\n", r) + defer cm.waitForRunnable.Done() + if err := r.Start(cm.internalCtx); err != nil { + cm.errChan <- err + } + }() + } +} + +// startSporadicRunnable fires off a goroutine that +// blocks on the runnable's Ready (or the shutdown context). +// +// Once ready, call a version of start runnable that blocks +// until the runnable is terminated. +// +// Once the runnable stops, loop back and wait for ready again. +func (cm *controllerManager) startSporadicRunnable(sr SporadicRunnable) { go func() { - defer cm.waitForRunnable.Done() - if err := r.Start(cm.internalCtx); err != nil { - cm.errChan <- err + fmt.Println("mgr got an sr") + for { + fmt.Println("mgr waiting on sr ReadyToStart") + select { + case <-cm.internalCtx.Done(): + fmt.Println("mgr internal context fired") + return + case <-sr.Ready(cm.internalCtx): + fmt.Println("mgr ready, starting the runnable") + // this doesn't block + cm.waitForRunnable.Add(1) + defer cm.waitForRunnable.Done() + fmt.Printf("starting sporadic runnable, %v\n", sr) + if err := sr.Start(cm.internalCtx); err != nil { + cm.errChan <- err + } + fmt.Println("mgr runnable done running") + return + } + fmt.Println("mgr done running, looping back to wait on ready") } }() } + +//// like startRunnable, but blocking +//// TODO: is there a better way to do this? +//func (cm *controllerManager) startBlockingRunnable(r Runnable) { +// cm.waitForRunnable.Add(1) +// defer cm.waitForRunnable.Done() +// fmt.Printf("starting sporadic runnable, %v\n", r) +// if err := r.Start(cm.internalCtx); err != nil { +// cm.errChan <- err +// } +//} diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index 30ff9e2516..2975182a83 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -290,6 +290,11 @@ func (r RunnableFunc) Start(ctx context.Context) error { return r(ctx) } +type SporadicRunnable interface { + Runnable + Ready(ctx context.Context) <-chan struct{} +} + // LeaderElectionRunnable knows if a Runnable needs to be run in the leader election mode. type LeaderElectionRunnable interface { // NeedLeaderElection returns true if the Runnable needs to be run in the leader election mode. diff --git a/pkg/source/source.go b/pkg/source/source.go index adabbaf917..79c385cae5 100644 --- a/pkg/source/source.go +++ b/pkg/source/source.go @@ -21,6 +21,7 @@ import ( "errors" "fmt" "sync" + "time" "k8s.io/apimachinery/pkg/api/meta" "k8s.io/client-go/util/workqueue" @@ -57,6 +58,12 @@ type Source interface { Start(context.Context, handler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) error } +type SporadicSource interface { + Source + StartNotifyDone(context.Context, handler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) (<-chan struct{}, error) + Ready(ctx context.Context, wg *sync.WaitGroup) +} + // SyncingSource is a source that needs syncing prior to being usable. The controller // will call its WaitForSync prior to starting workers. type SyncingSource interface { @@ -98,31 +105,71 @@ type Kind struct { startCancel func() } +type SporadicKind struct { + Kind + DiscoveryCheck func() bool +} + +func (sk *SporadicKind) Ready(ctx context.Context, wg *sync.WaitGroup) { + fmt.Println("src ready called") + defer wg.Done() + for { + select { + case <-ctx.Done(): + fmt.Println("src context shutdown") + return + default: + if sk.DiscoveryCheck() { + fmt.Println("src ready discovery check pass closing ready") + return + } + //TODO: parameterize this + fmt.Println("src ready discovery check fail, spin") + time.Sleep(5 * time.Second) + } + } +} + +// StartNotifyDone starts the kind while concurrently polling discovery to confirm the CRD is still installed +// It returns a signal that fires when the CRD is uninstalled, which also triggers a cacnel of the underlying informer +func (sk *SporadicKind) StartNotifyDone(ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface, prct ...predicate.Predicate) (<-chan struct{}, error) { + return sk.Kind.StartNotifyDone(ctx, handler, queue, prct...) +} + +func (sk *SporadicKind) Start(ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface, prct ...predicate.Predicate) error { + return sk.Kind.Start(ctx, handler, queue, prct...) +} + var _ SyncingSource = &Kind{} // Start is internal and should be called only by the Controller to register an EventHandler with the Informer // to enqueue reconcile.Requests. -func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface, - prct ...predicate.Predicate) error { +func (ks *Kind) StartNotifyDone(ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface, + prct ...predicate.Predicate) (<-chan struct{}, error) { // Type should have been specified by the user. if ks.Type == nil { - return fmt.Errorf("must specify Kind.Type") + return nil, fmt.Errorf("must specify Kind.Type") } // cache should have been injected before Start was called if ks.cache == nil { - return fmt.Errorf("must call CacheInto on Kind before calling Start") + return nil, fmt.Errorf("must call CacheInto on Kind before calling Start") } // cache.GetInformer will block until its context is cancelled if the cache was already started and it can not // sync that informer (most commonly due to RBAC issues). ctx, ks.startCancel = context.WithCancel(ctx) ks.started = make(chan error) + var stopCh <-chan struct{} go func() { // Lookup the Informer from the Cache and add an EventHandler which populates the Queue - i, err := ks.cache.GetInformer(ctx, ks.Type) + fmt.Printf("ks.Type = %+v\n", ks.Type) + var err error + var i cache.Informer + i, stopCh, err = ks.cache.GetStoppableInformer(ctx, ks.Type) if err != nil { + fmt.Printf("kind GetInformer err = %+v\n", err) kindMatchErr := &meta.NoKindMatchError{} if errors.As(err, &kindMatchErr) { log.Error(err, "if kind is a CRD, it should be installed before calling Start", @@ -136,10 +183,19 @@ func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue w // Would be great to return something more informative here ks.started <- errors.New("cache did not sync") } + fmt.Println("kind closing ks.started") close(ks.started) }() - return nil + return stopCh, nil +} + +// Start is internal and should be called only by the Controller to register an EventHandler with the Informer +// to enqueue reconcile.Requests. +func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface, + prct ...predicate.Predicate) error { + _, err := ks.StartNotifyDone(ctx, handler, queue, prct...) + return err } func (ks *Kind) String() string {