Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions pkg/controller/priorityqueue/priorityqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,8 @@ type priorityqueue[T comparable] struct {
get chan item[T]

// waiters is the number of routines blocked in Get, we use it to determine
// if we can push items.
waiters atomic.Int64
// if we can push items. Every manipulation has to be protected with the lock.
waiters int64

// Configurable for testing
now func() time.Time
Expand Down Expand Up @@ -269,15 +269,15 @@ func (w *priorityqueue[T]) spin() {
}
}

if w.waiters.Load() == 0 {
if w.waiters == 0 {
// Have to keep iterating here to ensure we update metrics
// for further items that became ready and set nextReady.
return true
}

w.metrics.get(item.Key, item.Priority)
w.locked.Insert(item.Key)
w.waiters.Add(-1)
w.waiters--
delete(w.items, item.Key)
toDelete = append(toDelete, item)
w.becameReady.Delete(item.Key)
Expand Down Expand Up @@ -316,7 +316,9 @@ func (w *priorityqueue[T]) GetWithPriority() (_ T, priority int, shutdown bool)
return zero, 0, true
}

w.waiters.Add(1)
w.lock.Lock()
w.waiters++
w.lock.Unlock()

w.notifyItemOrWaiterAdded()

Expand Down
54 changes: 54 additions & 0 deletions pkg/controller/priorityqueue/priorityqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,60 @@ func newQueueWithTimeForwarder() (_ *priorityqueue[string], _ *fakeMetricsProvid
}
}

func TestHighPriorityItemsAreReturnedBeforeLowPriorityItemMultipleTimes(t *testing.T) {
t.Parallel()
synctest.Test(t, func(t *testing.T) {
g := NewWithT(t)

q, metrics := newQueue()
defer q.ShutDown()

const itemsPerPriority = 1000
lowPriority := 0
lowMiddlePriority := 5
middlePriority := 10
upperMiddlePriority := 15
highPriority := 20
for i := range itemsPerPriority {
q.AddWithOpts(AddOpts{Priority: &highPriority}, fmt.Sprintf("high-%d", i))
q.AddWithOpts(AddOpts{Priority: &upperMiddlePriority}, fmt.Sprintf("upperMiddle-%d", i))
q.AddWithOpts(AddOpts{Priority: &middlePriority}, fmt.Sprintf("middle-%d", i))
q.AddWithOpts(AddOpts{Priority: &lowMiddlePriority}, fmt.Sprintf("lowMiddle-%d", i))
q.AddWithOpts(AddOpts{Priority: &lowPriority}, fmt.Sprintf("low-%d", i))
}
synctest.Wait()

for range itemsPerPriority {
key, prio, _ := q.GetWithPriority()
g.Expect(prio).To(Equal(highPriority))
g.Expect(key).To(HavePrefix("high-"))
}
for range itemsPerPriority {
key, prio, _ := q.GetWithPriority()
g.Expect(prio).To(Equal(upperMiddlePriority))
g.Expect(key).To(HavePrefix("upperMiddle-"))
}
for range itemsPerPriority {
key, prio, _ := q.GetWithPriority()
g.Expect(prio).To(Equal(middlePriority))
g.Expect(key).To(HavePrefix("middle-"))
}
for range itemsPerPriority {
key, prio, _ := q.GetWithPriority()
g.Expect(prio).To(Equal(lowMiddlePriority))
g.Expect(key).To(HavePrefix("lowMiddle-"))
}
for range itemsPerPriority {
key, prio, _ := q.GetWithPriority()
g.Expect(prio).To(Equal(lowPriority))
g.Expect(key).To(HavePrefix("low-"))
}
g.Expect(metrics.depth["test"]).To(Equal(map[int]int{10: 0, 5: 0, 0: 0, 20: 0, 15: 0}))
g.Expect(metrics.adds["test"]).To(Equal(itemsPerPriority * 5))
g.Expect(metrics.retries["test"]).To(Equal(0))
})
}

func newQueue() (*priorityqueue[string], *fakeMetricsProvider) {
metrics := newFakeMetricsProvider()
q := New("test", func(o *Opts[string]) {
Expand Down