From bdd78e2d492e8b5077cf3492be3194fdee0fa49b Mon Sep 17 00:00:00 2001 From: Kevin Delgado Date: Fri, 9 Oct 2020 21:29:04 +0000 Subject: [PATCH 1/3] Initial Ref counting --- pkg/cache/cache.go | 10 ++++++++++ pkg/cache/informer_cache.go | 9 +++++++++ pkg/cache/internal/deleg_map.go | 13 +++++++++++++ pkg/cache/internal/informers_map.go | 19 +++++++++++++++++++ pkg/cache/multi_namespace_cache.go | 5 +++++ pkg/internal/controller/controller.go | 18 ++++++++++++++++++ pkg/source/source.go | 18 ++++++++++-------- 7 files changed, 84 insertions(+), 8 deletions(-) diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index 71dfbd0454..3dc6bb8357 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -67,6 +67,16 @@ type Informers interface { // Informers knows how to add indices to the caches (informers) that it manages. client.FieldIndexer + + // ModifyEventHandlerCount updateds and retrieves the number of references for an informer. + // Passing a positive delta increments the EventHandler referece count a negative delta + // decrements it (by delta) and a delta of 0 is just a getter. + + // This is just a hacky way to test out ref counting without having to pipe a bunch of + // ref counting functions down the stack. + + // Alternatly, we could have sepearte inc, dec, get functions + ModifyEventHandlerCount(obj runtime.Object, delta int) int } // Informer - informer allows you interact with the underlying informer diff --git a/pkg/cache/informer_cache.go b/pkg/cache/informer_cache.go index 8ec3b921d9..847e8adbe6 100644 --- a/pkg/cache/informer_cache.go +++ b/pkg/cache/informer_cache.go @@ -159,6 +159,15 @@ func (ip *informerCache) GetInformer(ctx context.Context, obj client.Object) (In return i.Informer, err } +func (ip *informerCache) ModifyEventHandlerCount(obj runtime.Object, delta int) int { + gvk, err := apiutil.GVKForObject(obj, ip.Scheme) + if err != nil { + return 0 + } + + return ip.InformersMap.ModifyEventHandlerCount(gvk, obj, delta) +} + // NeedLeaderElection implements the LeaderElectionRunnable interface // to indicate that this can be started without requiring the leader lock func (ip *informerCache) NeedLeaderElection() bool { diff --git a/pkg/cache/internal/deleg_map.go b/pkg/cache/internal/deleg_map.go index a7ee643482..f048ec6ed1 100644 --- a/pkg/cache/internal/deleg_map.go +++ b/pkg/cache/internal/deleg_map.go @@ -92,6 +92,19 @@ func (m *InformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj return m.structured.Get(ctx, gvk, obj) } +func (m *InformersMap) ModifyEventHandlerCount(gvk schema.GroupVersionKind, obj runtime.Object, delta int) int { + _, isUnstructured := obj.(*unstructured.Unstructured) + _, isUnstructuredList := obj.(*unstructured.UnstructuredList) + isUnstructured = isUnstructured || isUnstructuredList + + switch { + case isUnstructured: + return m.unstructured.ModifyEventHandlerCount(gvk, delta) + default: + return m.structured.ModifyEventHandlerCount(gvk, delta) + } +} + // newStructuredInformersMap creates a new InformersMap for structured objects. func newStructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration, namespace string) *specificInformersMap { return newSpecificInformersMap(config, scheme, mapper, resync, namespace, createStructuredListWatch) diff --git a/pkg/cache/internal/informers_map.go b/pkg/cache/internal/informers_map.go index bc450d8781..67102e5e45 100644 --- a/pkg/cache/internal/informers_map.go +++ b/pkg/cache/internal/informers_map.go @@ -70,6 +70,10 @@ type MapEntry struct { // CacheReader wraps Informer and implements the CacheReader interface for a single type Reader CacheReader + + // we can get rid of this if apimachinery adds the ability to retrieve this from the SharedIndexInformer + // but until then, we have to track it ourselves + refCount int } // specificInformersMap create and caches Informers for (runtime.Object, schema.GroupVersionKind) pairs. @@ -121,6 +125,11 @@ type specificInformersMap struct { namespace string } +func (e *MapEntry) ModifyEventHandlerCount(delta int) int { + e.refCount += delta + return e.refCount +} + // Start calls Run on each of the informers and sets started to true. Blocks on the context. // It doesn't return start because it can't return an error, and it's not a runnable directly. func (ip *specificInformersMap) Start(ctx context.Context) { @@ -191,6 +200,16 @@ func (ip *specificInformersMap) Get(ctx context.Context, gvk schema.GroupVersion return started, i, nil } +func (ip *specificInformersMap) ModifyEventHandlerCount(gvk schema.GroupVersionKind, delta int) int { + entry, ok := ip.informersByGVK[gvk] + if !ok { + return 0 + } + + return entry.ModifyEventHandlerCount(delta) + +} + func (ip *specificInformersMap) addInformerToMap(gvk schema.GroupVersionKind, obj runtime.Object) (*MapEntry, bool, error) { ip.mu.Lock() defer ip.mu.Unlock() diff --git a/pkg/cache/multi_namespace_cache.go b/pkg/cache/multi_namespace_cache.go index f0e18c09b0..ff30560062 100644 --- a/pkg/cache/multi_namespace_cache.go +++ b/pkg/cache/multi_namespace_cache.go @@ -94,6 +94,11 @@ func (c *multiNamespaceCache) GetInformerForKind(ctx context.Context, gvk schema return &multiNamespaceInformer{namespaceToInformer: informers}, nil } +// TODO: what do we need to do for multi-namspace cache wrt to MEHC()? +func (c *multiNamespaceCache) ModifyEventHandlerCount(obj runtime.Object, delta int) int { + return 0 +} + func (c *multiNamespaceCache) Start(ctx context.Context) error { for ns, cache := range c.namespaceToCache { go func(ns string, cache Cache) { diff --git a/pkg/internal/controller/controller.go b/pkg/internal/controller/controller.go index 83866606c2..d114bf0605 100644 --- a/pkg/internal/controller/controller.go +++ b/pkg/internal/controller/controller.go @@ -24,9 +24,11 @@ import ( "time" "github.com/go-logr/logr" + "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/util/workqueue" + "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/handler" ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/internal/controller/metrics" logf "sigs.k8s.io/controller-runtime/pkg/log" @@ -145,6 +147,9 @@ func (c *Controller) Start(ctx context.Context) error { c.Queue = c.MakeQueue() defer c.Queue.ShutDown() // needs to be outside the iife so that we shutdown after the stop channel is closed + refCounts := make(map[runtime.Object]int) + var cache cache.Cache + err := func() error { defer c.mu.Unlock() @@ -155,10 +160,18 @@ func (c *Controller) Start(ctx context.Context) error { // caches to sync so that they have a chance to register their intendeded // caches. for _, watch := range c.startWatches { + kindSource, ok := watch.src.(*source.Kind) + if !ok { + fmt.Println("source NOT kind?") + } + // TODO: how to not duplicate this call? + cache = kindSource.Cache c.Log.Info("Starting EventSource", "source", watch.src) if err := watch.src.Start(ctx, watch.handler, c.Queue, watch.predicates...); err != nil { return err } + refCounts[kindSource.Type] = refCounts[kindSource.Type] + 1 + fmt.Printf("bumping refcounts for kindSource.Type = %+v, to refCounts[kindSource.Type] = %+v\n", kindSource.Type, refCounts[kindSource.Type]) } // Start the SharedIndexInformer factories to begin populating the SharedIndexInformer caches @@ -208,6 +221,11 @@ func (c *Controller) Start(ctx context.Context) error { } <-ctx.Done() + // decrement counts + for obj, count := range refCounts { + fmt.Printf("Decrementing refcounts for obj = %+v, by count = %+v\n", obj, count) + cache.ModifyEventHandlerCount(obj, -count) + } c.Log.Info("Stopping workers") return nil } diff --git a/pkg/source/source.go b/pkg/source/source.go index fe0e47150a..4aadd34646 100644 --- a/pkg/source/source.go +++ b/pkg/source/source.go @@ -68,7 +68,7 @@ type SyncingSource interface { // and not overwritten. It can be used to watch objects in a different cluster by passing the cache // from that other cluster func NewKindWithCache(object client.Object, cache cache.Cache) SyncingSource { - return &kindWithCache{kind: Kind{Type: object, cache: cache}} + return &kindWithCache{kind: Kind{Type: object, Cache: cache}} } type kindWithCache struct { @@ -89,8 +89,8 @@ type Kind struct { // Type is the type of object to watch. e.g. &v1.Pod{} Type client.Object - // cache used to watch APIs - cache cache.Cache + // Cache used to watch APIs + Cache cache.Cache } var _ SyncingSource = &Kind{} @@ -106,12 +106,12 @@ func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue w } // cache should have been injected before Start was called - if ks.cache == nil { + if ks.Cache == nil { return fmt.Errorf("must call CacheInto on Kind before calling Start") } // Lookup the Informer from the Cache and add an EventHandler which populates the Queue - i, err := ks.cache.GetInformer(ctx, ks.Type) + i, err := ks.Cache.GetInformer(ctx, ks.Type) if err != nil { if kindMatchErr, ok := err.(*meta.NoKindMatchError); ok { log.Error(err, "if kind is a CRD, it should be installed before calling Start", @@ -120,6 +120,8 @@ func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue w return err } i.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct}) + fmt.Printf("bump refcount for ks.Type = %+v\n", ks.Type) + ks.Cache.ModifyEventHandlerCount(ks.Type, 1) return nil } @@ -133,7 +135,7 @@ func (ks *Kind) String() string { // WaitForSync implements SyncingSource to allow controllers to wait with starting // workers until the cache is synced. func (ks *Kind) WaitForSync(ctx context.Context) error { - if !ks.cache.WaitForCacheSync(ctx) { + if !ks.Cache.WaitForCacheSync(ctx) { // Would be great to return something more informative here return errors.New("cache did not sync") } @@ -145,8 +147,8 @@ var _ inject.Cache = &Kind{} // InjectCache is internal should be called only by the Controller. InjectCache is used to inject // the Cache dependency initialized by the ControllerManager. func (ks *Kind) InjectCache(c cache.Cache) error { - if ks.cache == nil { - ks.cache = c + if ks.Cache == nil { + ks.Cache = c } return nil } From 4d1ce3af180097bca3f395ab6e100f8f57dd62d7 Mon Sep 17 00:00:00 2001 From: Kevin Delgado Date: Fri, 16 Oct 2020 01:32:25 +0000 Subject: [PATCH 2/3] Introduce SyncingSource and store ref count on the informer --- pkg/cache/cache.go | 20 ++++----- pkg/cache/informer_cache.go | 16 ++++---- pkg/cache/internal/deleg_map.go | 24 +++++------ pkg/cache/internal/informers_map.go | 49 ++++++++++++---------- pkg/cache/multi_namespace_cache.go | 15 +++++-- pkg/internal/controller/controller.go | 51 ++++++++++++++--------- pkg/source/source.go | 59 +++++++++++++++++++++------ 7 files changed, 148 insertions(+), 86 deletions(-) diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index 3dc6bb8357..b42d1e6f32 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -67,16 +67,6 @@ type Informers interface { // Informers knows how to add indices to the caches (informers) that it manages. client.FieldIndexer - - // ModifyEventHandlerCount updateds and retrieves the number of references for an informer. - // Passing a positive delta increments the EventHandler referece count a negative delta - // decrements it (by delta) and a delta of 0 is just a getter. - - // This is just a hacky way to test out ref counting without having to pipe a bunch of - // ref counting functions down the stack. - - // Alternatly, we could have sepearte inc, dec, get functions - ModifyEventHandlerCount(obj runtime.Object, delta int) int } // Informer - informer allows you interact with the underlying informer @@ -94,6 +84,16 @@ type Informer interface { AddIndexers(indexers toolscache.Indexers) error //HasSynced return true if the informers underlying store has synced HasSynced() bool + + // ModifyEventHandlerCount updates and retrieves the number of references for an informer. + // Passing a positive delta increments the EventHandler referece count a negative delta + // decrements it (by delta) and a delta of 0 is just a getter. + + // This is just a hacky way to test out ref counting without having to pipe a bunch of + // ref counting functions down the stack. + + // Alternatly, we could (and probably should) have separate inc, dec, and get functions + ModifyEventHandlerCount(delta int) int } // Options are the optional arguments for creating a new InformersMap object diff --git a/pkg/cache/informer_cache.go b/pkg/cache/informer_cache.go index 847e8adbe6..86d8385d06 100644 --- a/pkg/cache/informer_cache.go +++ b/pkg/cache/informer_cache.go @@ -159,14 +159,14 @@ func (ip *informerCache) GetInformer(ctx context.Context, obj client.Object) (In return i.Informer, err } -func (ip *informerCache) ModifyEventHandlerCount(obj runtime.Object, delta int) int { - gvk, err := apiutil.GVKForObject(obj, ip.Scheme) - if err != nil { - return 0 - } - - return ip.InformersMap.ModifyEventHandlerCount(gvk, obj, delta) -} +//func (ip *informerCache) ModifyEventHandlerCount(obj runtime.Object, delta int) int { +// gvk, err := apiutil.GVKForObject(obj, ip.Scheme) +// if err != nil { +// return 0 +// } +// +// return ip.InformersMap.ModifyEventHandlerCount(gvk, obj, delta) +//} // NeedLeaderElection implements the LeaderElectionRunnable interface // to indicate that this can be started without requiring the leader lock diff --git a/pkg/cache/internal/deleg_map.go b/pkg/cache/internal/deleg_map.go index f048ec6ed1..fe01019907 100644 --- a/pkg/cache/internal/deleg_map.go +++ b/pkg/cache/internal/deleg_map.go @@ -92,18 +92,18 @@ func (m *InformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj return m.structured.Get(ctx, gvk, obj) } -func (m *InformersMap) ModifyEventHandlerCount(gvk schema.GroupVersionKind, obj runtime.Object, delta int) int { - _, isUnstructured := obj.(*unstructured.Unstructured) - _, isUnstructuredList := obj.(*unstructured.UnstructuredList) - isUnstructured = isUnstructured || isUnstructuredList - - switch { - case isUnstructured: - return m.unstructured.ModifyEventHandlerCount(gvk, delta) - default: - return m.structured.ModifyEventHandlerCount(gvk, delta) - } -} +//func (m *InformersMap) ModifyEventHandlerCount(gvk schema.GroupVersionKind, obj runtime.Object, delta int) int { +// _, isUnstructured := obj.(*unstructured.Unstructured) +// _, isUnstructuredList := obj.(*unstructured.UnstructuredList) +// isUnstructured = isUnstructured || isUnstructuredList +// +// switch { +// case isUnstructured: +// return m.unstructured.ModifyEventHandlerCount(gvk, delta) +// default: +// return m.structured.ModifyEventHandlerCount(gvk, delta) +// } +//} // newStructuredInformersMap creates a new InformersMap for structured objects. func newStructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration, namespace string) *specificInformersMap { diff --git a/pkg/cache/internal/informers_map.go b/pkg/cache/internal/informers_map.go index 67102e5e45..8305771ed0 100644 --- a/pkg/cache/internal/informers_map.go +++ b/pkg/cache/internal/informers_map.go @@ -63,17 +63,29 @@ func newSpecificInformersMap(config *rest.Config, return ip } +//TODO: comment +// we can get rid of this if apimachinery adds the ability to retrieve this from the SharedIndexInformer +// but until then, we have to track it ourselves +type HandlerCountingInformer struct { + // Informer is the cached informer + cache.SharedIndexInformer + + // count indicates the number of EventHandlers registered on the informer + count int +} + +func (i *HandlerCountingInformer) ModifyEventHandlerCount(delta int) int { + i.count += delta + return i.count +} + // MapEntry contains the cached data for an Informer type MapEntry struct { - // Informer is the cached informer - Informer cache.SharedIndexInformer + //Informer is the HandlerCountingInformer + Informer *HandlerCountingInformer // CacheReader wraps Informer and implements the CacheReader interface for a single type Reader CacheReader - - // we can get rid of this if apimachinery adds the ability to retrieve this from the SharedIndexInformer - // but until then, we have to track it ourselves - refCount int } // specificInformersMap create and caches Informers for (runtime.Object, schema.GroupVersionKind) pairs. @@ -125,11 +137,6 @@ type specificInformersMap struct { namespace string } -func (e *MapEntry) ModifyEventHandlerCount(delta int) int { - e.refCount += delta - return e.refCount -} - // Start calls Run on each of the informers and sets started to true. Blocks on the context. // It doesn't return start because it can't return an error, and it's not a runnable directly. func (ip *specificInformersMap) Start(ctx context.Context) { @@ -200,15 +207,15 @@ func (ip *specificInformersMap) Get(ctx context.Context, gvk schema.GroupVersion return started, i, nil } -func (ip *specificInformersMap) ModifyEventHandlerCount(gvk schema.GroupVersionKind, delta int) int { - entry, ok := ip.informersByGVK[gvk] - if !ok { - return 0 - } - - return entry.ModifyEventHandlerCount(delta) - -} +//func (ip *specificInformersMap) ModifyEventHandlerCount(gvk schema.GroupVersionKind, delta int) int { +// entry, ok := ip.informersByGVK[gvk] +// if !ok { +// return 0 +// } +// +// return entry.ModifyEventHandlerCount(delta) +// +//} func (ip *specificInformersMap) addInformerToMap(gvk schema.GroupVersionKind, obj runtime.Object) (*MapEntry, bool, error) { ip.mu.Lock() @@ -231,7 +238,7 @@ func (ip *specificInformersMap) addInformerToMap(gvk schema.GroupVersionKind, ob cache.NamespaceIndex: cache.MetaNamespaceIndexFunc, }) i := &MapEntry{ - Informer: ni, + Informer: &HandlerCountingInformer{ni, 0}, Reader: CacheReader{indexer: ni.GetIndexer(), groupVersionKind: gvk}, } ip.informersByGVK[gvk] = i diff --git a/pkg/cache/multi_namespace_cache.go b/pkg/cache/multi_namespace_cache.go index ff30560062..26a4f84865 100644 --- a/pkg/cache/multi_namespace_cache.go +++ b/pkg/cache/multi_namespace_cache.go @@ -95,9 +95,9 @@ func (c *multiNamespaceCache) GetInformerForKind(ctx context.Context, gvk schema } // TODO: what do we need to do for multi-namspace cache wrt to MEHC()? -func (c *multiNamespaceCache) ModifyEventHandlerCount(obj runtime.Object, delta int) int { - return 0 -} +//func (c *multiNamespaceCache) ModifyEventHandlerCount(obj runtime.Object, delta int) int { +// return 0 +//} func (c *multiNamespaceCache) Start(ctx context.Context) error { for ns, cache := range c.namespaceToCache { @@ -191,6 +191,15 @@ type multiNamespaceInformer struct { var _ Informer = &multiNamespaceInformer{} +// ModifyEventHandlerCount TODO: comment +func (i *multiNamespaceInformer) ModifyEventHandlerCount(delta int) int { + total := 0 + for _, informer := range i.namespaceToInformer { + total += informer.ModifyEventHandlerCount(delta) + } + return total +} + // AddEventHandler adds the handler to each namespaced informer func (i *multiNamespaceInformer) AddEventHandler(handler toolscache.ResourceEventHandler) { for _, informer := range i.namespaceToInformer { diff --git a/pkg/internal/controller/controller.go b/pkg/internal/controller/controller.go index d114bf0605..681ec1d540 100644 --- a/pkg/internal/controller/controller.go +++ b/pkg/internal/controller/controller.go @@ -24,11 +24,9 @@ import ( "time" "github.com/go-logr/logr" - "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/util/workqueue" - "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/handler" ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/internal/controller/metrics" logf "sigs.k8s.io/controller-runtime/pkg/log" @@ -147,9 +145,6 @@ func (c *Controller) Start(ctx context.Context) error { c.Queue = c.MakeQueue() defer c.Queue.ShutDown() // needs to be outside the iife so that we shutdown after the stop channel is closed - refCounts := make(map[runtime.Object]int) - var cache cache.Cache - err := func() error { defer c.mu.Unlock() @@ -160,18 +155,28 @@ func (c *Controller) Start(ctx context.Context) error { // caches to sync so that they have a chance to register their intendeded // caches. for _, watch := range c.startWatches { - kindSource, ok := watch.src.(*source.Kind) - if !ok { - fmt.Println("source NOT kind?") - } - // TODO: how to not duplicate this call? - cache = kindSource.Cache - c.Log.Info("Starting EventSource", "source", watch.src) - if err := watch.src.Start(ctx, watch.handler, c.Queue, watch.predicates...); err != nil { - return err + //kindSource, ok := watch.src.(*source.Kind) + //if !ok { + // fmt.Println("source NOT kind?") + //} + //// TODO: how to not duplicate this call? + //cache = kindSource.Cache + + //if watch is stoppable { + // run in goro with ctx + //} + + stoppableSource, ok := watch.src.(source.StoppableSource) + if ok { + // TODO: apply daniel's suggested pattern + go stoppableSource.StartStoppable(ctx, watch.handler, c.Queue, watch.predicates...) + c.Log.Info("Starting STOPPABLE EventSource", "source", watch.src) + } else { + c.Log.Info("Starting EventSource", "source", watch.src) + if err := watch.src.Start(ctx, watch.handler, c.Queue, watch.predicates...); err != nil { + return err + } } - refCounts[kindSource.Type] = refCounts[kindSource.Type] + 1 - fmt.Printf("bumping refcounts for kindSource.Type = %+v, to refCounts[kindSource.Type] = %+v\n", kindSource.Type, refCounts[kindSource.Type]) } // Start the SharedIndexInformer factories to begin populating the SharedIndexInformer caches @@ -222,10 +227,16 @@ func (c *Controller) Start(ctx context.Context) error { <-ctx.Done() // decrement counts - for obj, count := range refCounts { - fmt.Printf("Decrementing refcounts for obj = %+v, by count = %+v\n", obj, count) - cache.ModifyEventHandlerCount(obj, -count) - } + //c.Log.Info("decrementing") + //for _, watch := range c.startWatches { + // stoppableSource, ok := watch.src.(source.StoppableSource) + // //stoppableSource, ok := watch.src.(*source.Kind) + // if ok { + // continue + // fmt.Println("stopping source") + // stoppableSource.Stop(ctx) + // } + //} c.Log.Info("Stopping workers") return nil } diff --git a/pkg/source/source.go b/pkg/source/source.go index 4aadd34646..8400447582 100644 --- a/pkg/source/source.go +++ b/pkg/source/source.go @@ -64,11 +64,16 @@ type SyncingSource interface { WaitForSync(ctx context.Context) error } +type StoppableSource interface { + Source + StartStoppable(context.Context, handler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) error +} + // NewKindWithCache creates a Source without InjectCache, so that it is assured that the given cache is used // and not overwritten. It can be used to watch objects in a different cluster by passing the cache // from that other cluster func NewKindWithCache(object client.Object, cache cache.Cache) SyncingSource { - return &kindWithCache{kind: Kind{Type: object, Cache: cache}} + return &kindWithCache{kind: Kind{Type: object, cache: cache}} } type kindWithCache struct { @@ -89,8 +94,8 @@ type Kind struct { // Type is the type of object to watch. e.g. &v1.Pod{} Type client.Object - // Cache used to watch APIs - Cache cache.Cache + // cache used to watch APIs + cache cache.Cache } var _ SyncingSource = &Kind{} @@ -106,12 +111,12 @@ func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue w } // cache should have been injected before Start was called - if ks.Cache == nil { + if ks.cache == nil { return fmt.Errorf("must call CacheInto on Kind before calling Start") } - // Lookup the Informer from the Cache and add an EventHandler which populates the Queue - i, err := ks.Cache.GetInformer(ctx, ks.Type) + // Lookup the Informer from the cache and add an EventHandler which populates the Queue + i, err := ks.cache.GetInformer(ctx, ks.Type) if err != nil { if kindMatchErr, ok := err.(*meta.NoKindMatchError); ok { log.Error(err, "if kind is a CRD, it should be installed before calling Start", @@ -120,8 +125,38 @@ func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue w return err } i.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct}) - fmt.Printf("bump refcount for ks.Type = %+v\n", ks.Type) - ks.Cache.ModifyEventHandlerCount(ks.Type, 1) + //if handlerCountingInformer, ok := i.(*cache.HandlerCountingInformer); ok { + //handlerCountingInformer.Count += 1 + newCount := i.ModifyEventHandlerCount(1) + fmt.Printf("increment, newCount is%+v\n", newCount) + //} + return nil +} + +func (ks *Kind) StartStoppable(ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface, + prct ...predicate.Predicate) error { + i, err := ks.cache.GetInformer(ctx, ks.Type) + if err != nil { + return err + } + if err := ks.Start(ctx, handler, queue, prct...); err != nil { + return err + } + <-ctx.Done() + newCount := i.ModifyEventHandlerCount(-1) + fmt.Printf("decrement, newCount is%+v\n", newCount) + // TODO: do we need to actually stop something with the context? + return nil +} + +func (ks *Kind) Stop(ctx context.Context) error { + i, err := ks.cache.GetInformer(ctx, ks.Type) + if err != nil { + return err + } + newCount := i.ModifyEventHandlerCount(-1) + fmt.Printf("decrement, newCount is%+v\n", newCount) + // TODO: do we need to actually stop something with the context? return nil } @@ -135,7 +170,7 @@ func (ks *Kind) String() string { // WaitForSync implements SyncingSource to allow controllers to wait with starting // workers until the cache is synced. func (ks *Kind) WaitForSync(ctx context.Context) error { - if !ks.Cache.WaitForCacheSync(ctx) { + if !ks.cache.WaitForCacheSync(ctx) { // Would be great to return something more informative here return errors.New("cache did not sync") } @@ -145,10 +180,10 @@ func (ks *Kind) WaitForSync(ctx context.Context) error { var _ inject.Cache = &Kind{} // InjectCache is internal should be called only by the Controller. InjectCache is used to inject -// the Cache dependency initialized by the ControllerManager. +// the cache dependency initialized by the ControllerManager. func (ks *Kind) InjectCache(c cache.Cache) error { - if ks.Cache == nil { - ks.Cache = c + if ks.cache == nil { + ks.cache = c } return nil } From a44710669e8bced561d27b418faeae17174a82cc Mon Sep 17 00:00:00 2001 From: Kevin Delgado Date: Fri, 16 Oct 2020 01:42:14 +0000 Subject: [PATCH 3/3] Remove commented out code --- pkg/cache/informer_cache.go | 9 --------- pkg/cache/internal/deleg_map.go | 13 ------------- pkg/cache/internal/informers_map.go | 10 ---------- pkg/cache/multi_namespace_cache.go | 5 ----- pkg/internal/controller/controller.go | 22 ---------------------- pkg/source/source.go | 13 +------------ 6 files changed, 1 insertion(+), 71 deletions(-) diff --git a/pkg/cache/informer_cache.go b/pkg/cache/informer_cache.go index 86d8385d06..8ec3b921d9 100644 --- a/pkg/cache/informer_cache.go +++ b/pkg/cache/informer_cache.go @@ -159,15 +159,6 @@ func (ip *informerCache) GetInformer(ctx context.Context, obj client.Object) (In return i.Informer, err } -//func (ip *informerCache) ModifyEventHandlerCount(obj runtime.Object, delta int) int { -// gvk, err := apiutil.GVKForObject(obj, ip.Scheme) -// if err != nil { -// return 0 -// } -// -// return ip.InformersMap.ModifyEventHandlerCount(gvk, obj, delta) -//} - // NeedLeaderElection implements the LeaderElectionRunnable interface // to indicate that this can be started without requiring the leader lock func (ip *informerCache) NeedLeaderElection() bool { diff --git a/pkg/cache/internal/deleg_map.go b/pkg/cache/internal/deleg_map.go index fe01019907..a7ee643482 100644 --- a/pkg/cache/internal/deleg_map.go +++ b/pkg/cache/internal/deleg_map.go @@ -92,19 +92,6 @@ func (m *InformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj return m.structured.Get(ctx, gvk, obj) } -//func (m *InformersMap) ModifyEventHandlerCount(gvk schema.GroupVersionKind, obj runtime.Object, delta int) int { -// _, isUnstructured := obj.(*unstructured.Unstructured) -// _, isUnstructuredList := obj.(*unstructured.UnstructuredList) -// isUnstructured = isUnstructured || isUnstructuredList -// -// switch { -// case isUnstructured: -// return m.unstructured.ModifyEventHandlerCount(gvk, delta) -// default: -// return m.structured.ModifyEventHandlerCount(gvk, delta) -// } -//} - // newStructuredInformersMap creates a new InformersMap for structured objects. func newStructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration, namespace string) *specificInformersMap { return newSpecificInformersMap(config, scheme, mapper, resync, namespace, createStructuredListWatch) diff --git a/pkg/cache/internal/informers_map.go b/pkg/cache/internal/informers_map.go index 8305771ed0..323b7d04a6 100644 --- a/pkg/cache/internal/informers_map.go +++ b/pkg/cache/internal/informers_map.go @@ -207,16 +207,6 @@ func (ip *specificInformersMap) Get(ctx context.Context, gvk schema.GroupVersion return started, i, nil } -//func (ip *specificInformersMap) ModifyEventHandlerCount(gvk schema.GroupVersionKind, delta int) int { -// entry, ok := ip.informersByGVK[gvk] -// if !ok { -// return 0 -// } -// -// return entry.ModifyEventHandlerCount(delta) -// -//} - func (ip *specificInformersMap) addInformerToMap(gvk schema.GroupVersionKind, obj runtime.Object) (*MapEntry, bool, error) { ip.mu.Lock() defer ip.mu.Unlock() diff --git a/pkg/cache/multi_namespace_cache.go b/pkg/cache/multi_namespace_cache.go index 26a4f84865..5b88e4de40 100644 --- a/pkg/cache/multi_namespace_cache.go +++ b/pkg/cache/multi_namespace_cache.go @@ -94,11 +94,6 @@ func (c *multiNamespaceCache) GetInformerForKind(ctx context.Context, gvk schema return &multiNamespaceInformer{namespaceToInformer: informers}, nil } -// TODO: what do we need to do for multi-namspace cache wrt to MEHC()? -//func (c *multiNamespaceCache) ModifyEventHandlerCount(obj runtime.Object, delta int) int { -// return 0 -//} - func (c *multiNamespaceCache) Start(ctx context.Context) error { for ns, cache := range c.namespaceToCache { go func(ns string, cache Cache) { diff --git a/pkg/internal/controller/controller.go b/pkg/internal/controller/controller.go index 681ec1d540..6e77623739 100644 --- a/pkg/internal/controller/controller.go +++ b/pkg/internal/controller/controller.go @@ -155,17 +155,6 @@ func (c *Controller) Start(ctx context.Context) error { // caches to sync so that they have a chance to register their intendeded // caches. for _, watch := range c.startWatches { - //kindSource, ok := watch.src.(*source.Kind) - //if !ok { - // fmt.Println("source NOT kind?") - //} - //// TODO: how to not duplicate this call? - //cache = kindSource.Cache - - //if watch is stoppable { - // run in goro with ctx - //} - stoppableSource, ok := watch.src.(source.StoppableSource) if ok { // TODO: apply daniel's suggested pattern @@ -226,17 +215,6 @@ func (c *Controller) Start(ctx context.Context) error { } <-ctx.Done() - // decrement counts - //c.Log.Info("decrementing") - //for _, watch := range c.startWatches { - // stoppableSource, ok := watch.src.(source.StoppableSource) - // //stoppableSource, ok := watch.src.(*source.Kind) - // if ok { - // continue - // fmt.Println("stopping source") - // stoppableSource.Stop(ctx) - // } - //} c.Log.Info("Stopping workers") return nil } diff --git a/pkg/source/source.go b/pkg/source/source.go index 8400447582..0bb6e954c6 100644 --- a/pkg/source/source.go +++ b/pkg/source/source.go @@ -115,7 +115,7 @@ func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue w return fmt.Errorf("must call CacheInto on Kind before calling Start") } - // Lookup the Informer from the cache and add an EventHandler which populates the Queue + // Lookup the Informer from the Cache and add an EventHandler which populates the Queue i, err := ks.cache.GetInformer(ctx, ks.Type) if err != nil { if kindMatchErr, ok := err.(*meta.NoKindMatchError); ok { @@ -149,17 +149,6 @@ func (ks *Kind) StartStoppable(ctx context.Context, handler handler.EventHandler return nil } -func (ks *Kind) Stop(ctx context.Context) error { - i, err := ks.cache.GetInformer(ctx, ks.Type) - if err != nil { - return err - } - newCount := i.ModifyEventHandlerCount(-1) - fmt.Printf("decrement, newCount is%+v\n", newCount) - // TODO: do we need to actually stop something with the context? - return nil -} - func (ks *Kind) String() string { if ks.Type != nil && ks.Type.GetObjectKind() != nil { return fmt.Sprintf("kind source: %v", ks.Type.GetObjectKind().GroupVersionKind().String())