From bef09079762e1f9e950889787dba1b8c9eecd8f8 Mon Sep 17 00:00:00 2001 From: fossedihelm Date: Tue, 28 Oct 2025 18:45:56 +0100 Subject: [PATCH] priority queue: properly sync the `waiter` manipulation As described in https://github.com/kubernetes-sigs/controller-runtime/issues/3363, there are some circumstances under which `GetWithPriority` is not returning the correct/expected element. This can happen when a `GetWithPriority` is executed and the `Ascend` of the queue is not completed yet, causing not all the items of the BTree to evaluate the same w.waiters.Load() value. Adding a lock to manipulate the waiters will solve the issue. Since the lock is required, there is no need to use an atomic.Int64 anymore. Signed-off-by: fossedihelm --- pkg/controller/priorityqueue/priorityqueue.go | 12 +++-- .../priorityqueue/priorityqueue_test.go | 54 +++++++++++++++++++ 2 files changed, 61 insertions(+), 5 deletions(-) diff --git a/pkg/controller/priorityqueue/priorityqueue.go b/pkg/controller/priorityqueue/priorityqueue.go index 98df84c56b..71363f0d17 100644 --- a/pkg/controller/priorityqueue/priorityqueue.go +++ b/pkg/controller/priorityqueue/priorityqueue.go @@ -124,8 +124,8 @@ type priorityqueue[T comparable] struct { get chan item[T] // waiters is the number of routines blocked in Get, we use it to determine - // if we can push items. - waiters atomic.Int64 + // if we can push items. Every manipulation has to be protected with the lock. + waiters int64 // Configurable for testing now func() time.Time @@ -269,7 +269,7 @@ func (w *priorityqueue[T]) spin() { } } - if w.waiters.Load() == 0 { + if w.waiters == 0 { // Have to keep iterating here to ensure we update metrics // for further items that became ready and set nextReady. return true @@ -277,7 +277,7 @@ func (w *priorityqueue[T]) spin() { w.metrics.get(item.Key, item.Priority) w.locked.Insert(item.Key) - w.waiters.Add(-1) + w.waiters-- delete(w.items, item.Key) toDelete = append(toDelete, item) w.becameReady.Delete(item.Key) @@ -316,7 +316,9 @@ func (w *priorityqueue[T]) GetWithPriority() (_ T, priority int, shutdown bool) return zero, 0, true } - w.waiters.Add(1) + w.lock.Lock() + w.waiters++ + w.lock.Unlock() w.notifyItemOrWaiterAdded() diff --git a/pkg/controller/priorityqueue/priorityqueue_test.go b/pkg/controller/priorityqueue/priorityqueue_test.go index fb186944ab..9c708e982b 100644 --- a/pkg/controller/priorityqueue/priorityqueue_test.go +++ b/pkg/controller/priorityqueue/priorityqueue_test.go @@ -441,6 +441,60 @@ func newQueueWithTimeForwarder() (_ *priorityqueue[string], _ *fakeMetricsProvid } } +func TestHighPriorityItemsAreReturnedBeforeLowPriorityItemMultipleTimes(t *testing.T) { + t.Parallel() + synctest.Test(t, func(t *testing.T) { + g := NewWithT(t) + + q, metrics := newQueue() + defer q.ShutDown() + + const itemsPerPriority = 1000 + lowPriority := 0 + lowMiddlePriority := 5 + middlePriority := 10 + upperMiddlePriority := 15 + highPriority := 20 + for i := range itemsPerPriority { + q.AddWithOpts(AddOpts{Priority: &highPriority}, fmt.Sprintf("high-%d", i)) + q.AddWithOpts(AddOpts{Priority: &upperMiddlePriority}, fmt.Sprintf("upperMiddle-%d", i)) + q.AddWithOpts(AddOpts{Priority: &middlePriority}, fmt.Sprintf("middle-%d", i)) + q.AddWithOpts(AddOpts{Priority: &lowMiddlePriority}, fmt.Sprintf("lowMiddle-%d", i)) + q.AddWithOpts(AddOpts{Priority: &lowPriority}, fmt.Sprintf("low-%d", i)) + } + synctest.Wait() + + for range itemsPerPriority { + key, prio, _ := q.GetWithPriority() + g.Expect(prio).To(Equal(highPriority)) + g.Expect(key).To(HavePrefix("high-")) + } + for range itemsPerPriority { + key, prio, _ := q.GetWithPriority() + g.Expect(prio).To(Equal(upperMiddlePriority)) + g.Expect(key).To(HavePrefix("upperMiddle-")) + } + for range itemsPerPriority { + key, prio, _ := q.GetWithPriority() + g.Expect(prio).To(Equal(middlePriority)) + g.Expect(key).To(HavePrefix("middle-")) + } + for range itemsPerPriority { + key, prio, _ := q.GetWithPriority() + g.Expect(prio).To(Equal(lowMiddlePriority)) + g.Expect(key).To(HavePrefix("lowMiddle-")) + } + for range itemsPerPriority { + key, prio, _ := q.GetWithPriority() + g.Expect(prio).To(Equal(lowPriority)) + g.Expect(key).To(HavePrefix("low-")) + } + g.Expect(metrics.depth["test"]).To(Equal(map[int]int{10: 0, 5: 0, 0: 0, 20: 0, 15: 0})) + g.Expect(metrics.adds["test"]).To(Equal(itemsPerPriority * 5)) + g.Expect(metrics.retries["test"]).To(Equal(0)) + }) +} + func newQueue() (*priorityqueue[string], *fakeMetricsProvider) { metrics := newFakeMetricsProvider() q := New("test", func(o *Opts[string]) {