Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions pkg/controller/priorityqueue/priorityqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,8 @@ type priorityqueue[T comparable] struct {
get chan item[T]

// waiters is the number of routines blocked in Get, we use it to determine
// if we can push items.
waiters atomic.Int64
// if we can push items. Every manipulation has to be protected with the lock.
waiters int64

// Configurable for testing
now func() time.Time
Expand Down Expand Up @@ -269,15 +269,15 @@ func (w *priorityqueue[T]) spin() {
}
}

if w.waiters.Load() == 0 {
if w.waiters == 0 {
// Have to keep iterating here to ensure we update metrics
// for further items that became ready and set nextReady.
return true
}

w.metrics.get(item.Key, item.Priority)
w.locked.Insert(item.Key)
w.waiters.Add(-1)
w.waiters--
delete(w.items, item.Key)
toDelete = append(toDelete, item)
w.becameReady.Delete(item.Key)
Expand Down Expand Up @@ -316,7 +316,9 @@ func (w *priorityqueue[T]) GetWithPriority() (_ T, priority int, shutdown bool)
return zero, 0, true
}

w.waiters.Add(1)
w.lock.Lock()
w.waiters++
w.lock.Unlock()

w.notifyItemOrWaiterAdded()

Expand Down
7 changes: 6 additions & 1 deletion pkg/controller/priorityqueue/priorityqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,12 @@ var _ = Describe("Controllerworkqueue", func() {
}()

// Verify the go routine above is now waiting for an item.
Eventually(q.(*priorityqueue[string]).waiters.Load).Should(Equal(int64(1)))
Eventually(func() int64 {
q.(*priorityqueue[string]).lock.Lock()
defer q.(*priorityqueue[string]).lock.Unlock()

return q.(*priorityqueue[string]).waiters
}).Should(Equal(int64(1)))
Consistently(getUnblocked).ShouldNot(BeClosed())

// shut down
Expand Down