diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index 71dfbd0454..b42d1e6f32 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -84,6 +84,16 @@ type Informer interface { AddIndexers(indexers toolscache.Indexers) error //HasSynced return true if the informers underlying store has synced HasSynced() bool + + // ModifyEventHandlerCount updates and retrieves the number of references for an informer. + // Passing a positive delta increments the EventHandler referece count a negative delta + // decrements it (by delta) and a delta of 0 is just a getter. + + // This is just a hacky way to test out ref counting without having to pipe a bunch of + // ref counting functions down the stack. + + // Alternatly, we could (and probably should) have separate inc, dec, and get functions + ModifyEventHandlerCount(delta int) int } // Options are the optional arguments for creating a new InformersMap object diff --git a/pkg/cache/internal/informers_map.go b/pkg/cache/internal/informers_map.go index bc450d8781..323b7d04a6 100644 --- a/pkg/cache/internal/informers_map.go +++ b/pkg/cache/internal/informers_map.go @@ -63,10 +63,26 @@ func newSpecificInformersMap(config *rest.Config, return ip } +//TODO: comment +// we can get rid of this if apimachinery adds the ability to retrieve this from the SharedIndexInformer +// but until then, we have to track it ourselves +type HandlerCountingInformer struct { + // Informer is the cached informer + cache.SharedIndexInformer + + // count indicates the number of EventHandlers registered on the informer + count int +} + +func (i *HandlerCountingInformer) ModifyEventHandlerCount(delta int) int { + i.count += delta + return i.count +} + // MapEntry contains the cached data for an Informer type MapEntry struct { - // Informer is the cached informer - Informer cache.SharedIndexInformer + //Informer is the HandlerCountingInformer + Informer *HandlerCountingInformer // CacheReader wraps Informer and implements the CacheReader interface for a single type Reader CacheReader @@ -212,7 +228,7 @@ func (ip *specificInformersMap) addInformerToMap(gvk schema.GroupVersionKind, ob cache.NamespaceIndex: cache.MetaNamespaceIndexFunc, }) i := &MapEntry{ - Informer: ni, + Informer: &HandlerCountingInformer{ni, 0}, Reader: CacheReader{indexer: ni.GetIndexer(), groupVersionKind: gvk}, } ip.informersByGVK[gvk] = i diff --git a/pkg/cache/multi_namespace_cache.go b/pkg/cache/multi_namespace_cache.go index f0e18c09b0..5b88e4de40 100644 --- a/pkg/cache/multi_namespace_cache.go +++ b/pkg/cache/multi_namespace_cache.go @@ -186,6 +186,15 @@ type multiNamespaceInformer struct { var _ Informer = &multiNamespaceInformer{} +// ModifyEventHandlerCount TODO: comment +func (i *multiNamespaceInformer) ModifyEventHandlerCount(delta int) int { + total := 0 + for _, informer := range i.namespaceToInformer { + total += informer.ModifyEventHandlerCount(delta) + } + return total +} + // AddEventHandler adds the handler to each namespaced informer func (i *multiNamespaceInformer) AddEventHandler(handler toolscache.ResourceEventHandler) { for _, informer := range i.namespaceToInformer { diff --git a/pkg/internal/controller/controller.go b/pkg/internal/controller/controller.go index 83866606c2..6e77623739 100644 --- a/pkg/internal/controller/controller.go +++ b/pkg/internal/controller/controller.go @@ -155,9 +155,16 @@ func (c *Controller) Start(ctx context.Context) error { // caches to sync so that they have a chance to register their intendeded // caches. for _, watch := range c.startWatches { - c.Log.Info("Starting EventSource", "source", watch.src) - if err := watch.src.Start(ctx, watch.handler, c.Queue, watch.predicates...); err != nil { - return err + stoppableSource, ok := watch.src.(source.StoppableSource) + if ok { + // TODO: apply daniel's suggested pattern + go stoppableSource.StartStoppable(ctx, watch.handler, c.Queue, watch.predicates...) + c.Log.Info("Starting STOPPABLE EventSource", "source", watch.src) + } else { + c.Log.Info("Starting EventSource", "source", watch.src) + if err := watch.src.Start(ctx, watch.handler, c.Queue, watch.predicates...); err != nil { + return err + } } } diff --git a/pkg/source/source.go b/pkg/source/source.go index fe0e47150a..0bb6e954c6 100644 --- a/pkg/source/source.go +++ b/pkg/source/source.go @@ -64,6 +64,11 @@ type SyncingSource interface { WaitForSync(ctx context.Context) error } +type StoppableSource interface { + Source + StartStoppable(context.Context, handler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) error +} + // NewKindWithCache creates a Source without InjectCache, so that it is assured that the given cache is used // and not overwritten. It can be used to watch objects in a different cluster by passing the cache // from that other cluster @@ -120,6 +125,27 @@ func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue w return err } i.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct}) + //if handlerCountingInformer, ok := i.(*cache.HandlerCountingInformer); ok { + //handlerCountingInformer.Count += 1 + newCount := i.ModifyEventHandlerCount(1) + fmt.Printf("increment, newCount is%+v\n", newCount) + //} + return nil +} + +func (ks *Kind) StartStoppable(ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface, + prct ...predicate.Predicate) error { + i, err := ks.cache.GetInformer(ctx, ks.Type) + if err != nil { + return err + } + if err := ks.Start(ctx, handler, queue, prct...); err != nil { + return err + } + <-ctx.Done() + newCount := i.ModifyEventHandlerCount(-1) + fmt.Printf("decrement, newCount is%+v\n", newCount) + // TODO: do we need to actually stop something with the context? return nil } @@ -143,7 +169,7 @@ func (ks *Kind) WaitForSync(ctx context.Context) error { var _ inject.Cache = &Kind{} // InjectCache is internal should be called only by the Controller. InjectCache is used to inject -// the Cache dependency initialized by the ControllerManager. +// the cache dependency initialized by the ControllerManager. func (ks *Kind) InjectCache(c cache.Cache) error { if ks.cache == nil { ks.cache = c