From 3281b7ddae12db85a994bae8278fc297a2a4ff8b Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Fri, 10 Nov 2017 17:35:11 +0000 Subject: [PATCH 1/2] Use latest configured rules when rescheduling work Hold on to the parsed rules for each instance instead of having them live entirely in the queue, so that we can use the latest set of rules when we reschedule. This requires that we lock around the `cfgs` map, so the existing lock was re-purposed to do that. --- pkg/ruler/scheduler.go | 38 +++++++++++++++++++++++++------------- 1 file changed, 25 insertions(+), 13 deletions(-) diff --git a/pkg/ruler/scheduler.go b/pkg/ruler/scheduler.go index c43e2b8cf4d..1dc220406bb 100644 --- a/pkg/ruler/scheduler.go +++ b/pkg/ruler/scheduler.go @@ -63,9 +63,9 @@ func (w workItem) Scheduled() time.Time { return w.scheduled } -// Defer returns a copy of this work item, rescheduled to a later time. -func (w workItem) Defer(interval time.Duration) workItem { - return workItem{w.userID, w.rules, w.scheduled.Add(interval)} +// Defer returns a work item with updated rules, rescheduled to a later time. +func (w workItem) Defer(interval time.Duration, currentRules []rules.Rule) workItem { + return workItem{w.userID, currentRules, w.scheduled.Add(interval)} } type scheduler struct { @@ -73,13 +73,11 @@ type scheduler struct { evaluationInterval time.Duration q *SchedulingQueue - // All the configurations that we have. Only used for instrumentation. - cfgs map[string]configs.Config - pollInterval time.Duration + cfgs map[string][]rules.Rule // all rules for all users latestConfig configs.ID - latestMutex sync.RWMutex + sync.RWMutex stop chan struct{} done chan struct{} @@ -92,7 +90,7 @@ func newScheduler(configsAPI configs_client.RulesAPI, evaluationInterval, pollIn evaluationInterval: evaluationInterval, pollInterval: pollInterval, q: NewSchedulingQueue(clockwork.NewRealClock()), - cfgs: map[string]configs.Config{}, + cfgs: map[string][]rules.Rule{}, stop: make(chan struct{}), done: make(chan struct{}), @@ -157,7 +155,9 @@ func (s *scheduler) updateConfigs(now time.Time) error { // poll the configuration server. Not re-entrant. func (s *scheduler) poll() (map[string]configs.View, error) { + s.Lock() configID := s.latestConfig + s.Unlock() var cfgs *configs_client.ConfigsResponse err := instrument.TimeRequestHistogram(context.Background(), "Configs.GetConfigs", configsRequestDuration, func(_ context.Context) error { var err error @@ -168,9 +168,9 @@ func (s *scheduler) poll() (map[string]configs.View, error) { log.Warnf("Scheduler: configs server poll failed: %v", err) return nil, err } - s.latestMutex.Lock() + s.Lock() s.latestConfig = cfgs.GetLatestConfigID() - s.latestMutex.Unlock() + s.Unlock() return cfgs.Configs, nil } @@ -188,11 +188,16 @@ func (s *scheduler) addNewConfigs(now time.Time, cfgs map[string]configs.View) { continue } + s.Lock() + s.cfgs[userID] = rules + s.Unlock() s.addWorkItem(workItem{userID, rules, now}) - s.cfgs[userID] = config.Config } configUpdates.Add(float64(len(cfgs))) - totalConfigs.Set(float64(len(s.cfgs))) + s.Lock() + lenCfgs := len(s.cfgs) + s.Unlock() + totalConfigs.Set(float64(lenCfgs)) } func (s *scheduler) addWorkItem(i workItem) { @@ -221,7 +226,14 @@ func (s *scheduler) nextWorkItem() *workItem { // workItemDone marks the given item as being ready to be rescheduled. func (s *scheduler) workItemDone(i workItem) { - next := i.Defer(s.evaluationInterval) + s.Lock() + currentRules, found := s.cfgs[i.userID] + s.Unlock() + if !found { + log.Debugf("Scheduler: no more work configured for %v", i.userID) + return + } + next := i.Defer(s.evaluationInterval, currentRules) log.Debugf("Scheduler: work item %v rescheduled for %v", i, next.scheduled.Format("2006-01-02 15:04:05")) s.addWorkItem(next) } From 22e590da2589b7b2ab0044059a55b030e8b39a74 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Fri, 10 Nov 2017 17:54:25 +0000 Subject: [PATCH 2/2] Comments --- pkg/ruler/scheduler.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/ruler/scheduler.go b/pkg/ruler/scheduler.go index 1dc220406bb..e4730594e4c 100644 --- a/pkg/ruler/scheduler.go +++ b/pkg/ruler/scheduler.go @@ -70,13 +70,13 @@ func (w workItem) Defer(interval time.Duration, currentRules []rules.Rule) workI type scheduler struct { configsAPI configs_client.RulesAPI - evaluationInterval time.Duration + evaluationInterval time.Duration // how often we re-evaluate each rule set q *SchedulingQueue - pollInterval time.Duration + pollInterval time.Duration // how often we check for new config cfgs map[string][]rules.Rule // all rules for all users - latestConfig configs.ID + latestConfig configs.ID // # of last update received from config sync.RWMutex stop chan struct{}