From 24e1649fea392a1c6d0964808ceab2558a45d778 Mon Sep 17 00:00:00 2001 From: Kevin Delgado Date: Sat, 15 May 2021 01:00:14 +0000 Subject: [PATCH 1/6] Informer Stop Channel --- pkg/cache/cache.go | 4 ++++ pkg/cache/informer_cache.go | 22 +++++++++++++++++---- pkg/cache/informertest/fake_cache.go | 6 ++++++ pkg/cache/internal/deleg_map.go | 12 ++++++------ pkg/cache/internal/informers_map.go | 29 ++++++++++++++++++++++++---- pkg/cache/multi_namespace_cache.go | 24 +++++++++++++++++++++++ 6 files changed, 83 insertions(+), 14 deletions(-) diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index dee523fe23..c9aa351c1c 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -54,6 +54,10 @@ type Informers interface { // API kind and resource. GetInformer(ctx context.Context, obj client.Object) (Informer, error) + // GetInformerStop fetches the stop channel of the informer for the given object (constructing + // the informer if necessary). This stop channel fires when the controller has stopped. + GetInformerStop(ctx context.Context, obj client.Object) (<-chan struct{}, error) + // GetInformerForKind is similar to GetInformer, except that it takes a group-version-kind, instead // of the underlying object. GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind) (Informer, error) diff --git a/pkg/cache/informer_cache.go b/pkg/cache/informer_cache.go index 8ec3b921d9..abe25f5cb9 100644 --- a/pkg/cache/informer_cache.go +++ b/pkg/cache/informer_cache.go @@ -57,7 +57,7 @@ func (ip *informerCache) Get(ctx context.Context, key client.ObjectKey, out clie return err } - started, cache, err := ip.InformersMap.Get(ctx, gvk, out) + started, cache, err := ip.InformersMap.Get(ctx, gvk, out, false) if err != nil { return err } @@ -76,7 +76,7 @@ func (ip *informerCache) List(ctx context.Context, out client.ObjectList, opts . return err } - started, cache, err := ip.InformersMap.Get(ctx, *gvk, cacheTypeObj) + started, cache, err := ip.InformersMap.Get(ctx, *gvk, cacheTypeObj, false) if err != nil { return err } @@ -138,7 +138,7 @@ func (ip *informerCache) GetInformerForKind(ctx context.Context, gvk schema.Grou return nil, err } - _, i, err := ip.InformersMap.Get(ctx, gvk, obj) + _, i, err := ip.InformersMap.Get(ctx, gvk, obj, false) if err != nil { return nil, err } @@ -152,13 +152,27 @@ func (ip *informerCache) GetInformer(ctx context.Context, obj client.Object) (In return nil, err } - _, i, err := ip.InformersMap.Get(ctx, gvk, obj) + _, i, err := ip.InformersMap.Get(ctx, gvk, obj, false) if err != nil { return nil, err } return i.Informer, err } +// GetInformerStop returns the stopChannel of the informer for the obj +func (ip *informerCache) GetInformerStop(ctx context.Context, obj client.Object) (<-chan struct{}, error) { + gvk, err := apiutil.GVKForObject(obj, ip.Scheme) + if err != nil { + return nil, err + } + _, i, err := ip.InformersMap.Get(ctx, gvk, obj, true) + if err != nil { + return nil, err + } + return i.StopCh, err + +} + // NeedLeaderElection implements the LeaderElectionRunnable interface // to indicate that this can be started without requiring the leader lock func (ip *informerCache) NeedLeaderElection() bool { diff --git a/pkg/cache/informertest/fake_cache.go b/pkg/cache/informertest/fake_cache.go index eb78e0bb65..1491501b03 100644 --- a/pkg/cache/informertest/fake_cache.go +++ b/pkg/cache/informertest/fake_cache.go @@ -79,6 +79,12 @@ func (c *FakeInformers) GetInformer(ctx context.Context, obj client.Object) (cac return c.informerFor(gvk, obj) } +// GetStoppableInformer implements informers +func (c *FakeInformers) GetInformerStop(ctx context.Context, obj client.Object) (<-chan struct{}, error) { + // TODO: not implemented + return nil, nil +} + // WaitForCacheSync implements Informers func (c *FakeInformers) WaitForCacheSync(ctx context.Context) bool { if c.Synced == nil { diff --git a/pkg/cache/internal/deleg_map.go b/pkg/cache/internal/deleg_map.go index 2242d9b674..1f6151c39f 100644 --- a/pkg/cache/internal/deleg_map.go +++ b/pkg/cache/internal/deleg_map.go @@ -91,18 +91,18 @@ func (m *InformersMap) WaitForCacheSync(ctx context.Context) bool { // Get will create a new Informer and add it to the map of InformersMap if none exists. Returns // the Informer from the map. -func (m *InformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object) (bool, *MapEntry, error) { +func (m *InformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object, stopOnError bool) (bool, *MapEntry, error) { switch obj.(type) { case *unstructured.Unstructured: - return m.unstructured.Get(ctx, gvk, obj) + return m.unstructured.Get(ctx, gvk, obj, stopOnError) case *unstructured.UnstructuredList: - return m.unstructured.Get(ctx, gvk, obj) + return m.unstructured.Get(ctx, gvk, obj, stopOnError) case *metav1.PartialObjectMetadata: - return m.metadata.Get(ctx, gvk, obj) + return m.metadata.Get(ctx, gvk, obj, stopOnError) case *metav1.PartialObjectMetadataList: - return m.metadata.Get(ctx, gvk, obj) + return m.metadata.Get(ctx, gvk, obj, stopOnError) default: - return m.structured.Get(ctx, gvk, obj) + return m.structured.Get(ctx, gvk, obj, stopOnError) } } diff --git a/pkg/cache/internal/informers_map.go b/pkg/cache/internal/informers_map.go index 5c9bd0b0a0..ec9eb637ab 100644 --- a/pkg/cache/internal/informers_map.go +++ b/pkg/cache/internal/informers_map.go @@ -72,6 +72,10 @@ type MapEntry struct { // CacheReader wraps Informer and implements the CacheReader interface for a single type Reader CacheReader + + // StopCh is a channel that is closed after + // the informer stops + StopCh <-chan struct{} } // specificInformersMap create and caches Informers for (runtime.Object, schema.GroupVersionKind) pairs. @@ -171,7 +175,7 @@ func (ip *specificInformersMap) HasSyncedFuncs() []cache.InformerSynced { // Get will create a new Informer and add it to the map of specificInformersMap if none exists. Returns // the Informer from the map. -func (ip *specificInformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object) (bool, *MapEntry, error) { +func (ip *specificInformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object, stopOnError bool) (bool, *MapEntry, error) { // Return the informer if it is found i, started, ok := func() (*MapEntry, bool, bool) { ip.mu.RLock() @@ -182,7 +186,7 @@ func (ip *specificInformersMap) Get(ctx context.Context, gvk schema.GroupVersion if !ok { var err error - if i, started, err = ip.addInformerToMap(gvk, obj); err != nil { + if i, started, err = ip.addInformerToMap(gvk, obj, stopOnError); err != nil { return started, nil, err } } @@ -197,7 +201,7 @@ func (ip *specificInformersMap) Get(ctx context.Context, gvk schema.GroupVersion return started, i, nil } -func (ip *specificInformersMap) addInformerToMap(gvk schema.GroupVersionKind, obj runtime.Object) (*MapEntry, bool, error) { +func (ip *specificInformersMap) addInformerToMap(gvk schema.GroupVersionKind, obj runtime.Object, stopOnError bool) (*MapEntry, bool, error) { ip.mu.Lock() defer ip.mu.Unlock() @@ -228,17 +232,34 @@ func (ip *specificInformersMap) addInformerToMap(gvk schema.GroupVersionKind, ob default: } + informerStop := make(chan struct{}) i := &MapEntry{ Informer: ni, Reader: CacheReader{indexer: ni.GetIndexer(), groupVersionKind: gvk, scopeName: rm.Scope.Name()}, + StopCh: informerStop, } ip.informersByGVK[gvk] = i + go func() { + <-ip.stop + close(informerStop) + }() + + i.Informer.SetWatchErrorHandler(func(r *cache.Reflector, err error) { + // TODO: ensure the error is a kind not found error before stopping + if stopOnError { + close(informerStop) + } + }) + // Start the Informer if need by // TODO(seans): write thorough tests and document what happens here - can you add indexers? // can you add eventhandlers? if ip.started { - go i.Informer.Run(ip.stop) + go func() { + i.Informer.Run(informerStop) + delete(ip.informersByGVK, gvk) + }() } return i, ip.started, nil } diff --git a/pkg/cache/multi_namespace_cache.go b/pkg/cache/multi_namespace_cache.go index f3520bf8d7..266419b083 100644 --- a/pkg/cache/multi_namespace_cache.go +++ b/pkg/cache/multi_namespace_cache.go @@ -19,6 +19,7 @@ package cache import ( "context" "fmt" + "sync" "time" corev1 "k8s.io/api/core/v1" @@ -115,6 +116,29 @@ func (c *multiNamespaceCache) GetInformer(ctx context.Context, obj client.Object return &multiNamespaceInformer{namespaceToInformer: informers}, nil } +func (c *multiNamespaceCache) GetInformerStop(ctx context.Context, obj client.Object) (<-chan struct{}, error) { + multiStopCh := make(chan struct{}) + var wg sync.WaitGroup + for _, cache := range c.namespaceToCache { + stopCh, err := cache.GetInformerStop(ctx, obj) + if err != nil { + return nil, err + } + wg.Add(1) + go func(stopCh <-chan struct{}) { + defer wg.Done() + <-stopCh + + }(stopCh) + } + + go func() { + defer close(multiStopCh) + wg.Done() + }() + return multiStopCh, nil +} + func (c *multiNamespaceCache) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind) (Informer, error) { informers := map[string]Informer{} From e551adec9960a657e6061e058caff3ea9c11a408 Mon Sep 17 00:00:00 2001 From: Kevin Delgado Date: Thu, 3 Jun 2021 16:01:15 +0000 Subject: [PATCH 2/6] very basic GetInformerWithOptions --- pkg/cache/cache.go | 7 ++ pkg/cache/informer_cache.go | 16 +++++ pkg/cache/informertest/fake_cache.go | 4 ++ pkg/cache/internal/deleg_map.go | 15 +++++ pkg/cache/internal/informers_map.go | 97 ++++++++++++++++++++++++++++ pkg/cache/multi_namespace_cache.go | 5 ++ 6 files changed, 144 insertions(+) diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index c9aa351c1c..41e560e18e 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -46,6 +46,8 @@ type Cache interface { Informers } +type ErrorHandler func(r *toolscache.Reflector, err error) + // Informers knows how to create or fetch informers for different // group-version-kinds, and add indices to those informers. It's safe to call // GetInformer from multiple threads. @@ -54,6 +56,11 @@ type Informers interface { // API kind and resource. GetInformer(ctx context.Context, obj client.Object) (Informer, error) + // GetInformerWithOptions + // TODO: return a struct? + // TODO: pass options? + GetInformerWithOptions(ctx context.Context, obj client.Object, stopperCh chan struct{}, handler func(r *toolscache.Reflector, err error)) (Informer, <-chan struct{}, error) + // GetInformerStop fetches the stop channel of the informer for the given object (constructing // the informer if necessary). This stop channel fires when the controller has stopped. GetInformerStop(ctx context.Context, obj client.Object) (<-chan struct{}, error) diff --git a/pkg/cache/informer_cache.go b/pkg/cache/informer_cache.go index abe25f5cb9..275ee4e7dc 100644 --- a/pkg/cache/informer_cache.go +++ b/pkg/cache/informer_cache.go @@ -27,6 +27,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/tools/cache" + toolscache "k8s.io/client-go/tools/cache" "sigs.k8s.io/controller-runtime/pkg/cache/internal" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/apiutil" @@ -173,6 +174,21 @@ func (ip *informerCache) GetInformerStop(ctx context.Context, obj client.Object) } +// GetInformerWithOptions +func (ip *informerCache) GetInformerWithOptions(ctx context.Context, obj client.Object, stopperCh chan struct{}, handler func(r *toolscache.Reflector, err error)) (Informer, <-chan struct{}, error) { + gvk, err := apiutil.GVKForObject(obj, ip.Scheme) + if err != nil { + return nil, nil, err + } + + _, i, err := ip.InformersMap.Get2(ctx, gvk, obj, stopperCh, handler) + if err != nil { + return nil, nil, err + } + return i.Informer, i.StopCh, err + +} + // NeedLeaderElection implements the LeaderElectionRunnable interface // to indicate that this can be started without requiring the leader lock func (ip *informerCache) NeedLeaderElection() bool { diff --git a/pkg/cache/informertest/fake_cache.go b/pkg/cache/informertest/fake_cache.go index 1491501b03..a260634249 100644 --- a/pkg/cache/informertest/fake_cache.go +++ b/pkg/cache/informertest/fake_cache.go @@ -84,6 +84,10 @@ func (c *FakeInformers) GetInformerStop(ctx context.Context, obj client.Object) // TODO: not implemented return nil, nil } +func (c *FakeInformers) GetInformerWithOptions(ctx context.Context, obj client.Object, stopperCh chan<- struct{}, handler func(r *toolscache.Reflector, err error)) (cache.Informer, <-chan struct{}, error) { + // TODO: not implemented + return nil, nil, nil +} // WaitForCacheSync implements Informers func (c *FakeInformers) WaitForCacheSync(ctx context.Context) bool { diff --git a/pkg/cache/internal/deleg_map.go b/pkg/cache/internal/deleg_map.go index 1f6151c39f..74e1cf3c4c 100644 --- a/pkg/cache/internal/deleg_map.go +++ b/pkg/cache/internal/deleg_map.go @@ -106,6 +106,21 @@ func (m *InformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj } } +func (m *InformersMap) Get2(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object, stopperCh chan struct{}, handler func(r *cache.Reflector, err error)) (bool, *MapEntry, error) { + switch obj.(type) { + case *unstructured.Unstructured: + return m.unstructured.Get2(ctx, gvk, obj, stopperCh, handler) + case *unstructured.UnstructuredList: + return m.unstructured.Get2(ctx, gvk, obj, stopperCh, handler) + case *metav1.PartialObjectMetadata: + return m.metadata.Get2(ctx, gvk, obj, stopperCh, handler) + case *metav1.PartialObjectMetadataList: + return m.metadata.Get2(ctx, gvk, obj, stopperCh, handler) + default: + return m.structured.Get2(ctx, gvk, obj, stopperCh, handler) + } +} + // newStructuredInformersMap creates a new InformersMap for structured objects. func newStructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration, namespace string, selectors SelectorsByGVK) *specificInformersMap { diff --git a/pkg/cache/internal/informers_map.go b/pkg/cache/internal/informers_map.go index ec9eb637ab..962807e848 100644 --- a/pkg/cache/internal/informers_map.go +++ b/pkg/cache/internal/informers_map.go @@ -264,6 +264,103 @@ func (ip *specificInformersMap) addInformerToMap(gvk schema.GroupVersionKind, ob return i, ip.started, nil } +type ErrorHandler func(r *cache.Reflector, err error) + +func (ip *specificInformersMap) Get2(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object, stopperCh chan struct{}, handler func(r *cache.Reflector, err error)) (bool, *MapEntry, error) { + // Return the informer if it is found + i, started, ok := func() (*MapEntry, bool, bool) { + ip.mu.RLock() + defer ip.mu.RUnlock() + i, ok := ip.informersByGVK[gvk] + return i, ip.started, ok + }() + + if !ok { + var err error + if i, started, err = ip.addInformerToMap2(gvk, obj, stopperCh, handler); err != nil { + return started, nil, err + } + } + + if started && !i.Informer.HasSynced() { + // Wait for it to sync before returning the Informer so that folks don't read from a stale cache. + if !cache.WaitForCacheSync(ctx.Done(), i.Informer.HasSynced) { + return started, nil, apierrors.NewTimeoutError(fmt.Sprintf("failed waiting for %T Informer to sync", obj), 0) + } + } + + return started, i, nil +} + +func (ip *specificInformersMap) addInformerToMap2(gvk schema.GroupVersionKind, obj runtime.Object, stopperCh chan struct{}, handler func(r *cache.Reflector, err error)) (*MapEntry, bool, error) { + ip.mu.Lock() + defer ip.mu.Unlock() + + // Check the cache to see if we already have an Informer. If we do, return the Informer. + // This is for the case where 2 routines tried to get the informer when it wasn't in the map + // so neither returned early, but the first one created it. + if i, ok := ip.informersByGVK[gvk]; ok { + return i, ip.started, nil + } + + // Create a NewSharedIndexInformer and add it to the map. + var lw *cache.ListWatch + lw, err := ip.createListWatcher(gvk, ip) + if err != nil { + return nil, false, err + } + ni := cache.NewSharedIndexInformer(lw, obj, resyncPeriod(ip.resync)(), cache.Indexers{ + cache.NamespaceIndex: cache.MetaNamespaceIndexFunc, + }) + rm, err := ip.mapper.RESTMapping(gvk.GroupKind(), gvk.Version) + if err != nil { + return nil, false, err + } + + switch obj.(type) { + case *metav1.PartialObjectMetadata, *metav1.PartialObjectMetadataList: + ni = metadataSharedIndexInformerPreserveGVK(gvk, ni) + default: + } + + informerStop := make(chan struct{}) + i := &MapEntry{ + Informer: ni, + Reader: CacheReader{indexer: ni.GetIndexer(), groupVersionKind: gvk, scopeName: rm.Scope.Name()}, + StopCh: informerStop, + } + ip.informersByGVK[gvk] = i + + go func() { + select { + case <-ip.stop: + close(informerStop) + case <-stopperCh: + close(informerStop) + } + }() + + //i.Informer.SetWatchErrorHandler(func(r *cache.Reflector, err error) { + // // TODO: ensure the error is a kind not found error before stopping + // //if stopOnError { + // // close(informerStop) + // //} + // handler(r, err) + //}) + i.Informer.SetWatchErrorHandler(handler) + + // Start the Informer if need by + // TODO(seans): write thorough tests and document what happens here - can you add indexers? + // can you add eventhandlers? + if ip.started { + go func() { + i.Informer.Run(informerStop) + delete(ip.informersByGVK, gvk) + }() + } + return i, ip.started, nil +} + // newListWatch returns a new ListWatch object that can be used to create a SharedIndexInformer. func createStructuredListWatch(gvk schema.GroupVersionKind, ip *specificInformersMap) (*cache.ListWatch, error) { // Kubernetes APIs work against Resources, not GroupVersionKinds. Map the diff --git a/pkg/cache/multi_namespace_cache.go b/pkg/cache/multi_namespace_cache.go index 266419b083..ba14d1fef8 100644 --- a/pkg/cache/multi_namespace_cache.go +++ b/pkg/cache/multi_namespace_cache.go @@ -139,6 +139,11 @@ func (c *multiNamespaceCache) GetInformerStop(ctx context.Context, obj client.Ob return multiStopCh, nil } +// TODO +func (c *multiNamespaceCache) GetInformerWithOptions(ctx context.Context, obj client.Object, stopperCh chan struct{}, handler func(r *toolscache.Reflector, err error)) (Informer, <-chan struct{}, error) { + panic("TODO") +} + func (c *multiNamespaceCache) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind) (Informer, error) { informers := map[string]Informer{} From 0f885d1f3bfadc23f36301c180543e2de0bb6b0d Mon Sep 17 00:00:00 2001 From: Kevin Delgado Date: Fri, 4 Jun 2021 17:24:45 +0000 Subject: [PATCH 3/6] remove refereces to GetInformerStop --- pkg/cache/cache.go | 2 +- pkg/cache/informer_cache.go | 38 +++--- pkg/cache/internal/deleg_map.go | 32 ++--- pkg/cache/internal/informers_map.go | 180 ++++++++++++++-------------- pkg/cache/multi_namespace_cache.go | 45 ++++--- 5 files changed, 150 insertions(+), 147 deletions(-) diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index 41e560e18e..64cc2c92f4 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -63,7 +63,7 @@ type Informers interface { // GetInformerStop fetches the stop channel of the informer for the given object (constructing // the informer if necessary). This stop channel fires when the controller has stopped. - GetInformerStop(ctx context.Context, obj client.Object) (<-chan struct{}, error) + //GetInformerStop(ctx context.Context, obj client.Object) (<-chan struct{}, error) // GetInformerForKind is similar to GetInformer, except that it takes a group-version-kind, instead // of the underlying object. diff --git a/pkg/cache/informer_cache.go b/pkg/cache/informer_cache.go index 275ee4e7dc..895b4e8b98 100644 --- a/pkg/cache/informer_cache.go +++ b/pkg/cache/informer_cache.go @@ -58,7 +58,8 @@ func (ip *informerCache) Get(ctx context.Context, key client.ObjectKey, out clie return err } - started, cache, err := ip.InformersMap.Get(ctx, gvk, out, false) + //started, cache, err := ip.InformersMap.Get(ctx, gvk, out, false) + started, cache, err := ip.InformersMap.Get2(ctx, gvk, out, nil, nil) if err != nil { return err } @@ -77,7 +78,8 @@ func (ip *informerCache) List(ctx context.Context, out client.ObjectList, opts . return err } - started, cache, err := ip.InformersMap.Get(ctx, *gvk, cacheTypeObj, false) + //started, cache, err := ip.InformersMap.Get(ctx, *gvk, cacheTypeObj, false) + started, cache, err := ip.InformersMap.Get2(ctx, *gvk, cacheTypeObj, nil, nil) if err != nil { return err } @@ -139,7 +141,8 @@ func (ip *informerCache) GetInformerForKind(ctx context.Context, gvk schema.Grou return nil, err } - _, i, err := ip.InformersMap.Get(ctx, gvk, obj, false) + //_, i, err := ip.InformersMap.Get(ctx, gvk, obj, false) + _, i, err := ip.InformersMap.Get2(ctx, gvk, obj, nil, nil) if err != nil { return nil, err } @@ -153,26 +156,27 @@ func (ip *informerCache) GetInformer(ctx context.Context, obj client.Object) (In return nil, err } - _, i, err := ip.InformersMap.Get(ctx, gvk, obj, false) + //_, i, err := ip.InformersMap.Get(ctx, gvk, obj, false) + _, i, err := ip.InformersMap.Get2(ctx, gvk, obj, nil, nil) if err != nil { return nil, err } return i.Informer, err } -// GetInformerStop returns the stopChannel of the informer for the obj -func (ip *informerCache) GetInformerStop(ctx context.Context, obj client.Object) (<-chan struct{}, error) { - gvk, err := apiutil.GVKForObject(obj, ip.Scheme) - if err != nil { - return nil, err - } - _, i, err := ip.InformersMap.Get(ctx, gvk, obj, true) - if err != nil { - return nil, err - } - return i.StopCh, err - -} +//// GetInformerStop returns the stopChannel of the informer for the obj +//func (ip *informerCache) GetInformerStop(ctx context.Context, obj client.Object) (<-chan struct{}, error) { +// gvk, err := apiutil.GVKForObject(obj, ip.Scheme) +// if err != nil { +// return nil, err +// } +// _, i, err := ip.InformersMap.Get(ctx, gvk, obj, true) +// if err != nil { +// return nil, err +// } +// return i.StopCh, err +// +//} // GetInformerWithOptions func (ip *informerCache) GetInformerWithOptions(ctx context.Context, obj client.Object, stopperCh chan struct{}, handler func(r *toolscache.Reflector, err error)) (Informer, <-chan struct{}, error) { diff --git a/pkg/cache/internal/deleg_map.go b/pkg/cache/internal/deleg_map.go index 74e1cf3c4c..a28f735d6f 100644 --- a/pkg/cache/internal/deleg_map.go +++ b/pkg/cache/internal/deleg_map.go @@ -89,22 +89,22 @@ func (m *InformersMap) WaitForCacheSync(ctx context.Context) bool { return cache.WaitForCacheSync(ctx.Done(), syncedFuncs...) } -// Get will create a new Informer and add it to the map of InformersMap if none exists. Returns -// the Informer from the map. -func (m *InformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object, stopOnError bool) (bool, *MapEntry, error) { - switch obj.(type) { - case *unstructured.Unstructured: - return m.unstructured.Get(ctx, gvk, obj, stopOnError) - case *unstructured.UnstructuredList: - return m.unstructured.Get(ctx, gvk, obj, stopOnError) - case *metav1.PartialObjectMetadata: - return m.metadata.Get(ctx, gvk, obj, stopOnError) - case *metav1.PartialObjectMetadataList: - return m.metadata.Get(ctx, gvk, obj, stopOnError) - default: - return m.structured.Get(ctx, gvk, obj, stopOnError) - } -} +//// Get will create a new Informer and add it to the map of InformersMap if none exists. Returns +//// the Informer from the map. +//func (m *InformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object, stopOnError bool) (bool, *MapEntry, error) { +// switch obj.(type) { +// case *unstructured.Unstructured: +// return m.unstructured.Get(ctx, gvk, obj, stopOnError) +// case *unstructured.UnstructuredList: +// return m.unstructured.Get(ctx, gvk, obj, stopOnError) +// case *metav1.PartialObjectMetadata: +// return m.metadata.Get(ctx, gvk, obj, stopOnError) +// case *metav1.PartialObjectMetadataList: +// return m.metadata.Get(ctx, gvk, obj, stopOnError) +// default: +// return m.structured.Get(ctx, gvk, obj, stopOnError) +// } +//} func (m *InformersMap) Get2(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object, stopperCh chan struct{}, handler func(r *cache.Reflector, err error)) (bool, *MapEntry, error) { switch obj.(type) { diff --git a/pkg/cache/internal/informers_map.go b/pkg/cache/internal/informers_map.go index 962807e848..ffb8efb00a 100644 --- a/pkg/cache/internal/informers_map.go +++ b/pkg/cache/internal/informers_map.go @@ -173,96 +173,96 @@ func (ip *specificInformersMap) HasSyncedFuncs() []cache.InformerSynced { return syncedFuncs } -// Get will create a new Informer and add it to the map of specificInformersMap if none exists. Returns -// the Informer from the map. -func (ip *specificInformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object, stopOnError bool) (bool, *MapEntry, error) { - // Return the informer if it is found - i, started, ok := func() (*MapEntry, bool, bool) { - ip.mu.RLock() - defer ip.mu.RUnlock() - i, ok := ip.informersByGVK[gvk] - return i, ip.started, ok - }() - - if !ok { - var err error - if i, started, err = ip.addInformerToMap(gvk, obj, stopOnError); err != nil { - return started, nil, err - } - } - - if started && !i.Informer.HasSynced() { - // Wait for it to sync before returning the Informer so that folks don't read from a stale cache. - if !cache.WaitForCacheSync(ctx.Done(), i.Informer.HasSynced) { - return started, nil, apierrors.NewTimeoutError(fmt.Sprintf("failed waiting for %T Informer to sync", obj), 0) - } - } - - return started, i, nil -} - -func (ip *specificInformersMap) addInformerToMap(gvk schema.GroupVersionKind, obj runtime.Object, stopOnError bool) (*MapEntry, bool, error) { - ip.mu.Lock() - defer ip.mu.Unlock() - - // Check the cache to see if we already have an Informer. If we do, return the Informer. - // This is for the case where 2 routines tried to get the informer when it wasn't in the map - // so neither returned early, but the first one created it. - if i, ok := ip.informersByGVK[gvk]; ok { - return i, ip.started, nil - } - - // Create a NewSharedIndexInformer and add it to the map. - var lw *cache.ListWatch - lw, err := ip.createListWatcher(gvk, ip) - if err != nil { - return nil, false, err - } - ni := cache.NewSharedIndexInformer(lw, obj, resyncPeriod(ip.resync)(), cache.Indexers{ - cache.NamespaceIndex: cache.MetaNamespaceIndexFunc, - }) - rm, err := ip.mapper.RESTMapping(gvk.GroupKind(), gvk.Version) - if err != nil { - return nil, false, err - } - - switch obj.(type) { - case *metav1.PartialObjectMetadata, *metav1.PartialObjectMetadataList: - ni = metadataSharedIndexInformerPreserveGVK(gvk, ni) - default: - } - - informerStop := make(chan struct{}) - i := &MapEntry{ - Informer: ni, - Reader: CacheReader{indexer: ni.GetIndexer(), groupVersionKind: gvk, scopeName: rm.Scope.Name()}, - StopCh: informerStop, - } - ip.informersByGVK[gvk] = i - - go func() { - <-ip.stop - close(informerStop) - }() - - i.Informer.SetWatchErrorHandler(func(r *cache.Reflector, err error) { - // TODO: ensure the error is a kind not found error before stopping - if stopOnError { - close(informerStop) - } - }) - - // Start the Informer if need by - // TODO(seans): write thorough tests and document what happens here - can you add indexers? - // can you add eventhandlers? - if ip.started { - go func() { - i.Informer.Run(informerStop) - delete(ip.informersByGVK, gvk) - }() - } - return i, ip.started, nil -} +//// Get will create a new Informer and add it to the map of specificInformersMap if none exists. Returns +//// the Informer from the map. +//func (ip *specificInformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object, stopOnError bool) (bool, *MapEntry, error) { +// // Return the informer if it is found +// i, started, ok := func() (*MapEntry, bool, bool) { +// ip.mu.RLock() +// defer ip.mu.RUnlock() +// i, ok := ip.informersByGVK[gvk] +// return i, ip.started, ok +// }() +// +// if !ok { +// var err error +// if i, started, err = ip.addInformerToMap(gvk, obj, stopOnError); err != nil { +// return started, nil, err +// } +// } +// +// if started && !i.Informer.HasSynced() { +// // Wait for it to sync before returning the Informer so that folks don't read from a stale cache. +// if !cache.WaitForCacheSync(ctx.Done(), i.Informer.HasSynced) { +// return started, nil, apierrors.NewTimeoutError(fmt.Sprintf("failed waiting for %T Informer to sync", obj), 0) +// } +// } +// +// return started, i, nil +//} +// +//func (ip *specificInformersMap) addInformerToMap(gvk schema.GroupVersionKind, obj runtime.Object, stopOnError bool) (*MapEntry, bool, error) { +// ip.mu.Lock() +// defer ip.mu.Unlock() +// +// // Check the cache to see if we already have an Informer. If we do, return the Informer. +// // This is for the case where 2 routines tried to get the informer when it wasn't in the map +// // so neither returned early, but the first one created it. +// if i, ok := ip.informersByGVK[gvk]; ok { +// return i, ip.started, nil +// } +// +// // Create a NewSharedIndexInformer and add it to the map. +// var lw *cache.ListWatch +// lw, err := ip.createListWatcher(gvk, ip) +// if err != nil { +// return nil, false, err +// } +// ni := cache.NewSharedIndexInformer(lw, obj, resyncPeriod(ip.resync)(), cache.Indexers{ +// cache.NamespaceIndex: cache.MetaNamespaceIndexFunc, +// }) +// rm, err := ip.mapper.RESTMapping(gvk.GroupKind(), gvk.Version) +// if err != nil { +// return nil, false, err +// } +// +// switch obj.(type) { +// case *metav1.PartialObjectMetadata, *metav1.PartialObjectMetadataList: +// ni = metadataSharedIndexInformerPreserveGVK(gvk, ni) +// default: +// } +// +// informerStop := make(chan struct{}) +// i := &MapEntry{ +// Informer: ni, +// Reader: CacheReader{indexer: ni.GetIndexer(), groupVersionKind: gvk, scopeName: rm.Scope.Name()}, +// StopCh: informerStop, +// } +// ip.informersByGVK[gvk] = i +// +// go func() { +// <-ip.stop +// close(informerStop) +// }() +// +// i.Informer.SetWatchErrorHandler(func(r *cache.Reflector, err error) { +// // TODO: ensure the error is a kind not found error before stopping +// if stopOnError { +// close(informerStop) +// } +// }) +// +// // Start the Informer if need by +// // TODO(seans): write thorough tests and document what happens here - can you add indexers? +// // can you add eventhandlers? +// if ip.started { +// go func() { +// i.Informer.Run(informerStop) +// delete(ip.informersByGVK, gvk) +// }() +// } +// return i, ip.started, nil +//} type ErrorHandler func(r *cache.Reflector, err error) diff --git a/pkg/cache/multi_namespace_cache.go b/pkg/cache/multi_namespace_cache.go index ba14d1fef8..262d79646f 100644 --- a/pkg/cache/multi_namespace_cache.go +++ b/pkg/cache/multi_namespace_cache.go @@ -19,7 +19,6 @@ package cache import ( "context" "fmt" - "sync" "time" corev1 "k8s.io/api/core/v1" @@ -116,28 +115,28 @@ func (c *multiNamespaceCache) GetInformer(ctx context.Context, obj client.Object return &multiNamespaceInformer{namespaceToInformer: informers}, nil } -func (c *multiNamespaceCache) GetInformerStop(ctx context.Context, obj client.Object) (<-chan struct{}, error) { - multiStopCh := make(chan struct{}) - var wg sync.WaitGroup - for _, cache := range c.namespaceToCache { - stopCh, err := cache.GetInformerStop(ctx, obj) - if err != nil { - return nil, err - } - wg.Add(1) - go func(stopCh <-chan struct{}) { - defer wg.Done() - <-stopCh - - }(stopCh) - } - - go func() { - defer close(multiStopCh) - wg.Done() - }() - return multiStopCh, nil -} +//func (c *multiNamespaceCache) GetInformerStop(ctx context.Context, obj client.Object) (<-chan struct{}, error) { +// multiStopCh := make(chan struct{}) +// var wg sync.WaitGroup +// for _, cache := range c.namespaceToCache { +// stopCh, err := cache.GetInformerStop(ctx, obj) +// if err != nil { +// return nil, err +// } +// wg.Add(1) +// go func(stopCh <-chan struct{}) { +// defer wg.Done() +// <-stopCh +// +// }(stopCh) +// } +// +// go func() { +// defer close(multiStopCh) +// wg.Done() +// }() +// return multiStopCh, nil +//} // TODO func (c *multiNamespaceCache) GetInformerWithOptions(ctx context.Context, obj client.Object, stopperCh chan struct{}, handler func(r *toolscache.Reflector, err error)) (Informer, <-chan struct{}, error) { From 375bf1c20737969430234b38e3140aab1bbc16b0 Mon Sep 17 00:00:00 2001 From: Kevin Delgado Date: Fri, 4 Jun 2021 17:59:20 +0000 Subject: [PATCH 4/6] Refactor into InformerOptions --- pkg/cache/cache.go | 14 ++++++++++++-- pkg/cache/informer_cache.go | 22 ++++++++++++---------- pkg/cache/internal/deleg_map.go | 12 ++++++------ pkg/cache/internal/informers_map.go | 6 +++--- pkg/cache/multi_namespace_cache.go | 2 +- 5 files changed, 34 insertions(+), 22 deletions(-) diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index 64cc2c92f4..5fe3363f3f 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -46,7 +46,17 @@ type Cache interface { Informers } -type ErrorHandler func(r *toolscache.Reflector, err error) +//type ErrorHandler func(r *toolscache.Reflector, err error) + +type InformerOptions struct { + StopperCh chan struct{} + ErrorHandler func(r *toolscache.Reflector, err error) +} + +type InformerInfo struct { + Informer Informer + StopCh <-chan struct{} +} // Informers knows how to create or fetch informers for different // group-version-kinds, and add indices to those informers. It's safe to call @@ -59,7 +69,7 @@ type Informers interface { // GetInformerWithOptions // TODO: return a struct? // TODO: pass options? - GetInformerWithOptions(ctx context.Context, obj client.Object, stopperCh chan struct{}, handler func(r *toolscache.Reflector, err error)) (Informer, <-chan struct{}, error) + GetInformerWithOptions(ctx context.Context, obj client.Object, options *InformerOptions) (*InformerInfo, error) // GetInformerStop fetches the stop channel of the informer for the given object (constructing // the informer if necessary). This stop channel fires when the controller has stopped. diff --git a/pkg/cache/informer_cache.go b/pkg/cache/informer_cache.go index 895b4e8b98..1ca52f42ca 100644 --- a/pkg/cache/informer_cache.go +++ b/pkg/cache/informer_cache.go @@ -27,7 +27,6 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/tools/cache" - toolscache "k8s.io/client-go/tools/cache" "sigs.k8s.io/controller-runtime/pkg/cache/internal" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/apiutil" @@ -59,7 +58,7 @@ func (ip *informerCache) Get(ctx context.Context, key client.ObjectKey, out clie } //started, cache, err := ip.InformersMap.Get(ctx, gvk, out, false) - started, cache, err := ip.InformersMap.Get2(ctx, gvk, out, nil, nil) + started, cache, err := ip.InformersMap.Get(ctx, gvk, out, nil, nil) if err != nil { return err } @@ -79,7 +78,7 @@ func (ip *informerCache) List(ctx context.Context, out client.ObjectList, opts . } //started, cache, err := ip.InformersMap.Get(ctx, *gvk, cacheTypeObj, false) - started, cache, err := ip.InformersMap.Get2(ctx, *gvk, cacheTypeObj, nil, nil) + started, cache, err := ip.InformersMap.Get(ctx, *gvk, cacheTypeObj, nil, nil) if err != nil { return err } @@ -142,7 +141,7 @@ func (ip *informerCache) GetInformerForKind(ctx context.Context, gvk schema.Grou } //_, i, err := ip.InformersMap.Get(ctx, gvk, obj, false) - _, i, err := ip.InformersMap.Get2(ctx, gvk, obj, nil, nil) + _, i, err := ip.InformersMap.Get(ctx, gvk, obj, nil, nil) if err != nil { return nil, err } @@ -157,7 +156,7 @@ func (ip *informerCache) GetInformer(ctx context.Context, obj client.Object) (In } //_, i, err := ip.InformersMap.Get(ctx, gvk, obj, false) - _, i, err := ip.InformersMap.Get2(ctx, gvk, obj, nil, nil) + _, i, err := ip.InformersMap.Get(ctx, gvk, obj, nil, nil) if err != nil { return nil, err } @@ -179,17 +178,20 @@ func (ip *informerCache) GetInformer(ctx context.Context, obj client.Object) (In //} // GetInformerWithOptions -func (ip *informerCache) GetInformerWithOptions(ctx context.Context, obj client.Object, stopperCh chan struct{}, handler func(r *toolscache.Reflector, err error)) (Informer, <-chan struct{}, error) { +func (ip *informerCache) GetInformerWithOptions(ctx context.Context, obj client.Object, options *InformerOptions) (*InformerInfo, error) { gvk, err := apiutil.GVKForObject(obj, ip.Scheme) if err != nil { - return nil, nil, err + return nil, err } - _, i, err := ip.InformersMap.Get2(ctx, gvk, obj, stopperCh, handler) + _, i, err := ip.InformersMap.Get(ctx, gvk, obj, options.StopperCh, options.ErrorHandler) if err != nil { - return nil, nil, err + return nil, err } - return i.Informer, i.StopCh, err + return &InformerInfo{ + i.Informer, + i.StopCh, + }, nil } diff --git a/pkg/cache/internal/deleg_map.go b/pkg/cache/internal/deleg_map.go index a28f735d6f..c7f76bae00 100644 --- a/pkg/cache/internal/deleg_map.go +++ b/pkg/cache/internal/deleg_map.go @@ -106,18 +106,18 @@ func (m *InformersMap) WaitForCacheSync(ctx context.Context) bool { // } //} -func (m *InformersMap) Get2(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object, stopperCh chan struct{}, handler func(r *cache.Reflector, err error)) (bool, *MapEntry, error) { +func (m *InformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object, stopperCh chan struct{}, handler func(r *cache.Reflector, err error)) (bool, *MapEntry, error) { switch obj.(type) { case *unstructured.Unstructured: - return m.unstructured.Get2(ctx, gvk, obj, stopperCh, handler) + return m.unstructured.Get(ctx, gvk, obj, stopperCh, handler) case *unstructured.UnstructuredList: - return m.unstructured.Get2(ctx, gvk, obj, stopperCh, handler) + return m.unstructured.Get(ctx, gvk, obj, stopperCh, handler) case *metav1.PartialObjectMetadata: - return m.metadata.Get2(ctx, gvk, obj, stopperCh, handler) + return m.metadata.Get(ctx, gvk, obj, stopperCh, handler) case *metav1.PartialObjectMetadataList: - return m.metadata.Get2(ctx, gvk, obj, stopperCh, handler) + return m.metadata.Get(ctx, gvk, obj, stopperCh, handler) default: - return m.structured.Get2(ctx, gvk, obj, stopperCh, handler) + return m.structured.Get(ctx, gvk, obj, stopperCh, handler) } } diff --git a/pkg/cache/internal/informers_map.go b/pkg/cache/internal/informers_map.go index ffb8efb00a..2d6c5e9e94 100644 --- a/pkg/cache/internal/informers_map.go +++ b/pkg/cache/internal/informers_map.go @@ -266,7 +266,7 @@ func (ip *specificInformersMap) HasSyncedFuncs() []cache.InformerSynced { type ErrorHandler func(r *cache.Reflector, err error) -func (ip *specificInformersMap) Get2(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object, stopperCh chan struct{}, handler func(r *cache.Reflector, err error)) (bool, *MapEntry, error) { +func (ip *specificInformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object, stopperCh chan struct{}, handler func(r *cache.Reflector, err error)) (bool, *MapEntry, error) { // Return the informer if it is found i, started, ok := func() (*MapEntry, bool, bool) { ip.mu.RLock() @@ -277,7 +277,7 @@ func (ip *specificInformersMap) Get2(ctx context.Context, gvk schema.GroupVersio if !ok { var err error - if i, started, err = ip.addInformerToMap2(gvk, obj, stopperCh, handler); err != nil { + if i, started, err = ip.addInformerToMap(gvk, obj, stopperCh, handler); err != nil { return started, nil, err } } @@ -292,7 +292,7 @@ func (ip *specificInformersMap) Get2(ctx context.Context, gvk schema.GroupVersio return started, i, nil } -func (ip *specificInformersMap) addInformerToMap2(gvk schema.GroupVersionKind, obj runtime.Object, stopperCh chan struct{}, handler func(r *cache.Reflector, err error)) (*MapEntry, bool, error) { +func (ip *specificInformersMap) addInformerToMap(gvk schema.GroupVersionKind, obj runtime.Object, stopperCh chan struct{}, handler func(r *cache.Reflector, err error)) (*MapEntry, bool, error) { ip.mu.Lock() defer ip.mu.Unlock() diff --git a/pkg/cache/multi_namespace_cache.go b/pkg/cache/multi_namespace_cache.go index 262d79646f..826dc12e5e 100644 --- a/pkg/cache/multi_namespace_cache.go +++ b/pkg/cache/multi_namespace_cache.go @@ -139,7 +139,7 @@ func (c *multiNamespaceCache) GetInformer(ctx context.Context, obj client.Object //} // TODO -func (c *multiNamespaceCache) GetInformerWithOptions(ctx context.Context, obj client.Object, stopperCh chan struct{}, handler func(r *toolscache.Reflector, err error)) (Informer, <-chan struct{}, error) { +func (c *multiNamespaceCache) GetInformerWithOptions(ctx context.Context, obj client.Object, options *InformerOptions) (*InformerInfo, error) { panic("TODO") } From 20ffe287c1dd71010080c2f3c7f679a00e187885 Mon Sep 17 00:00:00 2001 From: Kevin Delgado Date: Fri, 4 Jun 2021 19:04:08 +0000 Subject: [PATCH 5/6] cleanup and commentary --- pkg/cache/cache.go | 20 +++-- pkg/cache/internal/deleg_map.go | 31 ++------ pkg/cache/internal/informers_map.go | 110 ++-------------------------- 3 files changed, 27 insertions(+), 134 deletions(-) diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index 5fe3363f3f..e01164039b 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -46,16 +46,21 @@ type Cache interface { Informers } -//type ErrorHandler func(r *toolscache.Reflector, err error) - +// InformerOptions gives the caller some options for greater control over the lifecycle of the informer type InformerOptions struct { - StopperCh chan struct{} + // StopperCh in a channel that when closed, instructs the informer to stop running. + StopperCh chan struct{} + // ErrorHandler passed to the informer's SetWatchErrorHandler and handles any errors + // from ListAndWatch calls made by the informer's underlying reflector. ErrorHandler func(r *toolscache.Reflector, err error) } +// InformerInfo provides information when retrieving an informer. type InformerInfo struct { + // Informer is the Informer retrieved. Informer Informer - StopCh <-chan struct{} + // StopCh is a channel that is closed when the informer is stopped. + StopCh <-chan struct{} } // Informers knows how to create or fetch informers for different @@ -66,9 +71,10 @@ type Informers interface { // API kind and resource. GetInformer(ctx context.Context, obj client.Object) (Informer, error) - // GetInformerWithOptions - // TODO: return a struct? - // TODO: pass options? + // GetInformerWithOptions retrieves an existing informer for the given object along with it's stop channel + // that fires when the informer has stopped. + // + // If the informer does not already exist, it constructs an informer with the supplied InformerOptions. GetInformerWithOptions(ctx context.Context, obj client.Object, options *InformerOptions) (*InformerInfo, error) // GetInformerStop fetches the stop channel of the informer for the given object (constructing diff --git a/pkg/cache/internal/deleg_map.go b/pkg/cache/internal/deleg_map.go index c7f76bae00..047b21db1f 100644 --- a/pkg/cache/internal/deleg_map.go +++ b/pkg/cache/internal/deleg_map.go @@ -89,35 +89,20 @@ func (m *InformersMap) WaitForCacheSync(ctx context.Context) bool { return cache.WaitForCacheSync(ctx.Done(), syncedFuncs...) } -//// Get will create a new Informer and add it to the map of InformersMap if none exists. Returns -//// the Informer from the map. -//func (m *InformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object, stopOnError bool) (bool, *MapEntry, error) { -// switch obj.(type) { -// case *unstructured.Unstructured: -// return m.unstructured.Get(ctx, gvk, obj, stopOnError) -// case *unstructured.UnstructuredList: -// return m.unstructured.Get(ctx, gvk, obj, stopOnError) -// case *metav1.PartialObjectMetadata: -// return m.metadata.Get(ctx, gvk, obj, stopOnError) -// case *metav1.PartialObjectMetadataList: -// return m.metadata.Get(ctx, gvk, obj, stopOnError) -// default: -// return m.structured.Get(ctx, gvk, obj, stopOnError) -// } -//} - -func (m *InformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object, stopperCh chan struct{}, handler func(r *cache.Reflector, err error)) (bool, *MapEntry, error) { +// Get will create a new Informer and add it to the map of InformersMap if none exists. Returns +// the Informer from the map. +func (m *InformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object, stopperCh chan struct{}, errorHandler func(r *cache.Reflector, err error)) (bool, *MapEntry, error) { switch obj.(type) { case *unstructured.Unstructured: - return m.unstructured.Get(ctx, gvk, obj, stopperCh, handler) + return m.unstructured.Get(ctx, gvk, obj, stopperCh, errorHandler) case *unstructured.UnstructuredList: - return m.unstructured.Get(ctx, gvk, obj, stopperCh, handler) + return m.unstructured.Get(ctx, gvk, obj, stopperCh, errorHandler) case *metav1.PartialObjectMetadata: - return m.metadata.Get(ctx, gvk, obj, stopperCh, handler) + return m.metadata.Get(ctx, gvk, obj, stopperCh, errorHandler) case *metav1.PartialObjectMetadataList: - return m.metadata.Get(ctx, gvk, obj, stopperCh, handler) + return m.metadata.Get(ctx, gvk, obj, stopperCh, errorHandler) default: - return m.structured.Get(ctx, gvk, obj, stopperCh, handler) + return m.structured.Get(ctx, gvk, obj, stopperCh, errorHandler) } } diff --git a/pkg/cache/internal/informers_map.go b/pkg/cache/internal/informers_map.go index 2d6c5e9e94..04f3058899 100644 --- a/pkg/cache/internal/informers_map.go +++ b/pkg/cache/internal/informers_map.go @@ -173,100 +173,9 @@ func (ip *specificInformersMap) HasSyncedFuncs() []cache.InformerSynced { return syncedFuncs } -//// Get will create a new Informer and add it to the map of specificInformersMap if none exists. Returns -//// the Informer from the map. -//func (ip *specificInformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object, stopOnError bool) (bool, *MapEntry, error) { -// // Return the informer if it is found -// i, started, ok := func() (*MapEntry, bool, bool) { -// ip.mu.RLock() -// defer ip.mu.RUnlock() -// i, ok := ip.informersByGVK[gvk] -// return i, ip.started, ok -// }() -// -// if !ok { -// var err error -// if i, started, err = ip.addInformerToMap(gvk, obj, stopOnError); err != nil { -// return started, nil, err -// } -// } -// -// if started && !i.Informer.HasSynced() { -// // Wait for it to sync before returning the Informer so that folks don't read from a stale cache. -// if !cache.WaitForCacheSync(ctx.Done(), i.Informer.HasSynced) { -// return started, nil, apierrors.NewTimeoutError(fmt.Sprintf("failed waiting for %T Informer to sync", obj), 0) -// } -// } -// -// return started, i, nil -//} -// -//func (ip *specificInformersMap) addInformerToMap(gvk schema.GroupVersionKind, obj runtime.Object, stopOnError bool) (*MapEntry, bool, error) { -// ip.mu.Lock() -// defer ip.mu.Unlock() -// -// // Check the cache to see if we already have an Informer. If we do, return the Informer. -// // This is for the case where 2 routines tried to get the informer when it wasn't in the map -// // so neither returned early, but the first one created it. -// if i, ok := ip.informersByGVK[gvk]; ok { -// return i, ip.started, nil -// } -// -// // Create a NewSharedIndexInformer and add it to the map. -// var lw *cache.ListWatch -// lw, err := ip.createListWatcher(gvk, ip) -// if err != nil { -// return nil, false, err -// } -// ni := cache.NewSharedIndexInformer(lw, obj, resyncPeriod(ip.resync)(), cache.Indexers{ -// cache.NamespaceIndex: cache.MetaNamespaceIndexFunc, -// }) -// rm, err := ip.mapper.RESTMapping(gvk.GroupKind(), gvk.Version) -// if err != nil { -// return nil, false, err -// } -// -// switch obj.(type) { -// case *metav1.PartialObjectMetadata, *metav1.PartialObjectMetadataList: -// ni = metadataSharedIndexInformerPreserveGVK(gvk, ni) -// default: -// } -// -// informerStop := make(chan struct{}) -// i := &MapEntry{ -// Informer: ni, -// Reader: CacheReader{indexer: ni.GetIndexer(), groupVersionKind: gvk, scopeName: rm.Scope.Name()}, -// StopCh: informerStop, -// } -// ip.informersByGVK[gvk] = i -// -// go func() { -// <-ip.stop -// close(informerStop) -// }() -// -// i.Informer.SetWatchErrorHandler(func(r *cache.Reflector, err error) { -// // TODO: ensure the error is a kind not found error before stopping -// if stopOnError { -// close(informerStop) -// } -// }) -// -// // Start the Informer if need by -// // TODO(seans): write thorough tests and document what happens here - can you add indexers? -// // can you add eventhandlers? -// if ip.started { -// go func() { -// i.Informer.Run(informerStop) -// delete(ip.informersByGVK, gvk) -// }() -// } -// return i, ip.started, nil -//} - -type ErrorHandler func(r *cache.Reflector, err error) - -func (ip *specificInformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object, stopperCh chan struct{}, handler func(r *cache.Reflector, err error)) (bool, *MapEntry, error) { +// Get will create a new Informer and add it to the map of specificInformersMap if none exists. Returns +// the Informer from the map. +func (ip *specificInformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object, stopperCh chan struct{}, errorHandler func(r *cache.Reflector, err error)) (bool, *MapEntry, error) { // Return the informer if it is found i, started, ok := func() (*MapEntry, bool, bool) { ip.mu.RLock() @@ -277,7 +186,7 @@ func (ip *specificInformersMap) Get(ctx context.Context, gvk schema.GroupVersion if !ok { var err error - if i, started, err = ip.addInformerToMap(gvk, obj, stopperCh, handler); err != nil { + if i, started, err = ip.addInformerToMap(gvk, obj, stopperCh, errorHandler); err != nil { return started, nil, err } } @@ -292,7 +201,7 @@ func (ip *specificInformersMap) Get(ctx context.Context, gvk schema.GroupVersion return started, i, nil } -func (ip *specificInformersMap) addInformerToMap(gvk schema.GroupVersionKind, obj runtime.Object, stopperCh chan struct{}, handler func(r *cache.Reflector, err error)) (*MapEntry, bool, error) { +func (ip *specificInformersMap) addInformerToMap(gvk schema.GroupVersionKind, obj runtime.Object, stopperCh chan struct{}, errorHandler func(r *cache.Reflector, err error)) (*MapEntry, bool, error) { ip.mu.Lock() defer ip.mu.Unlock() @@ -340,14 +249,7 @@ func (ip *specificInformersMap) addInformerToMap(gvk schema.GroupVersionKind, ob } }() - //i.Informer.SetWatchErrorHandler(func(r *cache.Reflector, err error) { - // // TODO: ensure the error is a kind not found error before stopping - // //if stopOnError { - // // close(informerStop) - // //} - // handler(r, err) - //}) - i.Informer.SetWatchErrorHandler(handler) + i.Informer.SetWatchErrorHandler(errorHandler) // Start the Informer if need by // TODO(seans): write thorough tests and document what happens here - can you add indexers? From 3995913de53bdde6d6fb998e49884b48d072d6dd Mon Sep 17 00:00:00 2001 From: Kevin Delgado Date: Fri, 4 Jun 2021 20:03:21 +0000 Subject: [PATCH 6/6] fix fakeInformers and multNamespace for options --- pkg/cache/cache.go | 4 --- pkg/cache/informer_cache.go | 21 ++--------- pkg/cache/informertest/fake_cache.go | 19 +++++----- pkg/cache/multi_namespace_cache.go | 52 +++++++++++++++------------- 4 files changed, 40 insertions(+), 56 deletions(-) diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index e01164039b..69ffe08ba2 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -77,10 +77,6 @@ type Informers interface { // If the informer does not already exist, it constructs an informer with the supplied InformerOptions. GetInformerWithOptions(ctx context.Context, obj client.Object, options *InformerOptions) (*InformerInfo, error) - // GetInformerStop fetches the stop channel of the informer for the given object (constructing - // the informer if necessary). This stop channel fires when the controller has stopped. - //GetInformerStop(ctx context.Context, obj client.Object) (<-chan struct{}, error) - // GetInformerForKind is similar to GetInformer, except that it takes a group-version-kind, instead // of the underlying object. GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind) (Informer, error) diff --git a/pkg/cache/informer_cache.go b/pkg/cache/informer_cache.go index 1ca52f42ca..bb587260e5 100644 --- a/pkg/cache/informer_cache.go +++ b/pkg/cache/informer_cache.go @@ -57,7 +57,6 @@ func (ip *informerCache) Get(ctx context.Context, key client.ObjectKey, out clie return err } - //started, cache, err := ip.InformersMap.Get(ctx, gvk, out, false) started, cache, err := ip.InformersMap.Get(ctx, gvk, out, nil, nil) if err != nil { return err @@ -77,7 +76,6 @@ func (ip *informerCache) List(ctx context.Context, out client.ObjectList, opts . return err } - //started, cache, err := ip.InformersMap.Get(ctx, *gvk, cacheTypeObj, false) started, cache, err := ip.InformersMap.Get(ctx, *gvk, cacheTypeObj, nil, nil) if err != nil { return err @@ -140,7 +138,6 @@ func (ip *informerCache) GetInformerForKind(ctx context.Context, gvk schema.Grou return nil, err } - //_, i, err := ip.InformersMap.Get(ctx, gvk, obj, false) _, i, err := ip.InformersMap.Get(ctx, gvk, obj, nil, nil) if err != nil { return nil, err @@ -155,7 +152,6 @@ func (ip *informerCache) GetInformer(ctx context.Context, obj client.Object) (In return nil, err } - //_, i, err := ip.InformersMap.Get(ctx, gvk, obj, false) _, i, err := ip.InformersMap.Get(ctx, gvk, obj, nil, nil) if err != nil { return nil, err @@ -163,21 +159,8 @@ func (ip *informerCache) GetInformer(ctx context.Context, obj client.Object) (In return i.Informer, err } -//// GetInformerStop returns the stopChannel of the informer for the obj -//func (ip *informerCache) GetInformerStop(ctx context.Context, obj client.Object) (<-chan struct{}, error) { -// gvk, err := apiutil.GVKForObject(obj, ip.Scheme) -// if err != nil { -// return nil, err -// } -// _, i, err := ip.InformersMap.Get(ctx, gvk, obj, true) -// if err != nil { -// return nil, err -// } -// return i.StopCh, err -// -//} - -// GetInformerWithOptions +// GetInformerWithOptions retrieves the informer and its stop channel, creating and starting a +// new informer with the supplied options if necessary. func (ip *informerCache) GetInformerWithOptions(ctx context.Context, obj client.Object, options *InformerOptions) (*InformerInfo, error) { gvk, err := apiutil.GVKForObject(obj, ip.Scheme) if err != nil { diff --git a/pkg/cache/informertest/fake_cache.go b/pkg/cache/informertest/fake_cache.go index a260634249..abc60eefd5 100644 --- a/pkg/cache/informertest/fake_cache.go +++ b/pkg/cache/informertest/fake_cache.go @@ -79,14 +79,17 @@ func (c *FakeInformers) GetInformer(ctx context.Context, obj client.Object) (cac return c.informerFor(gvk, obj) } -// GetStoppableInformer implements informers -func (c *FakeInformers) GetInformerStop(ctx context.Context, obj client.Object) (<-chan struct{}, error) { - // TODO: not implemented - return nil, nil -} -func (c *FakeInformers) GetInformerWithOptions(ctx context.Context, obj client.Object, stopperCh chan<- struct{}, handler func(r *toolscache.Reflector, err error)) (cache.Informer, <-chan struct{}, error) { - // TODO: not implemented - return nil, nil, nil +// GetInformerWithOptions implements Informers +func (c *FakeInformers) GetInformerWithOptions(ctx context.Context, obj client.Object, options *cache.InformerOptions) (*cache.InformerInfo, error) { + i, err := c.GetInformer(ctx, obj) + if err != nil { + return nil, err + } + // fake informer is never started and therefore never stopped + // so stop channel is nil + return &cache.InformerInfo{ + Informer: i, + }, nil } // WaitForCacheSync implements Informers diff --git a/pkg/cache/multi_namespace_cache.go b/pkg/cache/multi_namespace_cache.go index 826dc12e5e..3ec7e807e1 100644 --- a/pkg/cache/multi_namespace_cache.go +++ b/pkg/cache/multi_namespace_cache.go @@ -19,6 +19,7 @@ package cache import ( "context" "fmt" + "sync" "time" corev1 "k8s.io/api/core/v1" @@ -115,32 +116,33 @@ func (c *multiNamespaceCache) GetInformer(ctx context.Context, obj client.Object return &multiNamespaceInformer{namespaceToInformer: informers}, nil } -//func (c *multiNamespaceCache) GetInformerStop(ctx context.Context, obj client.Object) (<-chan struct{}, error) { -// multiStopCh := make(chan struct{}) -// var wg sync.WaitGroup -// for _, cache := range c.namespaceToCache { -// stopCh, err := cache.GetInformerStop(ctx, obj) -// if err != nil { -// return nil, err -// } -// wg.Add(1) -// go func(stopCh <-chan struct{}) { -// defer wg.Done() -// <-stopCh -// -// }(stopCh) -// } -// -// go func() { -// defer close(multiStopCh) -// wg.Done() -// }() -// return multiStopCh, nil -//} - -// TODO +// Methods for multiNamespaceCache to conform to the Informers interface func (c *multiNamespaceCache) GetInformerWithOptions(ctx context.Context, obj client.Object, options *InformerOptions) (*InformerInfo, error) { - panic("TODO") + informers := map[string]Informer{} + multiStopCh := make(chan struct{}) + var wg sync.WaitGroup + for ns, cache := range c.namespaceToCache { + info, err := cache.GetInformerWithOptions(ctx, obj, options) + if err != nil { + return nil, err + } + informers[ns] = info.Informer + wg.Add(1) + go func(stopCh <-chan struct{}) { + defer wg.Done() + <-stopCh + + }(info.StopCh) + } + + go func() { + defer close(multiStopCh) + wg.Done() + }() + return &InformerInfo{ + Informer: &multiNamespaceInformer{namespaceToInformer: informers}, + StopCh: multiStopCh, + }, nil } func (c *multiNamespaceCache) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind) (Informer, error) {