Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions pkg/controller/priorityqueue/priorityqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,9 +290,18 @@ func (w *priorityqueue[T]) GetWithPriority() (_ T, priority int, shutdown bool)

w.notifyItemOrWaiterAdded()

item := <-w.get

return item.Key, item.Priority, w.shutdown.Load()
select {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved from #3332 (comment)

This was way to tricky to debug :)

Once PQ was enabled per default, this test started to fail:

  • manager stop failed with "failed waiting for all runnable to end within grace period"
  • because not all workers of the leader election runnable group stopped
  • and they didn't stop because they were stuck in GetWithPriority()

Just tricky to figure out:

  1. which test even caused this (I just got an apiserver stop timeout, the test itself did not fail)
  2. which runnable group did not stop
  3. which runnable did not stop
  4. why does it not stop (is reconcile blocking or something else)

I guess our logging could use some improvements ...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alvaroaleman:

Yeah, lets please do that, might also be worth to backport this. I don't fully understand it though I think, at this point we would've given them an item anyways, what difference does it make? Definitely worth go-docing that and i think it might be worth a test as well

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely worth go-docing that and i think it might be worth a test as well

Extended the godoc and added a unit test

I don't fully understand it though I think, at this point we would've given them an item anyways, what difference does it make?

We could give the caller only an item if we still have items in the queue, otherwise the caller is deadlocked.

Example:

  • controller is started
  • controller workers are calling GetWithPriority and are waiting for items
  • queue is empty and stays empty
  • controller (and accordingly queue) is shutdown
  • previous code:
    • Shutdown() closes w.done
    • workers remain blocked in l.293
    • accordingly controller and runnable group cannot shutdown
  • new code:
    • Shutdown() closes w.done
    • l.301 returns

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I misread the code the last time and thought this was where we send, not where we get 🤦 Makes total sense

case <-w.done:
// Return if the queue was shutdown while we were already waiting for an item here.
// For example controller workers are continuously calling GetWithPriority and
// GetWithPriority is blocking the workers if there are no items in the queue.
// If the controller and accordingly the queue is then shut down, without this code
// branch the controller workers remain blocked here and are unable to shut down.
var zero T
return zero, 0, true
case item := <-w.get:
return item.Key, item.Priority, w.shutdown.Load()
}
}

func (w *priorityqueue[T]) Get() (item T, shutdown bool) {
Expand Down
26 changes: 26 additions & 0 deletions pkg/controller/priorityqueue/priorityqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,32 @@ var _ = Describe("Controllerworkqueue", func() {
Expect(isShutDown).To(BeTrue())
})

It("Get from priority queue should get unblocked when the priority queue is shut down", func() {
q, _ := newQueue()

getUnblocked := make(chan struct{})

go func() {
defer GinkgoRecover()
defer close(getUnblocked)

item, priority, isShutDown := q.GetWithPriority()
Expect(item).To(Equal(""))
Expect(priority).To(Equal(0))
Expect(isShutDown).To(BeTrue())
}()

// Verify the go routine above is now waiting for an item.
Eventually(q.waiters.Load).Should(Equal(int64(1)))
Consistently(getUnblocked).ShouldNot(BeClosed())

// shut down
q.ShutDown()

// Verify the shutdown unblocked the go routine.
Eventually(getUnblocked).Should(BeClosed())
})

It("items are included in Len() and the queueDepth metric once they are ready", func() {
q, metrics := newQueue()
defer q.ShutDown()
Expand Down