diff --git a/designs/conditional-controllers.md b/designs/conditional-controllers.md new file mode 100644 index 0000000000..7212bf24bb --- /dev/null +++ b/designs/conditional-controllers.md @@ -0,0 +1,326 @@ +Controller Lifecycle Management +========================== + +## Summary + +Enable fine-grained control over the lifecycle of a controller, including the +ability to start/stop/restart controllers and their caches by exposing a way to +detect when the resource a controller is watching is uninstalled/reinstalled on +the cluster. + +## Background/Motivation + +Currently, the user does not have much controller over the lifecycle of a +controller. The user can add controllers to the manager and add informers to the +cache, but there is no way to remove either of these. + +Additionally there are restrictions that prevent a controller from running +multiple times such as the clearing the watches slice for a controller after it +has started. The effect of this is that users have no clean way of restarting controllers +after they have stopped them. + +Greater detail is given in the "Future Work / Use Cases" section, but a summary +of the motivating use-cases for the proposal is anytime a controller's watched resource +can be installed/uninstalled unexpectedly or whenever the administrator of a +controller is different from the administrator of the cluster (and thus has no +control over which resources are installed). + +## Goals + +controller-runtime should support starting/stopping/restarting controllers and +their caches based on whether or not a CRD is installed in the cluster. + +## Non-Goals + +* controller-runtime does not need to support starting/stopping/restarting controllers and +their caches on arbitrary conditions. + +* No further guarantees of multi-cluster support beyond what is already provided + by controller-runtime. + +## Proposal + +The following proposal offers a solution for controller/cache restarting by: + +1. Implementing a stop channel on the informers map `MapEntry` that can be + retrieved from the cache's `Informers` interface that fires when the informer has + stopped. +2. A new Source is created called `ConditionalSource` that is implemented by a + `ConditionalKind` that has a `Ready()` method that blocks until the kind's + type is installed on the cluster and ready to be controlled, and a + `StartNotifyDone()` that starts the underlying source with a channel that + fires when the source (and its underlying informer have stopped). +3. Controllers maintain a list of `ConditonalSource`s known as + `conditionalWatches` that can be added like regular watches via the + controllers `Watch` method. For any `ConditionalSource` that `Watch()` is + called with, it uses `Ready()` to determine when to `StartNotifyDone()` it + and uses the done channel returned by `StartNotifyDone()` to determine when + the watch has started and that it should wait on `Ready()` again. +4. The manager recognize a special type or `Runnable` known as a + `ConditionalRunnable` that has a `Ready()` method to indicate when the + underlying `Runnable` can be ran. +5. The controller builder enables users supply a boolean `conditional` option to + the builder's `For`, `Owns`, and `Watches` inputs that creates + `ConditionalRunnables` to be ran by the manager. + +### Informer Stop Channel + +The cache's `Informers` interface adds a new method that gives a stop channel +that fires when an informer is stopped: + +``` +type Informers interface { + // GetInformerStop fetches the stop channel for the informer for the given object (constructing + // the informer if necessary). This stop channel fires when the controller has stopped. + GetInformerStop(ctx context.Context, obj client.Object) (<-chan struct{}, error) + + // ... other existing methods +} +``` + +This is implemented by adding a stop channel to the informer cache's `MapEntry` +which gets populated when the informer is constructed and started in +`specificInformersMap#addInformerToMap`: + +``` +type MapEntry struct { + // StopCh is a channel that is closed after + // the informer stops + StopCh <-chan struct{} + + // ... other existing methods +} +``` + +### ConditionalSource + +A special type of Source called a `ConditionalSource` provides mechanisms for +determing when the Source can be started (`Ready()`) and when it is done +(`StartNotifyDone()`) + +``` +// ConditionalSource is a Source of events that additionally offer the ability to see when the Source is ready to be +// started as well as offering a wrapper around the Source's Start method that returns a channel the fires when an +// already started Source has stopped. +type ConditionalSource interface { + Source + + // StartNotifyDone runs the underlying Source's Start method and provides a channel that fires when + // the Source has stopped. + StartNotifyDone(context.Context, handler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) (<-chan struct{}, error) + + // Ready blocks until it is safe to call StartNotifyDone, meaning the Source's Kind's type has been + // successfully installed on the cluster and is ready to have its events watched and handled. + Ready(ctx context.Context, wg *sync.WaitGroup) +} +``` + +### ConditionalWatches (Controller) + +Controllers maintain a list of `ConditonalSource`s known as `conditionalWatches`. +``` + +type Controller struct { + // startWatches maintains a list of sources, handlers, and predicates to start when the controller is started. + startWatches []watchDescription + + // conditionalWatches maintains a list of conditional sources that provide information on when the source is ready to be started/restarted and when the source has stopped. + conditionalWatches []conditionalWatchDescription + + // ... existing fields +} +``` + +This is list is populated just like the existing startWatches method by passing +a `ConditionalSource` to the controller's `Watch()` method + +``` +func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prct ...predicate.Predicate) error { + // ... existing code + + if conditionalSource, ok := src.(source.ConditionalSource); ok && !c.Started { + c.conditionalWatches = append(c.conditionalWatches, conditionalWatchDescription{src: conditionalSource, handler: evthdler, predicates: prct}) + return nil + } + + // ... existing code +``` + +Controllers now expose a `Ready` method that is called by the controller manager +to determine when the controller can be started. +``` +func (c *Controller) Ready(ctx context.Context) <-chan struct{} { + ready := make(chan struct{}) + if len(c.conditionalWatches) == 0 { + close(ready) + return ready + } + + var wg sync.WaitGroup + for _, w := range c.conditionalWatches { + wg.Add(1) + go w.src.Ready(ctx, &wg) + } + + go func() { + defer close(ready) + wg.Wait() + }() + return ready +} +``` + +The controller's `Start()` method now runs the conditionalWatches with +`StartNotifyDone` and cancels the controller with the watch stops. + +### ConditionalRunnable (Manager) + +The manager recognizes a special type or `Runnable` known as a +`ConditionalRunnable` that has a `Ready()` method to indicate when the +underlying `Runnable` can be ran. + +``` +// ConditionalRunnable wraps Runnable with an additonal Ready method +// that returns a channel that fires when the underlying Runnable can +// be started. +type ConditionalRunnable interface { + Runnable + Ready(ctx context.Context) <-chan struct{} +} +``` + +Now when the controller starts all the conditional runnables, it runs in a loop +starting the runnable only when it is ready, looping and waiting for ready again +whenever it is stopped. + +``` +// startConditionalRunnable fires off a goroutine that +// blocks on the runnable's Ready (or the shutdown context). +// +// Once ready, call a version of start runnable that blocks +// until the runnable is terminated. +// +// Once the runnable stops, loop back and wait for ready again. +func (cm *controllerManager) startConditionalRunnable(cr ConditionalRunnable) { + go func() { + for { + select { + case <-cm.internalCtx.Done(): + return + case <-cr.Ready(cm.internalCtx): + cm.waitForRunnable.Add(1) + defer cm.waitForRunnable.Done() + if err := cr.Start(cm.internalCtx); err != nil { + cm.errChan <- err + } + return + } + } + }() +} +``` + +### Conditional Builder Option + +The controller builder enables users supply a boolean `conditional` option to +the builder's `For`, `Owns`, and `Watches` inputs that creates +`ConditionalRunnables` to be ran by the manager. + +``` +func (blder *Builder) doWatch() error { + if blder.forInput.conditional { + gvk, err := getGvk(blder.forInput.object, blder.mgr.GetScheme()) + if err != nil { + return err + } + existsInDiscovery := // function to determine whether the gvk + exists in discovery + src = &source.ConditionalKind{Kind: source.Kind{Type: typeForSrc}, DiscoveryCheck: existsInDiscovery} + } else { + src = &source.Kind{Type: typeForSrc} + } +``` + +## Alternatives + +A couple alternatives to what we propose, one that offloads all the +responsibility to the consumer requiring no changes to controller-runtime and +one that moves the event handler reference counting upstream into client-go. + +* using client go +* this previous approach + +### Cache of Caches + +One alternative is to create a separate cache per watch (i.e. cache of caches) +as the end user consuming controller-runtime. The advantage is that it prevents +us from having to modify any code in controller-runtime. The main drawback is +that it's very clunky to maintain multiple caches (one for each informer) and +breaks the clean design of the cache. + +### Stoppable Informers and EventHandler removal natively in client-go + +Currently, there is no way to stop a client-go `SharedInformer`. Therefore, the +current proposal suggests that we use the shared informer's `SetErrorHandler()` +method to determine when a CRD has been uninstalled (by inspecting listAndWatch +errors). + +A cleaner approach is to offload this work to client-go. Then we can run the +informer such that it knows to stop itself when the CRD has been uninstalled. +This has been proposed to the client-go maintainers and a WIP implementation +exists [here](https://github.com/kubernetes/kubernetes/pull/97214). + +## Open Questions + +### Discovery Client usage in DiscoveryCheck + +The current proposal stores a user provided function on the `ConditionalKind` +called a `DiscoveryCheck` that determines whether a kind exists in discovery or +not. + +This is a little clunky and calls out to the discovery client are slow. +Alternatively, the RESTMapper could theoretically be used to determine whether a +kind exists in discovery, but the current RESTMapper implementation doesn't +remove a type from the map when its resource is uninstalled from the cluster. + +### Multi-Cluster Support + +A multi-cluster environment where a single controller operates across multiple +clusters will likely be a heavy user of these new features because the +multi-cluster frequently results in a situation where the +cluster administrator is different from the controller administrator. + +We lack a consistent story around multi-cluster support and introducing changes +such as these without fully thinking through the mult-cluster story might bind +us for future designs. + + +## Future Work / Use Cases + +Were this to move forward, it unlocks a number of potential use-cases. + +1. We can support a "metacontroller" or controller of controllers that could + start and stop controllers based on the existence of their corresponding + CRDs. + +2. OPA/Gatekeeper can simplify it's dynamic watch management by having greater + controller over the lifecycle of a controller. See [this + doc](https://docs.google.com/document/d/1Wi3LM3sG6Qgfzm--bWb6R0SEKCkQCCt-ene6cO62FlM/edit) + +## Timeline of Events + +* 9/30/2020: Propose idea in design doc and proof-of-concept PR to +controller-runtime +* 10/7/2020: Design doc and proof-of-concept distilled to focus only on minimal + hooks needed rather than proposing entire conditional controller solution. + Alternatives added to discuss opportunities to push some of the implementation + to the api-machinery level. +* 10/8/2020: Discuss idea in community meeting +* 10/19/2020: Proposal updated to add EventHandler count tracking and client-go + based alternative. +* 11/4/2020: Propopsal to add RunWithStopOptions, RemoveEventHandler, and + EventHandlerCount discussed at sig-api-machinery meeting and was tentatively + accepted. See the [design doc](https://docs.google.com/document/d/17QrTaxfIEUNOEAt61Za3Mu0M76x-iEkcmR51q0a5lis/edit) and [WIP implementation](https://github.com/kevindelgado/kubernetes/pull/1) for more detail. +* 5/15/2021: Proposal updated to be fully self-contained in controller-runtime + and modifies the proposed mechanism using Ready/StartNotifyDone mechanics. + diff --git a/pkg/builder/controller.go b/pkg/builder/controller.go index 81f446d62f..4fb4f4b316 100644 --- a/pkg/builder/controller.go +++ b/pkg/builder/controller.go @@ -23,6 +23,7 @@ import ( "github.com/go-logr/logr" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/discovery" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/apiutil" @@ -72,6 +73,7 @@ type ForInput struct { predicates []predicate.Predicate objectProjection objectProjection err error + conditional bool } // For defines the type of Object being *reconciled*, and configures the ControllerManagedBy to respond to create / delete / @@ -97,6 +99,7 @@ type OwnsInput struct { object client.Object predicates []predicate.Predicate objectProjection objectProjection + conditional bool } // Owns defines types of Objects being *generated* by the ControllerManagedBy, and configures the ControllerManagedBy to respond to @@ -118,6 +121,7 @@ type WatchesInput struct { eventhandler handler.EventHandler predicates []predicate.Predicate objectProjection objectProjection + conditional bool } // Watches exposes the lower-level ControllerManagedBy Watches functions through the builder. Consider using @@ -216,13 +220,52 @@ func (blder *Builder) project(obj client.Object, proj objectProjection) (client. } } +func (blder *Builder) generateConditionalSource(typeForSrc client.Object) (source.Source, error) { + gvk, err := getGvk(blder.forInput.object, blder.mgr.GetScheme()) + if err != nil { + return nil, err + } + fmt.Printf("gvk = %+v\n", gvk) + dc, err := discovery.NewDiscoveryClientForConfig(blder.mgr.GetConfig()) + if err != nil { + return nil, err + } + existsInDiscovery := func() bool { + resources, err := dc.ServerResourcesForGroupVersion(gvk.GroupVersion().String()) + if err != nil { + fmt.Printf("NOT in discovery gvk = %+v\n", gvk) + return false + } + for _, res := range resources.APIResources { + if res.Kind == gvk.Kind { + fmt.Printf("YES in discovery gvk = %+v\n", gvk) + return true + } + } + fmt.Printf("NOT in discovery kind = %+v\n", gvk) + return false + } + return &source.ConditionalKind{Kind: source.Kind{Type: typeForSrc}, DiscoveryCheck: existsInDiscovery}, nil + +} + func (blder *Builder) doWatch() error { // Reconcile type typeForSrc, err := blder.project(blder.forInput.object, blder.forInput.objectProjection) if err != nil { return err } - src := &source.Kind{Type: typeForSrc} + + var src source.Source + if blder.forInput.conditional { + var err error + src, err = blder.generateConditionalSource(typeForSrc) + if err != nil { + return err + } + } else { + src = &source.Kind{Type: typeForSrc} + } hdler := &handler.EnqueueRequestForObject{} allPredicates := append(blder.globalPredicates, blder.forInput.predicates...) if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil { @@ -235,7 +278,17 @@ func (blder *Builder) doWatch() error { if err != nil { return err } - src := &source.Kind{Type: typeForSrc} + + var src source.Source + if own.conditional { + var err error + src, err = blder.generateConditionalSource(typeForSrc) + if err != nil { + return err + } + } else { + src = &source.Kind{Type: typeForSrc} + } hdler := &handler.EnqueueRequestForOwner{ OwnerType: blder.forInput.object, IsController: true, diff --git a/pkg/builder/options.go b/pkg/builder/options.go index 7bb4273094..b401e7a696 100644 --- a/pkg/builder/options.go +++ b/pkg/builder/options.go @@ -99,6 +99,12 @@ func (p projectAs) ApplyToWatches(opts *WatchesInput) { opts.objectProjection = objectProjection(p) } +type Conditional struct{} + +func (s Conditional) ApplyToFor(opts *ForInput) { + opts.conditional = true +} + var ( // OnlyMetadata tells the controller to *only* cache metadata, and to watch // the the API server in metadata-only form. This is useful when watching diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index 71dfbd0454..4de2509a3c 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -54,6 +54,10 @@ type Informers interface { // API kind and resource. GetInformer(ctx context.Context, obj client.Object) (Informer, error) + // GetInformerStop fetches the stop channel for the informer for the given object (constructing + // the informer if necessary). This stop channel fires when the controller has stopped. + GetInformerStop(ctx context.Context, obj client.Object) (<-chan struct{}, error) + // GetInformerForKind is similar to GetInformer, except that it takes a group-version-kind, instead // of the underlying object. GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind) (Informer, error) diff --git a/pkg/cache/informer_cache.go b/pkg/cache/informer_cache.go index 8ec3b921d9..ccb19d3685 100644 --- a/pkg/cache/informer_cache.go +++ b/pkg/cache/informer_cache.go @@ -57,7 +57,7 @@ func (ip *informerCache) Get(ctx context.Context, key client.ObjectKey, out clie return err } - started, cache, err := ip.InformersMap.Get(ctx, gvk, out) + started, cache, err := ip.InformersMap.Get(ctx, gvk, out, false) if err != nil { return err } @@ -76,7 +76,7 @@ func (ip *informerCache) List(ctx context.Context, out client.ObjectList, opts . return err } - started, cache, err := ip.InformersMap.Get(ctx, *gvk, cacheTypeObj) + started, cache, err := ip.InformersMap.Get(ctx, *gvk, cacheTypeObj, false) if err != nil { return err } @@ -138,7 +138,7 @@ func (ip *informerCache) GetInformerForKind(ctx context.Context, gvk schema.Grou return nil, err } - _, i, err := ip.InformersMap.Get(ctx, gvk, obj) + _, i, err := ip.InformersMap.Get(ctx, gvk, obj, false) if err != nil { return nil, err } @@ -152,13 +152,38 @@ func (ip *informerCache) GetInformer(ctx context.Context, obj client.Object) (In return nil, err } - _, i, err := ip.InformersMap.Get(ctx, gvk, obj) + _, i, err := ip.InformersMap.Get(ctx, gvk, obj, false) if err != nil { return nil, err } return i.Informer, err } +//func (ip *informerCache) GetStoppableInformer(ctx context.Context, obj client.Object) (Informer, <-chan struct{}, error) { +// gvk, err := apiutil.GVKForObject(obj, ip.Scheme) +// if err != nil { +// return nil, nil, err +// } +// _, i, err := ip.InformersMap.Get(ctx, gvk, obj, true) +// if err != nil { +// return nil, nil, err +// } +// return i.Informer, i.StopCh, err +//} + +func (ip *informerCache) GetInformerStop(ctx context.Context, obj client.Object) (<-chan struct{}, error) { + gvk, err := apiutil.GVKForObject(obj, ip.Scheme) + if err != nil { + return nil, err + } + _, i, err := ip.InformersMap.Get(ctx, gvk, obj, true) + if err != nil { + return nil, err + } + return i.StopCh, err + +} + // NeedLeaderElection implements the LeaderElectionRunnable interface // to indicate that this can be started without requiring the leader lock func (ip *informerCache) NeedLeaderElection() bool { diff --git a/pkg/cache/informertest/fake_cache.go b/pkg/cache/informertest/fake_cache.go index eb78e0bb65..1491501b03 100644 --- a/pkg/cache/informertest/fake_cache.go +++ b/pkg/cache/informertest/fake_cache.go @@ -79,6 +79,12 @@ func (c *FakeInformers) GetInformer(ctx context.Context, obj client.Object) (cac return c.informerFor(gvk, obj) } +// GetStoppableInformer implements informers +func (c *FakeInformers) GetInformerStop(ctx context.Context, obj client.Object) (<-chan struct{}, error) { + // TODO: not implemented + return nil, nil +} + // WaitForCacheSync implements Informers func (c *FakeInformers) WaitForCacheSync(ctx context.Context) bool { if c.Synced == nil { diff --git a/pkg/cache/internal/deleg_map.go b/pkg/cache/internal/deleg_map.go index 02bb1919f7..811f3be023 100644 --- a/pkg/cache/internal/deleg_map.go +++ b/pkg/cache/internal/deleg_map.go @@ -89,18 +89,18 @@ func (m *InformersMap) WaitForCacheSync(ctx context.Context) bool { // Get will create a new Informer and add it to the map of InformersMap if none exists. Returns // the Informer from the map. -func (m *InformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object) (bool, *MapEntry, error) { +func (m *InformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object, stopOnError bool) (bool, *MapEntry, error) { switch obj.(type) { case *unstructured.Unstructured: - return m.unstructured.Get(ctx, gvk, obj) + return m.unstructured.Get(ctx, gvk, obj, stopOnError) case *unstructured.UnstructuredList: - return m.unstructured.Get(ctx, gvk, obj) + return m.unstructured.Get(ctx, gvk, obj, stopOnError) case *metav1.PartialObjectMetadata: - return m.metadata.Get(ctx, gvk, obj) + return m.metadata.Get(ctx, gvk, obj, stopOnError) case *metav1.PartialObjectMetadataList: - return m.metadata.Get(ctx, gvk, obj) + return m.metadata.Get(ctx, gvk, obj, stopOnError) default: - return m.structured.Get(ctx, gvk, obj) + return m.structured.Get(ctx, gvk, obj, stopOnError) } } diff --git a/pkg/cache/internal/informers_map.go b/pkg/cache/internal/informers_map.go index 6b57c6fa61..9f640ff9a0 100644 --- a/pkg/cache/internal/informers_map.go +++ b/pkg/cache/internal/informers_map.go @@ -71,6 +71,10 @@ type MapEntry struct { // CacheReader wraps Informer and implements the CacheReader interface for a single type Reader CacheReader + + // StopCh is a channel that is closed after + // the informer stops + StopCh <-chan struct{} } // specificInformersMap create and caches Informers for (runtime.Object, schema.GroupVersionKind) pairs. @@ -166,7 +170,8 @@ func (ip *specificInformersMap) HasSyncedFuncs() []cache.InformerSynced { // Get will create a new Informer and add it to the map of specificInformersMap if none exists. Returns // the Informer from the map. -func (ip *specificInformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object) (bool, *MapEntry, error) { +func (ip *specificInformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object, stopOnError bool) (bool, *MapEntry, error) { + fmt.Println("inf Get") // Return the informer if it is found i, started, ok := func() (*MapEntry, bool, bool) { ip.mu.RLock() @@ -177,7 +182,7 @@ func (ip *specificInformersMap) Get(ctx context.Context, gvk schema.GroupVersion if !ok { var err error - if i, started, err = ip.addInformerToMap(gvk, obj); err != nil { + if i, started, err = ip.addInformerToMap(gvk, obj, stopOnError); err != nil { return started, nil, err } } @@ -192,7 +197,8 @@ func (ip *specificInformersMap) Get(ctx context.Context, gvk schema.GroupVersion return started, i, nil } -func (ip *specificInformersMap) addInformerToMap(gvk schema.GroupVersionKind, obj runtime.Object) (*MapEntry, bool, error) { +func (ip *specificInformersMap) addInformerToMap(gvk schema.GroupVersionKind, obj runtime.Object, stopOnError bool) (*MapEntry, bool, error) { + fmt.Println("inf addInf") ip.mu.Lock() defer ip.mu.Unlock() @@ -203,10 +209,13 @@ func (ip *specificInformersMap) addInformerToMap(gvk schema.GroupVersionKind, ob return i, ip.started, nil } + fmt.Println("inf createLW") + fmt.Printf("inf gvk = %+v\n", gvk) // Create a NewSharedIndexInformer and add it to the map. var lw *cache.ListWatch lw, err := ip.createListWatcher(gvk, ip) if err != nil { + fmt.Printf("inf createLW err = %+v\n", err) return nil, false, err } ni := cache.NewSharedIndexInformer(lw, obj, resyncPeriod(ip.resync)(), cache.Indexers{ @@ -214,20 +223,57 @@ func (ip *specificInformersMap) addInformerToMap(gvk schema.GroupVersionKind, ob }) rm, err := ip.mapper.RESTMapping(gvk.GroupKind(), gvk.Version) if err != nil { + fmt.Printf("RESTMapping err = %+v\n", err) + fmt.Printf("gvk = %+v\n", gvk) return nil, false, err } + fmt.Println("inf RM success") + informerStop := make(chan struct{}) i := &MapEntry{ Informer: ni, Reader: CacheReader{indexer: ni.GetIndexer(), groupVersionKind: gvk, scopeName: rm.Scope.Name()}, + StopCh: informerStop, } ip.informersByGVK[gvk] = i + go func() { + <-ip.stop + close(informerStop) + }() + + i.Informer.SetWatchErrorHandler(func(r *cache.Reflector, err error) { + ip.mu.RLock() + defer ip.mu.RUnlock() + fmt.Printf("inf handler err = %+v\n", err) + // TODO: check the error for not found + if stopOnError { + fmt.Println("inf stopping") + close(informerStop) + } + + }) + // Start the Informer if need by // TODO(seans): write thorough tests and document what happens here - can you add indexers? // can you add eventhandlers? + + // TODO: not cancelling? (leak) + //runCtx, cancel := context.WithCancel(ctx) + //go func() { + // <-ip.stop + // fmt.Println("inf ip stopped") + // cancel() + //}() if ip.started { - go i.Informer.Run(ip.stop) + fmt.Println("inf Run") + //go i.Informer.Run(informerStop) + go func() { + i.Informer.Run(informerStop) + fmt.Println("informer done running, remove from map") + delete(ip.informersByGVK, gvk) + }() } + fmt.Println("inf addInformer returning") return i, ip.started, nil } diff --git a/pkg/cache/multi_namespace_cache.go b/pkg/cache/multi_namespace_cache.go index f0e18c09b0..5f8fd8aac2 100644 --- a/pkg/cache/multi_namespace_cache.go +++ b/pkg/cache/multi_namespace_cache.go @@ -19,6 +19,7 @@ package cache import ( "context" "fmt" + "sync" "time" corev1 "k8s.io/api/core/v1" @@ -82,6 +83,30 @@ func (c *multiNamespaceCache) GetInformer(ctx context.Context, obj client.Object return &multiNamespaceInformer{namespaceToInformer: informers}, nil } +// TODO +func (c *multiNamespaceCache) GetInformerStop(ctx context.Context, obj client.Object) (<-chan struct{}, error) { + multiStopCh := make(chan struct{}) + var wg sync.WaitGroup + for _, cache := range c.namespaceToCache { + stopCh, err := cache.GetInformerStop(ctx, obj) + if err != nil { + return nil, err + } + wg.Add(1) + go func(stopCh <-chan struct{}) { + defer wg.Done() + <-stopCh + + }(stopCh) + } + + go func() { + defer close(multiStopCh) + wg.Done() + }() + return multiStopCh, nil +} + func (c *multiNamespaceCache) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind) (Informer, error) { informers := map[string]Informer{} for ns, cache := range c.namespaceToCache { diff --git a/pkg/controller/controller_integration_test.go b/pkg/controller/controller_integration_test.go index 762b3d9fbb..8ded02c916 100644 --- a/pkg/controller/controller_integration_test.go +++ b/pkg/controller/controller_integration_test.go @@ -18,15 +18,27 @@ package controller_test import ( "context" + goerrors "errors" + "path/filepath" + "time" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" "sigs.k8s.io/controller-runtime/pkg/cache" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllertest" + foo "sigs.k8s.io/controller-runtime/pkg/controller/testdata/foo/v1" + "sigs.k8s.io/controller-runtime/pkg/envtest" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" @@ -173,6 +185,198 @@ var _ = Describe("controller", func() { close(done) }, 5) }) + + It("should reconcile when the CRD is installed, uninstalled, reinstalled", func(done Done) { + By("Initializing the scheme and crd") + s := runtime.NewScheme() + err := v1beta1.AddToScheme(s) + Expect(err).NotTo(HaveOccurred()) + err = apiextensionsv1.AddToScheme(s) + Expect(err).NotTo(HaveOccurred()) + err = foo.AddToScheme(s) + Expect(err).NotTo(HaveOccurred()) + options := manager.Options{Scheme: s} + + By("Creating the Manager") + cm, err := manager.New(cfg, options) + Expect(err).NotTo(HaveOccurred()) + + By("Creating the Controller") + instance, err := controller.New("foo-controller", cm, controller.Options{ + Reconciler: reconcile.Func( + func(_ context.Context, request reconcile.Request) (reconcile.Result, error) { + reconciled <- request + return reconcile.Result{}, nil + }), + }) + Expect(err).NotTo(HaveOccurred()) + + By("Watching foo CRD as conditional kinds") + f := &foo.Foo{} + gvk := schema.GroupVersionKind{ + Group: "bar.example.com", + Version: "v1", + Kind: "Foo", + } + Expect(err).NotTo(HaveOccurred()) + existsInDiscovery := func() bool { + resources, err := clientset.Discovery().ServerResourcesForGroupVersion(gvk.GroupVersion().String()) + if err != nil { + return false + } + for _, res := range resources.APIResources { + if res.Kind == gvk.Kind { + return true + } + } + return false + } + err = instance.Watch(&source.ConditionalKind{Kind: source.Kind{Type: f}, DiscoveryCheck: existsInDiscovery}, &handler.EnqueueRequestForObject{}) + Expect(err).NotTo(HaveOccurred()) + + By("Starting the Manager") + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { + defer GinkgoRecover() + Expect(cm.Start(ctx)).NotTo(HaveOccurred()) + }() + + testFoo := &foo.Foo{ + TypeMeta: metav1.TypeMeta{Kind: gvk.Kind, APIVersion: gvk.GroupVersion().String()}, + ObjectMeta: metav1.ObjectMeta{Name: "test-foo", Namespace: "default"}, + } + + expectedReconcileRequest := reconcile.Request{NamespacedName: types.NamespacedName{ + Name: "test-foo", + Namespace: "default", + }} + _ = expectedReconcileRequest + + By("Failing to create a foo object if the crd isn't installed") + kindMatchErr := &meta.NoKindMatchError{} + err = cm.GetClient().Create(ctx, testFoo) + Expect(goerrors.As(err, &kindMatchErr)).To(BeTrue()) + + By("Installing the CRD") + crdPath := filepath.Join(".", "testdata", "foo", "foocrd.yaml") + crdOpts := envtest.CRDInstallOptions{ + Paths: []string{crdPath}, + MaxTime: 50 * time.Millisecond, + PollInterval: 15 * time.Millisecond, + } + crds, err := envtest.InstallCRDs(cfg, crdOpts) + Expect(err).NotTo(HaveOccurred()) + Expect(len(crds)).To(Equal(1)) + + By("Expecting to find the CRD") + crdv1 := &apiextensionsv1.CustomResourceDefinition{} + err = cm.GetClient().Get(context.TODO(), types.NamespacedName{Name: "foos.bar.example.com"}, crdv1) + Expect(err).NotTo(HaveOccurred()) + Expect(crdv1.Spec.Names.Kind).To(Equal("Foo")) + + err = envtest.WaitForCRDs(cfg, []client.Object{ + &v1beta1.CustomResourceDefinition{ + Spec: v1beta1.CustomResourceDefinitionSpec{ + Group: "bar.example.com", + Names: v1beta1.CustomResourceDefinitionNames{ + Kind: "Foo", + Plural: "foos", + }, + Versions: []v1beta1.CustomResourceDefinitionVersion{ + { + Name: "v1", + Storage: true, + Served: true, + }, + }}, + }, + }, + crdOpts, + ) + Expect(err).NotTo(HaveOccurred()) + + By("Invoking Reconcile for foo Create") + err = cm.GetClient().Create(ctx, testFoo) + Expect(err).NotTo(HaveOccurred()) + Expect(<-reconciled).To(Equal(expectedReconcileRequest)) + + By("Uninstalling the CRD") + err = envtest.UninstallCRDs(cfg, crdOpts) + Expect(err).NotTo(HaveOccurred()) + // wait for discovery to not recognize the resource after uninstall + wait.PollImmediate(15*time.Millisecond, 50*time.Millisecond, func() (bool, error) { + if _, err := clientset.Discovery().ServerResourcesForGroupVersion(gvk.Group + "/" + gvk.Version); err != nil { + if err.Error() == "the server could not find the requested resource" { + return true, nil + } + } + return false, nil + }) + + By("Failing create foo object if the crd isn't installed") + errNotFound := errors.NewGenericServerResponse(404, "POST", schema.GroupResource{Group: "bar.example.com", Resource: "foos"}, "", "404 page not found", 0, true) + err = cm.GetClient().Create(ctx, testFoo) + Expect(err).To(Equal(errNotFound)) + + By("Reinstalling the CRD") + crds, err = envtest.InstallCRDs(cfg, crdOpts) + Expect(err).NotTo(HaveOccurred()) + Expect(len(crds)).To(Equal(1)) + + By("Expecting to find the CRD") + crdv1 = &apiextensionsv1.CustomResourceDefinition{} + err = cm.GetClient().Get(context.TODO(), types.NamespacedName{Name: "foos.bar.example.com"}, crdv1) + Expect(err).NotTo(HaveOccurred()) + Expect(crdv1.Spec.Names.Kind).To(Equal("Foo")) + + err = envtest.WaitForCRDs(cfg, []client.Object{ + &v1beta1.CustomResourceDefinition{ + Spec: v1beta1.CustomResourceDefinitionSpec{ + Group: "bar.example.com", + Names: v1beta1.CustomResourceDefinitionNames{ + Kind: "Foo", + Plural: "foos", + }, + Versions: []v1beta1.CustomResourceDefinitionVersion{ + { + Name: "v1", + Storage: true, + Served: true, + }, + }}, + }, + }, + crdOpts, + ) + Expect(err).NotTo(HaveOccurred()) + + By("Invoking Reconcile for foo Create") + testFoo.ResourceVersion = "" + err = cm.GetClient().Create(ctx, testFoo) + Expect(err).NotTo(HaveOccurred()) + Expect(<-reconciled).To(Equal(expectedReconcileRequest)) + + By("Uninstalling the CRD") + err = envtest.UninstallCRDs(cfg, crdOpts) + Expect(err).NotTo(HaveOccurred()) + // wait for discovery to not recognize the resource after uninstall + wait.PollImmediate(15*time.Millisecond, 50*time.Millisecond, func() (bool, error) { + if _, err := clientset.Discovery().ServerResourcesForGroupVersion(gvk.Group + "/" + gvk.Version); err != nil { + if err.Error() == "the server could not find the requested resource" { + return true, nil + } + } + return false, nil + }) + + By("Failing create foo object if the crd isn't installed") + err = cm.GetClient().Create(ctx, testFoo) + Expect(err).To(Equal(errNotFound)) + + close(done) + }, 10) + }) func truePtr() *bool { diff --git a/pkg/controller/controller_test.go b/pkg/controller/controller_test.go index 4b0a2ee914..92b11a5c32 100644 --- a/pkg/controller/controller_test.go +++ b/pkg/controller/controller_test.go @@ -90,6 +90,7 @@ var _ = Describe("controller.Controller", func() { close(done) }) + // TODO: we should dupe this for sporadic controllers It("should not leak goroutines when stopped", func() { currentGRs := goleak.IgnoreCurrent() diff --git a/pkg/controller/testdata/foo/foo_types.go b/pkg/controller/testdata/foo/foo_types.go new file mode 100644 index 0000000000..4cd289275e --- /dev/null +++ b/pkg/controller/testdata/foo/foo_types.go @@ -0,0 +1,139 @@ +package foo + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "sigs.k8s.io/controller-runtime/pkg/scheme" +) + +type FooSpec struct{} +type FooStatus struct{} + +// +kubebuilder:object:root=true + +type Foo struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty"` + + Spec FooSpec `json:"spec,omitempty"` + Status FooStatus `json:"status,omitempty"` +} + +// +kubebuilder:object:root=true + +type FooList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata,omitempty"` + Items []Foo `json:"items"` +} + +func init() { + SchemeBuilder.Register(&Foo{}, &FooList{}) +} + +var ( + GroupVersion = schema.GroupVersion{Group: "bar.example.com", Version: "v1"} + SchemeBuilder = &scheme.Builder{GroupVersion: GroupVersion} + AddToScheme = SchemeBuilder.AddToScheme +) + +// TODO: autogen all this stuff (learn how to) and split into v1 directory +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Foo) DeepCopyInto(out *Foo) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + out.Spec = in.Spec + out.Status = in.Status +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Foo. +func (in *Foo) DeepCopy() *Foo { + if in == nil { + return nil + } + out := new(Foo) + in.DeepCopyInto(out) + return out +} + +func (in *Foo) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil + +} + +func (f *Foo) SetGroupVersionKind(kind schema.GroupVersionKind) {} +func (f *Foo) GroupVersionKind() schema.GroupVersionKind { + return schema.GroupVersionKind{ + Group: "bar.example.com", + Version: "v1", + Kind: "Foo", + } + +} + +func (f *Foo) GetObjectKind() schema.ObjectKind { + return f + +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *FooList) DeepCopyInto(out *FooList) { + *out = *in + out.TypeMeta = in.TypeMeta + out.ListMeta = in.ListMeta + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]Foo, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +func (in *FooList) DeepCopy() *FooList { + if in == nil { + return nil + } + out := new(FooList) + in.DeepCopyInto(out) + return out +} + +func (in *FooList) DeepCopyObject() runtime.Object { + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *FooSpec) DeepCopyInto(out *FooSpec) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new FooSpec. +func (in *FooSpec) DeepCopy() *FooSpec { + if in == nil { + return nil + } + out := new(FooSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *FooStatus) DeepCopyInto(out *FooStatus) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new FooStatus. +func (in *FooStatus) DeepCopy() *FooStatus { + if in == nil { + return nil + } + out := new(FooStatus) + in.DeepCopyInto(out) + return out +} diff --git a/pkg/controller/testdata/foo/foocrd.yaml b/pkg/controller/testdata/foo/foocrd.yaml new file mode 100644 index 0000000000..e2bddbc528 --- /dev/null +++ b/pkg/controller/testdata/foo/foocrd.yaml @@ -0,0 +1,17 @@ +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + name: foos.bar.example.com +spec: + group: bar.example.com + names: + kind: Foo + plural: foos + scope: Namespaced + versions: + - name: "v1" + storage: true + served: true + schema: + openAPIV3Schema: + type: object diff --git a/pkg/controller/testdata/foo/v1/foo_types.go b/pkg/controller/testdata/foo/v1/foo_types.go new file mode 100644 index 0000000000..380b7ce8c4 --- /dev/null +++ b/pkg/controller/testdata/foo/v1/foo_types.go @@ -0,0 +1,60 @@ +/* + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1 + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN! +// NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized. + +// FooSpec defines the desired state of Foo +type FooSpec struct { + // INSERT ADDITIONAL SPEC FIELDS - desired state of cluster + // Important: Run "make" to regenerate code after modifying this file + RunAt string `json:"runAt"` +} + +// FooStatus defines the observed state of Foo +type FooStatus struct { + // INSERT ADDITIONAL STATUS FIELD - define observed state of cluster + // Important: Run "make" to regenerate code after modifying this file +} + +// +kubebuilder:object:root=true + +// Foo is the Schema for the externaljobs API +type Foo struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty"` + + Spec FooSpec `json:"spec,omitempty"` + Status FooStatus `json:"status,omitempty"` +} + +// +kubebuilder:object:root=true + +// FooList contains a list of Foo +type FooList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata,omitempty"` + Items []Foo `json:"items"` +} + +func init() { + SchemeBuilder.Register(&Foo{}, &FooList{}) +} diff --git a/pkg/controller/testdata/foo/v1/groupversion_info.go b/pkg/controller/testdata/foo/v1/groupversion_info.go new file mode 100644 index 0000000000..a641b61024 --- /dev/null +++ b/pkg/controller/testdata/foo/v1/groupversion_info.go @@ -0,0 +1,38 @@ +/* +Copyright 2018 The Kubernetes authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// +kubebuilder:object:generate=true +// +groupName=chaosapps.metamagical.io +package v1 + +import ( + "k8s.io/apimachinery/pkg/runtime/schema" + logf "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/scheme" +) + +var ( + log = logf.Log.WithName("foo-resource") + + // SchemeGroupVersion is group version used to register these objects + SchemeGroupVersion = schema.GroupVersion{Group: "bar.example.com", Version: "v1"} + + // SchemeBuilder is used to add go types to the GroupVersionKind scheme + SchemeBuilder = &scheme.Builder{GroupVersion: SchemeGroupVersion} + + // AddToScheme is required by pkg/client/... + AddToScheme = SchemeBuilder.AddToScheme +) diff --git a/pkg/controller/testdata/foo/v1/zz_generated.deepcopy.go b/pkg/controller/testdata/foo/v1/zz_generated.deepcopy.go new file mode 100644 index 0000000000..01a5a9e34f --- /dev/null +++ b/pkg/controller/testdata/foo/v1/zz_generated.deepcopy.go @@ -0,0 +1,98 @@ +// +build !ignore_autogenerated + +// Code generated by controller-gen. DO NOT EDIT. + +package v1 + +import ( + runtime "k8s.io/apimachinery/pkg/runtime" +) + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Foo) DeepCopyInto(out *Foo) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + out.Spec = in.Spec + out.Status = in.Status +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Foo. +func (in *Foo) DeepCopy() *Foo { + if in == nil { + return nil + } + out := new(Foo) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *Foo) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *FooList) DeepCopyInto(out *FooList) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ListMeta.DeepCopyInto(&out.ListMeta) + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]Foo, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new FooList. +func (in *FooList) DeepCopy() *FooList { + if in == nil { + return nil + } + out := new(FooList) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *FooList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *FooSpec) DeepCopyInto(out *FooSpec) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new FooSpec. +func (in *FooSpec) DeepCopy() *FooSpec { + if in == nil { + return nil + } + out := new(FooSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *FooStatus) DeepCopyInto(out *FooStatus) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new FooStatus. +func (in *FooStatus) DeepCopy() *FooStatus { + if in == nil { + return nil + } + out := new(FooStatus) + in.DeepCopyInto(out) + return out +} diff --git a/pkg/internal/controller/controller.go b/pkg/internal/controller/controller.go index f5024502d9..539ebe94b3 100644 --- a/pkg/internal/controller/controller.go +++ b/pkg/internal/controller/controller.go @@ -83,6 +83,9 @@ type Controller struct { // startWatches maintains a list of sources, handlers, and predicates to start when the controller is started. startWatches []watchDescription + // conditionalWatches maintains a list of conditional sources that provide information on when the source is ready to be started/restarted and when the source has stopped. + conditionalWatches []conditionalWatchDescription + // Log is used to log messages to users during reconciliation, or for example when a watch is started. Log logr.Logger } @@ -94,6 +97,12 @@ type watchDescription struct { predicates []predicate.Predicate } +type conditionalWatchDescription struct { + src source.ConditionalSource + handler handler.EventHandler + predicates []predicate.Predicate +} + // Reconcile implements reconcile.Reconciler func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { log := c.Log.WithValues("name", req.Name, "namespace", req.Namespace) @@ -119,6 +128,12 @@ func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prc } } + // These get held on indefinitely + if conditionalSource, ok := src.(source.ConditionalSource); ok && !c.Started { + c.conditionalWatches = append(c.conditionalWatches, conditionalWatchDescription{src: conditionalSource, handler: evthdler, predicates: prct}) + return nil + } + // Controller hasn't started yet, store the watches locally and return. // // These watches are going to be held on the controller struct until the manager or user calls Start(...). @@ -131,6 +146,30 @@ func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prc return src.Start(c.ctx, evthdler, c.Queue, prct...) } +// Ready is called by the controller manager to determine when all the conditionalWatches +// can be started. It blocks until ready. +func (c *Controller) Ready(ctx context.Context) <-chan struct{} { + fmt.Printf("ctrl ReadyToStart len(c.conditionalWatches) = %+v\n", len(c.conditionalWatches)) + ready := make(chan struct{}) + if len(c.conditionalWatches) == 0 { + close(ready) + return ready + } + + var wg sync.WaitGroup + for _, w := range c.conditionalWatches { + wg.Add(1) + fmt.Println("ctrl checking src ready") + go w.src.Ready(ctx, &wg) + } + + go func() { + defer close(ready) + wg.Wait() + }() + return ready +} + // Start implements controller.Controller func (c *Controller) Start(ctx context.Context) error { // use an IIFE to get proper lock handling @@ -142,12 +181,17 @@ func (c *Controller) Start(ctx context.Context) error { c.initMetrics() + // wrap the given context in a cancellable one so that we can + // stop conditional watches when their done signal fires + ctrlCtx, ctrlCancel := context.WithCancel(ctx) + defer ctrlCancel() + // Set the internal context. - c.ctx = ctx + c.ctx = ctrlCtx c.Queue = c.MakeQueue() go func() { - <-ctx.Done() + <-ctrlCtx.Done() c.Queue.ShutDown() }() @@ -164,9 +208,31 @@ func (c *Controller) Start(ctx context.Context) error { for _, watch := range c.startWatches { c.Log.Info("Starting EventSource", "source", watch.src) - if err := watch.src.Start(ctx, watch.handler, c.Queue, watch.predicates...); err != nil { + if err := watch.src.Start(ctrlCtx, watch.handler, c.Queue, watch.predicates...); err != nil { + return err + } + } + + c.Log.Info("conditional watches", "len", len(c.conditionalWatches)) + // TODO: do we need a waitgroup here so that we only clear c.Started once all the watches are done + // or should a single done watch trigger all the others to stop, or something else? + for _, watch := range c.conditionalWatches { + c.Log.Info("conditional Starting EventSource", "source", watch.src) + + // call a version of the Start method specific to ConditionalSource that returns + // a done signal that fires when the CRD is no longer installed (and the informer gets shut down) + done, err := watch.src.StartNotifyDone(ctrlCtx, watch.handler, c.Queue, watch.predicates...) + if err != nil { return err } + // wait for done to fire and when it does, shut down the controller and reset c.Started + go func() { + fmt.Println("ctrl waiting for done") + <-done + fmt.Println("ctrl done fired, ctrlCancelling") + c.Started = false + ctrlCancel() + }() } // Start the SharedIndexInformer factories to begin populating the SharedIndexInformer caches @@ -180,7 +246,7 @@ func (c *Controller) Start(ctx context.Context) error { if err := func() error { // use a context with timeout for launching sources and syncing caches. - sourceStartCtx, cancel := context.WithTimeout(ctx, c.CacheSyncTimeout) + sourceStartCtx, cancel := context.WithTimeout(ctrlCtx, c.CacheSyncTimeout) defer cancel() // WaitForSync waits for a definitive timeout, and returns if there @@ -211,7 +277,7 @@ func (c *Controller) Start(ctx context.Context) error { defer wg.Done() // Run a worker thread that just dequeues items, processes them, and marks them done. // It enforces that the reconcileHandler is never invoked concurrently with the same object. - for c.processNextWorkItem(ctx) { + for c.processNextWorkItem(ctrlCtx) { } }() } @@ -223,7 +289,7 @@ func (c *Controller) Start(ctx context.Context) error { return err } - <-ctx.Done() + <-ctrlCtx.Done() c.Log.Info("Shutdown signal received, waiting for all workers to finish") wg.Wait() c.Log.Info("All workers finished") diff --git a/pkg/manager/internal.go b/pkg/manager/internal.go index 57f95ba5b3..e84bf302dc 100644 --- a/pkg/manager/internal.go +++ b/pkg/manager/internal.go @@ -203,11 +203,13 @@ func (cm *controllerManager) Add(r Runnable) error { // Add the runnable to the leader election or the non-leaderelection list if leRunnable, ok := r.(LeaderElectionRunnable); ok && !leRunnable.NeedLeaderElection() { + fmt.Println("mgr adding non ler") shouldStart = cm.started cm.nonLeaderElectionRunnables = append(cm.nonLeaderElectionRunnables, r) } else if hasCache, ok := r.(hasCache); ok { cm.caches = append(cm.caches, hasCache) } else { + fmt.Println("mgr adding ler") shouldStart = cm.startedLeader cm.leaderElectionRunnables = append(cm.leaderElectionRunnables, r) } @@ -427,6 +429,7 @@ func (cm *controllerManager) serveHealthProbes() { // Run server cm.startRunnable(RunnableFunc(func(_ context.Context) error { + cm.logger.Info("starting health probes") if err := server.Serve(cm.healthProbeListener); err != nil && err != http.ErrServerClosed { return err } @@ -443,6 +446,7 @@ func (cm *controllerManager) serveHealthProbes() { } func (cm *controllerManager) Start(ctx context.Context) (err error) { + fmt.Println("mgr Start") if err := cm.Add(cm.cluster); err != nil { return fmt.Errorf("failed to add cluster to runnables: %w", err) } @@ -501,14 +505,19 @@ func (cm *controllerManager) Start(ctx context.Context) (err error) { } }() + fmt.Println("mgr start waiting on signal") select { case <-ctx.Done(): + fmt.Println("mgr start ctx.Done fired") // We are done return nil case err := <-cm.errChan: + fmt.Printf("mgr start errChan fired, %v\n", err) // Error starting or running a runnable return err } + fmt.Println("mgr start finished") + return nil } // engageStopProcedure signals all runnables to stop, reads potential errors @@ -591,6 +600,7 @@ func (cm *controllerManager) startNonLeaderElectionRunnables() { // Start the non-leaderelection Runnables after the cache has synced for _, c := range cm.nonLeaderElectionRunnables { + fmt.Println("nonLER") // Controllers block, but we want to return an error if any have an error starting. // Write any Start errors to a channel so we can return them cm.startRunnable(c) @@ -605,6 +615,7 @@ func (cm *controllerManager) startLeaderElectionRunnables() { // Start the leader election Runnables after the cache has synced for _, c := range cm.leaderElectionRunnables { + fmt.Println("LER") // Controllers block, but we want to return an error if any have an error starting. // Write any Start errors to a channel so we can return them cm.startRunnable(c) @@ -683,11 +694,51 @@ func (cm *controllerManager) Elected() <-chan struct{} { } func (cm *controllerManager) startRunnable(r Runnable) { - cm.waitForRunnable.Add(1) + if conditionalRunnable, ok := r.(ConditionalRunnable); ok { + fmt.Printf("starting conditional runnable = %+v\n", conditionalRunnable) + cm.startConditionalRunnable(conditionalRunnable) + } else { + cm.waitForRunnable.Add(1) + go func() { + fmt.Printf("starting non-conditional runnable, %v\n", r) + defer cm.waitForRunnable.Done() + if err := r.Start(cm.internalCtx); err != nil { + cm.errChan <- err + } + }() + } +} + +// startConditionalRunnable fires off a goroutine that +// blocks on the runnable's Ready (or the shutdown context). +// +// Once ready, call a version of start runnable that blocks +// until the runnable is terminated. +// +// Once the runnable stops, loop back and wait for ready again. +func (cm *controllerManager) startConditionalRunnable(cr ConditionalRunnable) { go func() { - defer cm.waitForRunnable.Done() - if err := r.Start(cm.internalCtx); err != nil { - cm.errChan <- err + fmt.Println("mgr got an cr") + for { + fmt.Println("mgr waiting on cr ReadyToStart") + select { + case <-cm.internalCtx.Done(): + fmt.Println("mgr internal context fired") + return + case <-cr.Ready(cm.internalCtx): + fmt.Println("mgr ready, starting the runnable") + // this doesn't block + cm.waitForRunnable.Add(1) + defer cm.waitForRunnable.Done() + fmt.Printf("starting conditional runnable, %v\n", cr) + if err := cr.Start(cm.internalCtx); err != nil { + cm.errChan <- err + } + fmt.Println("mgr runnable done running") + // TODO: what happens if we don't return here? + return + } + fmt.Println("mgr done running, looping back to wait on ready") } }() } diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index 30ff9e2516..0c0b03694d 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -290,6 +290,14 @@ func (r RunnableFunc) Start(ctx context.Context) error { return r(ctx) } +// ConditionalRunnable wraps Runnable with an additonal Ready method +// that returns a channel that fires when the underlying Runnable can +// be started. +type ConditionalRunnable interface { + Runnable + Ready(ctx context.Context) <-chan struct{} +} + // LeaderElectionRunnable knows if a Runnable needs to be run in the leader election mode. type LeaderElectionRunnable interface { // NeedLeaderElection returns true if the Runnable needs to be run in the leader election mode. diff --git a/pkg/source/source.go b/pkg/source/source.go index adabbaf917..dc9298cd76 100644 --- a/pkg/source/source.go +++ b/pkg/source/source.go @@ -21,6 +21,7 @@ import ( "errors" "fmt" "sync" + "time" "k8s.io/apimachinery/pkg/api/meta" "k8s.io/client-go/util/workqueue" @@ -57,6 +58,21 @@ type Source interface { Start(context.Context, handler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) error } +// ConditionalSource is a Source of events that additionally offer the ability to see when the Source is ready to be +// started as well as offering a wrapper around the Source's Start method that returns a channel the fires when an +// already started Source has stopped. +type ConditionalSource interface { + Source + + // StartNotifyDone runs the underlying Source's Start method and provides a channel that fires when + // the Source has stopped. + StartNotifyDone(context.Context, handler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) (<-chan struct{}, error) + + // Ready blocks until it is safe to call StartNotifyDone, meaning the Source's Kind's type has been + // successfully installed on the cluster and is ready to have its events watched and handled. + Ready(ctx context.Context, wg *sync.WaitGroup) +} + // SyncingSource is a source that needs syncing prior to being usable. The controller // will call its WaitForSync prior to starting workers. type SyncingSource interface { @@ -98,31 +114,76 @@ type Kind struct { startCancel func() } +// ConditionalKind implements ConditionalSource allowing you to set a +// DiscoveryCheck function that is used to determine whene a stopped kind +// has been reinstalled on the cluster and is ready to be restarted. +type ConditionalKind struct { + Kind + + // DiscoveryCheck returns true if the Kind's Type exists on the cluster and false otherwise. + DiscoveryCheck func() bool +} + +func (sk *ConditionalKind) Ready(ctx context.Context, wg *sync.WaitGroup) { + fmt.Println("src ready called") + defer wg.Done() + for { + select { + case <-ctx.Done(): + fmt.Println("src context shutdown") + return + default: + if sk.DiscoveryCheck() { + fmt.Println("src ready discovery check pass closing ready") + return + } + //TODO: parameterize this + fmt.Println("src ready discovery check fail, spin") + time.Sleep(5 * time.Second) + } + } +} + +// StartNotifyDone starts the kind while concurrently polling discovery to confirm the CRD is still installed +// It returns a signal that fires when the CRD is uninstalled, which also triggers a cacnel of the underlying informer +func (sk *ConditionalKind) StartNotifyDone(ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface, prct ...predicate.Predicate) (<-chan struct{}, error) { + return sk.Kind.StartNotifyDone(ctx, handler, queue, prct...) +} + +func (sk *ConditionalKind) Start(ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface, prct ...predicate.Predicate) error { + return sk.Kind.Start(ctx, handler, queue, prct...) +} + var _ SyncingSource = &Kind{} // Start is internal and should be called only by the Controller to register an EventHandler with the Informer // to enqueue reconcile.Requests. -func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface, - prct ...predicate.Predicate) error { +func (ks *Kind) StartNotifyDone(ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface, + prct ...predicate.Predicate) (<-chan struct{}, error) { // Type should have been specified by the user. if ks.Type == nil { - return fmt.Errorf("must specify Kind.Type") + return nil, fmt.Errorf("must specify Kind.Type") } // cache should have been injected before Start was called if ks.cache == nil { - return fmt.Errorf("must call CacheInto on Kind before calling Start") + return nil, fmt.Errorf("must call CacheInto on Kind before calling Start") } // cache.GetInformer will block until its context is cancelled if the cache was already started and it can not // sync that informer (most commonly due to RBAC issues). ctx, ks.startCancel = context.WithCancel(ctx) ks.started = make(chan error) + var stopCh <-chan struct{} go func() { // Lookup the Informer from the Cache and add an EventHandler which populates the Queue + fmt.Printf("ks.Type = %+v\n", ks.Type) + //var err error + //var i cache.Informer i, err := ks.cache.GetInformer(ctx, ks.Type) if err != nil { + fmt.Printf("kind GetInformer err = %+v\n", err) kindMatchErr := &meta.NoKindMatchError{} if errors.As(err, &kindMatchErr) { log.Error(err, "if kind is a CRD, it should be installed before calling Start", @@ -131,15 +192,29 @@ func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue w ks.started <- err return } + stopCh, err = ks.cache.GetInformerStop(ctx, ks.Type) + if err != nil { + ks.started <- err + return + } i.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct}) if !ks.cache.WaitForCacheSync(ctx) { // Would be great to return something more informative here ks.started <- errors.New("cache did not sync") } + fmt.Println("kind closing ks.started") close(ks.started) }() - return nil + return stopCh, nil +} + +// Start is internal and should be called only by the Controller to register an EventHandler with the Informer +// to enqueue reconcile.Requests. +func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface, + prct ...predicate.Predicate) error { + _, err := ks.StartNotifyDone(ctx, handler, queue, prct...) + return err } func (ks *Kind) String() string { diff --git a/pkg/source/source_test.go b/pkg/source/source_test.go index 9b0a1c9744..16d42653c9 100644 --- a/pkg/source/source_test.go +++ b/pkg/source/source_test.go @@ -19,6 +19,8 @@ package source_test import ( "context" "fmt" + "sync" + "time" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -35,6 +37,78 @@ import ( ) var _ = Describe("Source", func() { + Describe("ConditionalKind", func() { + // TODO: mock out discovery checks that both pass and fail + // test Ready with pass/fail DCs + // test StartNotifyDone with pass/fail DCs + + //var c chan struct{} + //var p *corev1.Pod + //var ic *informertest.FakeInformers + + //BeforeEach(func(done Done) { + // ic = &informertest.FakeInformers{} + // c = make(chan struct{}) + // p = &corev1.Pod{ + // Spec: corev1.PodSpec{ + // Containers: []corev1.Container{ + // {Name: "test", Image: "test"}, + // }, + // }, + // } + // close(done) + //}) + + Context("for builtin resources", func() { + It("should always be ready when discovery check passes", func(done Done) { + yesInDiscovery := func() bool { return true } + //notInDiscovery := func() bool { return false } + kind := source.Kind{Type: &corev1.Pod{}} + instance := source.ConditionalKind{Kind: kind, DiscoveryCheck: yesInDiscovery} + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + ready := make(chan struct{}) + var wg sync.WaitGroup + wg.Add(1) + go func() { + instance.Ready(ctx, &wg) + close(ready) + }() + select { + case <-ctx.Done(): + defer GinkgoRecover() + Fail("Unexpected context closed") + case <-ready: + break + } + close(done) + }) + //It("should never be ready when discovery check fails", func(done Done) { + // //yesInDiscovery := func() bool { return true } + // notInDiscovery := func() bool { return false } + // kind := source.Kind{Type: &corev1.Pod{}} + // instance := source.ConditionalKind{Kind: kind, DiscoveryCheck: notInDiscovery} + // ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + // defer cancel() + // ready := make(chan struct{}) + // var wg sync.WaitGroup + // wg.Add(1) + // go func() { + // instance.Ready(ctx, &wg) + // close(ready) + // }() + // select { + // case <-ctx.Done(): + // break + // case <-ready: + // defer GinkgoRecover() + // Fail("Unexpected ready") + // } + // close(done) + //}) + }) + + }) Describe("Kind", func() { var c chan struct{} var p *corev1.Pod