Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 36 additions & 17 deletions modules/queue/workerpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,22 +308,18 @@ func (p *WorkerPool) addWorkers(ctx context.Context, cancel context.CancelFunc,
p.cond.Broadcast()
cancel()
}
if p.hasNoWorkerScaling() {
select {
case <-p.baseCtx.Done():
// Don't warn if the baseCtx is shutdown
default:
log.Warn(
"Queue: %d is configured to be non-scaling and has no workers - this configuration is likely incorrect.\n"+
"The queue will be paused to prevent data-loss with the assumption that you will add workers and unpause as required.", p.qid)
}
p.pause()
}
select {
case <-p.baseCtx.Done():
// this worker queue is shut-down don't reboost
// Don't warn or check for ongoing work if the baseCtx is shutdown
case <-p.paused:
// Don't warn or check for ongoing work if the pool is paused
default:
if p.numberOfWorkers == 0 && atomic.LoadInt64(&p.numInQueue) > 0 {
if p.hasNoWorkerScaling() {
log.Warn(
"Queue: %d is configured to be non-scaling and has no workers - this configuration is likely incorrect.\n"+
"The queue will be paused to prevent data-loss with the assumption that you will add workers and unpause as required.", p.qid)
p.pause()
} else if p.numberOfWorkers == 0 && atomic.LoadInt64(&p.numInQueue) > 0 {
// OK there are no workers but... there's still work to be done -> Reboost
p.zeroBoost()
// p.lock will be unlocked by zeroBoost
Expand Down Expand Up @@ -385,14 +381,37 @@ func (p *WorkerPool) pause() {

// Resume resumes the WorkerPool
func (p *WorkerPool) Resume() {
p.lock.Lock()
defer p.lock.Unlock()
p.lock.Lock() // can't defer unlock because of the zeroBoost at the end
select {
case <-p.resumed:
// already resumed - there's nothing to do
p.lock.Unlock()
return
default:
p.paused = make(chan struct{})
close(p.resumed)
}

p.paused = make(chan struct{})
close(p.resumed)

// OK now we need to check if we need to add some workers...
if p.numberOfWorkers > 0 || p.hasNoWorkerScaling() || atomic.LoadInt64(&p.numInQueue) == 0 {
// We either have workers, can't scale or there's no work to be done -> so just resume
p.lock.Unlock()
return
}

// OK we got some work but no workers we need to think about boosting
select {
case <-p.baseCtx.Done():
// don't bother boosting if the baseCtx is done
p.lock.Unlock()
return
default:
}

// OK we'd better add some boost workers!
p.zeroBoost()
// p.zeroBoost will unlock the lock
}

// CleanUp will drain the remaining contents of the channel
Expand Down