@@ -63,23 +63,21 @@ func (w workItem) Scheduled() time.Time {
6363 return w .scheduled
6464}
6565
66- // Defer returns a copy of this work item , rescheduled to a later time.
67- func (w workItem ) Defer (interval time.Duration ) workItem {
68- return workItem {w .userID , w . rules , w .scheduled .Add (interval )}
66+ // Defer returns a work item with updated rules , rescheduled to a later time.
67+ func (w workItem ) Defer (interval time.Duration , currentRules []rules. Rule ) workItem {
68+ return workItem {w .userID , currentRules , w .scheduled .Add (interval )}
6969}
7070
7171type scheduler struct {
7272 configsAPI configs_client.RulesAPI
7373 evaluationInterval time.Duration
7474 q * SchedulingQueue
7575
76- // All the configurations that we have. Only used for instrumentation.
77- cfgs map [string ]configs.Config
78-
7976 pollInterval time.Duration
8077
78+ cfgs map [string ][]rules.Rule // all rules for all users
8179 latestConfig configs.ID
82- latestMutex sync.RWMutex
80+ sync.RWMutex
8381
8482 stop chan struct {}
8583 done chan struct {}
@@ -92,7 +90,7 @@ func newScheduler(configsAPI configs_client.RulesAPI, evaluationInterval, pollIn
9290 evaluationInterval : evaluationInterval ,
9391 pollInterval : pollInterval ,
9492 q : NewSchedulingQueue (clockwork .NewRealClock ()),
95- cfgs : map [string ]configs. Config {},
93+ cfgs : map [string ][]rules. Rule {},
9694
9795 stop : make (chan struct {}),
9896 done : make (chan struct {}),
@@ -157,7 +155,9 @@ func (s *scheduler) updateConfigs(now time.Time) error {
157155
158156// poll the configuration server. Not re-entrant.
159157func (s * scheduler ) poll () (map [string ]configs.View , error ) {
158+ s .Lock ()
160159 configID := s .latestConfig
160+ s .Unlock ()
161161 var cfgs * configs_client.ConfigsResponse
162162 err := instrument .TimeRequestHistogram (context .Background (), "Configs.GetConfigs" , configsRequestDuration , func (_ context.Context ) error {
163163 var err error
@@ -168,9 +168,9 @@ func (s *scheduler) poll() (map[string]configs.View, error) {
168168 log .Warnf ("Scheduler: configs server poll failed: %v" , err )
169169 return nil , err
170170 }
171- s .latestMutex . Lock ()
171+ s .Lock ()
172172 s .latestConfig = cfgs .GetLatestConfigID ()
173- s .latestMutex . Unlock ()
173+ s .Unlock ()
174174 return cfgs .Configs , nil
175175}
176176
@@ -188,11 +188,16 @@ func (s *scheduler) addNewConfigs(now time.Time, cfgs map[string]configs.View) {
188188 continue
189189 }
190190
191+ s .Lock ()
192+ s .cfgs [userID ] = rules
193+ s .Unlock ()
191194 s .addWorkItem (workItem {userID , rules , now })
192- s .cfgs [userID ] = config .Config
193195 }
194196 configUpdates .Add (float64 (len (cfgs )))
195- totalConfigs .Set (float64 (len (s .cfgs )))
197+ s .Lock ()
198+ lenCfgs := len (s .cfgs )
199+ s .Unlock ()
200+ totalConfigs .Set (float64 (lenCfgs ))
196201}
197202
198203func (s * scheduler ) addWorkItem (i workItem ) {
@@ -221,7 +226,14 @@ func (s *scheduler) nextWorkItem() *workItem {
221226
222227// workItemDone marks the given item as being ready to be rescheduled.
223228func (s * scheduler ) workItemDone (i workItem ) {
224- next := i .Defer (s .evaluationInterval )
229+ s .Lock ()
230+ currentRules , found := s .cfgs [i .userID ]
231+ s .Unlock ()
232+ if ! found {
233+ log .Debugf ("Scheduler: no more work configured for %v" , i .userID )
234+ return
235+ }
236+ next := i .Defer (s .evaluationInterval , currentRules )
225237 log .Debugf ("Scheduler: work item %v rescheduled for %v" , i , next .scheduled .Format ("2006-01-02 15:04:05" ))
226238 s .addWorkItem (next )
227239}
0 commit comments