diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index dee523fe23..69ffe08ba2 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -46,6 +46,23 @@ type Cache interface { Informers } +// InformerOptions gives the caller some options for greater control over the lifecycle of the informer +type InformerOptions struct { + // StopperCh in a channel that when closed, instructs the informer to stop running. + StopperCh chan struct{} + // ErrorHandler passed to the informer's SetWatchErrorHandler and handles any errors + // from ListAndWatch calls made by the informer's underlying reflector. + ErrorHandler func(r *toolscache.Reflector, err error) +} + +// InformerInfo provides information when retrieving an informer. +type InformerInfo struct { + // Informer is the Informer retrieved. + Informer Informer + // StopCh is a channel that is closed when the informer is stopped. + StopCh <-chan struct{} +} + // Informers knows how to create or fetch informers for different // group-version-kinds, and add indices to those informers. It's safe to call // GetInformer from multiple threads. @@ -54,6 +71,12 @@ type Informers interface { // API kind and resource. GetInformer(ctx context.Context, obj client.Object) (Informer, error) + // GetInformerWithOptions retrieves an existing informer for the given object along with it's stop channel + // that fires when the informer has stopped. + // + // If the informer does not already exist, it constructs an informer with the supplied InformerOptions. + GetInformerWithOptions(ctx context.Context, obj client.Object, options *InformerOptions) (*InformerInfo, error) + // GetInformerForKind is similar to GetInformer, except that it takes a group-version-kind, instead // of the underlying object. GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind) (Informer, error) diff --git a/pkg/cache/informer_cache.go b/pkg/cache/informer_cache.go index 8ec3b921d9..bb587260e5 100644 --- a/pkg/cache/informer_cache.go +++ b/pkg/cache/informer_cache.go @@ -57,7 +57,7 @@ func (ip *informerCache) Get(ctx context.Context, key client.ObjectKey, out clie return err } - started, cache, err := ip.InformersMap.Get(ctx, gvk, out) + started, cache, err := ip.InformersMap.Get(ctx, gvk, out, nil, nil) if err != nil { return err } @@ -76,7 +76,7 @@ func (ip *informerCache) List(ctx context.Context, out client.ObjectList, opts . return err } - started, cache, err := ip.InformersMap.Get(ctx, *gvk, cacheTypeObj) + started, cache, err := ip.InformersMap.Get(ctx, *gvk, cacheTypeObj, nil, nil) if err != nil { return err } @@ -138,7 +138,7 @@ func (ip *informerCache) GetInformerForKind(ctx context.Context, gvk schema.Grou return nil, err } - _, i, err := ip.InformersMap.Get(ctx, gvk, obj) + _, i, err := ip.InformersMap.Get(ctx, gvk, obj, nil, nil) if err != nil { return nil, err } @@ -152,13 +152,32 @@ func (ip *informerCache) GetInformer(ctx context.Context, obj client.Object) (In return nil, err } - _, i, err := ip.InformersMap.Get(ctx, gvk, obj) + _, i, err := ip.InformersMap.Get(ctx, gvk, obj, nil, nil) if err != nil { return nil, err } return i.Informer, err } +// GetInformerWithOptions retrieves the informer and its stop channel, creating and starting a +// new informer with the supplied options if necessary. +func (ip *informerCache) GetInformerWithOptions(ctx context.Context, obj client.Object, options *InformerOptions) (*InformerInfo, error) { + gvk, err := apiutil.GVKForObject(obj, ip.Scheme) + if err != nil { + return nil, err + } + + _, i, err := ip.InformersMap.Get(ctx, gvk, obj, options.StopperCh, options.ErrorHandler) + if err != nil { + return nil, err + } + return &InformerInfo{ + i.Informer, + i.StopCh, + }, nil + +} + // NeedLeaderElection implements the LeaderElectionRunnable interface // to indicate that this can be started without requiring the leader lock func (ip *informerCache) NeedLeaderElection() bool { diff --git a/pkg/cache/informertest/fake_cache.go b/pkg/cache/informertest/fake_cache.go index eb78e0bb65..abc60eefd5 100644 --- a/pkg/cache/informertest/fake_cache.go +++ b/pkg/cache/informertest/fake_cache.go @@ -79,6 +79,19 @@ func (c *FakeInformers) GetInformer(ctx context.Context, obj client.Object) (cac return c.informerFor(gvk, obj) } +// GetInformerWithOptions implements Informers +func (c *FakeInformers) GetInformerWithOptions(ctx context.Context, obj client.Object, options *cache.InformerOptions) (*cache.InformerInfo, error) { + i, err := c.GetInformer(ctx, obj) + if err != nil { + return nil, err + } + // fake informer is never started and therefore never stopped + // so stop channel is nil + return &cache.InformerInfo{ + Informer: i, + }, nil +} + // WaitForCacheSync implements Informers func (c *FakeInformers) WaitForCacheSync(ctx context.Context) bool { if c.Synced == nil { diff --git a/pkg/cache/internal/deleg_map.go b/pkg/cache/internal/deleg_map.go index 2242d9b674..047b21db1f 100644 --- a/pkg/cache/internal/deleg_map.go +++ b/pkg/cache/internal/deleg_map.go @@ -91,18 +91,18 @@ func (m *InformersMap) WaitForCacheSync(ctx context.Context) bool { // Get will create a new Informer and add it to the map of InformersMap if none exists. Returns // the Informer from the map. -func (m *InformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object) (bool, *MapEntry, error) { +func (m *InformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object, stopperCh chan struct{}, errorHandler func(r *cache.Reflector, err error)) (bool, *MapEntry, error) { switch obj.(type) { case *unstructured.Unstructured: - return m.unstructured.Get(ctx, gvk, obj) + return m.unstructured.Get(ctx, gvk, obj, stopperCh, errorHandler) case *unstructured.UnstructuredList: - return m.unstructured.Get(ctx, gvk, obj) + return m.unstructured.Get(ctx, gvk, obj, stopperCh, errorHandler) case *metav1.PartialObjectMetadata: - return m.metadata.Get(ctx, gvk, obj) + return m.metadata.Get(ctx, gvk, obj, stopperCh, errorHandler) case *metav1.PartialObjectMetadataList: - return m.metadata.Get(ctx, gvk, obj) + return m.metadata.Get(ctx, gvk, obj, stopperCh, errorHandler) default: - return m.structured.Get(ctx, gvk, obj) + return m.structured.Get(ctx, gvk, obj, stopperCh, errorHandler) } } diff --git a/pkg/cache/internal/informers_map.go b/pkg/cache/internal/informers_map.go index 5c9bd0b0a0..04f3058899 100644 --- a/pkg/cache/internal/informers_map.go +++ b/pkg/cache/internal/informers_map.go @@ -72,6 +72,10 @@ type MapEntry struct { // CacheReader wraps Informer and implements the CacheReader interface for a single type Reader CacheReader + + // StopCh is a channel that is closed after + // the informer stops + StopCh <-chan struct{} } // specificInformersMap create and caches Informers for (runtime.Object, schema.GroupVersionKind) pairs. @@ -169,9 +173,9 @@ func (ip *specificInformersMap) HasSyncedFuncs() []cache.InformerSynced { return syncedFuncs } -// Get will create a new Informer and add it to the map of specificInformersMap if none exists. Returns +// Get will create a new Informer and add it to the map of specificInformersMap if none exists. Returns // the Informer from the map. -func (ip *specificInformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object) (bool, *MapEntry, error) { +func (ip *specificInformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object, stopperCh chan struct{}, errorHandler func(r *cache.Reflector, err error)) (bool, *MapEntry, error) { // Return the informer if it is found i, started, ok := func() (*MapEntry, bool, bool) { ip.mu.RLock() @@ -182,7 +186,7 @@ func (ip *specificInformersMap) Get(ctx context.Context, gvk schema.GroupVersion if !ok { var err error - if i, started, err = ip.addInformerToMap(gvk, obj); err != nil { + if i, started, err = ip.addInformerToMap(gvk, obj, stopperCh, errorHandler); err != nil { return started, nil, err } } @@ -197,7 +201,7 @@ func (ip *specificInformersMap) Get(ctx context.Context, gvk schema.GroupVersion return started, i, nil } -func (ip *specificInformersMap) addInformerToMap(gvk schema.GroupVersionKind, obj runtime.Object) (*MapEntry, bool, error) { +func (ip *specificInformersMap) addInformerToMap(gvk schema.GroupVersionKind, obj runtime.Object, stopperCh chan struct{}, errorHandler func(r *cache.Reflector, err error)) (*MapEntry, bool, error) { ip.mu.Lock() defer ip.mu.Unlock() @@ -228,17 +232,33 @@ func (ip *specificInformersMap) addInformerToMap(gvk schema.GroupVersionKind, ob default: } + informerStop := make(chan struct{}) i := &MapEntry{ Informer: ni, Reader: CacheReader{indexer: ni.GetIndexer(), groupVersionKind: gvk, scopeName: rm.Scope.Name()}, + StopCh: informerStop, } ip.informersByGVK[gvk] = i + go func() { + select { + case <-ip.stop: + close(informerStop) + case <-stopperCh: + close(informerStop) + } + }() + + i.Informer.SetWatchErrorHandler(errorHandler) + // Start the Informer if need by // TODO(seans): write thorough tests and document what happens here - can you add indexers? // can you add eventhandlers? if ip.started { - go i.Informer.Run(ip.stop) + go func() { + i.Informer.Run(informerStop) + delete(ip.informersByGVK, gvk) + }() } return i, ip.started, nil } diff --git a/pkg/cache/multi_namespace_cache.go b/pkg/cache/multi_namespace_cache.go index f3520bf8d7..3ec7e807e1 100644 --- a/pkg/cache/multi_namespace_cache.go +++ b/pkg/cache/multi_namespace_cache.go @@ -19,6 +19,7 @@ package cache import ( "context" "fmt" + "sync" "time" corev1 "k8s.io/api/core/v1" @@ -115,6 +116,35 @@ func (c *multiNamespaceCache) GetInformer(ctx context.Context, obj client.Object return &multiNamespaceInformer{namespaceToInformer: informers}, nil } +// Methods for multiNamespaceCache to conform to the Informers interface +func (c *multiNamespaceCache) GetInformerWithOptions(ctx context.Context, obj client.Object, options *InformerOptions) (*InformerInfo, error) { + informers := map[string]Informer{} + multiStopCh := make(chan struct{}) + var wg sync.WaitGroup + for ns, cache := range c.namespaceToCache { + info, err := cache.GetInformerWithOptions(ctx, obj, options) + if err != nil { + return nil, err + } + informers[ns] = info.Informer + wg.Add(1) + go func(stopCh <-chan struct{}) { + defer wg.Done() + <-stopCh + + }(info.StopCh) + } + + go func() { + defer close(multiStopCh) + wg.Done() + }() + return &InformerInfo{ + Informer: &multiNamespaceInformer{namespaceToInformer: informers}, + StopCh: multiStopCh, + }, nil +} + func (c *multiNamespaceCache) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind) (Informer, error) { informers := map[string]Informer{}