From f3fa595961e5e0f2e08642febdeae13280c0519d Mon Sep 17 00:00:00 2001 From: Kevin Delgado Date: Fri, 23 Apr 2021 23:57:16 +0000 Subject: [PATCH 01/17] wip EOD 4/23 --- pkg/builder/controller.go | 21 +++++++++++++++- pkg/internal/controller/controller.go | 35 ++++++++++++++++++++++++++- pkg/manager/internal.go | 25 +++++++++++++++++++ pkg/manager/manager.go | 6 +++++ pkg/source/source.go | 34 ++++++++++++++++++++++++++ 5 files changed, 119 insertions(+), 2 deletions(-) diff --git a/pkg/builder/controller.go b/pkg/builder/controller.go index 81f446d62f..c4b27e9f7b 100644 --- a/pkg/builder/controller.go +++ b/pkg/builder/controller.go @@ -72,6 +72,7 @@ type ForInput struct { predicates []predicate.Predicate objectProjection objectProjection err error + sporadic bool } // For defines the type of Object being *reconciled*, and configures the ControllerManagedBy to respond to create / delete / @@ -97,6 +98,7 @@ type OwnsInput struct { object client.Object predicates []predicate.Predicate objectProjection objectProjection + sporadic bool } // Owns defines types of Objects being *generated* by the ControllerManagedBy, and configures the ControllerManagedBy to respond to @@ -118,6 +120,7 @@ type WatchesInput struct { eventhandler handler.EventHandler predicates []predicate.Predicate objectProjection objectProjection + sporadic bool } // Watches exposes the lower-level ControllerManagedBy Watches functions through the builder. Consider using @@ -222,7 +225,23 @@ func (blder *Builder) doWatch() error { if err != nil { return err } - src := &source.Kind{Type: typeForSrc} + var src source.Source + if blder.forInput.sporadic { + gvk, err := getGvk(blder.forInput.object, blder.mgr.GetScheme()) + if err != nil { + return err + } + existsInDiscovery := func() bool { + if _, err := blder.mgr.GetRESTMapper().RESTMapping(gvk.GroupKind(), gvk.Version); err != nil { + return false + } + return true + } + + src = &source.SporadicKind{Kind: source.Kind{Type: typeForSrc}, DiscoveryCheck: existsInDiscovery} + } else { + src = &source.Kind{Type: typeForSrc} + } hdler := &handler.EnqueueRequestForObject{} allPredicates := append(blder.globalPredicates, blder.forInput.predicates...) if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil { diff --git a/pkg/internal/controller/controller.go b/pkg/internal/controller/controller.go index f5024502d9..ee689b0e1e 100644 --- a/pkg/internal/controller/controller.go +++ b/pkg/internal/controller/controller.go @@ -81,7 +81,8 @@ type Controller struct { CacheSyncTimeout time.Duration // startWatches maintains a list of sources, handlers, and predicates to start when the controller is started. - startWatches []watchDescription + startWatches []watchDescription + sporadicWatches []sporadicWatchDescription // Log is used to log messages to users during reconciliation, or for example when a watch is started. Log logr.Logger @@ -94,6 +95,12 @@ type watchDescription struct { predicates []predicate.Predicate } +type sporadicWatchDescription struct { + src source.SporadicSource + handler handler.EventHandler + predicates []predicate.Predicate +} + // Reconcile implements reconcile.Reconciler func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { log := c.Log.WithValues("name", req.Name, "namespace", req.Namespace) @@ -119,6 +126,11 @@ func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prc } } + if sporadicSource, ok := src.(source.SporadicSource); ok && !c.Started { + c.sporadicWatches = append(c.sporadicWatches, sporadicWatchDescription{src: sporadicSource, handler: evthdler, predicates: prct}) + return nil + } + // Controller hasn't started yet, store the watches locally and return. // // These watches are going to be held on the controller struct until the manager or user calls Start(...). @@ -131,6 +143,27 @@ func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prc return src.Start(c.ctx, evthdler, c.Queue, prct...) } +// Rea +func (c *Controller) ReadyToStart(ctx context.Context) <-chan struct{} { + ready := make(chan struct{}) + if len(c.sporadicWatches) == 0 { + close(ready) + return ready + } + + var wg sync.WaitGroup + for _, w := range c.sporadicWatches { + wg.Add(1) + go w.src.Ready(ctx, &wg) + } + + go func() { + wg.Wait() + close(ready) + }() + return ready +} + // Start implements controller.Controller func (c *Controller) Start(ctx context.Context) error { // use an IIFE to get proper lock handling diff --git a/pkg/manager/internal.go b/pkg/manager/internal.go index 57f95ba5b3..10023bb132 100644 --- a/pkg/manager/internal.go +++ b/pkg/manager/internal.go @@ -71,6 +71,8 @@ type controllerManager struct { // These Runnables will not be blocked by lead election. nonLeaderElectionRunnables []Runnable + sporadicRunnables []SporadicRunnable + // recorderProvider is used to generate event recorders that will be injected into Controllers // (and EventHandlers, Sources and Predicates). recorderProvider *intrec.Provider @@ -207,6 +209,10 @@ func (cm *controllerManager) Add(r Runnable) error { cm.nonLeaderElectionRunnables = append(cm.nonLeaderElectionRunnables, r) } else if hasCache, ok := r.(hasCache); ok { cm.caches = append(cm.caches, hasCache) + + } else if sporadicRunnable, ok := r.(SporadicRunnable); ok { + cm.sporadicRunnables = append(cm.sporadicRunnables, sporadicRunnable) + } else { shouldStart = cm.startedLeader cm.leaderElectionRunnables = append(cm.leaderElectionRunnables, r) @@ -583,6 +589,25 @@ func (cm *controllerManager) waitForRunnableToEnd(shutdownCancel context.CancelF return nil } +func (cm *controllerManager) startSporadicRunnables() { + cm.mu.Lock() + cm.waitForCache(cm.internalCtx) + cm.mu.Unlock() + + for _, sr := range cm.sporadicRunnables { + go func(sr SporadicRunnable) { + for { + select { + case <-cm.internalCtx.Done(): + return + case <-sr.Ready(cm.internalCtx): + cm.startRunnable(sr) + } + } + }(sr) + } +} + func (cm *controllerManager) startNonLeaderElectionRunnables() { cm.mu.Lock() defer cm.mu.Unlock() diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index 30ff9e2516..ef48c37650 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -290,6 +290,12 @@ func (r RunnableFunc) Start(ctx context.Context) error { return r(ctx) } +type SporadicRunnable interface { + Runnable + Ready(ctx context.Context) ReadySignal +} +type ReadySignal <-chan struct{} + // LeaderElectionRunnable knows if a Runnable needs to be run in the leader election mode. type LeaderElectionRunnable interface { // NeedLeaderElection returns true if the Runnable needs to be run in the leader election mode. diff --git a/pkg/source/source.go b/pkg/source/source.go index adabbaf917..0ac922c3bf 100644 --- a/pkg/source/source.go +++ b/pkg/source/source.go @@ -21,6 +21,7 @@ import ( "errors" "fmt" "sync" + "time" "k8s.io/apimachinery/pkg/api/meta" "k8s.io/client-go/util/workqueue" @@ -57,6 +58,11 @@ type Source interface { Start(context.Context, handler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) error } +type SporadicSource interface { + Source + Ready(ctx context.Context, wg *sync.WaitGroup) <-chan struct{} +} + // SyncingSource is a source that needs syncing prior to being usable. The controller // will call its WaitForSync prior to starting workers. type SyncingSource interface { @@ -98,6 +104,34 @@ type Kind struct { startCancel func() } +type SporadicKind struct { + Kind + DiscoveryCheck func() bool +} + +func (sk *SporadicKind) Ready(ctx context.Context, wg *sync.WaitGroup) <-chan struct{} { + defer wg.Done() + ready := make(chan struct{}) + go func() { + for { + select { + case <-ctx.Done(): + close(ready) + return + default: + if sk.DiscoveryCheck() { + close(ready) + return + } + //TODO: parameterize this + time.Sleep(5 * time.Second) + } + } + }() + + return ready +} + var _ SyncingSource = &Kind{} // Start is internal and should be called only by the Controller to register an EventHandler with the Informer From 31d67a66f460f65e0f5df7ca86a6562d437e4ff5 Mon Sep 17 00:00:00 2001 From: Kevin Delgado Date: Mon, 26 Apr 2021 22:55:36 +0000 Subject: [PATCH 02/17] wip 4/26 pre-testing --- pkg/builder/controller.go | 5 +++++ pkg/builder/options.go | 6 ++++++ pkg/cache/internal/informers_map.go | 14 +++++++++++--- pkg/internal/controller/controller.go | 3 +++ pkg/manager/internal.go | 9 +++++++++ pkg/source/source.go | 26 ++++++++++++++++++++++++++ 6 files changed, 60 insertions(+), 3 deletions(-) diff --git a/pkg/builder/controller.go b/pkg/builder/controller.go index c4b27e9f7b..bf1a0e084a 100644 --- a/pkg/builder/controller.go +++ b/pkg/builder/controller.go @@ -233,8 +233,10 @@ func (blder *Builder) doWatch() error { } existsInDiscovery := func() bool { if _, err := blder.mgr.GetRESTMapper().RESTMapping(gvk.GroupKind(), gvk.Version); err != nil { + fmt.Printf("NOT in discovery gvk = %+v\n", gvk) return false } + fmt.Printf("YES in discovery gvk = %+v\n", gvk) return true } @@ -255,6 +257,7 @@ func (blder *Builder) doWatch() error { return err } src := &source.Kind{Type: typeForSrc} + // TODO: handle sporadic watches for owns types too hdler := &handler.EnqueueRequestForOwner{ OwnerType: blder.forInput.object, IsController: true, @@ -271,6 +274,8 @@ func (blder *Builder) doWatch() error { allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...) allPredicates = append(allPredicates, w.predicates...) + // TODO: handle sporadic watches for owns types too + // If the source of this watch is of type *source.Kind, project it. if srckind, ok := w.src.(*source.Kind); ok { typeForSrc, err := blder.project(srckind.Type, w.objectProjection) diff --git a/pkg/builder/options.go b/pkg/builder/options.go index 7bb4273094..4fc8db2eed 100644 --- a/pkg/builder/options.go +++ b/pkg/builder/options.go @@ -99,6 +99,12 @@ func (p projectAs) ApplyToWatches(opts *WatchesInput) { opts.objectProjection = objectProjection(p) } +type Sporadic struct{} + +func (s Sporadic) ApplyToFor(opts *ForInput) { + opts.sporadic = true +} + var ( // OnlyMetadata tells the controller to *only* cache metadata, and to watch // the the API server in metadata-only form. This is useful when watching diff --git a/pkg/cache/internal/informers_map.go b/pkg/cache/internal/informers_map.go index 6b57c6fa61..0d2bd377ec 100644 --- a/pkg/cache/internal/informers_map.go +++ b/pkg/cache/internal/informers_map.go @@ -177,7 +177,7 @@ func (ip *specificInformersMap) Get(ctx context.Context, gvk schema.GroupVersion if !ok { var err error - if i, started, err = ip.addInformerToMap(gvk, obj); err != nil { + if i, started, err = ip.addInformerToMap(ctx, gvk, obj); err != nil { return started, nil, err } } @@ -192,7 +192,7 @@ func (ip *specificInformersMap) Get(ctx context.Context, gvk schema.GroupVersion return started, i, nil } -func (ip *specificInformersMap) addInformerToMap(gvk schema.GroupVersionKind, obj runtime.Object) (*MapEntry, bool, error) { +func (ip *specificInformersMap) addInformerToMap(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object) (*MapEntry, bool, error) { ip.mu.Lock() defer ip.mu.Unlock() @@ -225,8 +225,16 @@ func (ip *specificInformersMap) addInformerToMap(gvk schema.GroupVersionKind, ob // Start the Informer if need by // TODO(seans): write thorough tests and document what happens here - can you add indexers? // can you add eventhandlers? + runCtx, cancel := context.WithCancel(ctx) + // TODO: necessary? + defer cancel() + go func() { + // TODO: check shomron PR for why this might not be sufficient + <-ip.stop + cancel() + }() if ip.started { - go i.Informer.Run(ip.stop) + go i.Informer.Run(runCtx.Done()) } return i, ip.started, nil } diff --git a/pkg/internal/controller/controller.go b/pkg/internal/controller/controller.go index ee689b0e1e..467e300543 100644 --- a/pkg/internal/controller/controller.go +++ b/pkg/internal/controller/controller.go @@ -145,6 +145,7 @@ func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prc // Rea func (c *Controller) ReadyToStart(ctx context.Context) <-chan struct{} { + fmt.Printf("ctrl ReadyToStart len(c.sporadicWatches) = %+v\n", len(c.sporadicWatches)) ready := make(chan struct{}) if len(c.sporadicWatches) == 0 { close(ready) @@ -154,12 +155,14 @@ func (c *Controller) ReadyToStart(ctx context.Context) <-chan struct{} { var wg sync.WaitGroup for _, w := range c.sporadicWatches { wg.Add(1) + fmt.Println("ctrl checking src ready") go w.src.Ready(ctx, &wg) } go func() { wg.Wait() close(ready) + fmt.Println("ctrl all sources ready") }() return ready } diff --git a/pkg/manager/internal.go b/pkg/manager/internal.go index 10023bb132..8583b7679d 100644 --- a/pkg/manager/internal.go +++ b/pkg/manager/internal.go @@ -594,15 +594,24 @@ func (cm *controllerManager) startSporadicRunnables() { cm.waitForCache(cm.internalCtx) cm.mu.Unlock() + // TODO: what locking is necessary here? + fmt.Printf("mgr len(cm.sporadicRunnables) = %+v\n", len(cm.sporadicRunnables)) + for _, sr := range cm.sporadicRunnables { go func(sr SporadicRunnable) { + fmt.Println("mgr got an sr") for { + fmt.Println("mgr waiting on sr ReadyToStart") select { case <-cm.internalCtx.Done(): + fmt.Println("mgr internal context fired") return case <-sr.Ready(cm.internalCtx): + fmt.Println("mgr ready, starting the runnable") cm.startRunnable(sr) + fmt.Println("mgr runnable done running") } + fmt.Println("mgr done running, looping back to wait on ready") } }(sr) } diff --git a/pkg/source/source.go b/pkg/source/source.go index 0ac922c3bf..014a536d3c 100644 --- a/pkg/source/source.go +++ b/pkg/source/source.go @@ -110,20 +110,24 @@ type SporadicKind struct { } func (sk *SporadicKind) Ready(ctx context.Context, wg *sync.WaitGroup) <-chan struct{} { + fmt.Println("src ready called") defer wg.Done() ready := make(chan struct{}) go func() { for { select { case <-ctx.Done(): + fmt.Println("src context shutdown") close(ready) return default: if sk.DiscoveryCheck() { + fmt.Println("src ready discovery check pass closing ready") close(ready) return } //TODO: parameterize this + fmt.Println("src ready discovery check fail, spin") time.Sleep(5 * time.Second) } } @@ -132,6 +136,28 @@ func (sk *SporadicKind) Ready(ctx context.Context, wg *sync.WaitGroup) <-chan st return ready } +func (sk *SporadicKind) Start(ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface, prct ...predicate.Predicate) error { + // TODO: how do we cancel if we never fail the discovery check? (leak?) + fmt.Println("src start called") + infCtx, cancel := context.WithCancel(ctx) + go func() { + if !sk.DiscoveryCheck() { + fmt.Println("src start discovery check fail, cancelling") + cancel() + return + } + // todo parameterize + fmt.Println("src start discovery check pass, spinning") + time.Sleep(5 * time.Second) + }() + ret := make(chan error) + go func() { + fmt.Println("src starting the underlying Kind") + ret <- sk.Start(infCtx, handler, queue, prct...) + }() + return <-ret +} + var _ SyncingSource = &Kind{} // Start is internal and should be called only by the Controller to register an EventHandler with the Informer From ca9ee70501536e02569412743d7021ca9d73a6c6 Mon Sep 17 00:00:00 2001 From: Kevin Delgado Date: Tue, 27 Apr 2021 02:07:46 +0000 Subject: [PATCH 03/17] existsInDiscovery doesn't recognize uninstall --- pkg/builder/controller.go | 9 ++++++- pkg/cache/internal/informers_map.go | 6 +++-- pkg/internal/controller/controller.go | 13 +++++++++- pkg/manager/internal.go | 34 +++++++++++++++++++++++---- pkg/manager/manager.go | 3 ++- pkg/source/source.go | 24 ++++++++++++------- 6 files changed, 70 insertions(+), 19 deletions(-) diff --git a/pkg/builder/controller.go b/pkg/builder/controller.go index bf1a0e084a..b4dc62c239 100644 --- a/pkg/builder/controller.go +++ b/pkg/builder/controller.go @@ -226,13 +226,20 @@ func (blder *Builder) doWatch() error { return err } var src source.Source + fmt.Printf("blder.forInput.sporadic = %+v\n", blder.forInput.sporadic) if blder.forInput.sporadic { gvk, err := getGvk(blder.forInput.object, blder.mgr.GetScheme()) if err != nil { return err } + fmt.Printf("gvk = %+v\n", gvk) + // Bug: once in the rest mapper always claims it exists, even when it doesn't existsInDiscovery := func() bool { - if _, err := blder.mgr.GetRESTMapper().RESTMapping(gvk.GroupKind(), gvk.Version); err != nil { + mapper := blder.mgr.GetRESTMapper() + //fmt.Printf("mapper = %+v\n", mapper) + mapping, err := mapper.RESTMapping(gvk.GroupKind(), gvk.Version) + fmt.Printf("mapping = %+v\n", mapping) + if err != nil { fmt.Printf("NOT in discovery gvk = %+v\n", gvk) return false } diff --git a/pkg/cache/internal/informers_map.go b/pkg/cache/internal/informers_map.go index 0d2bd377ec..232ed8769d 100644 --- a/pkg/cache/internal/informers_map.go +++ b/pkg/cache/internal/informers_map.go @@ -226,16 +226,18 @@ func (ip *specificInformersMap) addInformerToMap(ctx context.Context, gvk schema // TODO(seans): write thorough tests and document what happens here - can you add indexers? // can you add eventhandlers? runCtx, cancel := context.WithCancel(ctx) - // TODO: necessary? - defer cancel() + // TODO: not cancelling? (leak) go func() { // TODO: check shomron PR for why this might not be sufficient <-ip.stop + fmt.Println("inf ip stopped") cancel() }() if ip.started { + fmt.Println("inf Run") go i.Informer.Run(runCtx.Done()) } + fmt.Println("inf done") return i, ip.started, nil } diff --git a/pkg/internal/controller/controller.go b/pkg/internal/controller/controller.go index 467e300543..3b75378205 100644 --- a/pkg/internal/controller/controller.go +++ b/pkg/internal/controller/controller.go @@ -144,7 +144,7 @@ func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prc } // Rea -func (c *Controller) ReadyToStart(ctx context.Context) <-chan struct{} { +func (c *Controller) Ready(ctx context.Context) <-chan struct{} { fmt.Printf("ctrl ReadyToStart len(c.sporadicWatches) = %+v\n", len(c.sporadicWatches)) ready := make(chan struct{}) if len(c.sporadicWatches) == 0 { @@ -160,7 +160,9 @@ func (c *Controller) ReadyToStart(ctx context.Context) <-chan struct{} { } go func() { + fmt.Println("ctrl ready wg wait starting") wg.Wait() + fmt.Println("ctrl ready wg wait done closing ready") close(ready) fmt.Println("ctrl all sources ready") }() @@ -205,6 +207,15 @@ func (c *Controller) Start(ctx context.Context) error { } } + c.Log.Info("sporadic watches", "len", len(c.sporadicWatches)) + for _, watch := range c.sporadicWatches { + c.Log.Info("sporadic Starting EventSource", "source", watch.src) + + if err := watch.src.Start(ctx, watch.handler, c.Queue, watch.predicates...); err != nil { + return err + } + } + // Start the SharedIndexInformer factories to begin populating the SharedIndexInformer caches c.Log.Info("Starting Controller") diff --git a/pkg/manager/internal.go b/pkg/manager/internal.go index 8583b7679d..8d5116030c 100644 --- a/pkg/manager/internal.go +++ b/pkg/manager/internal.go @@ -204,16 +204,18 @@ func (cm *controllerManager) Add(r Runnable) error { var shouldStart bool // Add the runnable to the leader election or the non-leaderelection list - if leRunnable, ok := r.(LeaderElectionRunnable); ok && !leRunnable.NeedLeaderElection() { + if sporadicRunnable, ok := r.(SporadicRunnable); ok { + fmt.Println("mgr adding sporadic") + cm.sporadicRunnables = append(cm.sporadicRunnables, sporadicRunnable) + } else if leRunnable, ok := r.(LeaderElectionRunnable); ok && !leRunnable.NeedLeaderElection() { + fmt.Println("mgr adding non ler") shouldStart = cm.started cm.nonLeaderElectionRunnables = append(cm.nonLeaderElectionRunnables, r) } else if hasCache, ok := r.(hasCache); ok { cm.caches = append(cm.caches, hasCache) - } else if sporadicRunnable, ok := r.(SporadicRunnable); ok { - cm.sporadicRunnables = append(cm.sporadicRunnables, sporadicRunnable) - } else { + fmt.Println("mgr adding ler") shouldStart = cm.startedLeader cm.leaderElectionRunnables = append(cm.leaderElectionRunnables, r) } @@ -433,6 +435,7 @@ func (cm *controllerManager) serveHealthProbes() { // Run server cm.startRunnable(RunnableFunc(func(_ context.Context) error { + cm.logger.Info("starting health probes") if err := server.Serve(cm.healthProbeListener); err != nil && err != http.ErrServerClosed { return err } @@ -449,6 +452,7 @@ func (cm *controllerManager) serveHealthProbes() { } func (cm *controllerManager) Start(ctx context.Context) (err error) { + fmt.Println("mgr Start") if err := cm.Add(cm.cluster); err != nil { return fmt.Errorf("failed to add cluster to runnables: %w", err) } @@ -492,6 +496,8 @@ func (cm *controllerManager) Start(ctx context.Context) (err error) { go cm.serveHealthProbes() } + go cm.startSporadicRunnables() + go cm.startNonLeaderElectionRunnables() go func() { @@ -507,14 +513,19 @@ func (cm *controllerManager) Start(ctx context.Context) (err error) { } }() + fmt.Println("mgr start waiting on signal") select { case <-ctx.Done(): + fmt.Println("mgr start ctx.Done fired") // We are done return nil case err := <-cm.errChan: + fmt.Printf("mgr start errChan fired, %v\n", err) // Error starting or running a runnable return err } + fmt.Println("mgr start finished") + return nil } // engageStopProcedure signals all runnables to stop, reads potential errors @@ -608,7 +619,8 @@ func (cm *controllerManager) startSporadicRunnables() { return case <-sr.Ready(cm.internalCtx): fmt.Println("mgr ready, starting the runnable") - cm.startRunnable(sr) + // this doesn't block + cm.startBlockingRunnable(sr) fmt.Println("mgr runnable done running") } fmt.Println("mgr done running, looping back to wait on ready") @@ -625,6 +637,7 @@ func (cm *controllerManager) startNonLeaderElectionRunnables() { // Start the non-leaderelection Runnables after the cache has synced for _, c := range cm.nonLeaderElectionRunnables { + fmt.Println("nonLER") // Controllers block, but we want to return an error if any have an error starting. // Write any Start errors to a channel so we can return them cm.startRunnable(c) @@ -639,6 +652,7 @@ func (cm *controllerManager) startLeaderElectionRunnables() { // Start the leader election Runnables after the cache has synced for _, c := range cm.leaderElectionRunnables { + fmt.Println("LER") // Controllers block, but we want to return an error if any have an error starting. // Write any Start errors to a channel so we can return them cm.startRunnable(c) @@ -719,9 +733,19 @@ func (cm *controllerManager) Elected() <-chan struct{} { func (cm *controllerManager) startRunnable(r Runnable) { cm.waitForRunnable.Add(1) go func() { + fmt.Printf("starting runnable, %v", r) defer cm.waitForRunnable.Done() if err := r.Start(cm.internalCtx); err != nil { cm.errChan <- err } }() } + +func (cm *controllerManager) startBlockingRunnable(r Runnable) { + cm.waitForRunnable.Add(1) + defer cm.waitForRunnable.Done() + fmt.Printf("starting sporadic runnable, %v", r) + if err := r.Start(cm.internalCtx); err != nil { + cm.errChan <- err + } +} diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index ef48c37650..caf47df9eb 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -292,7 +292,8 @@ func (r RunnableFunc) Start(ctx context.Context) error { type SporadicRunnable interface { Runnable - Ready(ctx context.Context) ReadySignal + // TODO: rename to ReadyToStart, use ReadySignal + Ready(ctx context.Context) <-chan struct{} } type ReadySignal <-chan struct{} diff --git a/pkg/source/source.go b/pkg/source/source.go index 014a536d3c..39e2c5c253 100644 --- a/pkg/source/source.go +++ b/pkg/source/source.go @@ -111,9 +111,9 @@ type SporadicKind struct { func (sk *SporadicKind) Ready(ctx context.Context, wg *sync.WaitGroup) <-chan struct{} { fmt.Println("src ready called") - defer wg.Done() ready := make(chan struct{}) go func() { + defer wg.Done() for { select { case <-ctx.Done(): @@ -141,19 +141,23 @@ func (sk *SporadicKind) Start(ctx context.Context, handler handler.EventHandler, fmt.Println("src start called") infCtx, cancel := context.WithCancel(ctx) go func() { - if !sk.DiscoveryCheck() { - fmt.Println("src start discovery check fail, cancelling") - cancel() - return + for { + if !sk.DiscoveryCheck() { + fmt.Println("src start discovery check fail, cancelling") + cancel() + return + } + // todo parameterize + fmt.Println("src start discovery check pass, spinning") + time.Sleep(5 * time.Second) } - // todo parameterize - fmt.Println("src start discovery check pass, spinning") - time.Sleep(5 * time.Second) }() ret := make(chan error) go func() { fmt.Println("src starting the underlying Kind") - ret <- sk.Start(infCtx, handler, queue, prct...) + err := sk.Kind.Start(infCtx, handler, queue, prct...) + fmt.Printf("err = %+v\n", err) + ret <- err }() return <-ret } @@ -183,6 +187,7 @@ func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue w // Lookup the Informer from the Cache and add an EventHandler which populates the Queue i, err := ks.cache.GetInformer(ctx, ks.Type) if err != nil { + fmt.Printf("kind GetInformer err = %+v\n", err) kindMatchErr := &meta.NoKindMatchError{} if errors.As(err, &kindMatchErr) { log.Error(err, "if kind is a CRD, it should be installed before calling Start", @@ -196,6 +201,7 @@ func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue w // Would be great to return something more informative here ks.started <- errors.New("cache did not sync") } + fmt.Println("kind closing ks.started") close(ks.started) }() From 9feeb704ab829105b1d7e7f8f06b22f74b3fea60 Mon Sep 17 00:00:00 2001 From: Kevin Delgado Date: Tue, 27 Apr 2021 02:30:35 +0000 Subject: [PATCH 04/17] discovery check working --- pkg/builder/controller.go | 26 +++++++++++++++++++------- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/pkg/builder/controller.go b/pkg/builder/controller.go index b4dc62c239..0da722b7dc 100644 --- a/pkg/builder/controller.go +++ b/pkg/builder/controller.go @@ -23,6 +23,7 @@ import ( "github.com/go-logr/logr" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/discovery" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/apiutil" @@ -233,20 +234,31 @@ func (blder *Builder) doWatch() error { return err } fmt.Printf("gvk = %+v\n", gvk) + dc, err := discovery.NewDiscoveryClientForConfig(blder.mgr.GetConfig()) + if err != nil { + return err + } // Bug: once in the rest mapper always claims it exists, even when it doesn't existsInDiscovery := func() bool { - mapper := blder.mgr.GetRESTMapper() - //fmt.Printf("mapper = %+v\n", mapper) - mapping, err := mapper.RESTMapping(gvk.GroupKind(), gvk.Version) - fmt.Printf("mapping = %+v\n", mapping) + //mapper := blder.mgr.GetRESTMapper() + ////fmt.Printf("mapper = %+v\n", mapper) + //mapping, err := mapper.RESTMapping(gvk.GroupKind(), gvk.Version) + //fmt.Printf("mapping = %+v\n", mapping) + + resources, err := dc.ServerResourcesForGroupVersion(gvk.GroupVersion().String()) if err != nil { fmt.Printf("NOT in discovery gvk = %+v\n", gvk) return false } - fmt.Printf("YES in discovery gvk = %+v\n", gvk) - return true + for _, res := range resources.APIResources { + if res.Kind == gvk.Kind { + fmt.Printf("YES in discovery gvk = %+v\n", gvk) + return true + } + } + fmt.Printf("NOT in discovery kind = %+v\n", gvk) + return false } - src = &source.SporadicKind{Kind: source.Kind{Type: typeForSrc}, DiscoveryCheck: existsInDiscovery} } else { src = &source.Kind{Type: typeForSrc} From 68f114dae6debf77627f919b1c6b63a753c76256 Mon Sep 17 00:00:00 2001 From: Kevin Delgado Date: Tue, 27 Apr 2021 04:55:03 +0000 Subject: [PATCH 05/17] everything working, pre-cleanup --- pkg/cache/internal/informers_map.go | 9 +++++++-- pkg/internal/controller/controller.go | 24 +++++++++++++++++------- pkg/source/source.go | 8 +++++--- 3 files changed, 29 insertions(+), 12 deletions(-) diff --git a/pkg/cache/internal/informers_map.go b/pkg/cache/internal/informers_map.go index 232ed8769d..a19571be99 100644 --- a/pkg/cache/internal/informers_map.go +++ b/pkg/cache/internal/informers_map.go @@ -235,9 +235,14 @@ func (ip *specificInformersMap) addInformerToMap(ctx context.Context, gvk schema }() if ip.started { fmt.Println("inf Run") - go i.Informer.Run(runCtx.Done()) + go func() { + i.Informer.Run(runCtx.Done()) + fmt.Println("informer done running, remove from map") + delete(ip.informersByGVK, gvk) + + }() } - fmt.Println("inf done") + fmt.Println("inf addInformer returning") return i, ip.started, nil } diff --git a/pkg/internal/controller/controller.go b/pkg/internal/controller/controller.go index 3b75378205..1d37d883d2 100644 --- a/pkg/internal/controller/controller.go +++ b/pkg/internal/controller/controller.go @@ -180,12 +180,14 @@ func (c *Controller) Start(ctx context.Context) error { c.initMetrics() + ctrlCtx, ctrlCancel := context.WithCancel(ctx) + // Set the internal context. - c.ctx = ctx + c.ctx = ctrlCtx c.Queue = c.MakeQueue() go func() { - <-ctx.Done() + <-ctrlCtx.Done() c.Queue.ShutDown() }() @@ -202,7 +204,7 @@ func (c *Controller) Start(ctx context.Context) error { for _, watch := range c.startWatches { c.Log.Info("Starting EventSource", "source", watch.src) - if err := watch.src.Start(ctx, watch.handler, c.Queue, watch.predicates...); err != nil { + if err := watch.src.Start(ctrlCtx, watch.handler, c.Queue, watch.predicates...); err != nil { return err } } @@ -211,9 +213,17 @@ func (c *Controller) Start(ctx context.Context) error { for _, watch := range c.sporadicWatches { c.Log.Info("sporadic Starting EventSource", "source", watch.src) - if err := watch.src.Start(ctx, watch.handler, c.Queue, watch.predicates...); err != nil { + done, err := watch.src.StartNotifyDone(ctrlCtx, watch.handler, c.Queue, watch.predicates...) + if err != nil { return err } + go func() { + fmt.Println("ctrl waiting for done") + <-done + fmt.Println("ctrl done fired, ctrlCancelling") + c.Started = false + ctrlCancel() + }() } // Start the SharedIndexInformer factories to begin populating the SharedIndexInformer caches @@ -227,7 +237,7 @@ func (c *Controller) Start(ctx context.Context) error { if err := func() error { // use a context with timeout for launching sources and syncing caches. - sourceStartCtx, cancel := context.WithTimeout(ctx, c.CacheSyncTimeout) + sourceStartCtx, cancel := context.WithTimeout(ctrlCtx, c.CacheSyncTimeout) defer cancel() // WaitForSync waits for a definitive timeout, and returns if there @@ -258,7 +268,7 @@ func (c *Controller) Start(ctx context.Context) error { defer wg.Done() // Run a worker thread that just dequeues items, processes them, and marks them done. // It enforces that the reconcileHandler is never invoked concurrently with the same object. - for c.processNextWorkItem(ctx) { + for c.processNextWorkItem(ctrlCtx) { } }() } @@ -270,7 +280,7 @@ func (c *Controller) Start(ctx context.Context) error { return err } - <-ctx.Done() + <-ctrlCtx.Done() c.Log.Info("Shutdown signal received, waiting for all workers to finish") wg.Wait() c.Log.Info("All workers finished") diff --git a/pkg/source/source.go b/pkg/source/source.go index 39e2c5c253..2251d096d7 100644 --- a/pkg/source/source.go +++ b/pkg/source/source.go @@ -59,7 +59,8 @@ type Source interface { } type SporadicSource interface { - Source + //Source + StartNotifyDone(context.Context, handler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) (<-chan struct{}, error) Ready(ctx context.Context, wg *sync.WaitGroup) <-chan struct{} } @@ -136,7 +137,7 @@ func (sk *SporadicKind) Ready(ctx context.Context, wg *sync.WaitGroup) <-chan st return ready } -func (sk *SporadicKind) Start(ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface, prct ...predicate.Predicate) error { +func (sk *SporadicKind) StartNotifyDone(ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface, prct ...predicate.Predicate) (<-chan struct{}, error) { // TODO: how do we cancel if we never fail the discovery check? (leak?) fmt.Println("src start called") infCtx, cancel := context.WithCancel(ctx) @@ -159,7 +160,8 @@ func (sk *SporadicKind) Start(ctx context.Context, handler handler.EventHandler, fmt.Printf("err = %+v\n", err) ret <- err }() - return <-ret + err := <-ret + return infCtx.Done(), err } var _ SyncingSource = &Kind{} From 340bdf53c674dcbecd476bb38c88e84f6d64bb83 Mon Sep 17 00:00:00 2001 From: Kevin Delgado Date: Tue, 27 Apr 2021 20:07:31 +0000 Subject: [PATCH 06/17] WIP controller integration test --- pkg/builder/controller.go | 6 - pkg/controller/controller_integration_test.go | 120 ++++++++++++++++++ pkg/controller/testdata/crds/foocrd.yaml | 17 +++ pkg/controller/testdata/foo/foo_types.go | 72 +++++++++++ pkg/internal/controller/controller.go | 3 + pkg/manager/internal.go | 1 + pkg/source/source.go | 7 +- 7 files changed, 219 insertions(+), 7 deletions(-) create mode 100644 pkg/controller/testdata/crds/foocrd.yaml create mode 100644 pkg/controller/testdata/foo/foo_types.go diff --git a/pkg/builder/controller.go b/pkg/builder/controller.go index 0da722b7dc..57757a6fff 100644 --- a/pkg/builder/controller.go +++ b/pkg/builder/controller.go @@ -238,13 +238,7 @@ func (blder *Builder) doWatch() error { if err != nil { return err } - // Bug: once in the rest mapper always claims it exists, even when it doesn't existsInDiscovery := func() bool { - //mapper := blder.mgr.GetRESTMapper() - ////fmt.Printf("mapper = %+v\n", mapper) - //mapping, err := mapper.RESTMapping(gvk.GroupKind(), gvk.Version) - //fmt.Printf("mapping = %+v\n", mapping) - resources, err := dc.ServerResourcesForGroupVersion(gvk.GroupVersion().String()) if err != nil { fmt.Printf("NOT in discovery gvk = %+v\n", gvk) diff --git a/pkg/controller/controller_integration_test.go b/pkg/controller/controller_integration_test.go index 762b3d9fbb..e52a06e0a0 100644 --- a/pkg/controller/controller_integration_test.go +++ b/pkg/controller/controller_integration_test.go @@ -18,15 +18,22 @@ package controller_test import ( "context" + "fmt" + "path/filepath" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/rest" "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllertest" + "sigs.k8s.io/controller-runtime/pkg/controller/testdata/foo" + "sigs.k8s.io/controller-runtime/pkg/envtest" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" @@ -173,6 +180,119 @@ var _ = Describe("controller", func() { close(done) }, 5) }) + + FIt("should reconcile when the CRD is installed, uninstalled, reinstalled", func(done Done) { + By("Initializing the scheme and crd") + s := runtime.NewScheme() + f := &foo.Foo{} + gvk := f.GroupVersionKind() + options := manager.Options{} + //s.AddKnownTypeWithName(gvk, f) + //fl := &foo.FooList{} + //s.AddKnownTypes(gvk.GroupVersion(), fl) + foo.AddToScheme(s) + options.Scheme = s + crdPath := filepath.Join(".", "testadata", "crds", "foocrd.yaml") + crdOpts := envtest.CRDInstallOptions{ + Paths: []string{crdPath}, + } + _ = crdOpts + + By("Creating the Manager") + cm, err := manager.New(cfg, options) + Expect(err).NotTo(HaveOccurred()) + + By("Creating the Controller") + instance, err := controller.New("foo-controller", cm, controller.Options{ + Reconciler: reconcile.Func( + func(_ context.Context, request reconcile.Request) (reconcile.Result, error) { + reconciled <- request + return reconcile.Result{}, nil + }), + }) + Expect(err).NotTo(HaveOccurred()) + + By("Watching foo CRD as sporadic kinds") + dc := clientset.Discovery() + Expect(err).NotTo(HaveOccurred()) + existsInDiscovery := func() bool { + resources, err := dc.ServerResourcesForGroupVersion(gvk.GroupVersion().String()) + if err != nil { + fmt.Printf("NOT in discovery gvk = %+v\n", gvk) + return false + } + for _, res := range resources.APIResources { + if res.Kind == gvk.Kind { + fmt.Printf("YES in discovery gvk = %+v\n", gvk) + return true + } + } + fmt.Printf("NOT in discovery kind = %+v\n", gvk) + return false + } + err = instance.Watch(&source.SporadicKind{Kind: source.Kind{Type: f}, DiscoveryCheck: existsInDiscovery}, &handler.EnqueueRequestForObject{}) + Expect(err).NotTo(HaveOccurred()) + //err = cm.GetClient().Get(ctx, types.NamespacedName{Name: "foo"}, &corev1.Namespace{}) + //Expect(err).To(Equal(&cache.ErrCacheNotStarted{})) + //err = cm.GetClient().List(ctx, &corev1.NamespaceList{}) + //Expect(err).To(Equal(&cache.ErrCacheNotStarted{})) + + By("Starting the Manager") + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { + defer GinkgoRecover() + Expect(cm.Start(ctx)).NotTo(HaveOccurred()) + }() + + testFoo := &foo.Foo{ + ObjectMeta: metav1.ObjectMeta{Name: "test-foo"}, + } + + expectedReconcileRequest := reconcile.Request{NamespacedName: types.NamespacedName{ + Namespace: "default", + Name: "test-foo", + }} + _ = expectedReconcileRequest + + By("Creating the rest client") + config := *cfg + gv := gvk.GroupVersion() + config.ContentConfig.GroupVersion = &gv + config.NegotiatedSerializer = scheme.Codecs.WithoutConversion() + client, err := rest.RESTClientFor(&config) + Expect(err).NotTo(HaveOccurred()) + + By("Failing to create a foo object if the crd isn't installed") + result := foo.Foo{} + err = client.Post().Namespace("default").Resource("foos").Body(testFoo).Do(ctx).Into(&result) + Expect(err).To(HaveOccurred()) + + By("Installing the CRD") + //fmt.Println("test installing crd") + //crds, err := envtest.InstallCRDs(cfg, crdOpts) + //fmt.Println("test crd installed") + //Expect(err).NotTo(HaveOccurred()) + //Expect(len(crds)).To(Equal(1)) + + By("Invoking Reconcile for foo Create") + + By("Uninstalling the CRD") + //err = envtest.UninstallCRDs(cfg, crdOpts) + //Expect(err).NotTo(HaveOccurred()) + + By("Failing get foo object if the crd isn't installed") + + By("Reinstalling the CRD") + + By("Invoking Reconcile for foo Create") + + By("Uninstalling the CRD") + + close(done) + + }, 20) + }) func truePtr() *bool { diff --git a/pkg/controller/testdata/crds/foocrd.yaml b/pkg/controller/testdata/crds/foocrd.yaml new file mode 100644 index 0000000000..e2bddbc528 --- /dev/null +++ b/pkg/controller/testdata/crds/foocrd.yaml @@ -0,0 +1,17 @@ +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + name: foos.bar.example.com +spec: + group: bar.example.com + names: + kind: Foo + plural: foos + scope: Namespaced + versions: + - name: "v1" + storage: true + served: true + schema: + openAPIV3Schema: + type: object diff --git a/pkg/controller/testdata/foo/foo_types.go b/pkg/controller/testdata/foo/foo_types.go new file mode 100644 index 0000000000..6101e57934 --- /dev/null +++ b/pkg/controller/testdata/foo/foo_types.go @@ -0,0 +1,72 @@ +package foo + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "sigs.k8s.io/controller-runtime/pkg/scheme" +) + +type FooSpec struct{} +type FooStatus struct{} + +//+kubebuilder:object:root=true +//+kubebuilder:subresource:status + +type Foo struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty"` + + Spec FooSpec `json:"spec,omitempty"` + Status FooStatus `json:"status,omitempty"` +} + +//+kubebuilder:object:root=true +type FooList struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty"` + Items []Foo `json:"items"` +} + +func init() { + SchemeBuilder.Register(&Foo{}, &FooList{}) +} + +func (f *Foo) DeepCopyObject() runtime.Object { + return nil + +} + +func (f *Foo) SetGroupVersionKind(kind schema.GroupVersionKind) {} +func (f *Foo) GroupVersionKind() schema.GroupVersionKind { + return schema.GroupVersionKind{ + Group: "bar.example.com", + Version: "v1", + Kind: "Foo", + } + +} + +func (f *Foo) GetObjectKind() schema.ObjectKind { + return f + +} + +//func (in *FooList) DeepCopy() *FooList { +// if in == nil { +// return nil +// } +// out := new(FooList) +// in.DeepCopyInto(out) +// return out +//} + +func (in *FooList) DeepCopyObject() runtime.Object { + return nil +} + +var ( + GroupVersion = schema.GroupVersion{Group: "bar.example.com", Version: "v1"} + SchemeBuilder = &scheme.Builder{GroupVersion: GroupVersion} + AddToScheme = SchemeBuilder.AddToScheme +) diff --git a/pkg/internal/controller/controller.go b/pkg/internal/controller/controller.go index 1d37d883d2..1765a32e4e 100644 --- a/pkg/internal/controller/controller.go +++ b/pkg/internal/controller/controller.go @@ -126,6 +126,7 @@ func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prc } } + // These get held on indefinitely if sporadicSource, ok := src.(source.SporadicSource); ok && !c.Started { c.sporadicWatches = append(c.sporadicWatches, sporadicWatchDescription{src: sporadicSource, handler: evthdler, predicates: prct}) return nil @@ -210,6 +211,8 @@ func (c *Controller) Start(ctx context.Context) error { } c.Log.Info("sporadic watches", "len", len(c.sporadicWatches)) + // TODO: do we need a waitgroup here so that we only clear c.Started once all the watches are done + // or should a single done watch trigger all the others to stop, or something else? for _, watch := range c.sporadicWatches { c.Log.Info("sporadic Starting EventSource", "source", watch.src) diff --git a/pkg/manager/internal.go b/pkg/manager/internal.go index 8d5116030c..09c479d348 100644 --- a/pkg/manager/internal.go +++ b/pkg/manager/internal.go @@ -741,6 +741,7 @@ func (cm *controllerManager) startRunnable(r Runnable) { }() } +// TODO: is there a better way to do this? func (cm *controllerManager) startBlockingRunnable(r Runnable) { cm.waitForRunnable.Add(1) defer cm.waitForRunnable.Done() diff --git a/pkg/source/source.go b/pkg/source/source.go index 2251d096d7..222fe8f41a 100644 --- a/pkg/source/source.go +++ b/pkg/source/source.go @@ -59,7 +59,7 @@ type Source interface { } type SporadicSource interface { - //Source + Source StartNotifyDone(context.Context, handler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) (<-chan struct{}, error) Ready(ctx context.Context, wg *sync.WaitGroup) <-chan struct{} } @@ -164,6 +164,11 @@ func (sk *SporadicKind) StartNotifyDone(ctx context.Context, handler handler.Eve return infCtx.Done(), err } +func (sk *SporadicKind) Start(ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface, prct ...predicate.Predicate) error { + _, err := sk.StartNotifyDone(ctx, handler, queue, prct...) + return err +} + var _ SyncingSource = &Kind{} // Start is internal and should be called only by the Controller to register an EventHandler with the Informer From 9b77f9d85f7deb2015380d5b4cf659a48d69384e Mon Sep 17 00:00:00 2001 From: Kevin Delgado Date: Tue, 27 Apr 2021 21:01:12 +0000 Subject: [PATCH 07/17] add some commentary --- pkg/cache/internal/informers_map.go | 3 +-- pkg/internal/controller/controller.go | 5 +++++ pkg/manager/internal.go | 10 ++++++++++ pkg/manager/manager.go | 2 -- pkg/source/source.go | 2 ++ 5 files changed, 18 insertions(+), 4 deletions(-) diff --git a/pkg/cache/internal/informers_map.go b/pkg/cache/internal/informers_map.go index a19571be99..83ace47bfc 100644 --- a/pkg/cache/internal/informers_map.go +++ b/pkg/cache/internal/informers_map.go @@ -225,10 +225,9 @@ func (ip *specificInformersMap) addInformerToMap(ctx context.Context, gvk schema // Start the Informer if need by // TODO(seans): write thorough tests and document what happens here - can you add indexers? // can you add eventhandlers? - runCtx, cancel := context.WithCancel(ctx) // TODO: not cancelling? (leak) + runCtx, cancel := context.WithCancel(ctx) go func() { - // TODO: check shomron PR for why this might not be sufficient <-ip.stop fmt.Println("inf ip stopped") cancel() diff --git a/pkg/internal/controller/controller.go b/pkg/internal/controller/controller.go index 1765a32e4e..3c35aa9089 100644 --- a/pkg/internal/controller/controller.go +++ b/pkg/internal/controller/controller.go @@ -181,6 +181,8 @@ func (c *Controller) Start(ctx context.Context) error { c.initMetrics() + // wrap the given context in a cancellable one so that we can + // stop sporadic watches when their done signal fires ctrlCtx, ctrlCancel := context.WithCancel(ctx) // Set the internal context. @@ -216,10 +218,13 @@ func (c *Controller) Start(ctx context.Context) error { for _, watch := range c.sporadicWatches { c.Log.Info("sporadic Starting EventSource", "source", watch.src) + // Call a version of the Start method specific to SporadicSource that returns + // a done signal that fires when the CRD is no longer installed (and the informer gets shut down) done, err := watch.src.StartNotifyDone(ctrlCtx, watch.handler, c.Queue, watch.predicates...) if err != nil { return err } + // wait for done to fire and when it does, shut down the controller and reset c.Started go func() { fmt.Println("ctrl waiting for done") <-done diff --git a/pkg/manager/internal.go b/pkg/manager/internal.go index 09c479d348..4a40db4f60 100644 --- a/pkg/manager/internal.go +++ b/pkg/manager/internal.go @@ -204,6 +204,8 @@ func (cm *controllerManager) Add(r Runnable) error { var shouldStart bool // Add the runnable to the leader election or the non-leaderelection list + // TODO: currently we treat sporadicRunnable as separate from LER/non-LER + // but we shouldn't right? if sporadicRunnable, ok := r.(SporadicRunnable); ok { fmt.Println("mgr adding sporadic") cm.sporadicRunnables = append(cm.sporadicRunnables, sporadicRunnable) @@ -600,6 +602,13 @@ func (cm *controllerManager) waitForRunnableToEnd(shutdownCancel context.CancelF return nil } +// For each sporadicRunnable fire off a goroutine that +// blocks on the runnable's Ready (or the shutdown context). +// +// Once ready, call a version of start runnable that blocks +// until the runnable is terminated. +// +// Once the runnable stops, loop back and wait for ready again. func (cm *controllerManager) startSporadicRunnables() { cm.mu.Lock() cm.waitForCache(cm.internalCtx) @@ -741,6 +750,7 @@ func (cm *controllerManager) startRunnable(r Runnable) { }() } +// like startRunnable, but blocking // TODO: is there a better way to do this? func (cm *controllerManager) startBlockingRunnable(r Runnable) { cm.waitForRunnable.Add(1) diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index caf47df9eb..2975182a83 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -292,10 +292,8 @@ func (r RunnableFunc) Start(ctx context.Context) error { type SporadicRunnable interface { Runnable - // TODO: rename to ReadyToStart, use ReadySignal Ready(ctx context.Context) <-chan struct{} } -type ReadySignal <-chan struct{} // LeaderElectionRunnable knows if a Runnable needs to be run in the leader election mode. type LeaderElectionRunnable interface { diff --git a/pkg/source/source.go b/pkg/source/source.go index 222fe8f41a..b648fb977f 100644 --- a/pkg/source/source.go +++ b/pkg/source/source.go @@ -137,6 +137,8 @@ func (sk *SporadicKind) Ready(ctx context.Context, wg *sync.WaitGroup) <-chan st return ready } +// StartNotifyDone starts the kind while concurrently polling discovery to confirm the CRD is still installed +// It returns a signal that fires when the CRD is uninstalled, which also triggers a cacnel of the underlying informer func (sk *SporadicKind) StartNotifyDone(ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface, prct ...predicate.Predicate) (<-chan struct{}, error) { // TODO: how do we cancel if we never fail the discovery check? (leak?) fmt.Println("src start called") From d701263fbfb86c901ed75591e4c6d82f49f2be65 Mon Sep 17 00:00:00 2001 From: Kevin Delgado Date: Thu, 29 Apr 2021 18:28:49 +0000 Subject: [PATCH 08/17] wip broken test --- pkg/controller/controller_integration_test.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/controller/controller_integration_test.go b/pkg/controller/controller_integration_test.go index e52a06e0a0..731d0f7a4e 100644 --- a/pkg/controller/controller_integration_test.go +++ b/pkg/controller/controller_integration_test.go @@ -269,11 +269,11 @@ var _ = Describe("controller", func() { Expect(err).To(HaveOccurred()) By("Installing the CRD") - //fmt.Println("test installing crd") - //crds, err := envtest.InstallCRDs(cfg, crdOpts) - //fmt.Println("test crd installed") - //Expect(err).NotTo(HaveOccurred()) - //Expect(len(crds)).To(Equal(1)) + fmt.Println("test installing crd") + crds, err := envtest.InstallCRDs(cfg, crdOpts) + fmt.Println("test crd installed") + Expect(err).NotTo(HaveOccurred()) + Expect(len(crds)).To(Equal(1)) By("Invoking Reconcile for foo Create") From 4fd35e01b41fc7646512fd44c2eb4b1c2f80eb36 Mon Sep 17 00:00:00 2001 From: Kevin Delgado Date: Fri, 30 Apr 2021 06:27:41 +0000 Subject: [PATCH 09/17] wip integration test --- pkg/cache/internal/informers_map.go | 8 ++ pkg/controller/controller_integration_test.go | 119 ++++++++++++++---- pkg/controller/testdata/crds/foocrd.yaml | 17 --- pkg/controller/testdata/foo/foo_types.go | 107 +++++++++++++--- pkg/manager/internal.go | 2 +- pkg/source/source.go | 1 + 6 files changed, 191 insertions(+), 63 deletions(-) delete mode 100644 pkg/controller/testdata/crds/foocrd.yaml diff --git a/pkg/cache/internal/informers_map.go b/pkg/cache/internal/informers_map.go index 83ace47bfc..7ec81504e2 100644 --- a/pkg/cache/internal/informers_map.go +++ b/pkg/cache/internal/informers_map.go @@ -167,6 +167,7 @@ func (ip *specificInformersMap) HasSyncedFuncs() []cache.InformerSynced { // Get will create a new Informer and add it to the map of specificInformersMap if none exists. Returns // the Informer from the map. func (ip *specificInformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object) (bool, *MapEntry, error) { + fmt.Println("inf Get") // Return the informer if it is found i, started, ok := func() (*MapEntry, bool, bool) { ip.mu.RLock() @@ -193,6 +194,7 @@ func (ip *specificInformersMap) Get(ctx context.Context, gvk schema.GroupVersion } func (ip *specificInformersMap) addInformerToMap(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object) (*MapEntry, bool, error) { + fmt.Println("inf addInf") ip.mu.Lock() defer ip.mu.Unlock() @@ -203,10 +205,13 @@ func (ip *specificInformersMap) addInformerToMap(ctx context.Context, gvk schema return i, ip.started, nil } + fmt.Println("inf createLW") + fmt.Printf("inf gvk = %+v\n", gvk) // Create a NewSharedIndexInformer and add it to the map. var lw *cache.ListWatch lw, err := ip.createListWatcher(gvk, ip) if err != nil { + fmt.Printf("inf createLW err = %+v\n", err) return nil, false, err } ni := cache.NewSharedIndexInformer(lw, obj, resyncPeriod(ip.resync)(), cache.Indexers{ @@ -214,8 +219,11 @@ func (ip *specificInformersMap) addInformerToMap(ctx context.Context, gvk schema }) rm, err := ip.mapper.RESTMapping(gvk.GroupKind(), gvk.Version) if err != nil { + fmt.Printf("RESTMapping err = %+v\n", err) + fmt.Printf("gvk = %+v\n", gvk) return nil, false, err } + fmt.Println("inf RM success") i := &MapEntry{ Informer: ni, Reader: CacheReader{indexer: ni.GetIndexer(), groupVersionKind: gvk, scopeName: rm.Scope.Name()}, diff --git a/pkg/controller/controller_integration_test.go b/pkg/controller/controller_integration_test.go index 731d0f7a4e..e974ff4f50 100644 --- a/pkg/controller/controller_integration_test.go +++ b/pkg/controller/controller_integration_test.go @@ -20,19 +20,21 @@ import ( "context" "fmt" "path/filepath" + "time" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" - "k8s.io/client-go/kubernetes/scheme" - "k8s.io/client-go/rest" "sigs.k8s.io/controller-runtime/pkg/cache" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllertest" - "sigs.k8s.io/controller-runtime/pkg/controller/testdata/foo" + foo "sigs.k8s.io/controller-runtime/pkg/controller/testdata/foo/v1" "sigs.k8s.io/controller-runtime/pkg/envtest" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -184,19 +186,16 @@ var _ = Describe("controller", func() { FIt("should reconcile when the CRD is installed, uninstalled, reinstalled", func(done Done) { By("Initializing the scheme and crd") s := runtime.NewScheme() - f := &foo.Foo{} - gvk := f.GroupVersionKind() - options := manager.Options{} //s.AddKnownTypeWithName(gvk, f) //fl := &foo.FooList{} //s.AddKnownTypes(gvk.GroupVersion(), fl) - foo.AddToScheme(s) - options.Scheme = s - crdPath := filepath.Join(".", "testadata", "crds", "foocrd.yaml") - crdOpts := envtest.CRDInstallOptions{ - Paths: []string{crdPath}, - } - _ = crdOpts + err := v1beta1.AddToScheme(s) + Expect(err).NotTo(HaveOccurred()) + err = apiextensionsv1.AddToScheme(s) + Expect(err).NotTo(HaveOccurred()) + err = foo.AddToScheme(s) + Expect(err).NotTo(HaveOccurred()) + options := manager.Options{Scheme: s} By("Creating the Manager") cm, err := manager.New(cfg, options) @@ -213,6 +212,14 @@ var _ = Describe("controller", func() { Expect(err).NotTo(HaveOccurred()) By("Watching foo CRD as sporadic kinds") + f := &foo.Foo{} + //gvk := f.GroupVersionKind() + gvk := schema.GroupVersionKind{ + Group: "bar.example.com", + Version: "v1", + Kind: "Foo", + } + fmt.Printf("gvk = %+v\n", gvk) dc := clientset.Discovery() Expect(err).NotTo(HaveOccurred()) existsInDiscovery := func() bool { @@ -233,6 +240,7 @@ var _ = Describe("controller", func() { err = instance.Watch(&source.SporadicKind{Kind: source.Kind{Type: f}, DiscoveryCheck: existsInDiscovery}, &handler.EnqueueRequestForObject{}) Expect(err).NotTo(HaveOccurred()) //err = cm.GetClient().Get(ctx, types.NamespacedName{Name: "foo"}, &corev1.Namespace{}) + //err = cm.GetClient().Get(context.TODO(), types.NamespacedName{Name: "foos.bar.example.com"}, &corev1.Namespace{}) //Expect(err).To(Equal(&cache.ErrCacheNotStarted{})) //err = cm.GetClient().List(ctx, &corev1.NamespaceList{}) //Expect(err).To(Equal(&cache.ErrCacheNotStarted{})) @@ -246,7 +254,8 @@ var _ = Describe("controller", func() { }() testFoo := &foo.Foo{ - ObjectMeta: metav1.ObjectMeta{Name: "test-foo"}, + TypeMeta: metav1.TypeMeta{Kind: gvk.Kind, APIVersion: gvk.GroupVersion().String()}, + ObjectMeta: metav1.ObjectMeta{Name: "test-foo", Namespace: "default"}, } expectedReconcileRequest := reconcile.Request{NamespacedName: types.NamespacedName{ @@ -255,33 +264,92 @@ var _ = Describe("controller", func() { }} _ = expectedReconcileRequest - By("Creating the rest client") - config := *cfg - gv := gvk.GroupVersion() - config.ContentConfig.GroupVersion = &gv - config.NegotiatedSerializer = scheme.Codecs.WithoutConversion() - client, err := rest.RESTClientFor(&config) + By("Creating the client") + //config := *cfg + //gv := gvk.GroupVersion() + //config.ContentConfig.GroupVersion = &gv + //config.NegotiatedSerializer = scheme.Codecs.WithoutConversion() + //client, err := rest.RESTClientFor(&config) + // TODO: I don't think Mapper is actually necessary, recheck once working + //c, err := client.New(cfg, client.Options{Scheme: s, Mapper: cm.GetRESTMapper()}) + c := cm.GetClient() Expect(err).NotTo(HaveOccurred()) By("Failing to create a foo object if the crd isn't installed") - result := foo.Foo{} - err = client.Post().Namespace("default").Resource("foos").Body(testFoo).Do(ctx).Into(&result) + //result := foo.Foo{} + //err = client.Post().Namespace("default").Resource("foos").Body(testFoo).Do(ctx).Into(&result) + err = c.Create(ctx, testFoo) Expect(err).To(HaveOccurred()) + // TODO: remove, see if necessary + //time.Sleep(6 * time.Second) + By("Installing the CRD") fmt.Println("test installing crd") + + crdPath := filepath.Join(".", "testdata", "foo", "foocrd.yaml") + // TODO: fmt + crdOpts := envtest.CRDInstallOptions{ + Paths: []string{crdPath}, + MaxTime: 50 * time.Millisecond, + PollInterval: 15 * time.Millisecond, + } crds, err := envtest.InstallCRDs(cfg, crdOpts) fmt.Println("test crd installed") + fmt.Printf("err = %+v\n", err) + fmt.Printf("crds = %+v\n", crds[0]) Expect(err).NotTo(HaveOccurred()) Expect(len(crds)).To(Equal(1)) + By("Expecting to find the CRD") + crdv1 := &apiextensionsv1.CustomResourceDefinition{} + err = c.Get(context.TODO(), types.NamespacedName{Name: "foos.bar.example.com"}, crdv1) + Expect(err).NotTo(HaveOccurred()) + Expect(crdv1.Spec.Names.Kind).To(Equal("Foo")) + + err = envtest.WaitForCRDs(cfg, []client.Object{ + &v1beta1.CustomResourceDefinition{ + Spec: v1beta1.CustomResourceDefinitionSpec{ + Group: "bar.example.com", + Names: v1beta1.CustomResourceDefinitionNames{ + Kind: "Foo", + Plural: "foos", + }, + Versions: []v1beta1.CustomResourceDefinitionVersion{ + { + Name: "v1", + Storage: true, + Served: true, + }, + }}, + }, + }, + crdOpts, + //envtest.CRDInstallOptions{MaxTime: 50 * time.Millisecond, PollInterval: 15 * time.Millisecond}, + ) + Expect(err).NotTo(HaveOccurred()) + fmt.Println("crds done waiting") + + // sleep + // TODO: remove once working if not needed + time.Sleep(6 * time.Second) + By("Invoking Reconcile for foo Create") + fmt.Println("post foo") + //result = foo.Foo{} + //err = client.Post().Namespace("default").Resource("foos").Body(testFoo).Do(ctx).Into(&result) + err = c.Create(ctx, testFoo) + fmt.Printf("err = %+v\n", err) + Expect(err).NotTo(HaveOccurred()) + Expect(<-reconciled).To(Equal(expectedReconcileRequest)) By("Uninstalling the CRD") - //err = envtest.UninstallCRDs(cfg, crdOpts) - //Expect(err).NotTo(HaveOccurred()) + err = envtest.UninstallCRDs(cfg, crdOpts) + Expect(err).NotTo(HaveOccurred()) By("Failing get foo object if the crd isn't installed") + err = c.Create(ctx, testFoo) + Expect(err).To(HaveOccurred()) By("Reinstalling the CRD") @@ -289,9 +357,10 @@ var _ = Describe("controller", func() { By("Uninstalling the CRD") + fmt.Println("done") close(done) - }, 20) + }, 15) }) diff --git a/pkg/controller/testdata/crds/foocrd.yaml b/pkg/controller/testdata/crds/foocrd.yaml deleted file mode 100644 index e2bddbc528..0000000000 --- a/pkg/controller/testdata/crds/foocrd.yaml +++ /dev/null @@ -1,17 +0,0 @@ -apiVersion: apiextensions.k8s.io/v1 -kind: CustomResourceDefinition -metadata: - name: foos.bar.example.com -spec: - group: bar.example.com - names: - kind: Foo - plural: foos - scope: Namespaced - versions: - - name: "v1" - storage: true - served: true - schema: - openAPIV3Schema: - type: object diff --git a/pkg/controller/testdata/foo/foo_types.go b/pkg/controller/testdata/foo/foo_types.go index 6101e57934..4cd289275e 100644 --- a/pkg/controller/testdata/foo/foo_types.go +++ b/pkg/controller/testdata/foo/foo_types.go @@ -10,8 +10,7 @@ import ( type FooSpec struct{} type FooStatus struct{} -//+kubebuilder:object:root=true -//+kubebuilder:subresource:status +// +kubebuilder:object:root=true type Foo struct { metav1.TypeMeta `json:",inline"` @@ -21,18 +20,48 @@ type Foo struct { Status FooStatus `json:"status,omitempty"` } -//+kubebuilder:object:root=true +// +kubebuilder:object:root=true + type FooList struct { - metav1.TypeMeta `json:",inline"` - metav1.ObjectMeta `json:"metadata,omitempty"` - Items []Foo `json:"items"` + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata,omitempty"` + Items []Foo `json:"items"` } func init() { SchemeBuilder.Register(&Foo{}, &FooList{}) } -func (f *Foo) DeepCopyObject() runtime.Object { +var ( + GroupVersion = schema.GroupVersion{Group: "bar.example.com", Version: "v1"} + SchemeBuilder = &scheme.Builder{GroupVersion: GroupVersion} + AddToScheme = SchemeBuilder.AddToScheme +) + +// TODO: autogen all this stuff (learn how to) and split into v1 directory +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Foo) DeepCopyInto(out *Foo) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + out.Spec = in.Spec + out.Status = in.Status +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Foo. +func (in *Foo) DeepCopy() *Foo { + if in == nil { + return nil + } + out := new(Foo) + in.DeepCopyInto(out) + return out +} + +func (in *Foo) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } return nil } @@ -52,21 +81,59 @@ func (f *Foo) GetObjectKind() schema.ObjectKind { } -//func (in *FooList) DeepCopy() *FooList { -// if in == nil { -// return nil -// } -// out := new(FooList) -// in.DeepCopyInto(out) -// return out -//} +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *FooList) DeepCopyInto(out *FooList) { + *out = *in + out.TypeMeta = in.TypeMeta + out.ListMeta = in.ListMeta + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]Foo, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +func (in *FooList) DeepCopy() *FooList { + if in == nil { + return nil + } + out := new(FooList) + in.DeepCopyInto(out) + return out +} func (in *FooList) DeepCopyObject() runtime.Object { return nil } -var ( - GroupVersion = schema.GroupVersion{Group: "bar.example.com", Version: "v1"} - SchemeBuilder = &scheme.Builder{GroupVersion: GroupVersion} - AddToScheme = SchemeBuilder.AddToScheme -) +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *FooSpec) DeepCopyInto(out *FooSpec) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new FooSpec. +func (in *FooSpec) DeepCopy() *FooSpec { + if in == nil { + return nil + } + out := new(FooSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *FooStatus) DeepCopyInto(out *FooStatus) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new FooStatus. +func (in *FooStatus) DeepCopy() *FooStatus { + if in == nil { + return nil + } + out := new(FooStatus) + in.DeepCopyInto(out) + return out +} diff --git a/pkg/manager/internal.go b/pkg/manager/internal.go index 4a40db4f60..fc95c78057 100644 --- a/pkg/manager/internal.go +++ b/pkg/manager/internal.go @@ -755,7 +755,7 @@ func (cm *controllerManager) startRunnable(r Runnable) { func (cm *controllerManager) startBlockingRunnable(r Runnable) { cm.waitForRunnable.Add(1) defer cm.waitForRunnable.Done() - fmt.Printf("starting sporadic runnable, %v", r) + fmt.Printf("starting sporadic runnable, %v\n", r) if err := r.Start(cm.internalCtx); err != nil { cm.errChan <- err } diff --git a/pkg/source/source.go b/pkg/source/source.go index b648fb977f..45a1eaaccd 100644 --- a/pkg/source/source.go +++ b/pkg/source/source.go @@ -194,6 +194,7 @@ func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue w ks.started = make(chan error) go func() { // Lookup the Informer from the Cache and add an EventHandler which populates the Queue + fmt.Printf("ks.Type = %+v\n", ks.Type) i, err := ks.cache.GetInformer(ctx, ks.Type) if err != nil { fmt.Printf("kind GetInformer err = %+v\n", err) From 533484a8e3bd91f87f98aca6105db912163a0ed9 Mon Sep 17 00:00:00 2001 From: Kevin Delgado Date: Fri, 30 Apr 2021 06:28:46 +0000 Subject: [PATCH 10/17] wip integration test --- pkg/controller/testdata/foo/foocrd.yaml | 17 ++++ pkg/controller/testdata/foo/v1/foo_types.go | 60 ++++++++++++ .../testdata/foo/v1/groupversion_info.go | 38 +++++++ .../testdata/foo/v1/zz_generated.deepcopy.go | 98 +++++++++++++++++++ 4 files changed, 213 insertions(+) create mode 100644 pkg/controller/testdata/foo/foocrd.yaml create mode 100644 pkg/controller/testdata/foo/v1/foo_types.go create mode 100644 pkg/controller/testdata/foo/v1/groupversion_info.go create mode 100644 pkg/controller/testdata/foo/v1/zz_generated.deepcopy.go diff --git a/pkg/controller/testdata/foo/foocrd.yaml b/pkg/controller/testdata/foo/foocrd.yaml new file mode 100644 index 0000000000..e2bddbc528 --- /dev/null +++ b/pkg/controller/testdata/foo/foocrd.yaml @@ -0,0 +1,17 @@ +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + name: foos.bar.example.com +spec: + group: bar.example.com + names: + kind: Foo + plural: foos + scope: Namespaced + versions: + - name: "v1" + storage: true + served: true + schema: + openAPIV3Schema: + type: object diff --git a/pkg/controller/testdata/foo/v1/foo_types.go b/pkg/controller/testdata/foo/v1/foo_types.go new file mode 100644 index 0000000000..380b7ce8c4 --- /dev/null +++ b/pkg/controller/testdata/foo/v1/foo_types.go @@ -0,0 +1,60 @@ +/* + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1 + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN! +// NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized. + +// FooSpec defines the desired state of Foo +type FooSpec struct { + // INSERT ADDITIONAL SPEC FIELDS - desired state of cluster + // Important: Run "make" to regenerate code after modifying this file + RunAt string `json:"runAt"` +} + +// FooStatus defines the observed state of Foo +type FooStatus struct { + // INSERT ADDITIONAL STATUS FIELD - define observed state of cluster + // Important: Run "make" to regenerate code after modifying this file +} + +// +kubebuilder:object:root=true + +// Foo is the Schema for the externaljobs API +type Foo struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty"` + + Spec FooSpec `json:"spec,omitempty"` + Status FooStatus `json:"status,omitempty"` +} + +// +kubebuilder:object:root=true + +// FooList contains a list of Foo +type FooList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata,omitempty"` + Items []Foo `json:"items"` +} + +func init() { + SchemeBuilder.Register(&Foo{}, &FooList{}) +} diff --git a/pkg/controller/testdata/foo/v1/groupversion_info.go b/pkg/controller/testdata/foo/v1/groupversion_info.go new file mode 100644 index 0000000000..a641b61024 --- /dev/null +++ b/pkg/controller/testdata/foo/v1/groupversion_info.go @@ -0,0 +1,38 @@ +/* +Copyright 2018 The Kubernetes authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// +kubebuilder:object:generate=true +// +groupName=chaosapps.metamagical.io +package v1 + +import ( + "k8s.io/apimachinery/pkg/runtime/schema" + logf "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/scheme" +) + +var ( + log = logf.Log.WithName("foo-resource") + + // SchemeGroupVersion is group version used to register these objects + SchemeGroupVersion = schema.GroupVersion{Group: "bar.example.com", Version: "v1"} + + // SchemeBuilder is used to add go types to the GroupVersionKind scheme + SchemeBuilder = &scheme.Builder{GroupVersion: SchemeGroupVersion} + + // AddToScheme is required by pkg/client/... + AddToScheme = SchemeBuilder.AddToScheme +) diff --git a/pkg/controller/testdata/foo/v1/zz_generated.deepcopy.go b/pkg/controller/testdata/foo/v1/zz_generated.deepcopy.go new file mode 100644 index 0000000000..01a5a9e34f --- /dev/null +++ b/pkg/controller/testdata/foo/v1/zz_generated.deepcopy.go @@ -0,0 +1,98 @@ +// +build !ignore_autogenerated + +// Code generated by controller-gen. DO NOT EDIT. + +package v1 + +import ( + runtime "k8s.io/apimachinery/pkg/runtime" +) + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Foo) DeepCopyInto(out *Foo) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + out.Spec = in.Spec + out.Status = in.Status +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Foo. +func (in *Foo) DeepCopy() *Foo { + if in == nil { + return nil + } + out := new(Foo) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *Foo) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *FooList) DeepCopyInto(out *FooList) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ListMeta.DeepCopyInto(&out.ListMeta) + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]Foo, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new FooList. +func (in *FooList) DeepCopy() *FooList { + if in == nil { + return nil + } + out := new(FooList) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *FooList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *FooSpec) DeepCopyInto(out *FooSpec) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new FooSpec. +func (in *FooSpec) DeepCopy() *FooSpec { + if in == nil { + return nil + } + out := new(FooSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *FooStatus) DeepCopyInto(out *FooStatus) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new FooStatus. +func (in *FooStatus) DeepCopy() *FooStatus { + if in == nil { + return nil + } + out := new(FooStatus) + in.DeepCopyInto(out) + return out +} From 5ad277095fdf3dce72969835927b3cdfb5ffe8d4 Mon Sep 17 00:00:00 2001 From: Kevin Delgado Date: Fri, 30 Apr 2021 06:39:25 +0000 Subject: [PATCH 11/17] full hacky integration test working --- pkg/controller/controller_integration_test.go | 58 ++++++++++++++++++- 1 file changed, 56 insertions(+), 2 deletions(-) diff --git a/pkg/controller/controller_integration_test.go b/pkg/controller/controller_integration_test.go index e974ff4f50..80e37ee867 100644 --- a/pkg/controller/controller_integration_test.go +++ b/pkg/controller/controller_integration_test.go @@ -332,7 +332,7 @@ var _ = Describe("controller", func() { // sleep // TODO: remove once working if not needed - time.Sleep(6 * time.Second) + //time.Sleep(6 * time.Second) By("Invoking Reconcile for foo Create") fmt.Println("post foo") @@ -346,21 +346,75 @@ var _ = Describe("controller", func() { By("Uninstalling the CRD") err = envtest.UninstallCRDs(cfg, crdOpts) Expect(err).NotTo(HaveOccurred()) + // TODO: wait for crd to uninstall instead of sleeping + time.Sleep(6 * time.Second) By("Failing get foo object if the crd isn't installed") err = c.Create(ctx, testFoo) + // TODO: check the error is the correct one Expect(err).To(HaveOccurred()) + //Expect(err).NotTo(HaveOccurred()) By("Reinstalling the CRD") + crds, err = envtest.InstallCRDs(cfg, crdOpts) + fmt.Println("test crd installed") + fmt.Printf("err = %+v\n", err) + fmt.Printf("crds = %+v\n", crds[0]) + Expect(err).NotTo(HaveOccurred()) + Expect(len(crds)).To(Equal(1)) + + By("Expecting to find the CRD") + crdv1 = &apiextensionsv1.CustomResourceDefinition{} + err = c.Get(context.TODO(), types.NamespacedName{Name: "foos.bar.example.com"}, crdv1) + Expect(err).NotTo(HaveOccurred()) + Expect(crdv1.Spec.Names.Kind).To(Equal("Foo")) + + err = envtest.WaitForCRDs(cfg, []client.Object{ + &v1beta1.CustomResourceDefinition{ + Spec: v1beta1.CustomResourceDefinitionSpec{ + Group: "bar.example.com", + Names: v1beta1.CustomResourceDefinitionNames{ + Kind: "Foo", + Plural: "foos", + }, + Versions: []v1beta1.CustomResourceDefinitionVersion{ + { + Name: "v1", + Storage: true, + Served: true, + }, + }}, + }, + }, + crdOpts, + //envtest.CRDInstallOptions{MaxTime: 50 * time.Millisecond, PollInterval: 15 * time.Millisecond}, + ) + Expect(err).NotTo(HaveOccurred()) + fmt.Println("crds done waiting") By("Invoking Reconcile for foo Create") + fmt.Println("post foo") + //result = foo.Foo{} + //err = client.Post().Namespace("default").Resource("foos").Body(testFoo).Do(ctx).Into(&result) + // TODO: create a new testFoo? + // clear the resourceVersion + testFoo.ResourceVersion = "" + err = c.Create(ctx, testFoo) + fmt.Printf("err = %+v\n", err) + Expect(err).NotTo(HaveOccurred()) + Expect(<-reconciled).To(Equal(expectedReconcileRequest)) By("Uninstalling the CRD") + err = envtest.UninstallCRDs(cfg, crdOpts) + Expect(err).NotTo(HaveOccurred()) + // TODO: wait for crd to uninstall instead of sleeping + time.Sleep(6 * time.Second) fmt.Println("done") close(done) - }, 15) + // TODO: make the test faster? + }, 25) }) From 5ba0991a39c2cd3fd71f236e1c3860e50dc65763 Mon Sep 17 00:00:00 2001 From: Kevin Delgado Date: Fri, 30 Apr 2021 17:26:46 +0000 Subject: [PATCH 12/17] Complete integration test, pre-cleanup --- pkg/controller/controller_integration_test.go | 36 +++++++++++++++---- 1 file changed, 30 insertions(+), 6 deletions(-) diff --git a/pkg/controller/controller_integration_test.go b/pkg/controller/controller_integration_test.go index 80e37ee867..99048b929e 100644 --- a/pkg/controller/controller_integration_test.go +++ b/pkg/controller/controller_integration_test.go @@ -26,10 +26,12 @@ import ( corev1 "k8s.io/api/core/v1" apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" + "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" @@ -344,15 +346,26 @@ var _ = Describe("controller", func() { Expect(<-reconciled).To(Equal(expectedReconcileRequest)) By("Uninstalling the CRD") + errNotFound := errors.NewGenericServerResponse(404, "POST", schema.GroupResource{Group: "bar.example.com", Resource: "foos"}, "", "404 page not found", 0, true) err = envtest.UninstallCRDs(cfg, crdOpts) Expect(err).NotTo(HaveOccurred()) - // TODO: wait for crd to uninstall instead of sleeping - time.Sleep(6 * time.Second) + // wait for discovery to not recognize the resource after uninstall + wait.PollImmediate(15*time.Millisecond, 50*time.Millisecond, func() (bool, error) { + if _, err := clientset.Discovery().ServerResourcesForGroupVersion(gvk.Group + "/" + gvk.Version); err != nil { + if err.Error() == "the server could not find the requested resource" { + return true, nil + } + } + return false, nil + }) - By("Failing get foo object if the crd isn't installed") + By("Failing create foo object if the crd isn't installed") err = c.Create(ctx, testFoo) // TODO: check the error is the correct one - Expect(err).To(HaveOccurred()) + fmt.Printf("fail err = %+v\n", err) + //Expect(err).To(HaveOccurred()) + //errNotFound := errors.NewNotFound(schema.GroupResource{Group: "bar.example.com", Resource: "foos"}, "") + Expect(err).To(Equal(errNotFound)) //Expect(err).NotTo(HaveOccurred()) By("Reinstalling the CRD") @@ -407,8 +420,19 @@ var _ = Describe("controller", func() { By("Uninstalling the CRD") err = envtest.UninstallCRDs(cfg, crdOpts) Expect(err).NotTo(HaveOccurred()) - // TODO: wait for crd to uninstall instead of sleeping - time.Sleep(6 * time.Second) + // wait for discovery to not recognize the resource after uninstall + wait.PollImmediate(15*time.Millisecond, 50*time.Millisecond, func() (bool, error) { + if _, err := clientset.Discovery().ServerResourcesForGroupVersion(gvk.Group + "/" + gvk.Version); err != nil { + if err.Error() == "the server could not find the requested resource" { + return true, nil + } + } + return false, nil + }) + + By("Failing create foo object if the crd isn't installed") + err = c.Create(ctx, testFoo) + Expect(err).To(Equal(errNotFound)) fmt.Println("done") close(done) From 8685213e34402192bf1e2dcf27e0d382d0065534 Mon Sep 17 00:00:00 2001 From: Kevin Delgado Date: Fri, 30 Apr 2021 17:39:38 +0000 Subject: [PATCH 13/17] cleanup integration test --- pkg/controller/controller_integration_test.go | 93 +++---------------- 1 file changed, 15 insertions(+), 78 deletions(-) diff --git a/pkg/controller/controller_integration_test.go b/pkg/controller/controller_integration_test.go index 99048b929e..6f2e51f94d 100644 --- a/pkg/controller/controller_integration_test.go +++ b/pkg/controller/controller_integration_test.go @@ -18,7 +18,7 @@ package controller_test import ( "context" - "fmt" + goerrors "errors" "path/filepath" "time" @@ -27,6 +27,7 @@ import ( apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" @@ -188,9 +189,6 @@ var _ = Describe("controller", func() { FIt("should reconcile when the CRD is installed, uninstalled, reinstalled", func(done Done) { By("Initializing the scheme and crd") s := runtime.NewScheme() - //s.AddKnownTypeWithName(gvk, f) - //fl := &foo.FooList{} - //s.AddKnownTypes(gvk.GroupVersion(), fl) err := v1beta1.AddToScheme(s) Expect(err).NotTo(HaveOccurred()) err = apiextensionsv1.AddToScheme(s) @@ -215,37 +213,26 @@ var _ = Describe("controller", func() { By("Watching foo CRD as sporadic kinds") f := &foo.Foo{} - //gvk := f.GroupVersionKind() gvk := schema.GroupVersionKind{ Group: "bar.example.com", Version: "v1", Kind: "Foo", } - fmt.Printf("gvk = %+v\n", gvk) - dc := clientset.Discovery() Expect(err).NotTo(HaveOccurred()) existsInDiscovery := func() bool { - resources, err := dc.ServerResourcesForGroupVersion(gvk.GroupVersion().String()) + resources, err := clientset.Discovery().ServerResourcesForGroupVersion(gvk.GroupVersion().String()) if err != nil { - fmt.Printf("NOT in discovery gvk = %+v\n", gvk) return false } for _, res := range resources.APIResources { if res.Kind == gvk.Kind { - fmt.Printf("YES in discovery gvk = %+v\n", gvk) return true } } - fmt.Printf("NOT in discovery kind = %+v\n", gvk) return false } err = instance.Watch(&source.SporadicKind{Kind: source.Kind{Type: f}, DiscoveryCheck: existsInDiscovery}, &handler.EnqueueRequestForObject{}) Expect(err).NotTo(HaveOccurred()) - //err = cm.GetClient().Get(ctx, types.NamespacedName{Name: "foo"}, &corev1.Namespace{}) - //err = cm.GetClient().Get(context.TODO(), types.NamespacedName{Name: "foos.bar.example.com"}, &corev1.Namespace{}) - //Expect(err).To(Equal(&cache.ErrCacheNotStarted{})) - //err = cm.GetClient().List(ctx, &corev1.NamespaceList{}) - //Expect(err).To(Equal(&cache.ErrCacheNotStarted{})) By("Starting the Manager") ctx, cancel := context.WithCancel(context.Background()) @@ -261,51 +248,30 @@ var _ = Describe("controller", func() { } expectedReconcileRequest := reconcile.Request{NamespacedName: types.NamespacedName{ - Namespace: "default", Name: "test-foo", + Namespace: "default", }} _ = expectedReconcileRequest - By("Creating the client") - //config := *cfg - //gv := gvk.GroupVersion() - //config.ContentConfig.GroupVersion = &gv - //config.NegotiatedSerializer = scheme.Codecs.WithoutConversion() - //client, err := rest.RESTClientFor(&config) - // TODO: I don't think Mapper is actually necessary, recheck once working - //c, err := client.New(cfg, client.Options{Scheme: s, Mapper: cm.GetRESTMapper()}) - c := cm.GetClient() - Expect(err).NotTo(HaveOccurred()) - By("Failing to create a foo object if the crd isn't installed") - //result := foo.Foo{} - //err = client.Post().Namespace("default").Resource("foos").Body(testFoo).Do(ctx).Into(&result) - err = c.Create(ctx, testFoo) - Expect(err).To(HaveOccurred()) - - // TODO: remove, see if necessary - //time.Sleep(6 * time.Second) + kindMatchErr := &meta.NoKindMatchError{} + err = cm.GetClient().Create(ctx, testFoo) + Expect(goerrors.As(err, &kindMatchErr)).To(BeTrue()) By("Installing the CRD") - fmt.Println("test installing crd") - crdPath := filepath.Join(".", "testdata", "foo", "foocrd.yaml") - // TODO: fmt crdOpts := envtest.CRDInstallOptions{ Paths: []string{crdPath}, MaxTime: 50 * time.Millisecond, PollInterval: 15 * time.Millisecond, } crds, err := envtest.InstallCRDs(cfg, crdOpts) - fmt.Println("test crd installed") - fmt.Printf("err = %+v\n", err) - fmt.Printf("crds = %+v\n", crds[0]) Expect(err).NotTo(HaveOccurred()) Expect(len(crds)).To(Equal(1)) By("Expecting to find the CRD") crdv1 := &apiextensionsv1.CustomResourceDefinition{} - err = c.Get(context.TODO(), types.NamespacedName{Name: "foos.bar.example.com"}, crdv1) + err = cm.GetClient().Get(context.TODO(), types.NamespacedName{Name: "foos.bar.example.com"}, crdv1) Expect(err).NotTo(HaveOccurred()) Expect(crdv1.Spec.Names.Kind).To(Equal("Foo")) @@ -327,26 +293,15 @@ var _ = Describe("controller", func() { }, }, crdOpts, - //envtest.CRDInstallOptions{MaxTime: 50 * time.Millisecond, PollInterval: 15 * time.Millisecond}, ) Expect(err).NotTo(HaveOccurred()) - fmt.Println("crds done waiting") - - // sleep - // TODO: remove once working if not needed - //time.Sleep(6 * time.Second) By("Invoking Reconcile for foo Create") - fmt.Println("post foo") - //result = foo.Foo{} - //err = client.Post().Namespace("default").Resource("foos").Body(testFoo).Do(ctx).Into(&result) - err = c.Create(ctx, testFoo) - fmt.Printf("err = %+v\n", err) + err = cm.GetClient().Create(ctx, testFoo) Expect(err).NotTo(HaveOccurred()) Expect(<-reconciled).To(Equal(expectedReconcileRequest)) By("Uninstalling the CRD") - errNotFound := errors.NewGenericServerResponse(404, "POST", schema.GroupResource{Group: "bar.example.com", Resource: "foos"}, "", "404 page not found", 0, true) err = envtest.UninstallCRDs(cfg, crdOpts) Expect(err).NotTo(HaveOccurred()) // wait for discovery to not recognize the resource after uninstall @@ -360,25 +315,18 @@ var _ = Describe("controller", func() { }) By("Failing create foo object if the crd isn't installed") - err = c.Create(ctx, testFoo) - // TODO: check the error is the correct one - fmt.Printf("fail err = %+v\n", err) - //Expect(err).To(HaveOccurred()) - //errNotFound := errors.NewNotFound(schema.GroupResource{Group: "bar.example.com", Resource: "foos"}, "") + errNotFound := errors.NewGenericServerResponse(404, "POST", schema.GroupResource{Group: "bar.example.com", Resource: "foos"}, "", "404 page not found", 0, true) + err = cm.GetClient().Create(ctx, testFoo) Expect(err).To(Equal(errNotFound)) - //Expect(err).NotTo(HaveOccurred()) By("Reinstalling the CRD") crds, err = envtest.InstallCRDs(cfg, crdOpts) - fmt.Println("test crd installed") - fmt.Printf("err = %+v\n", err) - fmt.Printf("crds = %+v\n", crds[0]) Expect(err).NotTo(HaveOccurred()) Expect(len(crds)).To(Equal(1)) By("Expecting to find the CRD") crdv1 = &apiextensionsv1.CustomResourceDefinition{} - err = c.Get(context.TODO(), types.NamespacedName{Name: "foos.bar.example.com"}, crdv1) + err = cm.GetClient().Get(context.TODO(), types.NamespacedName{Name: "foos.bar.example.com"}, crdv1) Expect(err).NotTo(HaveOccurred()) Expect(crdv1.Spec.Names.Kind).To(Equal("Foo")) @@ -400,20 +348,12 @@ var _ = Describe("controller", func() { }, }, crdOpts, - //envtest.CRDInstallOptions{MaxTime: 50 * time.Millisecond, PollInterval: 15 * time.Millisecond}, ) Expect(err).NotTo(HaveOccurred()) - fmt.Println("crds done waiting") By("Invoking Reconcile for foo Create") - fmt.Println("post foo") - //result = foo.Foo{} - //err = client.Post().Namespace("default").Resource("foos").Body(testFoo).Do(ctx).Into(&result) - // TODO: create a new testFoo? - // clear the resourceVersion testFoo.ResourceVersion = "" - err = c.Create(ctx, testFoo) - fmt.Printf("err = %+v\n", err) + err = cm.GetClient().Create(ctx, testFoo) Expect(err).NotTo(HaveOccurred()) Expect(<-reconciled).To(Equal(expectedReconcileRequest)) @@ -431,14 +371,11 @@ var _ = Describe("controller", func() { }) By("Failing create foo object if the crd isn't installed") - err = c.Create(ctx, testFoo) + err = cm.GetClient().Create(ctx, testFoo) Expect(err).To(Equal(errNotFound)) - fmt.Println("done") close(done) - - // TODO: make the test faster? - }, 25) + }, 10) }) From 36e65f23f78f6e47f07ffb6ac4d909c677483dda Mon Sep 17 00:00:00 2001 From: Kevin Delgado Date: Tue, 11 May 2021 20:43:34 +0000 Subject: [PATCH 14/17] fix leaky goro --- pkg/manager/internal.go | 1 + pkg/source/source.go | 37 +++++++++++++++---------------------- 2 files changed, 16 insertions(+), 22 deletions(-) diff --git a/pkg/manager/internal.go b/pkg/manager/internal.go index fc95c78057..15e7774a2f 100644 --- a/pkg/manager/internal.go +++ b/pkg/manager/internal.go @@ -631,6 +631,7 @@ func (cm *controllerManager) startSporadicRunnables() { // this doesn't block cm.startBlockingRunnable(sr) fmt.Println("mgr runnable done running") + return } fmt.Println("mgr done running, looping back to wait on ready") } diff --git a/pkg/source/source.go b/pkg/source/source.go index 45a1eaaccd..f3de3e44a2 100644 --- a/pkg/source/source.go +++ b/pkg/source/source.go @@ -61,7 +61,7 @@ type Source interface { type SporadicSource interface { Source StartNotifyDone(context.Context, handler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) (<-chan struct{}, error) - Ready(ctx context.Context, wg *sync.WaitGroup) <-chan struct{} + Ready(ctx context.Context, wg *sync.WaitGroup) } // SyncingSource is a source that needs syncing prior to being usable. The controller @@ -110,31 +110,24 @@ type SporadicKind struct { DiscoveryCheck func() bool } -func (sk *SporadicKind) Ready(ctx context.Context, wg *sync.WaitGroup) <-chan struct{} { +func (sk *SporadicKind) Ready(ctx context.Context, wg *sync.WaitGroup) { fmt.Println("src ready called") - ready := make(chan struct{}) - go func() { - defer wg.Done() - for { - select { - case <-ctx.Done(): - fmt.Println("src context shutdown") - close(ready) + defer wg.Done() + for { + select { + case <-ctx.Done(): + fmt.Println("src context shutdown") + return + default: + if sk.DiscoveryCheck() { + fmt.Println("src ready discovery check pass closing ready") return - default: - if sk.DiscoveryCheck() { - fmt.Println("src ready discovery check pass closing ready") - close(ready) - return - } - //TODO: parameterize this - fmt.Println("src ready discovery check fail, spin") - time.Sleep(5 * time.Second) } + //TODO: parameterize this + fmt.Println("src ready discovery check fail, spin") + time.Sleep(5 * time.Second) } - }() - - return ready + } } // StartNotifyDone starts the kind while concurrently polling discovery to confirm the CRD is still installed From 97af49d9e66f5586c523b4c85ec5eff1c429a067 Mon Sep 17 00:00:00 2001 From: Kevin Delgado Date: Tue, 11 May 2021 21:24:26 +0000 Subject: [PATCH 15/17] fix fake_cache --- pkg/cache/informertest/fake_cache.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/pkg/cache/informertest/fake_cache.go b/pkg/cache/informertest/fake_cache.go index eb78e0bb65..fcaddb963e 100644 --- a/pkg/cache/informertest/fake_cache.go +++ b/pkg/cache/informertest/fake_cache.go @@ -79,6 +79,15 @@ func (c *FakeInformers) GetInformer(ctx context.Context, obj client.Object) (cac return c.informerFor(gvk, obj) } +// GetStoppableInformer implements informers +func (c *FakeInformers) GetStoppableInformer(ctx context.Context, obj client.Object) (cache.Informer, <-chan struct{}, error) { + // TODO: + //panic("not implemented") + i, e := c.GetInformer(ctx, obj) + return i, nil, e + +} + // WaitForCacheSync implements Informers func (c *FakeInformers) WaitForCacheSync(ctx context.Context) bool { if c.Synced == nil { From 5b6d8fb985bf454595945df96c51f8028bf64438 Mon Sep 17 00:00:00 2001 From: Kevin Delgado Date: Tue, 11 May 2021 23:06:32 +0000 Subject: [PATCH 16/17] wip, first attempt sporadic separate from le --- pkg/manager/internal.go | 152 +++++++++++++++++++++++++--------------- 1 file changed, 95 insertions(+), 57 deletions(-) diff --git a/pkg/manager/internal.go b/pkg/manager/internal.go index 15e7774a2f..9c58032358 100644 --- a/pkg/manager/internal.go +++ b/pkg/manager/internal.go @@ -204,18 +204,18 @@ func (cm *controllerManager) Add(r Runnable) error { var shouldStart bool // Add the runnable to the leader election or the non-leaderelection list - // TODO: currently we treat sporadicRunnable as separate from LER/non-LER - // but we shouldn't right? - if sporadicRunnable, ok := r.(SporadicRunnable); ok { - fmt.Println("mgr adding sporadic") - cm.sporadicRunnables = append(cm.sporadicRunnables, sporadicRunnable) - } else if leRunnable, ok := r.(LeaderElectionRunnable); ok && !leRunnable.NeedLeaderElection() { + //// TODO: currently we treat sporadicRunnable as separate from LER/non-LER + //// but we shouldn't right? + //if sporadicRunnable, ok := r.(SporadicRunnable); ok { + // fmt.Println("mgr adding sporadic") + // cm.sporadicRunnables = append(cm.sporadicRunnables, sporadicRunnable) + //} else if leRunnable, ok := r.(LeaderElectionRunnable); ok && !leRunnable.NeedLeaderElection() { + if leRunnable, ok := r.(LeaderElectionRunnable); ok && !leRunnable.NeedLeaderElection() { fmt.Println("mgr adding non ler") shouldStart = cm.started cm.nonLeaderElectionRunnables = append(cm.nonLeaderElectionRunnables, r) } else if hasCache, ok := r.(hasCache); ok { cm.caches = append(cm.caches, hasCache) - } else { fmt.Println("mgr adding ler") shouldStart = cm.startedLeader @@ -498,7 +498,7 @@ func (cm *controllerManager) Start(ctx context.Context) (err error) { go cm.serveHealthProbes() } - go cm.startSporadicRunnables() + //go cm.startSporadicRunnables() go cm.startNonLeaderElectionRunnables() @@ -602,42 +602,42 @@ func (cm *controllerManager) waitForRunnableToEnd(shutdownCancel context.CancelF return nil } -// For each sporadicRunnable fire off a goroutine that -// blocks on the runnable's Ready (or the shutdown context). +//// For each sporadicRunnable fire off a goroutine that +//// blocks on the runnable's Ready (or the shutdown context). +//// +//// Once ready, call a version of start runnable that blocks +//// until the runnable is terminated. +//// +//// Once the runnable stops, loop back and wait for ready again. +//func (cm *controllerManager) startSporadicRunnables() { +// cm.mu.Lock() +// cm.waitForCache(cm.internalCtx) +// cm.mu.Unlock() // -// Once ready, call a version of start runnable that blocks -// until the runnable is terminated. +// // TODO: what locking is necessary here? +// fmt.Printf("mgr len(cm.sporadicRunnables) = %+v\n", len(cm.sporadicRunnables)) // -// Once the runnable stops, loop back and wait for ready again. -func (cm *controllerManager) startSporadicRunnables() { - cm.mu.Lock() - cm.waitForCache(cm.internalCtx) - cm.mu.Unlock() - - // TODO: what locking is necessary here? - fmt.Printf("mgr len(cm.sporadicRunnables) = %+v\n", len(cm.sporadicRunnables)) - - for _, sr := range cm.sporadicRunnables { - go func(sr SporadicRunnable) { - fmt.Println("mgr got an sr") - for { - fmt.Println("mgr waiting on sr ReadyToStart") - select { - case <-cm.internalCtx.Done(): - fmt.Println("mgr internal context fired") - return - case <-sr.Ready(cm.internalCtx): - fmt.Println("mgr ready, starting the runnable") - // this doesn't block - cm.startBlockingRunnable(sr) - fmt.Println("mgr runnable done running") - return - } - fmt.Println("mgr done running, looping back to wait on ready") - } - }(sr) - } -} +// for _, sr := range cm.sporadicRunnables { +// go func(sr SporadicRunnable) { +// fmt.Println("mgr got an sr") +// for { +// fmt.Println("mgr waiting on sr ReadyToStart") +// select { +// case <-cm.internalCtx.Done(): +// fmt.Println("mgr internal context fired") +// return +// case <-sr.Ready(cm.internalCtx): +// fmt.Println("mgr ready, starting the runnable") +// // this doesn't block +// cm.startBlockingRunnable(sr) +// fmt.Println("mgr runnable done running") +// return +// } +// fmt.Println("mgr done running, looping back to wait on ready") +// } +// }(sr) +// } +//} func (cm *controllerManager) startNonLeaderElectionRunnables() { cm.mu.Lock() @@ -741,23 +741,61 @@ func (cm *controllerManager) Elected() <-chan struct{} { } func (cm *controllerManager) startRunnable(r Runnable) { - cm.waitForRunnable.Add(1) + if sporadicRunnable, ok := r.(SporadicRunnable); ok { + fmt.Printf("starting sporadic runnable = %+v\n", sporadicRunnable) + cm.startSporadicRunnable(sporadicRunnable) + } else { + cm.waitForRunnable.Add(1) + go func() { + fmt.Printf("starting non-sporadic runnable, %v\n", r) + defer cm.waitForRunnable.Done() + if err := r.Start(cm.internalCtx); err != nil { + cm.errChan <- err + } + }() + } +} + +// startSporadicRunnable fires off a goroutine that +// blocks on the runnable's Ready (or the shutdown context). +// +// Once ready, call a version of start runnable that blocks +// until the runnable is terminated. +// +// Once the runnable stops, loop back and wait for ready again. +func (cm *controllerManager) startSporadicRunnable(sr SporadicRunnable) { go func() { - fmt.Printf("starting runnable, %v", r) - defer cm.waitForRunnable.Done() - if err := r.Start(cm.internalCtx); err != nil { - cm.errChan <- err + fmt.Println("mgr got an sr") + for { + fmt.Println("mgr waiting on sr ReadyToStart") + select { + case <-cm.internalCtx.Done(): + fmt.Println("mgr internal context fired") + return + case <-sr.Ready(cm.internalCtx): + fmt.Println("mgr ready, starting the runnable") + // this doesn't block + cm.waitForRunnable.Add(1) + defer cm.waitForRunnable.Done() + fmt.Printf("starting sporadic runnable, %v\n", sr) + if err := sr.Start(cm.internalCtx); err != nil { + cm.errChan <- err + } + fmt.Println("mgr runnable done running") + return + } + fmt.Println("mgr done running, looping back to wait on ready") } }() } -// like startRunnable, but blocking -// TODO: is there a better way to do this? -func (cm *controllerManager) startBlockingRunnable(r Runnable) { - cm.waitForRunnable.Add(1) - defer cm.waitForRunnable.Done() - fmt.Printf("starting sporadic runnable, %v\n", r) - if err := r.Start(cm.internalCtx); err != nil { - cm.errChan <- err - } -} +//// like startRunnable, but blocking +//// TODO: is there a better way to do this? +//func (cm *controllerManager) startBlockingRunnable(r Runnable) { +// cm.waitForRunnable.Add(1) +// defer cm.waitForRunnable.Done() +// fmt.Printf("starting sporadic runnable, %v\n", r) +// if err := r.Start(cm.internalCtx); err != nil { +// cm.errChan <- err +// } +//} From 393e6e9a53bbc87121e76b444354ddc500810ece Mon Sep 17 00:00:00 2001 From: Kevin Delgado Date: Fri, 30 Apr 2021 21:35:19 +0000 Subject: [PATCH 17/17] Use informer stopped rather than discovery check tests leak goroutines --- pkg/cache/cache.go | 2 + pkg/cache/informer_cache.go | 20 ++++++-- pkg/cache/internal/deleg_map.go | 12 ++--- pkg/cache/internal/informers_map.go | 46 +++++++++++++---- pkg/cache/multi_namespace_cache.go | 15 ++++++ pkg/controller/controller_integration_test.go | 2 +- pkg/controller/controller_test.go | 1 + pkg/internal/controller/controller.go | 1 + pkg/source/source.go | 51 +++++++------------ 9 files changed, 96 insertions(+), 54 deletions(-) diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index 71dfbd0454..b89ca4e6ea 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -54,6 +54,8 @@ type Informers interface { // API kind and resource. GetInformer(ctx context.Context, obj client.Object) (Informer, error) + GetStoppableInformer(ctx context.Context, obj client.Object) (Informer, <-chan struct{}, error) + // GetInformerForKind is similar to GetInformer, except that it takes a group-version-kind, instead // of the underlying object. GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind) (Informer, error) diff --git a/pkg/cache/informer_cache.go b/pkg/cache/informer_cache.go index 8ec3b921d9..f435a93636 100644 --- a/pkg/cache/informer_cache.go +++ b/pkg/cache/informer_cache.go @@ -57,7 +57,7 @@ func (ip *informerCache) Get(ctx context.Context, key client.ObjectKey, out clie return err } - started, cache, err := ip.InformersMap.Get(ctx, gvk, out) + started, cache, err := ip.InformersMap.Get(ctx, gvk, out, false) if err != nil { return err } @@ -76,7 +76,7 @@ func (ip *informerCache) List(ctx context.Context, out client.ObjectList, opts . return err } - started, cache, err := ip.InformersMap.Get(ctx, *gvk, cacheTypeObj) + started, cache, err := ip.InformersMap.Get(ctx, *gvk, cacheTypeObj, false) if err != nil { return err } @@ -138,7 +138,7 @@ func (ip *informerCache) GetInformerForKind(ctx context.Context, gvk schema.Grou return nil, err } - _, i, err := ip.InformersMap.Get(ctx, gvk, obj) + _, i, err := ip.InformersMap.Get(ctx, gvk, obj, false) if err != nil { return nil, err } @@ -152,13 +152,25 @@ func (ip *informerCache) GetInformer(ctx context.Context, obj client.Object) (In return nil, err } - _, i, err := ip.InformersMap.Get(ctx, gvk, obj) + _, i, err := ip.InformersMap.Get(ctx, gvk, obj, false) if err != nil { return nil, err } return i.Informer, err } +func (ip *informerCache) GetStoppableInformer(ctx context.Context, obj client.Object) (Informer, <-chan struct{}, error) { + gvk, err := apiutil.GVKForObject(obj, ip.Scheme) + if err != nil { + return nil, nil, err + } + _, i, err := ip.InformersMap.Get(ctx, gvk, obj, true) + if err != nil { + return nil, nil, err + } + return i.Informer, i.StopCh, err +} + // NeedLeaderElection implements the LeaderElectionRunnable interface // to indicate that this can be started without requiring the leader lock func (ip *informerCache) NeedLeaderElection() bool { diff --git a/pkg/cache/internal/deleg_map.go b/pkg/cache/internal/deleg_map.go index 02bb1919f7..811f3be023 100644 --- a/pkg/cache/internal/deleg_map.go +++ b/pkg/cache/internal/deleg_map.go @@ -89,18 +89,18 @@ func (m *InformersMap) WaitForCacheSync(ctx context.Context) bool { // Get will create a new Informer and add it to the map of InformersMap if none exists. Returns // the Informer from the map. -func (m *InformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object) (bool, *MapEntry, error) { +func (m *InformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object, stopOnError bool) (bool, *MapEntry, error) { switch obj.(type) { case *unstructured.Unstructured: - return m.unstructured.Get(ctx, gvk, obj) + return m.unstructured.Get(ctx, gvk, obj, stopOnError) case *unstructured.UnstructuredList: - return m.unstructured.Get(ctx, gvk, obj) + return m.unstructured.Get(ctx, gvk, obj, stopOnError) case *metav1.PartialObjectMetadata: - return m.metadata.Get(ctx, gvk, obj) + return m.metadata.Get(ctx, gvk, obj, stopOnError) case *metav1.PartialObjectMetadataList: - return m.metadata.Get(ctx, gvk, obj) + return m.metadata.Get(ctx, gvk, obj, stopOnError) default: - return m.structured.Get(ctx, gvk, obj) + return m.structured.Get(ctx, gvk, obj, stopOnError) } } diff --git a/pkg/cache/internal/informers_map.go b/pkg/cache/internal/informers_map.go index 7ec81504e2..98b443d5e5 100644 --- a/pkg/cache/internal/informers_map.go +++ b/pkg/cache/internal/informers_map.go @@ -71,6 +71,10 @@ type MapEntry struct { // CacheReader wraps Informer and implements the CacheReader interface for a single type Reader CacheReader + + // informerDone is a channel that is closed after + // the informer stops + StopCh <-chan struct{} } // specificInformersMap create and caches Informers for (runtime.Object, schema.GroupVersionKind) pairs. @@ -166,7 +170,7 @@ func (ip *specificInformersMap) HasSyncedFuncs() []cache.InformerSynced { // Get will create a new Informer and add it to the map of specificInformersMap if none exists. Returns // the Informer from the map. -func (ip *specificInformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object) (bool, *MapEntry, error) { +func (ip *specificInformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object, stopOnError bool) (bool, *MapEntry, error) { fmt.Println("inf Get") // Return the informer if it is found i, started, ok := func() (*MapEntry, bool, bool) { @@ -178,7 +182,7 @@ func (ip *specificInformersMap) Get(ctx context.Context, gvk schema.GroupVersion if !ok { var err error - if i, started, err = ip.addInformerToMap(ctx, gvk, obj); err != nil { + if i, started, err = ip.addInformerToMap(gvk, obj, stopOnError); err != nil { return started, nil, err } } @@ -193,7 +197,7 @@ func (ip *specificInformersMap) Get(ctx context.Context, gvk schema.GroupVersion return started, i, nil } -func (ip *specificInformersMap) addInformerToMap(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object) (*MapEntry, bool, error) { +func (ip *specificInformersMap) addInformerToMap(gvk schema.GroupVersionKind, obj runtime.Object, stopOnError bool) (*MapEntry, bool, error) { fmt.Println("inf addInf") ip.mu.Lock() defer ip.mu.Unlock() @@ -224,29 +228,49 @@ func (ip *specificInformersMap) addInformerToMap(ctx context.Context, gvk schema return nil, false, err } fmt.Println("inf RM success") + informerStop := make(chan struct{}) i := &MapEntry{ Informer: ni, Reader: CacheReader{indexer: ni.GetIndexer(), groupVersionKind: gvk, scopeName: rm.Scope.Name()}, + StopCh: informerStop, } ip.informersByGVK[gvk] = i + go func() { + <-ip.stop + close(informerStop) + }() + + i.Informer.SetWatchErrorHandler(func(r *cache.Reflector, err error) { + ip.mu.RLock() + defer ip.mu.RUnlock() + fmt.Printf("inf handler err = %+v\n", err) + // TODO: check the error for not found + if stopOnError { + fmt.Println("inf stopping") + close(informerStop) + } + + }) + // Start the Informer if need by // TODO(seans): write thorough tests and document what happens here - can you add indexers? // can you add eventhandlers? + // TODO: not cancelling? (leak) - runCtx, cancel := context.WithCancel(ctx) - go func() { - <-ip.stop - fmt.Println("inf ip stopped") - cancel() - }() + //runCtx, cancel := context.WithCancel(ctx) + //go func() { + // <-ip.stop + // fmt.Println("inf ip stopped") + // cancel() + //}() if ip.started { fmt.Println("inf Run") + //go i.Informer.Run(informerStop) go func() { - i.Informer.Run(runCtx.Done()) + i.Informer.Run(informerStop) fmt.Println("informer done running, remove from map") delete(ip.informersByGVK, gvk) - }() } fmt.Println("inf addInformer returning") diff --git a/pkg/cache/multi_namespace_cache.go b/pkg/cache/multi_namespace_cache.go index f0e18c09b0..1fab974638 100644 --- a/pkg/cache/multi_namespace_cache.go +++ b/pkg/cache/multi_namespace_cache.go @@ -82,6 +82,21 @@ func (c *multiNamespaceCache) GetInformer(ctx context.Context, obj client.Object return &multiNamespaceInformer{namespaceToInformer: informers}, nil } +// TODO +func (c *multiNamespaceCache) GetStoppableInformer(ctx context.Context, obj client.Object) (Informer, <-chan struct{}, error) { + panic("not implemented") + //informers := map[string]Informer{} + //for ns, cache := range c.namespaceToCache { + + // informer, _, err := cache.GetStoppableInformer(ctx, obj) + // if err != nil { + // return nil, nil, err + // } + // informers[ns] = informer + //} + //return &multiNamespaceInformer{namespaceToInformer: informers}, nil, nil +} + func (c *multiNamespaceCache) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind) (Informer, error) { informers := map[string]Informer{} for ns, cache := range c.namespaceToCache { diff --git a/pkg/controller/controller_integration_test.go b/pkg/controller/controller_integration_test.go index 6f2e51f94d..629d916f17 100644 --- a/pkg/controller/controller_integration_test.go +++ b/pkg/controller/controller_integration_test.go @@ -186,7 +186,7 @@ var _ = Describe("controller", func() { }, 5) }) - FIt("should reconcile when the CRD is installed, uninstalled, reinstalled", func(done Done) { + It("should reconcile when the CRD is installed, uninstalled, reinstalled", func(done Done) { By("Initializing the scheme and crd") s := runtime.NewScheme() err := v1beta1.AddToScheme(s) diff --git a/pkg/controller/controller_test.go b/pkg/controller/controller_test.go index 4b0a2ee914..92b11a5c32 100644 --- a/pkg/controller/controller_test.go +++ b/pkg/controller/controller_test.go @@ -90,6 +90,7 @@ var _ = Describe("controller.Controller", func() { close(done) }) + // TODO: we should dupe this for sporadic controllers It("should not leak goroutines when stopped", func() { currentGRs := goleak.IgnoreCurrent() diff --git a/pkg/internal/controller/controller.go b/pkg/internal/controller/controller.go index 3c35aa9089..3a7edb0de8 100644 --- a/pkg/internal/controller/controller.go +++ b/pkg/internal/controller/controller.go @@ -184,6 +184,7 @@ func (c *Controller) Start(ctx context.Context) error { // wrap the given context in a cancellable one so that we can // stop sporadic watches when their done signal fires ctrlCtx, ctrlCancel := context.WithCancel(ctx) + defer ctrlCancel() // Set the internal context. c.ctx = ctrlCtx diff --git a/pkg/source/source.go b/pkg/source/source.go index f3de3e44a2..79c385cae5 100644 --- a/pkg/source/source.go +++ b/pkg/source/source.go @@ -133,62 +133,41 @@ func (sk *SporadicKind) Ready(ctx context.Context, wg *sync.WaitGroup) { // StartNotifyDone starts the kind while concurrently polling discovery to confirm the CRD is still installed // It returns a signal that fires when the CRD is uninstalled, which also triggers a cacnel of the underlying informer func (sk *SporadicKind) StartNotifyDone(ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface, prct ...predicate.Predicate) (<-chan struct{}, error) { - // TODO: how do we cancel if we never fail the discovery check? (leak?) - fmt.Println("src start called") - infCtx, cancel := context.WithCancel(ctx) - go func() { - for { - if !sk.DiscoveryCheck() { - fmt.Println("src start discovery check fail, cancelling") - cancel() - return - } - // todo parameterize - fmt.Println("src start discovery check pass, spinning") - time.Sleep(5 * time.Second) - } - }() - ret := make(chan error) - go func() { - fmt.Println("src starting the underlying Kind") - err := sk.Kind.Start(infCtx, handler, queue, prct...) - fmt.Printf("err = %+v\n", err) - ret <- err - }() - err := <-ret - return infCtx.Done(), err + return sk.Kind.StartNotifyDone(ctx, handler, queue, prct...) } func (sk *SporadicKind) Start(ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface, prct ...predicate.Predicate) error { - _, err := sk.StartNotifyDone(ctx, handler, queue, prct...) - return err + return sk.Kind.Start(ctx, handler, queue, prct...) } var _ SyncingSource = &Kind{} // Start is internal and should be called only by the Controller to register an EventHandler with the Informer // to enqueue reconcile.Requests. -func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface, - prct ...predicate.Predicate) error { +func (ks *Kind) StartNotifyDone(ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface, + prct ...predicate.Predicate) (<-chan struct{}, error) { // Type should have been specified by the user. if ks.Type == nil { - return fmt.Errorf("must specify Kind.Type") + return nil, fmt.Errorf("must specify Kind.Type") } // cache should have been injected before Start was called if ks.cache == nil { - return fmt.Errorf("must call CacheInto on Kind before calling Start") + return nil, fmt.Errorf("must call CacheInto on Kind before calling Start") } // cache.GetInformer will block until its context is cancelled if the cache was already started and it can not // sync that informer (most commonly due to RBAC issues). ctx, ks.startCancel = context.WithCancel(ctx) ks.started = make(chan error) + var stopCh <-chan struct{} go func() { // Lookup the Informer from the Cache and add an EventHandler which populates the Queue fmt.Printf("ks.Type = %+v\n", ks.Type) - i, err := ks.cache.GetInformer(ctx, ks.Type) + var err error + var i cache.Informer + i, stopCh, err = ks.cache.GetStoppableInformer(ctx, ks.Type) if err != nil { fmt.Printf("kind GetInformer err = %+v\n", err) kindMatchErr := &meta.NoKindMatchError{} @@ -208,7 +187,15 @@ func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue w close(ks.started) }() - return nil + return stopCh, nil +} + +// Start is internal and should be called only by the Controller to register an EventHandler with the Informer +// to enqueue reconcile.Requests. +func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface, + prct ...predicate.Predicate) error { + _, err := ks.StartNotifyDone(ctx, handler, queue, prct...) + return err } func (ks *Kind) String() string {