Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 28 additions & 16 deletions pkg/ruler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,23 +63,21 @@ func (w workItem) Scheduled() time.Time {
return w.scheduled
}

// Defer returns a copy of this work item, rescheduled to a later time.
func (w workItem) Defer(interval time.Duration) workItem {
return workItem{w.userID, w.rules, w.scheduled.Add(interval)}
// Defer returns a work item with updated rules, rescheduled to a later time.
func (w workItem) Defer(interval time.Duration, currentRules []rules.Rule) workItem {
return workItem{w.userID, currentRules, w.scheduled.Add(interval)}
}

type scheduler struct {
configsAPI configs_client.RulesAPI
evaluationInterval time.Duration
evaluationInterval time.Duration // how often we re-evaluate each rule set
q *SchedulingQueue

// All the configurations that we have. Only used for instrumentation.
cfgs map[string]configs.Config
pollInterval time.Duration // how often we check for new config

pollInterval time.Duration

latestConfig configs.ID
latestMutex sync.RWMutex
cfgs map[string][]rules.Rule // all rules for all users
latestConfig configs.ID // # of last update received from config
sync.RWMutex

stop chan struct{}
done chan struct{}
Expand All @@ -92,7 +90,7 @@ func newScheduler(configsAPI configs_client.RulesAPI, evaluationInterval, pollIn
evaluationInterval: evaluationInterval,
pollInterval: pollInterval,
q: NewSchedulingQueue(clockwork.NewRealClock()),
cfgs: map[string]configs.Config{},
cfgs: map[string][]rules.Rule{},

stop: make(chan struct{}),
done: make(chan struct{}),
Expand Down Expand Up @@ -157,7 +155,9 @@ func (s *scheduler) updateConfigs(now time.Time) error {

// poll the configuration server. Not re-entrant.
func (s *scheduler) poll() (map[string]configs.View, error) {
s.Lock()
configID := s.latestConfig
s.Unlock()
var cfgs *configs_client.ConfigsResponse
err := instrument.TimeRequestHistogram(context.Background(), "Configs.GetConfigs", configsRequestDuration, func(_ context.Context) error {
var err error
Expand All @@ -168,9 +168,9 @@ func (s *scheduler) poll() (map[string]configs.View, error) {
log.Warnf("Scheduler: configs server poll failed: %v", err)
return nil, err
}
s.latestMutex.Lock()
s.Lock()
s.latestConfig = cfgs.GetLatestConfigID()
s.latestMutex.Unlock()
s.Unlock()
return cfgs.Configs, nil
}

Expand All @@ -188,11 +188,16 @@ func (s *scheduler) addNewConfigs(now time.Time, cfgs map[string]configs.View) {
continue
}

s.Lock()
s.cfgs[userID] = rules
s.Unlock()
s.addWorkItem(workItem{userID, rules, now})
s.cfgs[userID] = config.Config
}
configUpdates.Add(float64(len(cfgs)))
totalConfigs.Set(float64(len(s.cfgs)))
s.Lock()
lenCfgs := len(s.cfgs)
s.Unlock()
totalConfigs.Set(float64(lenCfgs))
}

func (s *scheduler) addWorkItem(i workItem) {
Expand Down Expand Up @@ -221,7 +226,14 @@ func (s *scheduler) nextWorkItem() *workItem {

// workItemDone marks the given item as being ready to be rescheduled.
func (s *scheduler) workItemDone(i workItem) {
next := i.Defer(s.evaluationInterval)
s.Lock()
currentRules, found := s.cfgs[i.userID]
s.Unlock()
if !found {
log.Debugf("Scheduler: no more work configured for %v", i.userID)
return
}
next := i.Defer(s.evaluationInterval, currentRules)
log.Debugf("Scheduler: work item %v rescheduled for %v", i, next.scheduled.Format("2006-01-02 15:04:05"))
s.addWorkItem(next)
}