From b45c3ae744b05306688f6b3105e2927584a725f1 Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Fri, 30 Apr 2021 22:14:17 +0100 Subject: [PATCH 01/24] move shutdownfns, terminatefns and hammerfns out of separate goroutines Coalesce the shutdownfns etc into a list of functions that get run at shutdown rather then have them run at goroutines blocked on selects. This may help reduce the background select/poll load in certain configurations. Signed-off-by: Andrew Thornton --- modules/graceful/context.go | 23 +---- modules/graceful/manager.go | 147 ++++++++++++++++------------ modules/graceful/manager_unix.go | 24 +++-- modules/graceful/manager_windows.go | 16 ++- 4 files changed, 117 insertions(+), 93 deletions(-) diff --git a/modules/graceful/context.go b/modules/graceful/context.go index 1ad1109b4e5bd..8cebd407a85f4 100644 --- a/modules/graceful/context.go +++ b/modules/graceful/context.go @@ -6,17 +6,9 @@ package graceful import ( "context" - "fmt" "time" ) -// Errors for context.Err() -var ( - ErrShutdown = fmt.Errorf("Graceful Manager called Shutdown") - ErrHammer = fmt.Errorf("Graceful Manager called Hammer") - ErrTerminate = fmt.Errorf("Graceful Manager called Terminate") -) - // ChannelContext is a context that wraps a channel and error as a context type ChannelContext struct { done <-chan struct{} @@ -63,28 +55,19 @@ func (ctx *ChannelContext) Value(key interface{}) interface{} { // Callers using this context should ensure that they are registered as a running server // in order that they are waited for. func (g *Manager) ShutdownContext() context.Context { - return &ChannelContext{ - done: g.IsShutdown(), - err: ErrShutdown, - } + return g.shutdown } // HammerContext returns a context.Context that is Done at hammer // Callers using this context should ensure that they are registered as a running server // in order that they are waited for. func (g *Manager) HammerContext() context.Context { - return &ChannelContext{ - done: g.IsHammer(), - err: ErrHammer, - } + return g.hammer } // TerminateContext returns a context.Context that is Done at terminate // Callers using this context should ensure that they are registered as a terminating server // in order that they are waited for. func (g *Manager) TerminateContext() context.Context { - return &ChannelContext{ - done: g.IsTerminate(), - err: ErrTerminate, - } + return g.terminate } diff --git a/modules/graceful/manager.go b/modules/graceful/manager.go index 903d05ed21f41..a72facdf868b6 100644 --- a/modules/graceful/manager.go +++ b/modules/graceful/manager.go @@ -81,14 +81,23 @@ func (g *Manager) RunWithShutdownFns(run RunnableWithShutdownFns) { } }() run(func(ctx context.Context, atShutdown func()) { - go func() { - select { - case <-g.IsShutdown(): - atShutdown() - case <-ctx.Done(): - return - } - }() + g.lock.Lock() + defer g.lock.Unlock() + g.toRunAtShutdown = append(g.toRunAtShutdown, + func() { + defer func() { + if err := recover(); err != nil { + log.Critical("PANIC during RunWithShutdownFns: %v\nStacktrace: %s", err, log.Stack(2)) + g.doShutdown() + } + }() + select { + case <-ctx.Done(): + return + default: + atShutdown() + } + }) }, func(ctx context.Context, atTerminate func()) { g.RunAtTerminate(ctx, atTerminate) }) @@ -138,58 +147,73 @@ func (g *Manager) RunWithShutdownContext(run func(context.Context)) { // RunAtTerminate adds to the terminate wait group and creates a go-routine to run the provided function at termination func (g *Manager) RunAtTerminate(ctx context.Context, terminate func()) { g.terminateWaitGroup.Add(1) - go func() { - defer g.terminateWaitGroup.Done() - defer func() { - if err := recover(); err != nil { - log.Critical("PANIC during RunAtTerminate: %v\nStacktrace: %s", err, log.Stack(2)) + g.lock.Lock() + defer g.lock.Unlock() + g.toRunAtTerminate = append(g.toRunAtTerminate, + func() { + defer g.terminateWaitGroup.Done() + defer func() { + if err := recover(); err != nil { + log.Critical("PANIC during RunAtTerminate: %v\nStacktrace: %s", err, log.Stack(2)) + } + }() + select { + case <-ctx.Done(): + return + default: + terminate() } - }() - select { - case <-g.IsTerminate(): - terminate() - case <-ctx.Done(): - } - }() + }) } // RunAtShutdown creates a go-routine to run the provided function at shutdown func (g *Manager) RunAtShutdown(ctx context.Context, shutdown func()) { - go func() { - defer func() { - if err := recover(); err != nil { - log.Critical("PANIC during RunAtShutdown: %v\nStacktrace: %s", err, log.Stack(2)) + g.lock.Lock() + defer g.lock.Unlock() + g.toRunAtShutdown = append(g.toRunAtShutdown, + func() { + defer func() { + if err := recover(); err != nil { + log.Critical("PANIC during RunAtShutdown: %v\nStacktrace: %s", err, log.Stack(2)) + } + }() + select { + case <-ctx.Done(): + return + default: + shutdown() } - }() - select { - case <-g.IsShutdown(): - shutdown() - case <-ctx.Done(): - } - }() + }) } // RunAtHammer creates a go-routine to run the provided function at shutdown func (g *Manager) RunAtHammer(ctx context.Context, hammer func()) { - go func() { - defer func() { - if err := recover(); err != nil { - log.Critical("PANIC during RunAtHammer: %v\nStacktrace: %s", err, log.Stack(2)) + g.lock.Lock() + defer g.lock.Unlock() + g.toRunAtHammer = append(g.toRunAtHammer, + func() { + defer func() { + if err := recover(); err != nil { + log.Critical("PANIC during RunAtHammer: %v\nStacktrace: %s", err, log.Stack(2)) + } + }() + select { + case <-ctx.Done(): + return + default: + hammer() } - }() - select { - case <-g.IsHammer(): - hammer() - case <-ctx.Done(): - } - }() + }) } func (g *Manager) doShutdown() { if !g.setStateTransition(stateRunning, stateShuttingDown) { return } g.lock.Lock() - close(g.shutdown) + g.shutdownCancel() + for _, fn := range g.toRunAtShutdown { + go fn() + } g.lock.Unlock() if setting.GracefulHammerTime >= 0 { @@ -203,7 +227,7 @@ func (g *Manager) doShutdown() { g.doTerminate() g.WaitForTerminate() g.lock.Lock() - close(g.done) + g.doneCancel() g.lock.Unlock() }() } @@ -212,10 +236,13 @@ func (g *Manager) doHammerTime(d time.Duration) { time.Sleep(d) g.lock.Lock() select { - case <-g.hammer: + case <-g.hammer.Done(): default: log.Warn("Setting Hammer condition") - close(g.hammer) + g.hammerCancel() + for _, fn := range g.toRunAtHammer { + go fn() + } } g.lock.Unlock() } @@ -226,10 +253,13 @@ func (g *Manager) doTerminate() { } g.lock.Lock() select { - case <-g.terminate: + case <-g.terminate.Done(): default: log.Warn("Terminating") - close(g.terminate) + g.terminateCancel() + for _, fn := range g.toRunAtTerminate { + go fn() + } } g.lock.Unlock() } @@ -242,7 +272,7 @@ func (g *Manager) IsChild() bool { // IsShutdown returns a channel which will be closed at shutdown. // The order of closure is IsShutdown, IsHammer (potentially), IsTerminate func (g *Manager) IsShutdown() <-chan struct{} { - return g.shutdown + return g.shutdown.Done() } // IsHammer returns a channel which will be closed at hammer @@ -250,14 +280,14 @@ func (g *Manager) IsShutdown() <-chan struct{} { // Servers running within the running server wait group should respond to IsHammer // if not shutdown already func (g *Manager) IsHammer() <-chan struct{} { - return g.hammer + return g.hammer.Done() } // IsTerminate returns a channel which will be closed at terminate // The order of closure is IsShutdown, IsHammer (potentially), IsTerminate // IsTerminate will only close once all running servers have stopped func (g *Manager) IsTerminate() <-chan struct{} { - return g.terminate + return g.terminate.Done() } // ServerDone declares a running server done and subtracts one from the @@ -314,25 +344,20 @@ func (g *Manager) InformCleanup() { // Done allows the manager to be viewed as a context.Context, it returns a channel that is closed when the server is finished terminating func (g *Manager) Done() <-chan struct{} { - return g.done + return g.done.Done() } -// Err allows the manager to be viewed as a context.Context done at Terminate, it returns ErrTerminate +// Err allows the manager to be viewed as a context.Context done at Terminate func (g *Manager) Err() error { - select { - case <-g.Done(): - return ErrTerminate - default: - return nil - } + return g.done.Err() } // Value allows the manager to be viewed as a context.Context done at Terminate, it has no values func (g *Manager) Value(key interface{}) interface{} { - return nil + return g.done.Value(key) } // Deadline returns nil as there is no fixed Deadline for the manager, it allows the manager to be viewed as a context.Context func (g *Manager) Deadline() (deadline time.Time, ok bool) { - return + return g.done.Deadline() } diff --git a/modules/graceful/manager_unix.go b/modules/graceful/manager_unix.go index 540974454c34c..e8c6188dfa7a8 100644 --- a/modules/graceful/manager_unix.go +++ b/modules/graceful/manager_unix.go @@ -25,13 +25,21 @@ type Manager struct { forked bool lock *sync.RWMutex state state - shutdown chan struct{} - hammer chan struct{} - terminate chan struct{} - done chan struct{} + shutdown context.Context + hammer context.Context + terminate context.Context + done context.Context + shutdownCancel context.CancelFunc + hammerCancel context.CancelFunc + terminateCancel context.CancelFunc + doneCancel context.CancelFunc runningServerWaitGroup sync.WaitGroup createServerWaitGroup sync.WaitGroup terminateWaitGroup sync.WaitGroup + + toRunAtShutdown []func() + toRunAtHammer []func() + toRunAtTerminate []func() } func newGracefulManager(ctx context.Context) *Manager { @@ -46,10 +54,10 @@ func newGracefulManager(ctx context.Context) *Manager { func (g *Manager) start(ctx context.Context) { // Make channels - g.terminate = make(chan struct{}) - g.shutdown = make(chan struct{}) - g.hammer = make(chan struct{}) - g.done = make(chan struct{}) + g.terminate, g.terminateCancel = context.WithCancel(ctx) + g.shutdown, g.shutdownCancel = context.WithCancel(ctx) + g.hammer, g.hammerCancel = context.WithCancel(ctx) + g.done, g.doneCancel = context.WithCancel(ctx) // Set the running state & handle signals g.setState(stateRunning) diff --git a/modules/graceful/manager_windows.go b/modules/graceful/manager_windows.go index b0e0d1ce38e30..b5806181c7891 100644 --- a/modules/graceful/manager_windows.go +++ b/modules/graceful/manager_windows.go @@ -36,14 +36,22 @@ type Manager struct { isChild bool lock *sync.RWMutex state state - shutdown chan struct{} - hammer chan struct{} - terminate chan struct{} - done chan struct{} + shutdown context.Context + hammer context.Context + terminate context.Context + done context.Context + shutdownCancel context.CancelFunc + hammerCancel context.CancelFunc + terminateCancel context.CancelFunc + doneCancel context.CancelFunc runningServerWaitGroup sync.WaitGroup createServerWaitGroup sync.WaitGroup terminateWaitGroup sync.WaitGroup shutdownRequested chan struct{} + + toRunAtShutdown []func() + toRunAtHammer []func() + toRunAtTerminate []func() } func newGracefulManager(ctx context.Context) *Manager { From 105a121c263000869985b0fc79ac39cd45284129 Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Fri, 30 Apr 2021 22:26:22 +0100 Subject: [PATCH 02/24] oops windows; Signed-off-by: Andrew Thornton --- modules/graceful/manager_windows.go | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/modules/graceful/manager_windows.go b/modules/graceful/manager_windows.go index b5806181c7891..3cf5c5d60c001 100644 --- a/modules/graceful/manager_windows.go +++ b/modules/graceful/manager_windows.go @@ -66,11 +66,14 @@ func newGracefulManager(ctx context.Context) *Manager { } func (g *Manager) start() { + + // Make channels + g.terminate, g.terminateCancel = context.WithCancel(ctx) + g.shutdown, g.shutdownCancel = context.WithCancel(ctx) + g.hammer, g.hammerCancel = context.WithCancel(ctx) + g.done, g.doneCancel = context.WithCancel(ctx) + // Make channels - g.terminate = make(chan struct{}) - g.shutdown = make(chan struct{}) - g.hammer = make(chan struct{}) - g.done = make(chan struct{}) g.shutdownRequested = make(chan struct{}) // Set the running state From 7976e99424f85b04b43a85d0c5c478d67c78caef Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Fri, 30 Apr 2021 22:31:20 +0100 Subject: [PATCH 03/24] comment changes Signed-off-by: Andrew Thornton --- modules/graceful/manager_unix.go | 2 +- modules/graceful/manager_windows.go | 11 +++++------ 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/modules/graceful/manager_unix.go b/modules/graceful/manager_unix.go index e8c6188dfa7a8..60d4f54280de7 100644 --- a/modules/graceful/manager_unix.go +++ b/modules/graceful/manager_unix.go @@ -53,7 +53,7 @@ func newGracefulManager(ctx context.Context) *Manager { } func (g *Manager) start(ctx context.Context) { - // Make channels + // Make contexts g.terminate, g.terminateCancel = context.WithCancel(ctx) g.shutdown, g.shutdownCancel = context.WithCancel(ctx) g.hammer, g.hammerCancel = context.WithCancel(ctx) diff --git a/modules/graceful/manager_windows.go b/modules/graceful/manager_windows.go index 3cf5c5d60c001..21b50c0ef4eba 100644 --- a/modules/graceful/manager_windows.go +++ b/modules/graceful/manager_windows.go @@ -66,12 +66,11 @@ func newGracefulManager(ctx context.Context) *Manager { } func (g *Manager) start() { - - // Make channels - g.terminate, g.terminateCancel = context.WithCancel(ctx) - g.shutdown, g.shutdownCancel = context.WithCancel(ctx) - g.hammer, g.hammerCancel = context.WithCancel(ctx) - g.done, g.doneCancel = context.WithCancel(ctx) + // Make contexts + g.terminate, g.terminateCancel = context.WithCancel(g.ctx) + g.shutdown, g.shutdownCancel = context.WithCancel(g.ctx) + g.hammer, g.hammerCancel = context.WithCancel(g.ctx) + g.done, g.doneCancel = context.WithCancel(g.ctx) // Make channels g.shutdownRequested = make(chan struct{}) From 8815d904e20b4c31a250a5b476d52d0d19fc71ab Mon Sep 17 00:00:00 2001 From: zeripath Date: Fri, 30 Apr 2021 22:49:29 +0100 Subject: [PATCH 04/24] Update manager_windows.go --- modules/graceful/manager_windows.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/graceful/manager_windows.go b/modules/graceful/manager_windows.go index 21b50c0ef4eba..01ada4d85cc49 100644 --- a/modules/graceful/manager_windows.go +++ b/modules/graceful/manager_windows.go @@ -179,7 +179,7 @@ hammerLoop: default: log.Debug("Unexpected control request: %v", change.Cmd) } - case <-g.hammer: + case <-g.hammer.Done(): break hammerLoop } } From 159c3a194473a4112e3a258244d3507c125df677 Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Sat, 1 May 2021 18:30:06 +0100 Subject: [PATCH 05/24] The LevelDB queues can actually wait on empty instead of polling Slight refactor to cause leveldb queues to wait on empty instead of polling. Signed-off-by: Andrew Thornton --- modules/queue/queue_bytefifo.go | 59 +++++++++++++++++++++--------- modules/queue/queue_disk.go | 1 + modules/queue/unique_queue_disk.go | 1 + 3 files changed, 43 insertions(+), 18 deletions(-) diff --git a/modules/queue/queue_bytefifo.go b/modules/queue/queue_bytefifo.go index bc86078493307..e4ac38bd50702 100644 --- a/modules/queue/queue_bytefifo.go +++ b/modules/queue/queue_bytefifo.go @@ -17,8 +17,9 @@ import ( // ByteFIFOQueueConfiguration is the configuration for a ByteFIFOQueue type ByteFIFOQueueConfiguration struct { WorkerPoolConfiguration - Workers int - Name string + Workers int + Name string + WaitOnEmpty bool } var _ Queue = &ByteFIFOQueue{} @@ -26,14 +27,16 @@ var _ Queue = &ByteFIFOQueue{} // ByteFIFOQueue is a Queue formed from a ByteFIFO and WorkerPool type ByteFIFOQueue struct { *WorkerPool - byteFIFO ByteFIFO - typ Type - closed chan struct{} - terminated chan struct{} - exemplar interface{} - workers int - name string - lock sync.Mutex + byteFIFO ByteFIFO + typ Type + closed chan struct{} + terminated chan struct{} + exemplar interface{} + workers int + name string + lock sync.Mutex + waitOnEmpty bool + pushed chan struct{} } // NewByteFIFOQueue creates a new ByteFIFOQueue @@ -45,14 +48,16 @@ func NewByteFIFOQueue(typ Type, byteFIFO ByteFIFO, handle HandlerFunc, cfg, exem config := configInterface.(ByteFIFOQueueConfiguration) return &ByteFIFOQueue{ - WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration), - byteFIFO: byteFIFO, - typ: typ, - closed: make(chan struct{}), - terminated: make(chan struct{}), - exemplar: exemplar, - workers: config.Workers, - name: config.Name, + WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration), + byteFIFO: byteFIFO, + typ: typ, + closed: make(chan struct{}), + terminated: make(chan struct{}), + exemplar: exemplar, + workers: config.Workers, + name: config.Name, + waitOnEmpty: config.WaitOnEmpty, + pushed: make(chan struct{}, 1), }, nil } @@ -76,6 +81,14 @@ func (q *ByteFIFOQueue) PushFunc(data Data, fn func() error) error { if err != nil { return err } + if q.waitOnEmpty { + defer func() { + select { + case q.pushed <- struct{}{}: + default: + } + }() + } return q.byteFIFO.PushFunc(bs, fn) } @@ -131,6 +144,16 @@ func (q *ByteFIFOQueue) readToChan() { } if len(bs) == 0 { + if q.waitOnEmpty && q.byteFIFO.Len() == 0 { + q.lock.Unlock() + log.Trace("%s: %s Waiting on Empty", q.typ, q.name) + select { + case <-q.pushed: + continue + case <-q.closed: + continue + } + } q.lock.Unlock() time.Sleep(time.Millisecond * 100) continue diff --git a/modules/queue/queue_disk.go b/modules/queue/queue_disk.go index 6c15a8e63be29..98127b5bf132d 100644 --- a/modules/queue/queue_disk.go +++ b/modules/queue/queue_disk.go @@ -37,6 +37,7 @@ func NewLevelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) if len(config.ConnectionString) == 0 { config.ConnectionString = config.DataDir } + config.WaitOnEmpty = true byteFIFO, err := NewLevelQueueByteFIFO(config.ConnectionString, config.QueueName) if err != nil { diff --git a/modules/queue/unique_queue_disk.go b/modules/queue/unique_queue_disk.go index 8ec8848bc498b..e914133678031 100644 --- a/modules/queue/unique_queue_disk.go +++ b/modules/queue/unique_queue_disk.go @@ -41,6 +41,7 @@ func NewLevelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, if len(config.ConnectionString) == 0 { config.ConnectionString = config.DataDir } + config.WaitOnEmpty = true byteFIFO, err := NewLevelUniqueQueueByteFIFO(config.ConnectionString, config.QueueName) if err != nil { From ec99e10b2c5ea28b13ef39290db9067585846db0 Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Sun, 2 May 2021 15:21:28 +0100 Subject: [PATCH 06/24] Shutdown the shadow level queue once it is empty Signed-off-by: Andrew Thornton --- modules/queue/queue_disk_channel.go | 17 +++++++++++++++-- modules/queue/unique_queue_disk_channel.go | 16 ++++++++++++++-- 2 files changed, 29 insertions(+), 4 deletions(-) diff --git a/modules/queue/queue_disk_channel.go b/modules/queue/queue_disk_channel.go index 801fd8a12235c..0f3774dbca44e 100644 --- a/modules/queue/queue_disk_channel.go +++ b/modules/queue/queue_disk_channel.go @@ -150,8 +150,21 @@ func (q *PersistableChannelQueue) Run(atShutdown, atTerminate func(context.Conte atShutdown(context.Background(), q.Shutdown) atTerminate(context.Background(), q.Terminate) - // Just run the level queue - we shut it down later - go q.internal.Run(func(_ context.Context, _ func()) {}, func(_ context.Context, _ func()) {}) + if lq, ok := q.internal.(*LevelQueue); ok && lq.byteFIFO.Len() != 0 { + // Just run the level queue - we shut it down once it's flushed + go q.internal.Run(func(_ context.Context, _ func()) {}, func(_ context.Context, _ func()) {}) + go func() { + _ = q.internal.Flush(0) + log.Debug("LevelQueue: %s flushed so shutting down", q.internal.(*LevelQueue).Name()) + q.internal.(*LevelQueue).Shutdown() + GetManager().Remove(q.internal.(*LevelQueue).qid) + + }() + } else { + log.Debug("PersistableChannelQueue: %s Skipping running the empty level queue", q.delayedStarter.name) + q.internal.(*LevelQueue).Shutdown() + GetManager().Remove(q.internal.(*LevelQueue).qid) + } go func() { _ = q.channelQueue.AddWorkers(q.channelQueue.workers, 0) diff --git a/modules/queue/unique_queue_disk_channel.go b/modules/queue/unique_queue_disk_channel.go index 47c4f2bdd574d..6fe19ff029f98 100644 --- a/modules/queue/unique_queue_disk_channel.go +++ b/modules/queue/unique_queue_disk_channel.go @@ -182,8 +182,20 @@ func (q *PersistableChannelUniqueQueue) Run(atShutdown, atTerminate func(context atShutdown(context.Background(), q.Shutdown) atTerminate(context.Background(), q.Terminate) - // Just run the level queue - we shut it down later - go q.internal.Run(func(_ context.Context, _ func()) {}, func(_ context.Context, _ func()) {}) + if luq, ok := q.internal.(*LevelUniqueQueue); ok && luq.ByteFIFOUniqueQueue.byteFIFO.Len() != 0 { + // Just run the level queue - we shut it down once it's flushed + go q.internal.Run(func(_ context.Context, _ func()) {}, func(_ context.Context, _ func()) {}) + go func() { + _ = q.internal.Flush(0) + log.Debug("LevelUniqueQueue: %s flushed so shutting down", q.internal.(*LevelQueue).Name()) + q.internal.(*LevelUniqueQueue).Shutdown() + GetManager().Remove(q.internal.(*LevelUniqueQueue).qid) + }() + } else { + log.Debug("PersistableChannelUniqueQueue: %s Skipping running the empty level queue", q.delayedStarter.name) + q.internal.(*LevelUniqueQueue).Shutdown() + GetManager().Remove(q.internal.(*LevelUniqueQueue).qid) + } go func() { _ = q.ChannelUniqueQueue.AddWorkers(q.workers, 0) From 0c107aa0cbb5f46598187fcc104ba554ba27436a Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Sun, 2 May 2021 22:13:29 +0100 Subject: [PATCH 07/24] Remove bytefifo additional goroutine for readToChan as it can just be run in run Signed-off-by: Andrew Thornton --- modules/queue/queue_bytefifo.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/modules/queue/queue_bytefifo.go b/modules/queue/queue_bytefifo.go index e4ac38bd50702..195de8a92dc80 100644 --- a/modules/queue/queue_bytefifo.go +++ b/modules/queue/queue_bytefifo.go @@ -112,9 +112,9 @@ func (q *ByteFIFOQueue) Run(atShutdown, atTerminate func(context.Context, func() _ = q.AddWorkers(q.workers, 0) }() - go q.readToChan() + log.Trace("%s: %s Now running", q.typ, q.name) + q.readToChan() - log.Trace("%s: %s Waiting til closed", q.typ, q.name) <-q.closed log.Trace("%s: %s Waiting til done", q.typ, q.name) q.Wait() From c41b52a4be2c7637d78f95c4526467c815a5231c Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Sun, 2 May 2021 22:14:14 +0100 Subject: [PATCH 08/24] Remove additional removeWorkers goroutine for workers Signed-off-by: Andrew Thornton --- modules/queue/workerpool.go | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/modules/queue/workerpool.go b/modules/queue/workerpool.go index 0f15ccac9efd7..b3f7cd89f03fa 100644 --- a/modules/queue/workerpool.go +++ b/modules/queue/workerpool.go @@ -113,7 +113,7 @@ func (p *WorkerPool) zeroBoost() { }() } p.lock.Unlock() - p.addWorkers(ctx, boost) + p.addWorkers(ctx, cancel, boost) } func (p *WorkerPool) pushBoost(data Data) { @@ -167,7 +167,7 @@ func (p *WorkerPool) pushBoost(data Data) { p.lock.Unlock() }() p.lock.Unlock() - p.addWorkers(ctx, boost) + p.addWorkers(ctx, cancel, boost) p.dataChan <- data } } @@ -243,28 +243,25 @@ func (p *WorkerPool) commonRegisterWorkers(number int, timeout time.Duration, is mq := GetManager().GetManagedQueue(p.qid) if mq != nil { pid := mq.RegisterWorkers(number, start, hasTimeout, end, cancel, isFlusher) - go func() { - <-ctx.Done() - mq.RemoveWorkers(pid) - cancel() - }() log.Trace("WorkerPool: %d (for %s) adding %d workers with group id: %d", p.qid, mq.Name, number, pid) - } else { - log.Trace("WorkerPool: %d adding %d workers (no group id)", p.qid, number) - + return ctx, func() { + mq.RemoveWorkers(pid) + } } + log.Trace("WorkerPool: %d adding %d workers (no group id)", p.qid, number) + return ctx, cancel } // AddWorkers adds workers to the pool - this allows the number of workers to go above the limit func (p *WorkerPool) AddWorkers(number int, timeout time.Duration) context.CancelFunc { ctx, cancel := p.commonRegisterWorkers(number, timeout, false) - p.addWorkers(ctx, number) + p.addWorkers(ctx, cancel, number) return cancel } // addWorkers adds workers to the pool -func (p *WorkerPool) addWorkers(ctx context.Context, number int) { +func (p *WorkerPool) addWorkers(ctx context.Context, cancel context.CancelFunc, number int) { for i := 0; i < number; i++ { p.lock.Lock() if p.cond == nil { @@ -279,11 +276,13 @@ func (p *WorkerPool) addWorkers(ctx context.Context, number int) { p.numberOfWorkers-- if p.numberOfWorkers == 0 { p.cond.Broadcast() + cancel() } else if p.numberOfWorkers < 0 { // numberOfWorkers can't go negative but... log.Warn("Number of Workers < 0 for QID %d - this shouldn't happen", p.qid) p.numberOfWorkers = 0 p.cond.Broadcast() + cancel() } p.lock.Unlock() }() From 5e35339e312eee98f4d22c23806a8afeb1a855d0 Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Sun, 2 May 2021 22:22:24 +0100 Subject: [PATCH 09/24] simplify zero boost Signed-off-by: Andrew Thornton --- modules/queue/workerpool.go | 18 +++--------------- 1 file changed, 3 insertions(+), 15 deletions(-) diff --git a/modules/queue/workerpool.go b/modules/queue/workerpool.go index b3f7cd89f03fa..1b5a936826b3f 100644 --- a/modules/queue/workerpool.go +++ b/modules/queue/workerpool.go @@ -83,7 +83,7 @@ func (p *WorkerPool) Push(data Data) { } func (p *WorkerPool) zeroBoost() { - ctx, cancel := context.WithCancel(p.baseCtx) + ctx, cancel := context.WithTimeout(p.baseCtx, p.boostTimeout) mq := GetManager().GetManagedQueue(p.qid) boost := p.boostWorkers if (boost+p.numberOfWorkers) > p.maxNumberOfWorkers && p.maxNumberOfWorkers >= 0 { @@ -94,23 +94,11 @@ func (p *WorkerPool) zeroBoost() { start := time.Now() pid := mq.RegisterWorkers(boost, start, true, start.Add(p.boostTimeout), cancel, false) - go func() { - select { - case <-ctx.Done(): - case <-time.After(p.boostTimeout): - } + cancel = func() { mq.RemoveWorkers(pid) - cancel() - }() + } } else { log.Warn("WorkerPool: %d has zero workers - adding %d temporary workers for %s", p.qid, p.boostWorkers, p.boostTimeout) - go func() { - select { - case <-ctx.Done(): - case <-time.After(p.boostTimeout): - } - cancel() - }() } p.lock.Unlock() p.addWorkers(ctx, cancel, boost) From eda962dd08a9664d53af80adc46bb3a373656de2 Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Mon, 3 May 2021 21:26:57 +0100 Subject: [PATCH 10/24] Simplify the AtShutdown and AtTerminate functions and add Channel Flusher Signed-off-by: Andrew Thornton --- modules/graceful/manager.go | 43 +++------ modules/indexer/code/indexer.go | 8 +- modules/indexer/issues/indexer.go | 4 +- modules/queue/queue.go | 6 +- modules/queue/queue_bytefifo.go | 103 +++++++++++---------- modules/queue/queue_channel.go | 62 ++++++++++--- modules/queue/queue_channel_test.go | 5 +- modules/queue/queue_disk_channel.go | 8 +- modules/queue/queue_disk_channel_test.go | 9 +- modules/queue/queue_disk_test.go | 9 +- modules/queue/queue_wrapped.go | 8 +- modules/queue/unique_queue_channel.go | 7 +- modules/queue/unique_queue_disk_channel.go | 9 +- services/pull/check_test.go | 5 +- 14 files changed, 154 insertions(+), 132 deletions(-) diff --git a/modules/graceful/manager.go b/modules/graceful/manager.go index a72facdf868b6..b7ae0e8c137d4 100644 --- a/modules/graceful/manager.go +++ b/modules/graceful/manager.go @@ -54,8 +54,8 @@ func InitManager(ctx context.Context) { }) } -// CallbackWithContext is combined runnable and context to watch to see if the caller has finished -type CallbackWithContext func(ctx context.Context, callback func()) +// WithCallback is a runnable to call when the caller has finished +type WithCallback func(callback func()) // RunnableWithShutdownFns is a runnable with functions to run at shutdown and terminate // After the callback to atShutdown is called and is complete, the main function must return. @@ -63,7 +63,7 @@ type CallbackWithContext func(ctx context.Context, callback func()) // Please note that use of the atShutdown and atTerminate callbacks will create go-routines that will wait till their respective signals // - users must therefore be careful to only call these as necessary. // If run is not expected to run indefinitely RunWithShutdownChan is likely to be more appropriate. -type RunnableWithShutdownFns func(atShutdown, atTerminate func(context.Context, func())) +type RunnableWithShutdownFns func(atShutdown, atTerminate func(func())) // RunWithShutdownFns takes a function that has both atShutdown and atTerminate callbacks // After the callback to atShutdown is called and is complete, the main function must return. @@ -80,7 +80,7 @@ func (g *Manager) RunWithShutdownFns(run RunnableWithShutdownFns) { g.doShutdown() } }() - run(func(ctx context.Context, atShutdown func()) { + run(func(atShutdown func()) { g.lock.Lock() defer g.lock.Unlock() g.toRunAtShutdown = append(g.toRunAtShutdown, @@ -91,15 +91,10 @@ func (g *Manager) RunWithShutdownFns(run RunnableWithShutdownFns) { g.doShutdown() } }() - select { - case <-ctx.Done(): - return - default: - atShutdown() - } + atShutdown() }) - }, func(ctx context.Context, atTerminate func()) { - g.RunAtTerminate(ctx, atTerminate) + }, func(atTerminate func()) { + g.RunAtTerminate(atTerminate) }) } @@ -108,7 +103,7 @@ func (g *Manager) RunWithShutdownFns(run RunnableWithShutdownFns) { // (Optionally IsHammer may be waited for instead however, this should be avoided if possible.) // The callback function provided to atTerminate must return once termination is complete. // Please note that use of the atTerminate function will create a go-routine that will wait till terminate - users must therefore be careful to only call this as necessary. -type RunnableWithShutdownChan func(atShutdown <-chan struct{}, atTerminate CallbackWithContext) +type RunnableWithShutdownChan func(atShutdown <-chan struct{}, atTerminate WithCallback) // RunWithShutdownChan takes a function that has channel to watch for shutdown and atTerminate callbacks // After the atShutdown channel is closed, the main function must return once shutdown is complete. @@ -124,8 +119,8 @@ func (g *Manager) RunWithShutdownChan(run RunnableWithShutdownChan) { g.doShutdown() } }() - run(g.IsShutdown(), func(ctx context.Context, atTerminate func()) { - g.RunAtTerminate(ctx, atTerminate) + run(g.IsShutdown(), func(atTerminate func()) { + g.RunAtTerminate(atTerminate) }) } @@ -145,7 +140,7 @@ func (g *Manager) RunWithShutdownContext(run func(context.Context)) { } // RunAtTerminate adds to the terminate wait group and creates a go-routine to run the provided function at termination -func (g *Manager) RunAtTerminate(ctx context.Context, terminate func()) { +func (g *Manager) RunAtTerminate(terminate func()) { g.terminateWaitGroup.Add(1) g.lock.Lock() defer g.lock.Unlock() @@ -157,12 +152,7 @@ func (g *Manager) RunAtTerminate(ctx context.Context, terminate func()) { log.Critical("PANIC during RunAtTerminate: %v\nStacktrace: %s", err, log.Stack(2)) } }() - select { - case <-ctx.Done(): - return - default: - terminate() - } + terminate() }) } @@ -187,7 +177,7 @@ func (g *Manager) RunAtShutdown(ctx context.Context, shutdown func()) { } // RunAtHammer creates a go-routine to run the provided function at shutdown -func (g *Manager) RunAtHammer(ctx context.Context, hammer func()) { +func (g *Manager) RunAtHammer(hammer func()) { g.lock.Lock() defer g.lock.Unlock() g.toRunAtHammer = append(g.toRunAtHammer, @@ -197,12 +187,7 @@ func (g *Manager) RunAtHammer(ctx context.Context, hammer func()) { log.Critical("PANIC during RunAtHammer: %v\nStacktrace: %s", err, log.Stack(2)) } }() - select { - case <-ctx.Done(): - return - default: - hammer() - } + hammer() }) } func (g *Manager) doShutdown() { diff --git a/modules/indexer/code/indexer.go b/modules/indexer/code/indexer.go index a7d78e9fdc82c..67fa43eda89dc 100644 --- a/modules/indexer/code/indexer.go +++ b/modules/indexer/code/indexer.go @@ -115,7 +115,13 @@ func Init() { ctx, cancel := context.WithCancel(context.Background()) - graceful.GetManager().RunAtTerminate(ctx, func() { + graceful.GetManager().RunAtTerminate(func() { + select { + case <-ctx.Done(): + return + default: + } + cancel() log.Debug("Closing repository indexer") indexer.Close() log.Info("PID: %d Repository Indexer closed", os.Getpid()) diff --git a/modules/indexer/issues/indexer.go b/modules/indexer/issues/indexer.go index 9edaef6bdd017..676b6686ea5b2 100644 --- a/modules/indexer/issues/indexer.go +++ b/modules/indexer/issues/indexer.go @@ -160,7 +160,7 @@ func InitIssueIndexer(syncReindex bool) { } populate = !exist holder.set(issueIndexer) - graceful.GetManager().RunAtTerminate(context.Background(), func() { + graceful.GetManager().RunAtTerminate(func() { log.Debug("Closing issue indexer") issueIndexer := holder.get() if issueIndexer != nil { @@ -170,7 +170,7 @@ func InitIssueIndexer(syncReindex bool) { }) log.Debug("Created Bleve Indexer") case "elasticsearch": - graceful.GetManager().RunWithShutdownFns(func(_, atTerminate func(context.Context, func())) { + graceful.GetManager().RunWithShutdownFns(func(_, atTerminate func(func())) { issueIndexer, err := NewElasticSearchIndexer(setting.Indexer.IssueConnStr, setting.Indexer.IssueIndexerName) if err != nil { log.Fatal("Unable to initialize Elastic Search Issue Indexer at connection: %s Error: %v", setting.Indexer.IssueConnStr, err) diff --git a/modules/queue/queue.go b/modules/queue/queue.go index d08cba35a1ea5..7159048c11689 100644 --- a/modules/queue/queue.go +++ b/modules/queue/queue.go @@ -57,7 +57,7 @@ type Named interface { // Queues will handle their own contents in the Run method type Queue interface { Flushable - Run(atShutdown, atTerminate func(context.Context, func())) + Run(atShutdown, atTerminate func(func())) Push(Data) error } @@ -74,7 +74,7 @@ type DummyQueue struct { } // Run does nothing -func (*DummyQueue) Run(_, _ func(context.Context, func())) {} +func (*DummyQueue) Run(_, _ func(func())) {} // Push fakes a push of data to the queue func (*DummyQueue) Push(Data) error { @@ -122,7 +122,7 @@ type Immediate struct { } // Run does nothing -func (*Immediate) Run(_, _ func(context.Context, func())) {} +func (*Immediate) Run(_, _ func(func())) {} // Push fakes a push of data to the queue func (q *Immediate) Push(data Data) error { diff --git a/modules/queue/queue_bytefifo.go b/modules/queue/queue_bytefifo.go index 195de8a92dc80..818180723fa38 100644 --- a/modules/queue/queue_bytefifo.go +++ b/modules/queue/queue_bytefifo.go @@ -27,16 +27,18 @@ var _ Queue = &ByteFIFOQueue{} // ByteFIFOQueue is a Queue formed from a ByteFIFO and WorkerPool type ByteFIFOQueue struct { *WorkerPool - byteFIFO ByteFIFO - typ Type - closed chan struct{} - terminated chan struct{} - exemplar interface{} - workers int - name string - lock sync.Mutex - waitOnEmpty bool - pushed chan struct{} + byteFIFO ByteFIFO + typ Type + shutdownCtx context.Context + shutdownCancel context.CancelFunc + terminateCtx context.Context + terminateCancel context.CancelFunc + exemplar interface{} + workers int + name string + lock sync.Mutex + waitOnEmpty bool + pushed chan struct{} } // NewByteFIFOQueue creates a new ByteFIFOQueue @@ -47,17 +49,22 @@ func NewByteFIFOQueue(typ Type, byteFIFO ByteFIFO, handle HandlerFunc, cfg, exem } config := configInterface.(ByteFIFOQueueConfiguration) + terminateCtx, terminateCancel := context.WithCancel(context.Background()) + shutdownCtx, shutdownCancel := context.WithCancel(terminateCtx) + return &ByteFIFOQueue{ - WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration), - byteFIFO: byteFIFO, - typ: typ, - closed: make(chan struct{}), - terminated: make(chan struct{}), - exemplar: exemplar, - workers: config.Workers, - name: config.Name, - waitOnEmpty: config.WaitOnEmpty, - pushed: make(chan struct{}, 1), + WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration), + byteFIFO: byteFIFO, + typ: typ, + shutdownCtx: shutdownCtx, + shutdownCancel: shutdownCancel, + terminateCtx: terminateCtx, + terminateCancel: terminateCancel, + exemplar: exemplar, + workers: config.Workers, + name: config.Name, + waitOnEmpty: config.WaitOnEmpty, + pushed: make(chan struct{}, 1), }, nil } @@ -103,9 +110,9 @@ func (q *ByteFIFOQueue) IsEmpty() bool { } // Run runs the bytefifo queue -func (q *ByteFIFOQueue) Run(atShutdown, atTerminate func(context.Context, func())) { - atShutdown(context.Background(), q.Shutdown) - atTerminate(context.Background(), q.Terminate) +func (q *ByteFIFOQueue) Run(atShutdown, atTerminate func(func())) { + atShutdown(q.Shutdown) + atTerminate(q.Terminate) log.Debug("%s: %s Starting", q.typ, q.name) go func() { @@ -115,21 +122,19 @@ func (q *ByteFIFOQueue) Run(atShutdown, atTerminate func(context.Context, func() log.Trace("%s: %s Now running", q.typ, q.name) q.readToChan() - <-q.closed + <-q.shutdownCtx.Done() log.Trace("%s: %s Waiting til done", q.typ, q.name) q.Wait() log.Trace("%s: %s Waiting til cleaned", q.typ, q.name) - ctx, cancel := context.WithCancel(context.Background()) - atTerminate(ctx, cancel) - q.CleanUp(ctx) - cancel() + q.CleanUp(q.terminateCtx) + q.terminateCancel() } func (q *ByteFIFOQueue) readToChan() { for { select { - case <-q.closed: + case <-q.shutdownCtx.Done(): // tell the pool to shutdown. q.cancel() return @@ -150,7 +155,7 @@ func (q *ByteFIFOQueue) readToChan() { select { case <-q.pushed: continue - case <-q.closed: + case <-q.shutdownCtx.Done(): continue } } @@ -177,34 +182,30 @@ func (q *ByteFIFOQueue) readToChan() { // Shutdown processing from this queue func (q *ByteFIFOQueue) Shutdown() { log.Trace("%s: %s Shutting down", q.typ, q.name) - q.lock.Lock() select { - case <-q.closed: + case <-q.shutdownCtx.Done(): + return default: - close(q.closed) } - q.lock.Unlock() + q.shutdownCancel() log.Debug("%s: %s Shutdown", q.typ, q.name) } // IsShutdown returns a channel which is closed when this Queue is shutdown func (q *ByteFIFOQueue) IsShutdown() <-chan struct{} { - return q.closed + return q.shutdownCtx.Done() } // Terminate this queue and close the queue func (q *ByteFIFOQueue) Terminate() { log.Trace("%s: %s Terminating", q.typ, q.name) q.Shutdown() - q.lock.Lock() select { - case <-q.terminated: - q.lock.Unlock() + case <-q.terminateCtx.Done(): return default: } - close(q.terminated) - q.lock.Unlock() + q.terminateCancel() if log.IsDebug() { log.Debug("%s: %s Closing with %d tasks left in queue", q.typ, q.name, q.byteFIFO.Len()) } @@ -216,7 +217,7 @@ func (q *ByteFIFOQueue) Terminate() { // IsTerminated returns a channel which is closed when this Queue is terminated func (q *ByteFIFOQueue) IsTerminated() <-chan struct{} { - return q.terminated + return q.terminateCtx.Done() } var _ UniqueQueue = &ByteFIFOUniqueQueue{} @@ -233,17 +234,21 @@ func NewByteFIFOUniqueQueue(typ Type, byteFIFO UniqueByteFIFO, handle HandlerFun return nil, err } config := configInterface.(ByteFIFOQueueConfiguration) + terminateCtx, terminateCancel := context.WithCancel(context.Background()) + shutdownCtx, shutdownCancel := context.WithCancel(terminateCtx) return &ByteFIFOUniqueQueue{ ByteFIFOQueue: ByteFIFOQueue{ - WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration), - byteFIFO: byteFIFO, - typ: typ, - closed: make(chan struct{}), - terminated: make(chan struct{}), - exemplar: exemplar, - workers: config.Workers, - name: config.Name, + WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration), + byteFIFO: byteFIFO, + typ: typ, + shutdownCtx: shutdownCtx, + shutdownCancel: shutdownCancel, + terminateCtx: terminateCtx, + terminateCancel: terminateCancel, + exemplar: exemplar, + workers: config.Workers, + name: config.Name, }, }, nil } diff --git a/modules/queue/queue_channel.go b/modules/queue/queue_channel.go index d7a11e79f5dc6..8477a775f4260 100644 --- a/modules/queue/queue_channel.go +++ b/modules/queue/queue_channel.go @@ -27,9 +27,13 @@ type ChannelQueueConfiguration struct { // It is basically a very thin wrapper around a WorkerPool type ChannelQueue struct { *WorkerPool - exemplar interface{} - workers int - name string + shutdownCtx context.Context + shutdownCancel context.CancelFunc + terminateCtx context.Context + terminateCancel context.CancelFunc + exemplar interface{} + workers int + name string } // NewChannelQueue creates a memory channel queue @@ -42,24 +46,28 @@ func NewChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, erro if config.BatchLength == 0 { config.BatchLength = 1 } + + terminateCtx, terminateCancel := context.WithCancel(context.Background()) + shutdownCtx, shutdownCancel := context.WithCancel(terminateCtx) + queue := &ChannelQueue{ - WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration), - exemplar: exemplar, - workers: config.Workers, - name: config.Name, + WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration), + shutdownCtx: shutdownCtx, + shutdownCancel: shutdownCancel, + terminateCtx: terminateCtx, + terminateCancel: terminateCancel, + exemplar: exemplar, + workers: config.Workers, + name: config.Name, } queue.qid = GetManager().Add(queue, ChannelQueueType, config, exemplar) return queue, nil } // Run starts to run the queue -func (q *ChannelQueue) Run(atShutdown, atTerminate func(context.Context, func())) { - atShutdown(context.Background(), func() { - log.Warn("ChannelQueue: %s is not shutdownable!", q.name) - }) - atTerminate(context.Background(), func() { - log.Warn("ChannelQueue: %s is not terminatable!", q.name) - }) +func (q *ChannelQueue) Run(atShutdown, atTerminate func(func())) { + atShutdown(q.Shutdown) + atTerminate(q.Terminate) log.Debug("ChannelQueue: %s Starting", q.name) go func() { _ = q.AddWorkers(q.workers, 0) @@ -75,6 +83,32 @@ func (q *ChannelQueue) Push(data Data) error { return nil } +// Shutdown processing from this queue +func (q *ChannelQueue) Shutdown() { + log.Trace("ChannelQueue: %s Shutting down", q.name) + select { + case <-q.shutdownCtx.Done(): + return + default: + } + go q.FlushWithContext(q.terminateCtx) + q.shutdownCancel() + log.Debug("ChannelQueue: %s Shutdown", q.name) +} + +// Terminate this queue and close the queue +func (q *ChannelQueue) Terminate() { + log.Trace("ChannelQueue: %s Terminating", q.name) + q.Shutdown() + select { + case <-q.terminateCtx.Done(): + return + default: + } + q.terminateCancel() + log.Debug("ChannelQueue: %s Terminated", q.name) +} + // Name returns the name of this queue func (q *ChannelQueue) Name() string { return q.name diff --git a/modules/queue/queue_channel_test.go b/modules/queue/queue_channel_test.go index 08a64c0ab86c6..1b0f76da3753a 100644 --- a/modules/queue/queue_channel_test.go +++ b/modules/queue/queue_channel_test.go @@ -5,7 +5,6 @@ package queue import ( - "context" "testing" "time" @@ -21,7 +20,7 @@ func TestChannelQueue(t *testing.T) { } } - nilFn := func(_ context.Context, _ func()) {} + nilFn := func(_ func()) {} queue, err := NewChannelQueue(handle, ChannelQueueConfiguration{ @@ -61,7 +60,7 @@ func TestChannelQueue_Batch(t *testing.T) { } } - nilFn := func(_ context.Context, _ func()) {} + nilFn := func(_ func()) {} queue, err := NewChannelQueue(handle, ChannelQueueConfiguration{ diff --git a/modules/queue/queue_disk_channel.go b/modules/queue/queue_disk_channel.go index 0f3774dbca44e..9b96867838ac2 100644 --- a/modules/queue/queue_disk_channel.go +++ b/modules/queue/queue_disk_channel.go @@ -133,7 +133,7 @@ func (q *PersistableChannelQueue) Push(data Data) error { } // Run starts to run the queue -func (q *PersistableChannelQueue) Run(atShutdown, atTerminate func(context.Context, func())) { +func (q *PersistableChannelQueue) Run(atShutdown, atTerminate func(func())) { log.Debug("PersistableChannelQueue: %s Starting", q.delayedStarter.name) q.lock.Lock() @@ -147,12 +147,12 @@ func (q *PersistableChannelQueue) Run(atShutdown, atTerminate func(context.Conte } else { q.lock.Unlock() } - atShutdown(context.Background(), q.Shutdown) - atTerminate(context.Background(), q.Terminate) + atShutdown(q.Shutdown) + atTerminate(q.Terminate) if lq, ok := q.internal.(*LevelQueue); ok && lq.byteFIFO.Len() != 0 { // Just run the level queue - we shut it down once it's flushed - go q.internal.Run(func(_ context.Context, _ func()) {}, func(_ context.Context, _ func()) {}) + go q.internal.Run(func(_ func()) {}, func(_ func()) {}) go func() { _ = q.internal.Flush(0) log.Debug("LevelQueue: %s flushed so shutting down", q.internal.(*LevelQueue).Name()) diff --git a/modules/queue/queue_disk_channel_test.go b/modules/queue/queue_disk_channel_test.go index 93061bffc6586..72e00d6982eae 100644 --- a/modules/queue/queue_disk_channel_test.go +++ b/modules/queue/queue_disk_channel_test.go @@ -5,7 +5,6 @@ package queue import ( - "context" "io/ioutil" "testing" "time" @@ -40,9 +39,9 @@ func TestPersistableChannelQueue(t *testing.T) { }, &testData{}) assert.NoError(t, err) - go queue.Run(func(_ context.Context, shutdown func()) { + go queue.Run(func(shutdown func()) { queueShutdown = append(queueShutdown, shutdown) - }, func(_ context.Context, terminate func()) { + }, func(terminate func()) { queueTerminate = append(queueTerminate, terminate) }) @@ -94,9 +93,9 @@ func TestPersistableChannelQueue(t *testing.T) { }, &testData{}) assert.NoError(t, err) - go queue.Run(func(_ context.Context, shutdown func()) { + go queue.Run(func(shutdown func()) { queueShutdown = append(queueShutdown, shutdown) - }, func(_ context.Context, terminate func()) { + }, func(terminate func()) { queueTerminate = append(queueTerminate, terminate) }) diff --git a/modules/queue/queue_disk_test.go b/modules/queue/queue_disk_test.go index edaed49a52396..1f884d4f8d76d 100644 --- a/modules/queue/queue_disk_test.go +++ b/modules/queue/queue_disk_test.go @@ -5,7 +5,6 @@ package queue import ( - "context" "io/ioutil" "sync" "testing" @@ -49,11 +48,11 @@ func TestLevelQueue(t *testing.T) { }, &testData{}) assert.NoError(t, err) - go queue.Run(func(_ context.Context, shutdown func()) { + go queue.Run(func(shutdown func()) { lock.Lock() queueShutdown = append(queueShutdown, shutdown) lock.Unlock() - }, func(_ context.Context, terminate func()) { + }, func(terminate func()) { lock.Lock() queueTerminate = append(queueTerminate, terminate) lock.Unlock() @@ -123,11 +122,11 @@ func TestLevelQueue(t *testing.T) { }, &testData{}) assert.NoError(t, err) - go queue.Run(func(_ context.Context, shutdown func()) { + go queue.Run(func(shutdown func()) { lock.Lock() queueShutdown = append(queueShutdown, shutdown) lock.Unlock() - }, func(_ context.Context, terminate func()) { + }, func(terminate func()) { lock.Lock() queueTerminate = append(queueTerminate, terminate) lock.Unlock() diff --git a/modules/queue/queue_wrapped.go b/modules/queue/queue_wrapped.go index 88d64e82464f4..ec30ab0281972 100644 --- a/modules/queue/queue_wrapped.go +++ b/modules/queue/queue_wrapped.go @@ -38,7 +38,7 @@ type delayedStarter struct { } // setInternal must be called with the lock locked. -func (q *delayedStarter) setInternal(atShutdown func(context.Context, func()), handle HandlerFunc, exemplar interface{}) error { +func (q *delayedStarter) setInternal(atShutdown func(func()), handle HandlerFunc, exemplar interface{}) error { var ctx context.Context var cancel context.CancelFunc if q.timeout > 0 { @@ -49,9 +49,7 @@ func (q *delayedStarter) setInternal(atShutdown func(context.Context, func()), h defer cancel() // Ensure we also stop at shutdown - atShutdown(ctx, func() { - cancel() - }) + atShutdown(cancel) i := 1 for q.internal == nil { @@ -221,7 +219,7 @@ func (q *WrappedQueue) IsEmpty() bool { } // Run starts to run the queue and attempts to create the internal queue -func (q *WrappedQueue) Run(atShutdown, atTerminate func(context.Context, func())) { +func (q *WrappedQueue) Run(atShutdown, atTerminate func(func())) { log.Debug("WrappedQueue: %s Starting", q.name) q.lock.Lock() if q.internal == nil { diff --git a/modules/queue/unique_queue_channel.go b/modules/queue/unique_queue_channel.go index dec1cfc5c06e3..e4b759413b589 100644 --- a/modules/queue/unique_queue_channel.go +++ b/modules/queue/unique_queue_channel.go @@ -5,7 +5,6 @@ package queue import ( - "context" "fmt" "sync" @@ -65,11 +64,11 @@ func NewChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue } // Run starts to run the queue -func (q *ChannelUniqueQueue) Run(atShutdown, atTerminate func(context.Context, func())) { - atShutdown(context.Background(), func() { +func (q *ChannelUniqueQueue) Run(atShutdown, atTerminate func(func())) { + atShutdown(func() { log.Warn("ChannelUniqueQueue: %s is not shutdownable!", q.name) }) - atTerminate(context.Background(), func() { + atTerminate(func() { log.Warn("ChannelUniqueQueue: %s is not terminatable!", q.name) }) log.Debug("ChannelUniqueQueue: %s Starting", q.name) diff --git a/modules/queue/unique_queue_disk_channel.go b/modules/queue/unique_queue_disk_channel.go index 6fe19ff029f98..bb36a3702e22d 100644 --- a/modules/queue/unique_queue_disk_channel.go +++ b/modules/queue/unique_queue_disk_channel.go @@ -5,7 +5,6 @@ package queue import ( - "context" "sync" "time" @@ -158,7 +157,7 @@ func (q *PersistableChannelUniqueQueue) Has(data Data) (bool, error) { } // Run starts to run the queue -func (q *PersistableChannelUniqueQueue) Run(atShutdown, atTerminate func(context.Context, func())) { +func (q *PersistableChannelUniqueQueue) Run(atShutdown, atTerminate func(func())) { log.Debug("PersistableChannelUniqueQueue: %s Starting", q.delayedStarter.name) q.lock.Lock() @@ -179,12 +178,12 @@ func (q *PersistableChannelUniqueQueue) Run(atShutdown, atTerminate func(context } else { q.lock.Unlock() } - atShutdown(context.Background(), q.Shutdown) - atTerminate(context.Background(), q.Terminate) + atShutdown(q.Shutdown) + atTerminate(q.Terminate) if luq, ok := q.internal.(*LevelUniqueQueue); ok && luq.ByteFIFOUniqueQueue.byteFIFO.Len() != 0 { // Just run the level queue - we shut it down once it's flushed - go q.internal.Run(func(_ context.Context, _ func()) {}, func(_ context.Context, _ func()) {}) + go q.internal.Run(func(_ func()) {}, func(_ func()) {}) go func() { _ = q.internal.Flush(0) log.Debug("LevelUniqueQueue: %s flushed so shutting down", q.internal.(*LevelQueue).Name()) diff --git a/services/pull/check_test.go b/services/pull/check_test.go index 33a230e5ab86b..f6614ea0ad27f 100644 --- a/services/pull/check_test.go +++ b/services/pull/check_test.go @@ -6,7 +6,6 @@ package pull import ( - "context" "strconv" "testing" "time" @@ -54,9 +53,9 @@ func TestPullRequest_AddToTaskQueue(t *testing.T) { assert.True(t, has) assert.NoError(t, err) - prQueue.Run(func(_ context.Context, shutdown func()) { + prQueue.Run(func(shutdown func()) { queueShutdown = append(queueShutdown, shutdown) - }, func(_ context.Context, terminate func()) { + }, func(terminate func()) { queueTerminate = append(queueTerminate, terminate) }) From 5fc1c804f2b2c6be881bff6f52ee4252be9af8b3 Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Mon, 3 May 2021 21:31:30 +0100 Subject: [PATCH 11/24] add logging Signed-off-by: Andrew Thornton --- modules/queue/queue_channel.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/modules/queue/queue_channel.go b/modules/queue/queue_channel.go index 8477a775f4260..40744cb68f445 100644 --- a/modules/queue/queue_channel.go +++ b/modules/queue/queue_channel.go @@ -91,7 +91,14 @@ func (q *ChannelQueue) Shutdown() { return default: } - go q.FlushWithContext(q.terminateCtx) + go func() { + log.Trace("ChannelQueue: %s Flushing", q.name) + if err := q.FlushWithContext(q.terminateCtx); err != nil { + log.Warn("ChannelQueue: %s Terminated before completed flushing", q.name) + return + } + log.Debug("ChannelQueue: %s Flushed", q.name) + }() q.shutdownCancel() log.Debug("ChannelQueue: %s Shutdown", q.name) } From 5ea5882194e5e04f03d208e1f2ecfe9b126b9adb Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Mon, 3 May 2021 21:34:42 +0100 Subject: [PATCH 12/24] Add shutdown flusher to CUQ Signed-off-by: Andrew Thornton --- modules/queue/unique_queue_channel.go | 72 +++++++++++++++++++++------ 1 file changed, 57 insertions(+), 15 deletions(-) diff --git a/modules/queue/unique_queue_channel.go b/modules/queue/unique_queue_channel.go index e4b759413b589..ff9e448b7e875 100644 --- a/modules/queue/unique_queue_channel.go +++ b/modules/queue/unique_queue_channel.go @@ -5,6 +5,7 @@ package queue import ( + "context" "fmt" "sync" @@ -27,11 +28,15 @@ type ChannelUniqueQueueConfiguration ChannelQueueConfiguration // only guaranteed whilst the task is waiting in the queue. type ChannelUniqueQueue struct { *WorkerPool - lock sync.Mutex - table map[Data]bool - exemplar interface{} - workers int - name string + lock sync.Mutex + table map[Data]bool + shutdownCtx context.Context + shutdownCancel context.CancelFunc + terminateCtx context.Context + terminateCancel context.CancelFunc + exemplar interface{} + workers int + name string } // NewChannelUniqueQueue create a memory channel queue @@ -44,11 +49,19 @@ func NewChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue if config.BatchLength == 0 { config.BatchLength = 1 } + + terminateCtx, terminateCancel := context.WithCancel(context.Background()) + shutdownCtx, shutdownCancel := context.WithCancel(terminateCtx) + queue := &ChannelUniqueQueue{ - table: map[Data]bool{}, - exemplar: exemplar, - workers: config.Workers, - name: config.Name, + table: map[Data]bool{}, + shutdownCtx: shutdownCtx, + shutdownCancel: shutdownCancel, + terminateCtx: terminateCtx, + terminateCancel: terminateCancel, + exemplar: exemplar, + workers: config.Workers, + name: config.Name, } queue.WorkerPool = NewWorkerPool(func(data ...Data) { for _, datum := range data { @@ -65,12 +78,8 @@ func NewChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue // Run starts to run the queue func (q *ChannelUniqueQueue) Run(atShutdown, atTerminate func(func())) { - atShutdown(func() { - log.Warn("ChannelUniqueQueue: %s is not shutdownable!", q.name) - }) - atTerminate(func() { - log.Warn("ChannelUniqueQueue: %s is not terminatable!", q.name) - }) + atShutdown(q.Shutdown) + atTerminate(q.Terminate) log.Debug("ChannelUniqueQueue: %s Starting", q.name) go func() { _ = q.AddWorkers(q.workers, 0) @@ -121,6 +130,39 @@ func (q *ChannelUniqueQueue) Has(data Data) (bool, error) { return has, nil } +// Shutdown processing from this queue +func (q *ChannelUniqueQueue) Shutdown() { + log.Trace("ChannelUniqueQueue: %s Shutting down", q.name) + select { + case <-q.shutdownCtx.Done(): + return + default: + } + go func() { + log.Trace("ChannelUniqueQueue: %s Flushing", q.name) + if err := q.FlushWithContext(q.terminateCtx); err != nil { + log.Warn("ChannelUniqueQueue: %s Terminated before completed flushing", q.name) + return + } + log.Debug("ChannelUniqueQueue: %s Flushed", q.name) + }() + q.shutdownCancel() + log.Debug("ChannelUniqueQueue: %s Shutdown", q.name) +} + +// Terminate this queue and close the queue +func (q *ChannelUniqueQueue) Terminate() { + log.Trace("ChannelUniqueQueue: %s Terminating", q.name) + q.Shutdown() + select { + case <-q.terminateCtx.Done(): + return + default: + } + q.terminateCancel() + log.Debug("ChannelUniqueQueue: %s Terminated", q.name) +} + // Name returns the name of this queue func (q *ChannelUniqueQueue) Name() string { return q.name From fb4b1c9977bd3eeb2e8bf5171f6aaf362f8eaeef Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Mon, 3 May 2021 22:01:47 +0100 Subject: [PATCH 13/24] move persistable channel shutdown stuff to Shutdown Fn Signed-off-by: Andrew Thornton --- modules/queue/queue_disk_channel.go | 51 +++++++++++----------- modules/queue/unique_queue_disk_channel.go | 38 ++++++++-------- 2 files changed, 45 insertions(+), 44 deletions(-) diff --git a/modules/queue/queue_disk_channel.go b/modules/queue/queue_disk_channel.go index 9b96867838ac2..b4c9dc39f15e5 100644 --- a/modules/queue/queue_disk_channel.go +++ b/modules/queue/queue_disk_channel.go @@ -169,25 +169,6 @@ func (q *PersistableChannelQueue) Run(atShutdown, atTerminate func(func())) { go func() { _ = q.channelQueue.AddWorkers(q.channelQueue.workers, 0) }() - - log.Trace("PersistableChannelQueue: %s Waiting til closed", q.delayedStarter.name) - <-q.closed - log.Trace("PersistableChannelQueue: %s Cancelling pools", q.delayedStarter.name) - q.channelQueue.cancel() - q.internal.(*LevelQueue).cancel() - log.Trace("PersistableChannelQueue: %s Waiting til done", q.delayedStarter.name) - q.channelQueue.Wait() - q.internal.(*LevelQueue).Wait() - // Redirect all remaining data in the chan to the internal channel - go func() { - log.Trace("PersistableChannelQueue: %s Redirecting remaining data", q.delayedStarter.name) - for data := range q.channelQueue.dataChan { - _ = q.internal.Push(data) - atomic.AddInt64(&q.channelQueue.numInQueue, -1) - } - log.Trace("PersistableChannelQueue: %s Done Redirecting remaining data", q.delayedStarter.name) - }() - log.Trace("PersistableChannelQueue: %s Done main loop", q.delayedStarter.name) } // Flush flushes the queue and blocks till the queue is empty @@ -245,16 +226,36 @@ func (q *PersistableChannelQueue) IsEmpty() bool { func (q *PersistableChannelQueue) Shutdown() { log.Trace("PersistableChannelQueue: %s Shutting down", q.delayedStarter.name) q.lock.Lock() - defer q.lock.Unlock() + select { case <-q.closed: + q.lock.Unlock() + return default: - if q.internal != nil { - q.internal.(*LevelQueue).Shutdown() - } - close(q.closed) - log.Debug("PersistableChannelQueue: %s Shutdown", q.delayedStarter.name) } + if q.internal != nil { + q.internal.(*LevelQueue).Shutdown() + } + close(q.closed) + q.lock.Unlock() + + log.Trace("PersistableChannelQueue: %s Cancelling pools", q.delayedStarter.name) + q.channelQueue.cancel() + q.internal.(*LevelQueue).cancel() + log.Trace("PersistableChannelQueue: %s Waiting til done", q.delayedStarter.name) + q.channelQueue.Wait() + q.internal.(*LevelQueue).Wait() + // Redirect all remaining data in the chan to the internal channel + go func() { + log.Trace("PersistableChannelQueue: %s Redirecting remaining data", q.delayedStarter.name) + for data := range q.channelQueue.dataChan { + _ = q.internal.Push(data) + atomic.AddInt64(&q.channelQueue.numInQueue, -1) + } + log.Trace("PersistableChannelQueue: %s Done Redirecting remaining data", q.delayedStarter.name) + }() + + log.Debug("PersistableChannelQueue: %s Shutdown", q.delayedStarter.name) } // Terminate this queue and close the queue diff --git a/modules/queue/unique_queue_disk_channel.go b/modules/queue/unique_queue_disk_channel.go index bb36a3702e22d..a8b70883eff6d 100644 --- a/modules/queue/unique_queue_disk_channel.go +++ b/modules/queue/unique_queue_disk_channel.go @@ -199,24 +199,6 @@ func (q *PersistableChannelUniqueQueue) Run(atShutdown, atTerminate func(func()) go func() { _ = q.ChannelUniqueQueue.AddWorkers(q.workers, 0) }() - - log.Trace("PersistableChannelUniqueQueue: %s Waiting til closed", q.delayedStarter.name) - <-q.closed - log.Trace("PersistableChannelUniqueQueue: %s Cancelling pools", q.delayedStarter.name) - q.internal.(*LevelUniqueQueue).cancel() - q.ChannelUniqueQueue.cancel() - log.Trace("PersistableChannelUniqueQueue: %s Waiting til done", q.delayedStarter.name) - q.ChannelUniqueQueue.Wait() - q.internal.(*LevelUniqueQueue).Wait() - // Redirect all remaining data in the chan to the internal channel - go func() { - log.Trace("PersistableChannelUniqueQueue: %s Redirecting remaining data", q.delayedStarter.name) - for data := range q.ChannelUniqueQueue.dataChan { - _ = q.internal.Push(data) - } - log.Trace("PersistableChannelUniqueQueue: %s Done Redirecting remaining data", q.delayedStarter.name) - }() - log.Trace("PersistableChannelUniqueQueue: %s Done main loop", q.delayedStarter.name) } // Flush flushes the queue @@ -228,15 +210,33 @@ func (q *PersistableChannelUniqueQueue) Flush(timeout time.Duration) error { func (q *PersistableChannelUniqueQueue) Shutdown() { log.Trace("PersistableChannelUniqueQueue: %s Shutting down", q.delayedStarter.name) q.lock.Lock() - defer q.lock.Unlock() select { case <-q.closed: + q.lock.Unlock() + return default: if q.internal != nil { q.internal.(*LevelUniqueQueue).Shutdown() } close(q.closed) + q.lock.Unlock() } + + log.Trace("PersistableChannelUniqueQueue: %s Cancelling pools", q.delayedStarter.name) + q.internal.(*LevelUniqueQueue).cancel() + q.ChannelUniqueQueue.cancel() + log.Trace("PersistableChannelUniqueQueue: %s Waiting til done", q.delayedStarter.name) + q.ChannelUniqueQueue.Wait() + q.internal.(*LevelUniqueQueue).Wait() + // Redirect all remaining data in the chan to the internal channel + go func() { + log.Trace("PersistableChannelUniqueQueue: %s Redirecting remaining data", q.delayedStarter.name) + for data := range q.ChannelUniqueQueue.dataChan { + _ = q.internal.Push(data) + } + log.Trace("PersistableChannelUniqueQueue: %s Done Redirecting remaining data", q.delayedStarter.name) + }() + log.Debug("PersistableChannelUniqueQueue: %s Shutdown", q.delayedStarter.name) } From b0bbd0d5d2f42d7640a13c7176776be24eb87c7d Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Mon, 3 May 2021 22:14:11 +0100 Subject: [PATCH 14/24] Ensure that UPCQ has the correct config Signed-off-by: Andrew Thornton --- modules/queue/unique_queue_disk_channel.go | 33 ++++++++++++++-------- 1 file changed, 22 insertions(+), 11 deletions(-) diff --git a/modules/queue/unique_queue_disk_channel.go b/modules/queue/unique_queue_disk_channel.go index a8b70883eff6d..63a1f2902980c 100644 --- a/modules/queue/unique_queue_disk_channel.go +++ b/modules/queue/unique_queue_disk_channel.go @@ -5,6 +5,7 @@ package queue import ( + "context" "sync" "time" @@ -35,7 +36,7 @@ type PersistableChannelUniqueQueueConfiguration struct { // task cannot be processed twice or more at the same time. Uniqueness is // only guaranteed whilst the task is waiting in the queue. type PersistableChannelUniqueQueue struct { - *ChannelUniqueQueue + channelQueue *ChannelUniqueQueue delayedStarter lock sync.Mutex closed chan struct{} @@ -84,8 +85,8 @@ func NewPersistableChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interfac } queue := &PersistableChannelUniqueQueue{ - ChannelUniqueQueue: channelUniqueQueue.(*ChannelUniqueQueue), - closed: make(chan struct{}), + channelQueue: channelUniqueQueue.(*ChannelUniqueQueue), + closed: make(chan struct{}), } levelQueue, err := NewLevelUniqueQueue(func(data ...Data) { @@ -137,14 +138,14 @@ func (q *PersistableChannelUniqueQueue) PushFunc(data Data, fn func() error) err case <-q.closed: return q.internal.(UniqueQueue).PushFunc(data, fn) default: - return q.ChannelUniqueQueue.PushFunc(data, fn) + return q.channelQueue.PushFunc(data, fn) } } // Has will test if the queue has the data func (q *PersistableChannelUniqueQueue) Has(data Data) (bool, error) { // This is more difficult... - has, err := q.ChannelUniqueQueue.Has(data) + has, err := q.channelQueue.Has(data) if err != nil || has { return has, err } @@ -169,7 +170,7 @@ func (q *PersistableChannelUniqueQueue) Run(atShutdown, atTerminate func(func()) log.Error("Unable push to channelled queue: %v", err) } } - }, q.exemplar) + }, q.channelQueue.exemplar) q.lock.Unlock() if err != nil { log.Fatal("Unable to create internal queue for %s Error: %v", q.Name(), err) @@ -197,13 +198,23 @@ func (q *PersistableChannelUniqueQueue) Run(atShutdown, atTerminate func(func()) } go func() { - _ = q.ChannelUniqueQueue.AddWorkers(q.workers, 0) + _ = q.channelQueue.AddWorkers(q.channelQueue.workers, 0) }() } // Flush flushes the queue func (q *PersistableChannelUniqueQueue) Flush(timeout time.Duration) error { - return q.ChannelUniqueQueue.Flush(timeout) + return q.channelQueue.Flush(timeout) +} + +// Flush flushes the queue +func (q *PersistableChannelUniqueQueue) FlushWithContext(ctx context.Context) error { + return q.channelQueue.FlushWithContext(ctx) +} + +// IsEmpty checks if a queue is empty +func (q *PersistableChannelUniqueQueue) IsEmpty() bool { + return q.channelQueue.IsEmpty() } // Shutdown processing this queue @@ -224,14 +235,14 @@ func (q *PersistableChannelUniqueQueue) Shutdown() { log.Trace("PersistableChannelUniqueQueue: %s Cancelling pools", q.delayedStarter.name) q.internal.(*LevelUniqueQueue).cancel() - q.ChannelUniqueQueue.cancel() + q.channelQueue.cancel() log.Trace("PersistableChannelUniqueQueue: %s Waiting til done", q.delayedStarter.name) - q.ChannelUniqueQueue.Wait() + q.channelQueue.Wait() q.internal.(*LevelUniqueQueue).Wait() // Redirect all remaining data in the chan to the internal channel go func() { log.Trace("PersistableChannelUniqueQueue: %s Redirecting remaining data", q.delayedStarter.name) - for data := range q.ChannelUniqueQueue.dataChan { + for data := range q.channelQueue.dataChan { _ = q.internal.Push(data) } log.Trace("PersistableChannelUniqueQueue: %s Done Redirecting remaining data", q.delayedStarter.name) From 6b95a890635694bf8ab2939aa4387dfe91fdc0e3 Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Mon, 3 May 2021 22:29:13 +0100 Subject: [PATCH 15/24] placate lint Signed-off-by: Andrew Thornton --- modules/queue/unique_queue_disk_channel.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/queue/unique_queue_disk_channel.go b/modules/queue/unique_queue_disk_channel.go index 63a1f2902980c..43745d21ba281 100644 --- a/modules/queue/unique_queue_disk_channel.go +++ b/modules/queue/unique_queue_disk_channel.go @@ -207,7 +207,7 @@ func (q *PersistableChannelUniqueQueue) Flush(timeout time.Duration) error { return q.channelQueue.Flush(timeout) } -// Flush flushes the queue +// FlushWithContext flushes the queue func (q *PersistableChannelUniqueQueue) FlushWithContext(ctx context.Context) error { return q.channelQueue.FlushWithContext(ctx) } From dae99ea4068b2565505ec428da7c5526ed738c8e Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Tue, 4 May 2021 21:04:35 +0100 Subject: [PATCH 16/24] pass down context Signed-off-by: Andrew Thornton --- modules/queue/bytefifo.go | 18 ++++++++++-------- modules/queue/queue_bytefifo.go | 18 +++++++++++------- modules/queue/queue_disk.go | 8 +++++--- modules/queue/queue_disk_channel.go | 2 +- modules/queue/queue_redis.go | 20 ++++++++------------ modules/queue/unique_queue_disk.go | 10 ++++++---- modules/queue/unique_queue_disk_channel.go | 2 +- modules/queue/unique_queue_redis.go | 21 +++++++++------------ 8 files changed, 51 insertions(+), 48 deletions(-) diff --git a/modules/queue/bytefifo.go b/modules/queue/bytefifo.go index 94478e6f05c4b..3a10c8e1259c6 100644 --- a/modules/queue/bytefifo.go +++ b/modules/queue/bytefifo.go @@ -4,14 +4,16 @@ package queue +import "context" + // ByteFIFO defines a FIFO that takes a byte array type ByteFIFO interface { // Len returns the length of the fifo - Len() int64 + Len(ctx context.Context) int64 // PushFunc pushes data to the end of the fifo and calls the callback if it is added - PushFunc(data []byte, fn func() error) error + PushFunc(ctx context.Context, data []byte, fn func() error) error // Pop pops data from the start of the fifo - Pop() ([]byte, error) + Pop(ctx context.Context) ([]byte, error) // Close this fifo Close() error } @@ -20,7 +22,7 @@ type ByteFIFO interface { type UniqueByteFIFO interface { ByteFIFO // Has returns whether the fifo contains this data - Has(data []byte) (bool, error) + Has(ctx context.Context, data []byte) (bool, error) } var _ ByteFIFO = &DummyByteFIFO{} @@ -29,12 +31,12 @@ var _ ByteFIFO = &DummyByteFIFO{} type DummyByteFIFO struct{} // PushFunc returns nil -func (*DummyByteFIFO) PushFunc(data []byte, fn func() error) error { +func (*DummyByteFIFO) PushFunc(ctx context.Context, data []byte, fn func() error) error { return nil } // Pop returns nil -func (*DummyByteFIFO) Pop() ([]byte, error) { +func (*DummyByteFIFO) Pop(ctx context.Context) ([]byte, error) { return []byte{}, nil } @@ -44,7 +46,7 @@ func (*DummyByteFIFO) Close() error { } // Len is always 0 -func (*DummyByteFIFO) Len() int64 { +func (*DummyByteFIFO) Len(ctx context.Context) int64 { return 0 } @@ -56,6 +58,6 @@ type DummyUniqueByteFIFO struct { } // Has always returns false -func (*DummyUniqueByteFIFO) Has([]byte) (bool, error) { +func (*DummyUniqueByteFIFO) Has(ctx context.Context, data []byte) (bool, error) { return false, nil } diff --git a/modules/queue/queue_bytefifo.go b/modules/queue/queue_bytefifo.go index 818180723fa38..554a49863e1b0 100644 --- a/modules/queue/queue_bytefifo.go +++ b/modules/queue/queue_bytefifo.go @@ -96,7 +96,7 @@ func (q *ByteFIFOQueue) PushFunc(data Data, fn func() error) error { } }() } - return q.byteFIFO.PushFunc(bs, fn) + return q.byteFIFO.PushFunc(q.terminateCtx, bs, fn) } // IsEmpty checks if the queue is empty @@ -106,7 +106,7 @@ func (q *ByteFIFOQueue) IsEmpty() bool { if !q.WorkerPool.IsEmpty() { return false } - return q.byteFIFO.Len() == 0 + return q.byteFIFO.Len(q.terminateCtx) == 0 } // Run runs the bytefifo queue @@ -140,16 +140,20 @@ func (q *ByteFIFOQueue) readToChan() { return default: q.lock.Lock() - bs, err := q.byteFIFO.Pop() + bs, err := q.byteFIFO.Pop(q.shutdownCtx) if err != nil { q.lock.Unlock() + if err == context.Canceled { + q.cancel() + return + } log.Error("%s: %s Error on Pop: %v", q.typ, q.name, err) time.Sleep(time.Millisecond * 100) continue } if len(bs) == 0 { - if q.waitOnEmpty && q.byteFIFO.Len() == 0 { + if q.waitOnEmpty && q.byteFIFO.Len(q.shutdownCtx) == 0 { q.lock.Unlock() log.Trace("%s: %s Waiting on Empty", q.typ, q.name) select { @@ -205,10 +209,10 @@ func (q *ByteFIFOQueue) Terminate() { return default: } - q.terminateCancel() if log.IsDebug() { - log.Debug("%s: %s Closing with %d tasks left in queue", q.typ, q.name, q.byteFIFO.Len()) + log.Debug("%s: %s Closing with %d tasks left in queue", q.typ, q.name, q.byteFIFO.Len(q.terminateCtx)) } + q.terminateCancel() if err := q.byteFIFO.Close(); err != nil { log.Error("Error whilst closing internal byte fifo in %s: %s: %v", q.typ, q.name, err) } @@ -263,5 +267,5 @@ func (q *ByteFIFOUniqueQueue) Has(data Data) (bool, error) { if err != nil { return false, err } - return q.byteFIFO.(UniqueByteFIFO).Has(bs) + return q.byteFIFO.(UniqueByteFIFO).Has(q.terminateCtx, bs) } diff --git a/modules/queue/queue_disk.go b/modules/queue/queue_disk.go index 98127b5bf132d..911233a5d9a01 100644 --- a/modules/queue/queue_disk.go +++ b/modules/queue/queue_disk.go @@ -5,6 +5,8 @@ package queue import ( + "context" + "code.gitea.io/gitea/modules/nosql" "gitea.com/lunny/levelqueue" @@ -83,7 +85,7 @@ func NewLevelQueueByteFIFO(connection, prefix string) (*LevelQueueByteFIFO, erro } // PushFunc will push data into the fifo -func (fifo *LevelQueueByteFIFO) PushFunc(data []byte, fn func() error) error { +func (fifo *LevelQueueByteFIFO) PushFunc(ctx context.Context, data []byte, fn func() error) error { if fn != nil { if err := fn(); err != nil { return err @@ -93,7 +95,7 @@ func (fifo *LevelQueueByteFIFO) PushFunc(data []byte, fn func() error) error { } // Pop pops data from the start of the fifo -func (fifo *LevelQueueByteFIFO) Pop() ([]byte, error) { +func (fifo *LevelQueueByteFIFO) Pop(ctx context.Context) ([]byte, error) { data, err := fifo.internal.RPop() if err != nil && err != levelqueue.ErrNotFound { return nil, err @@ -109,7 +111,7 @@ func (fifo *LevelQueueByteFIFO) Close() error { } // Len returns the length of the fifo -func (fifo *LevelQueueByteFIFO) Len() int64 { +func (fifo *LevelQueueByteFIFO) Len(ctx context.Context) int64 { return fifo.internal.Len() } diff --git a/modules/queue/queue_disk_channel.go b/modules/queue/queue_disk_channel.go index b4c9dc39f15e5..47d27615c4115 100644 --- a/modules/queue/queue_disk_channel.go +++ b/modules/queue/queue_disk_channel.go @@ -150,7 +150,7 @@ func (q *PersistableChannelQueue) Run(atShutdown, atTerminate func(func())) { atShutdown(q.Shutdown) atTerminate(q.Terminate) - if lq, ok := q.internal.(*LevelQueue); ok && lq.byteFIFO.Len() != 0 { + if lq, ok := q.internal.(*LevelQueue); ok && lq.byteFIFO.Len(lq.shutdownCtx) != 0 { // Just run the level queue - we shut it down once it's flushed go q.internal.Run(func(_ func()) {}, func(_ func()) {}) go func() { diff --git a/modules/queue/queue_redis.go b/modules/queue/queue_redis.go index af2cc30335b78..a5fb866dc1e11 100644 --- a/modules/queue/queue_redis.go +++ b/modules/queue/queue_redis.go @@ -6,7 +6,6 @@ package queue import ( "context" - "fmt" "code.gitea.io/gitea/modules/graceful" "code.gitea.io/gitea/modules/log" @@ -47,8 +46,6 @@ func NewRedisQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) return nil, err } - byteFIFO.ctx = graceful.NewChannelContext(byteFIFOQueue.IsTerminated(), fmt.Errorf("queue has been terminated")) - queue := &RedisQueue{ ByteFIFOQueue: byteFIFOQueue, } @@ -73,8 +70,8 @@ var _ ByteFIFO = &RedisByteFIFO{} // RedisByteFIFO represents a ByteFIFO formed from a redisClient type RedisByteFIFO struct { - ctx context.Context - client redisClient + client redisClient + queueName string } @@ -89,7 +86,6 @@ func NewRedisByteFIFO(config RedisByteFIFOConfiguration) (*RedisByteFIFO, error) fifo := &RedisByteFIFO{ queueName: config.QueueName, } - fifo.ctx = graceful.GetManager().TerminateContext() fifo.client = nosql.GetManager().GetRedisClient(config.ConnectionString) if err := fifo.client.Ping(graceful.GetManager().ShutdownContext()).Err(); err != nil { return nil, err @@ -98,18 +94,18 @@ func NewRedisByteFIFO(config RedisByteFIFOConfiguration) (*RedisByteFIFO, error) } // PushFunc pushes data to the end of the fifo and calls the callback if it is added -func (fifo *RedisByteFIFO) PushFunc(data []byte, fn func() error) error { +func (fifo *RedisByteFIFO) PushFunc(ctx context.Context, data []byte, fn func() error) error { if fn != nil { if err := fn(); err != nil { return err } } - return fifo.client.RPush(fifo.ctx, fifo.queueName, data).Err() + return fifo.client.RPush(ctx, fifo.queueName, data).Err() } // Pop pops data from the start of the fifo -func (fifo *RedisByteFIFO) Pop() ([]byte, error) { - data, err := fifo.client.LPop(fifo.ctx, fifo.queueName).Bytes() +func (fifo *RedisByteFIFO) Pop(ctx context.Context) ([]byte, error) { + data, err := fifo.client.LPop(ctx, fifo.queueName).Bytes() if err == nil || err == redis.Nil { return data, nil } @@ -122,8 +118,8 @@ func (fifo *RedisByteFIFO) Close() error { } // Len returns the length of the fifo -func (fifo *RedisByteFIFO) Len() int64 { - val, err := fifo.client.LLen(fifo.ctx, fifo.queueName).Result() +func (fifo *RedisByteFIFO) Len(ctx context.Context) int64 { + val, err := fifo.client.LLen(ctx, fifo.queueName).Result() if err != nil { log.Error("Error whilst getting length of redis queue %s: Error: %v", fifo.queueName, err) return -1 diff --git a/modules/queue/unique_queue_disk.go b/modules/queue/unique_queue_disk.go index e914133678031..bb0eb7d950c59 100644 --- a/modules/queue/unique_queue_disk.go +++ b/modules/queue/unique_queue_disk.go @@ -5,6 +5,8 @@ package queue import ( + "context" + "code.gitea.io/gitea/modules/nosql" "gitea.com/lunny/levelqueue" @@ -87,12 +89,12 @@ func NewLevelUniqueQueueByteFIFO(connection, prefix string) (*LevelUniqueQueueBy } // PushFunc pushes data to the end of the fifo and calls the callback if it is added -func (fifo *LevelUniqueQueueByteFIFO) PushFunc(data []byte, fn func() error) error { +func (fifo *LevelUniqueQueueByteFIFO) PushFunc(ctx context.Context, data []byte, fn func() error) error { return fifo.internal.LPushFunc(data, fn) } // Pop pops data from the start of the fifo -func (fifo *LevelUniqueQueueByteFIFO) Pop() ([]byte, error) { +func (fifo *LevelUniqueQueueByteFIFO) Pop(ctx context.Context) ([]byte, error) { data, err := fifo.internal.RPop() if err != nil && err != levelqueue.ErrNotFound { return nil, err @@ -101,12 +103,12 @@ func (fifo *LevelUniqueQueueByteFIFO) Pop() ([]byte, error) { } // Len returns the length of the fifo -func (fifo *LevelUniqueQueueByteFIFO) Len() int64 { +func (fifo *LevelUniqueQueueByteFIFO) Len(ctx context.Context) int64 { return fifo.internal.Len() } // Has returns whether the fifo contains this data -func (fifo *LevelUniqueQueueByteFIFO) Has(data []byte) (bool, error) { +func (fifo *LevelUniqueQueueByteFIFO) Has(ctx context.Context, data []byte) (bool, error) { return fifo.internal.Has(data) } diff --git a/modules/queue/unique_queue_disk_channel.go b/modules/queue/unique_queue_disk_channel.go index 43745d21ba281..d45f8f9fc1f92 100644 --- a/modules/queue/unique_queue_disk_channel.go +++ b/modules/queue/unique_queue_disk_channel.go @@ -182,7 +182,7 @@ func (q *PersistableChannelUniqueQueue) Run(atShutdown, atTerminate func(func()) atShutdown(q.Shutdown) atTerminate(q.Terminate) - if luq, ok := q.internal.(*LevelUniqueQueue); ok && luq.ByteFIFOUniqueQueue.byteFIFO.Len() != 0 { + if luq, ok := q.internal.(*LevelUniqueQueue); ok && luq.ByteFIFOUniqueQueue.byteFIFO.Len(luq.shutdownCtx) != 0 { // Just run the level queue - we shut it down once it's flushed go q.internal.Run(func(_ func()) {}, func(_ func()) {}) go func() { diff --git a/modules/queue/unique_queue_redis.go b/modules/queue/unique_queue_redis.go index 20a50cc1f235f..7474c096655d3 100644 --- a/modules/queue/unique_queue_redis.go +++ b/modules/queue/unique_queue_redis.go @@ -5,9 +5,8 @@ package queue import ( - "fmt" + "context" - "code.gitea.io/gitea/modules/graceful" "github.com/go-redis/redis/v8" ) @@ -51,8 +50,6 @@ func NewRedisUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, return nil, err } - byteFIFO.ctx = graceful.NewChannelContext(byteFIFOQueue.IsTerminated(), fmt.Errorf("queue has been terminated")) - queue := &RedisUniqueQueue{ ByteFIFOUniqueQueue: byteFIFOQueue, } @@ -92,8 +89,8 @@ func NewRedisUniqueByteFIFO(config RedisUniqueByteFIFOConfiguration) (*RedisUniq } // PushFunc pushes data to the end of the fifo and calls the callback if it is added -func (fifo *RedisUniqueByteFIFO) PushFunc(data []byte, fn func() error) error { - added, err := fifo.client.SAdd(fifo.ctx, fifo.setName, data).Result() +func (fifo *RedisUniqueByteFIFO) PushFunc(ctx context.Context, data []byte, fn func() error) error { + added, err := fifo.client.SAdd(ctx, fifo.setName, data).Result() if err != nil { return err } @@ -105,12 +102,12 @@ func (fifo *RedisUniqueByteFIFO) PushFunc(data []byte, fn func() error) error { return err } } - return fifo.client.RPush(fifo.ctx, fifo.queueName, data).Err() + return fifo.client.RPush(ctx, fifo.queueName, data).Err() } // Pop pops data from the start of the fifo -func (fifo *RedisUniqueByteFIFO) Pop() ([]byte, error) { - data, err := fifo.client.LPop(fifo.ctx, fifo.queueName).Bytes() +func (fifo *RedisUniqueByteFIFO) Pop(ctx context.Context) ([]byte, error) { + data, err := fifo.client.LPop(ctx, fifo.queueName).Bytes() if err != nil && err != redis.Nil { return data, err } @@ -119,13 +116,13 @@ func (fifo *RedisUniqueByteFIFO) Pop() ([]byte, error) { return data, nil } - err = fifo.client.SRem(fifo.ctx, fifo.setName, data).Err() + err = fifo.client.SRem(ctx, fifo.setName, data).Err() return data, err } // Has returns whether the fifo contains this data -func (fifo *RedisUniqueByteFIFO) Has(data []byte) (bool, error) { - return fifo.client.SIsMember(fifo.ctx, fifo.setName, data).Result() +func (fifo *RedisUniqueByteFIFO) Has(ctx context.Context, data []byte) (bool, error) { + return fifo.client.SIsMember(ctx, fifo.setName, data).Result() } func init() { From 0f9aea82690d24461bc256a0946a610f14a3e02e Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Wed, 5 May 2021 20:50:18 +0100 Subject: [PATCH 17/24] fix test Signed-off-by: Andrew Thornton --- modules/queue/queue_disk_channel.go | 8 +++-- modules/queue/queue_disk_channel_test.go | 40 +++++++++++++++++------- 2 files changed, 34 insertions(+), 14 deletions(-) diff --git a/modules/queue/queue_disk_channel.go b/modules/queue/queue_disk_channel.go index 47d27615c4115..ac3588d8255aa 100644 --- a/modules/queue/queue_disk_channel.go +++ b/modules/queue/queue_disk_channel.go @@ -154,11 +154,13 @@ func (q *PersistableChannelQueue) Run(atShutdown, atTerminate func(func())) { // Just run the level queue - we shut it down once it's flushed go q.internal.Run(func(_ func()) {}, func(_ func()) {}) go func() { - _ = q.internal.Flush(0) + for !q.IsEmpty() { + _ = q.internal.Flush(0) + <-time.After(100 * time.Millisecond) + } log.Debug("LevelQueue: %s flushed so shutting down", q.internal.(*LevelQueue).Name()) q.internal.(*LevelQueue).Shutdown() GetManager().Remove(q.internal.(*LevelQueue).qid) - }() } else { log.Debug("PersistableChannelQueue: %s Skipping running the empty level queue", q.delayedStarter.name) @@ -233,6 +235,7 @@ func (q *PersistableChannelQueue) Shutdown() { return default: } + q.channelQueue.Shutdown() if q.internal != nil { q.internal.(*LevelQueue).Shutdown() } @@ -264,6 +267,7 @@ func (q *PersistableChannelQueue) Terminate() { q.Shutdown() q.lock.Lock() defer q.lock.Unlock() + q.channelQueue.Terminate() if q.internal != nil { q.internal.(*LevelQueue).Terminate() } diff --git a/modules/queue/queue_disk_channel_test.go b/modules/queue/queue_disk_channel_test.go index 72e00d6982eae..561f98ca907b6 100644 --- a/modules/queue/queue_disk_channel_test.go +++ b/modules/queue/queue_disk_channel_test.go @@ -7,7 +7,6 @@ package queue import ( "io/ioutil" "testing" - "time" "code.gitea.io/gitea/modules/util" "github.com/stretchr/testify/assert" @@ -31,11 +30,13 @@ func TestPersistableChannelQueue(t *testing.T) { defer util.RemoveAll(tmpDir) queue, err := NewPersistableChannelQueue(handle, PersistableChannelQueueConfiguration{ - DataDir: tmpDir, - BatchLength: 2, - QueueLength: 20, - Workers: 1, - MaxWorkers: 10, + DataDir: tmpDir, + BatchLength: 2, + QueueLength: 20, + Workers: 1, + BoostWorkers: 0, + MaxWorkers: 10, + Name: "first", }, &testData{}) assert.NoError(t, err) @@ -63,13 +64,18 @@ func TestPersistableChannelQueue(t *testing.T) { assert.Equal(t, test2.TestString, result2.TestString) assert.Equal(t, test2.TestInt, result2.TestInt) + // test1 is a testData not a *testData so will be rejected err = queue.Push(test1) assert.Error(t, err) + // Now shutdown the queue for _, callback := range queueShutdown { callback() } - time.Sleep(200 * time.Millisecond) + + // Wait til it is closed + <-queue.(*PersistableChannelQueue).closed + err = queue.Push(&test1) assert.NoError(t, err) err = queue.Push(&test2) @@ -79,17 +85,27 @@ func TestPersistableChannelQueue(t *testing.T) { assert.Fail(t, "Handler processing should have stopped") default: } + + // terminate the queue for _, callback := range queueTerminate { callback() } + select { + case <-handleChan: + assert.Fail(t, "Handler processing should have stopped") + default: + } + // Reopen queue queue, err = NewPersistableChannelQueue(handle, PersistableChannelQueueConfiguration{ - DataDir: tmpDir, - BatchLength: 2, - QueueLength: 20, - Workers: 1, - MaxWorkers: 10, + DataDir: tmpDir, + BatchLength: 2, + QueueLength: 20, + Workers: 1, + BoostWorkers: 0, + MaxWorkers: 10, + Name: "second", }, &testData{}) assert.NoError(t, err) From 795000b26a9df9b4ee251757b6f57e56745b2e66 Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Wed, 5 May 2021 22:16:55 +0100 Subject: [PATCH 18/24] handle shutdown during the flushing Signed-off-by: Andrew Thornton --- modules/queue/queue_disk_channel.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/modules/queue/queue_disk_channel.go b/modules/queue/queue_disk_channel.go index ac3588d8255aa..af81f69bd6555 100644 --- a/modules/queue/queue_disk_channel.go +++ b/modules/queue/queue_disk_channel.go @@ -156,7 +156,12 @@ func (q *PersistableChannelQueue) Run(atShutdown, atTerminate func(func())) { go func() { for !q.IsEmpty() { _ = q.internal.Flush(0) - <-time.After(100 * time.Millisecond) + select { + case <-time.After(100 * time.Millisecond): + case <-q.internal.(*LevelQueue).shutdownCtx.Done(): + log.Warn("LevelQueue: %s shut down before completely flushed", q.internal.(*LevelQueue).Name()) + return + } } log.Debug("LevelQueue: %s flushed so shutting down", q.internal.(*LevelQueue).Name()) q.internal.(*LevelQueue).Shutdown() From a5809e98b282284be9e398bf522fe80728d59ae3 Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Fri, 7 May 2021 20:04:18 +0100 Subject: [PATCH 19/24] reduce risk of race between zeroBoost and addWorkers Signed-off-by: Andrew Thornton --- modules/queue/queue_bytefifo.go | 4 +--- modules/queue/queue_channel.go | 4 +--- modules/queue/queue_disk_channel.go | 4 +--- modules/queue/unique_queue_channel.go | 4 +--- modules/queue/unique_queue_disk_channel.go | 4 +--- 5 files changed, 5 insertions(+), 15 deletions(-) diff --git a/modules/queue/queue_bytefifo.go b/modules/queue/queue_bytefifo.go index 554a49863e1b0..b0c879fb216b0 100644 --- a/modules/queue/queue_bytefifo.go +++ b/modules/queue/queue_bytefifo.go @@ -115,9 +115,7 @@ func (q *ByteFIFOQueue) Run(atShutdown, atTerminate func(func())) { atTerminate(q.Terminate) log.Debug("%s: %s Starting", q.typ, q.name) - go func() { - _ = q.AddWorkers(q.workers, 0) - }() + _ = q.AddWorkers(q.workers, 0) log.Trace("%s: %s Now running", q.typ, q.name) q.readToChan() diff --git a/modules/queue/queue_channel.go b/modules/queue/queue_channel.go index 40744cb68f445..fa6ed972d27b9 100644 --- a/modules/queue/queue_channel.go +++ b/modules/queue/queue_channel.go @@ -69,9 +69,7 @@ func (q *ChannelQueue) Run(atShutdown, atTerminate func(func())) { atShutdown(q.Shutdown) atTerminate(q.Terminate) log.Debug("ChannelQueue: %s Starting", q.name) - go func() { - _ = q.AddWorkers(q.workers, 0) - }() + _ = q.AddWorkers(q.workers, 0) } // Push will push data into the queue diff --git a/modules/queue/queue_disk_channel.go b/modules/queue/queue_disk_channel.go index af81f69bd6555..a2636d5f08687 100644 --- a/modules/queue/queue_disk_channel.go +++ b/modules/queue/queue_disk_channel.go @@ -135,6 +135,7 @@ func (q *PersistableChannelQueue) Push(data Data) error { // Run starts to run the queue func (q *PersistableChannelQueue) Run(atShutdown, atTerminate func(func())) { log.Debug("PersistableChannelQueue: %s Starting", q.delayedStarter.name) + _ = q.channelQueue.AddWorkers(q.channelQueue.workers, 0) q.lock.Lock() if q.internal == nil { @@ -173,9 +174,6 @@ func (q *PersistableChannelQueue) Run(atShutdown, atTerminate func(func())) { GetManager().Remove(q.internal.(*LevelQueue).qid) } - go func() { - _ = q.channelQueue.AddWorkers(q.channelQueue.workers, 0) - }() } // Flush flushes the queue and blocks till the queue is empty diff --git a/modules/queue/unique_queue_channel.go b/modules/queue/unique_queue_channel.go index ff9e448b7e875..67d99758f2e1b 100644 --- a/modules/queue/unique_queue_channel.go +++ b/modules/queue/unique_queue_channel.go @@ -81,9 +81,7 @@ func (q *ChannelUniqueQueue) Run(atShutdown, atTerminate func(func())) { atShutdown(q.Shutdown) atTerminate(q.Terminate) log.Debug("ChannelUniqueQueue: %s Starting", q.name) - go func() { - _ = q.AddWorkers(q.workers, 0) - }() + _ = q.AddWorkers(q.workers, 0) } // Push will push data into the queue if the data is not already in the queue diff --git a/modules/queue/unique_queue_disk_channel.go b/modules/queue/unique_queue_disk_channel.go index d45f8f9fc1f92..e7c33c1b7d2d2 100644 --- a/modules/queue/unique_queue_disk_channel.go +++ b/modules/queue/unique_queue_disk_channel.go @@ -181,6 +181,7 @@ func (q *PersistableChannelUniqueQueue) Run(atShutdown, atTerminate func(func()) } atShutdown(q.Shutdown) atTerminate(q.Terminate) + _ = q.channelQueue.AddWorkers(q.channelQueue.workers, 0) if luq, ok := q.internal.(*LevelUniqueQueue); ok && luq.ByteFIFOUniqueQueue.byteFIFO.Len(luq.shutdownCtx) != 0 { // Just run the level queue - we shut it down once it's flushed @@ -197,9 +198,6 @@ func (q *PersistableChannelUniqueQueue) Run(atShutdown, atTerminate func(func()) GetManager().Remove(q.internal.(*LevelUniqueQueue).qid) } - go func() { - _ = q.channelQueue.AddWorkers(q.channelQueue.workers, 0) - }() } // Flush flushes the queue From b14af1fb2a0e2c04222ee2c56b8da3cb3aa325bc Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Sat, 8 May 2021 01:23:58 +0100 Subject: [PATCH 20/24] update comment as per 6543 Signed-off-by: Andrew Thornton --- modules/graceful/manager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/graceful/manager.go b/modules/graceful/manager.go index b7ae0e8c137d4..bcbd8216136a2 100644 --- a/modules/graceful/manager.go +++ b/modules/graceful/manager.go @@ -337,7 +337,7 @@ func (g *Manager) Err() error { return g.done.Err() } -// Value allows the manager to be viewed as a context.Context done at Terminate, it has no values +// Value allows the manager to be viewed as a context.Context done at Terminate func (g *Manager) Value(key interface{}) interface{} { return g.done.Value(key) } From 24ca154a63305f3013460d1c2ff58a5d03f1479b Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Sat, 8 May 2021 15:24:32 +0100 Subject: [PATCH 21/24] prevent double shutdown Signed-off-by: Andrew Thornton --- modules/queue/queue_channel.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/modules/queue/queue_channel.go b/modules/queue/queue_channel.go index fa6ed972d27b9..83c5eaffee97e 100644 --- a/modules/queue/queue_channel.go +++ b/modules/queue/queue_channel.go @@ -83,12 +83,15 @@ func (q *ChannelQueue) Push(data Data) error { // Shutdown processing from this queue func (q *ChannelQueue) Shutdown() { - log.Trace("ChannelQueue: %s Shutting down", q.name) + q.lock.Lock() + defer q.lock.Unlock() select { case <-q.shutdownCtx.Done(): + log.Trace("ChannelQueue: %s Already Shutting down", q.name) return default: } + log.Trace("ChannelQueue: %s Shutting down", q.name) go func() { log.Trace("ChannelQueue: %s Flushing", q.name) if err := q.FlushWithContext(q.terminateCtx); err != nil { From e876fec25a362b6dc2d3ab0555242ad913ecde9b Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Sat, 8 May 2021 15:35:10 +0100 Subject: [PATCH 22/24] rename contexts and their cancel fns Signed-off-by: Andrew Thornton --- modules/graceful/context.go | 6 +- modules/graceful/manager.go | 26 ++++----- modules/graceful/manager_unix.go | 24 ++++---- modules/graceful/manager_windows.go | 10 ++-- modules/queue/manager.go | 6 +- modules/queue/queue_bytefifo.go | 82 +++++++++++++-------------- modules/queue/queue_channel.go | 38 ++++++------- modules/queue/unique_queue_channel.go | 42 +++++++------- modules/queue/workerpool.go | 16 +++--- 9 files changed, 125 insertions(+), 125 deletions(-) diff --git a/modules/graceful/context.go b/modules/graceful/context.go index 8cebd407a85f4..9d955329a42b9 100644 --- a/modules/graceful/context.go +++ b/modules/graceful/context.go @@ -55,19 +55,19 @@ func (ctx *ChannelContext) Value(key interface{}) interface{} { // Callers using this context should ensure that they are registered as a running server // in order that they are waited for. func (g *Manager) ShutdownContext() context.Context { - return g.shutdown + return g.shutdownCtx } // HammerContext returns a context.Context that is Done at hammer // Callers using this context should ensure that they are registered as a running server // in order that they are waited for. func (g *Manager) HammerContext() context.Context { - return g.hammer + return g.hammerCtx } // TerminateContext returns a context.Context that is Done at terminate // Callers using this context should ensure that they are registered as a terminating server // in order that they are waited for. func (g *Manager) TerminateContext() context.Context { - return g.terminate + return g.terminateCtx } diff --git a/modules/graceful/manager.go b/modules/graceful/manager.go index bcbd8216136a2..8c3b95c4aa74d 100644 --- a/modules/graceful/manager.go +++ b/modules/graceful/manager.go @@ -195,7 +195,7 @@ func (g *Manager) doShutdown() { return } g.lock.Lock() - g.shutdownCancel() + g.shutdownCtxCancel() for _, fn := range g.toRunAtShutdown { go fn() } @@ -212,7 +212,7 @@ func (g *Manager) doShutdown() { g.doTerminate() g.WaitForTerminate() g.lock.Lock() - g.doneCancel() + g.doneCtxCancel() g.lock.Unlock() }() } @@ -221,10 +221,10 @@ func (g *Manager) doHammerTime(d time.Duration) { time.Sleep(d) g.lock.Lock() select { - case <-g.hammer.Done(): + case <-g.hammerCtx.Done(): default: log.Warn("Setting Hammer condition") - g.hammerCancel() + g.hammerCtxCancel() for _, fn := range g.toRunAtHammer { go fn() } @@ -238,10 +238,10 @@ func (g *Manager) doTerminate() { } g.lock.Lock() select { - case <-g.terminate.Done(): + case <-g.terminateCtx.Done(): default: log.Warn("Terminating") - g.terminateCancel() + g.terminateCtxCancel() for _, fn := range g.toRunAtTerminate { go fn() } @@ -257,7 +257,7 @@ func (g *Manager) IsChild() bool { // IsShutdown returns a channel which will be closed at shutdown. // The order of closure is IsShutdown, IsHammer (potentially), IsTerminate func (g *Manager) IsShutdown() <-chan struct{} { - return g.shutdown.Done() + return g.shutdownCtx.Done() } // IsHammer returns a channel which will be closed at hammer @@ -265,14 +265,14 @@ func (g *Manager) IsShutdown() <-chan struct{} { // Servers running within the running server wait group should respond to IsHammer // if not shutdown already func (g *Manager) IsHammer() <-chan struct{} { - return g.hammer.Done() + return g.hammerCtx.Done() } // IsTerminate returns a channel which will be closed at terminate // The order of closure is IsShutdown, IsHammer (potentially), IsTerminate // IsTerminate will only close once all running servers have stopped func (g *Manager) IsTerminate() <-chan struct{} { - return g.terminate.Done() + return g.terminateCtx.Done() } // ServerDone declares a running server done and subtracts one from the @@ -329,20 +329,20 @@ func (g *Manager) InformCleanup() { // Done allows the manager to be viewed as a context.Context, it returns a channel that is closed when the server is finished terminating func (g *Manager) Done() <-chan struct{} { - return g.done.Done() + return g.doneCtx.Done() } // Err allows the manager to be viewed as a context.Context done at Terminate func (g *Manager) Err() error { - return g.done.Err() + return g.doneCtx.Err() } // Value allows the manager to be viewed as a context.Context done at Terminate func (g *Manager) Value(key interface{}) interface{} { - return g.done.Value(key) + return g.doneCtx.Value(key) } // Deadline returns nil as there is no fixed Deadline for the manager, it allows the manager to be viewed as a context.Context func (g *Manager) Deadline() (deadline time.Time, ok bool) { - return g.done.Deadline() + return g.doneCtx.Deadline() } diff --git a/modules/graceful/manager_unix.go b/modules/graceful/manager_unix.go index 60d4f54280de7..20d9b3905c4fb 100644 --- a/modules/graceful/manager_unix.go +++ b/modules/graceful/manager_unix.go @@ -25,14 +25,14 @@ type Manager struct { forked bool lock *sync.RWMutex state state - shutdown context.Context - hammer context.Context - terminate context.Context - done context.Context - shutdownCancel context.CancelFunc - hammerCancel context.CancelFunc - terminateCancel context.CancelFunc - doneCancel context.CancelFunc + shutdownCtx context.Context + hammerCtx context.Context + terminateCtx context.Context + doneCtx context.Context + shutdownCtxCancel context.CancelFunc + hammerCtxCancel context.CancelFunc + terminateCtxCancel context.CancelFunc + doneCtxCancel context.CancelFunc runningServerWaitGroup sync.WaitGroup createServerWaitGroup sync.WaitGroup terminateWaitGroup sync.WaitGroup @@ -54,10 +54,10 @@ func newGracefulManager(ctx context.Context) *Manager { func (g *Manager) start(ctx context.Context) { // Make contexts - g.terminate, g.terminateCancel = context.WithCancel(ctx) - g.shutdown, g.shutdownCancel = context.WithCancel(ctx) - g.hammer, g.hammerCancel = context.WithCancel(ctx) - g.done, g.doneCancel = context.WithCancel(ctx) + g.terminateCtx, g.terminateCtxCancel = context.WithCancel(ctx) + g.shutdownCtx, g.shutdownCtxCancel = context.WithCancel(ctx) + g.hammerCtx, g.hammerCtxCancel = context.WithCancel(ctx) + g.doneCtx, g.doneCtxCancel = context.WithCancel(ctx) // Set the running state & handle signals g.setState(stateRunning) diff --git a/modules/graceful/manager_windows.go b/modules/graceful/manager_windows.go index ad7770c7a6e0e..8b860aa63f9e8 100644 --- a/modules/graceful/manager_windows.go +++ b/modules/graceful/manager_windows.go @@ -67,10 +67,10 @@ func newGracefulManager(ctx context.Context) *Manager { func (g *Manager) start() { // Make contexts - g.terminate, g.terminateCancel = context.WithCancel(g.ctx) - g.shutdown, g.shutdownCancel = context.WithCancel(g.ctx) - g.hammer, g.hammerCancel = context.WithCancel(g.ctx) - g.done, g.doneCancel = context.WithCancel(g.ctx) + g.terminateCtx, g.terminateCtxCancel = context.WithCancel(g.ctx) + g.shutdownCtx, g.shutdownCtxCancel = context.WithCancel(g.ctx) + g.hammerCtx, g.hammerCtxCancel = context.WithCancel(g.ctx) + g.doneCtx, g.doneCtxCancel = context.WithCancel(g.ctx) // Make channels g.shutdownRequested = make(chan struct{}) @@ -181,7 +181,7 @@ hammerLoop: default: log.Debug("Unexpected control request: %v", change.Cmd) } - case <-g.hammer.Done(): + case <-g.hammerCtx.Done(): break hammerLoop } } diff --git a/modules/queue/manager.go b/modules/queue/manager.go index da0fc606e6e16..eef80af037995 100644 --- a/modules/queue/manager.go +++ b/modules/queue/manager.go @@ -187,14 +187,14 @@ func (m *Manager) FlushAll(baseCtx context.Context, timeout time.Duration) error if flushable, ok := mq.Managed.(Flushable); ok { log.Debug("Flushing (flushable) queue: %s", mq.Name) go func(q *ManagedQueue) { - localCtx, localCancel := context.WithCancel(ctx) - pid := q.RegisterWorkers(1, start, hasTimeout, end, localCancel, true) + localCtx, localCtxCancel := context.WithCancel(ctx) + pid := q.RegisterWorkers(1, start, hasTimeout, end, localCtxCancel, true) err := flushable.FlushWithContext(localCtx) if err != nil && err != ctx.Err() { cancel() } q.CancelWorkers(pid) - localCancel() + localCtxCancel() wg.Done() }(mq) } else { diff --git a/modules/queue/queue_bytefifo.go b/modules/queue/queue_bytefifo.go index b0c879fb216b0..f273ddc7bb179 100644 --- a/modules/queue/queue_bytefifo.go +++ b/modules/queue/queue_bytefifo.go @@ -27,18 +27,18 @@ var _ Queue = &ByteFIFOQueue{} // ByteFIFOQueue is a Queue formed from a ByteFIFO and WorkerPool type ByteFIFOQueue struct { *WorkerPool - byteFIFO ByteFIFO - typ Type - shutdownCtx context.Context - shutdownCancel context.CancelFunc - terminateCtx context.Context - terminateCancel context.CancelFunc - exemplar interface{} - workers int - name string - lock sync.Mutex - waitOnEmpty bool - pushed chan struct{} + byteFIFO ByteFIFO + typ Type + shutdownCtx context.Context + shutdownCtxCancel context.CancelFunc + terminateCtx context.Context + terminateCtxCancel context.CancelFunc + exemplar interface{} + workers int + name string + lock sync.Mutex + waitOnEmpty bool + pushed chan struct{} } // NewByteFIFOQueue creates a new ByteFIFOQueue @@ -49,22 +49,22 @@ func NewByteFIFOQueue(typ Type, byteFIFO ByteFIFO, handle HandlerFunc, cfg, exem } config := configInterface.(ByteFIFOQueueConfiguration) - terminateCtx, terminateCancel := context.WithCancel(context.Background()) - shutdownCtx, shutdownCancel := context.WithCancel(terminateCtx) + terminateCtx, terminateCtxCancel := context.WithCancel(context.Background()) + shutdownCtx, shutdownCtxCancel := context.WithCancel(terminateCtx) return &ByteFIFOQueue{ - WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration), - byteFIFO: byteFIFO, - typ: typ, - shutdownCtx: shutdownCtx, - shutdownCancel: shutdownCancel, - terminateCtx: terminateCtx, - terminateCancel: terminateCancel, - exemplar: exemplar, - workers: config.Workers, - name: config.Name, - waitOnEmpty: config.WaitOnEmpty, - pushed: make(chan struct{}, 1), + WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration), + byteFIFO: byteFIFO, + typ: typ, + shutdownCtx: shutdownCtx, + shutdownCtxCancel: shutdownCtxCancel, + terminateCtx: terminateCtx, + terminateCtxCancel: terminateCtxCancel, + exemplar: exemplar, + workers: config.Workers, + name: config.Name, + waitOnEmpty: config.WaitOnEmpty, + pushed: make(chan struct{}, 1), }, nil } @@ -126,7 +126,7 @@ func (q *ByteFIFOQueue) Run(atShutdown, atTerminate func(func())) { log.Trace("%s: %s Waiting til cleaned", q.typ, q.name) q.CleanUp(q.terminateCtx) - q.terminateCancel() + q.terminateCtxCancel() } func (q *ByteFIFOQueue) readToChan() { @@ -189,7 +189,7 @@ func (q *ByteFIFOQueue) Shutdown() { return default: } - q.shutdownCancel() + q.shutdownCtxCancel() log.Debug("%s: %s Shutdown", q.typ, q.name) } @@ -210,7 +210,7 @@ func (q *ByteFIFOQueue) Terminate() { if log.IsDebug() { log.Debug("%s: %s Closing with %d tasks left in queue", q.typ, q.name, q.byteFIFO.Len(q.terminateCtx)) } - q.terminateCancel() + q.terminateCtxCancel() if err := q.byteFIFO.Close(); err != nil { log.Error("Error whilst closing internal byte fifo in %s: %s: %v", q.typ, q.name, err) } @@ -236,21 +236,21 @@ func NewByteFIFOUniqueQueue(typ Type, byteFIFO UniqueByteFIFO, handle HandlerFun return nil, err } config := configInterface.(ByteFIFOQueueConfiguration) - terminateCtx, terminateCancel := context.WithCancel(context.Background()) - shutdownCtx, shutdownCancel := context.WithCancel(terminateCtx) + terminateCtx, terminateCtxCancel := context.WithCancel(context.Background()) + shutdownCtx, shutdownCtxCancel := context.WithCancel(terminateCtx) return &ByteFIFOUniqueQueue{ ByteFIFOQueue: ByteFIFOQueue{ - WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration), - byteFIFO: byteFIFO, - typ: typ, - shutdownCtx: shutdownCtx, - shutdownCancel: shutdownCancel, - terminateCtx: terminateCtx, - terminateCancel: terminateCancel, - exemplar: exemplar, - workers: config.Workers, - name: config.Name, + WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration), + byteFIFO: byteFIFO, + typ: typ, + shutdownCtx: shutdownCtx, + shutdownCtxCancel: shutdownCtxCancel, + terminateCtx: terminateCtx, + terminateCtxCancel: terminateCtxCancel, + exemplar: exemplar, + workers: config.Workers, + name: config.Name, }, }, nil } diff --git a/modules/queue/queue_channel.go b/modules/queue/queue_channel.go index 83c5eaffee97e..4df64b69ee5ee 100644 --- a/modules/queue/queue_channel.go +++ b/modules/queue/queue_channel.go @@ -27,13 +27,13 @@ type ChannelQueueConfiguration struct { // It is basically a very thin wrapper around a WorkerPool type ChannelQueue struct { *WorkerPool - shutdownCtx context.Context - shutdownCancel context.CancelFunc - terminateCtx context.Context - terminateCancel context.CancelFunc - exemplar interface{} - workers int - name string + shutdownCtx context.Context + shutdownCtxCancel context.CancelFunc + terminateCtx context.Context + terminateCtxCancel context.CancelFunc + exemplar interface{} + workers int + name string } // NewChannelQueue creates a memory channel queue @@ -47,18 +47,18 @@ func NewChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, erro config.BatchLength = 1 } - terminateCtx, terminateCancel := context.WithCancel(context.Background()) - shutdownCtx, shutdownCancel := context.WithCancel(terminateCtx) + terminateCtx, terminateCtxCancel := context.WithCancel(context.Background()) + shutdownCtx, shutdownCtxCancel := context.WithCancel(terminateCtx) queue := &ChannelQueue{ - WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration), - shutdownCtx: shutdownCtx, - shutdownCancel: shutdownCancel, - terminateCtx: terminateCtx, - terminateCancel: terminateCancel, - exemplar: exemplar, - workers: config.Workers, - name: config.Name, + WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration), + shutdownCtx: shutdownCtx, + shutdownCtxCancel: shutdownCtxCancel, + terminateCtx: terminateCtx, + terminateCtxCancel: terminateCtxCancel, + exemplar: exemplar, + workers: config.Workers, + name: config.Name, } queue.qid = GetManager().Add(queue, ChannelQueueType, config, exemplar) return queue, nil @@ -100,7 +100,7 @@ func (q *ChannelQueue) Shutdown() { } log.Debug("ChannelQueue: %s Flushed", q.name) }() - q.shutdownCancel() + q.shutdownCtxCancel() log.Debug("ChannelQueue: %s Shutdown", q.name) } @@ -113,7 +113,7 @@ func (q *ChannelQueue) Terminate() { return default: } - q.terminateCancel() + q.terminateCtxCancel() log.Debug("ChannelQueue: %s Terminated", q.name) } diff --git a/modules/queue/unique_queue_channel.go b/modules/queue/unique_queue_channel.go index 67d99758f2e1b..5bec67c4d355c 100644 --- a/modules/queue/unique_queue_channel.go +++ b/modules/queue/unique_queue_channel.go @@ -28,15 +28,15 @@ type ChannelUniqueQueueConfiguration ChannelQueueConfiguration // only guaranteed whilst the task is waiting in the queue. type ChannelUniqueQueue struct { *WorkerPool - lock sync.Mutex - table map[Data]bool - shutdownCtx context.Context - shutdownCancel context.CancelFunc - terminateCtx context.Context - terminateCancel context.CancelFunc - exemplar interface{} - workers int - name string + lock sync.Mutex + table map[Data]bool + shutdownCtx context.Context + shutdownCtxCancel context.CancelFunc + terminateCtx context.Context + terminateCtxCancel context.CancelFunc + exemplar interface{} + workers int + name string } // NewChannelUniqueQueue create a memory channel queue @@ -50,18 +50,18 @@ func NewChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue config.BatchLength = 1 } - terminateCtx, terminateCancel := context.WithCancel(context.Background()) - shutdownCtx, shutdownCancel := context.WithCancel(terminateCtx) + terminateCtx, terminateCtxCancel := context.WithCancel(context.Background()) + shutdownCtx, shutdownCtxCancel := context.WithCancel(terminateCtx) queue := &ChannelUniqueQueue{ - table: map[Data]bool{}, - shutdownCtx: shutdownCtx, - shutdownCancel: shutdownCancel, - terminateCtx: terminateCtx, - terminateCancel: terminateCancel, - exemplar: exemplar, - workers: config.Workers, - name: config.Name, + table: map[Data]bool{}, + shutdownCtx: shutdownCtx, + shutdownCtxCancel: shutdownCtxCancel, + terminateCtx: terminateCtx, + terminateCtxCancel: terminateCtxCancel, + exemplar: exemplar, + workers: config.Workers, + name: config.Name, } queue.WorkerPool = NewWorkerPool(func(data ...Data) { for _, datum := range data { @@ -144,7 +144,7 @@ func (q *ChannelUniqueQueue) Shutdown() { } log.Debug("ChannelUniqueQueue: %s Flushed", q.name) }() - q.shutdownCancel() + q.shutdownCtxCancel() log.Debug("ChannelUniqueQueue: %s Shutdown", q.name) } @@ -157,7 +157,7 @@ func (q *ChannelUniqueQueue) Terminate() { return default: } - q.terminateCancel() + q.terminateCtxCancel() log.Debug("ChannelUniqueQueue: %s Terminated", q.name) } diff --git a/modules/queue/workerpool.go b/modules/queue/workerpool.go index 1b5a936826b3f..0176e2e0b2d20 100644 --- a/modules/queue/workerpool.go +++ b/modules/queue/workerpool.go @@ -21,7 +21,7 @@ import ( type WorkerPool struct { lock sync.Mutex baseCtx context.Context - cancel context.CancelFunc + baseCtxCancel context.CancelFunc cond *sync.Cond qid int64 maxNumberOfWorkers int @@ -52,7 +52,7 @@ func NewWorkerPool(handle HandlerFunc, config WorkerPoolConfiguration) *WorkerPo dataChan := make(chan Data, config.QueueLength) pool := &WorkerPool{ baseCtx: ctx, - cancel: cancel, + baseCtxCancel: cancel, batchLength: config.BatchLength, dataChan: dataChan, handle: handle, @@ -128,7 +128,7 @@ func (p *WorkerPool) pushBoost(data Data) { return } p.blockTimeout *= 2 - ctx, cancel := context.WithCancel(p.baseCtx) + boostCtx, boostCtxCancel := context.WithCancel(p.baseCtx) mq := GetManager().GetManagedQueue(p.qid) boost := p.boostWorkers if (boost+p.numberOfWorkers) > p.maxNumberOfWorkers && p.maxNumberOfWorkers >= 0 { @@ -138,24 +138,24 @@ func (p *WorkerPool) pushBoost(data Data) { log.Warn("WorkerPool: %d (for %s) Channel blocked for %v - adding %d temporary workers for %s, block timeout now %v", p.qid, mq.Name, ourTimeout, boost, p.boostTimeout, p.blockTimeout) start := time.Now() - pid := mq.RegisterWorkers(boost, start, true, start.Add(p.boostTimeout), cancel, false) + pid := mq.RegisterWorkers(boost, start, true, start.Add(p.boostTimeout), boostCtxCancel, false) go func() { - <-ctx.Done() + <-boostCtx.Done() mq.RemoveWorkers(pid) - cancel() + boostCtxCancel() }() } else { log.Warn("WorkerPool: %d Channel blocked for %v - adding %d temporary workers for %s, block timeout now %v", p.qid, ourTimeout, p.boostWorkers, p.boostTimeout, p.blockTimeout) } go func() { <-time.After(p.boostTimeout) - cancel() + boostCtxCancel() p.lock.Lock() p.blockTimeout /= 2 p.lock.Unlock() }() p.lock.Unlock() - p.addWorkers(ctx, cancel, boost) + p.addWorkers(boostCtx, boostCtxCancel, boost) p.dataChan <- data } } From 6a60b2af0c51ef3a45b3672b28a183c6739408b8 Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Sat, 8 May 2021 17:38:16 +0100 Subject: [PATCH 23/24] few missed commits Signed-off-by: Andrew Thornton --- modules/queue/queue_bytefifo.go | 4 ++-- modules/queue/queue_disk_channel.go | 4 ++-- modules/queue/unique_queue_disk_channel.go | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/modules/queue/queue_bytefifo.go b/modules/queue/queue_bytefifo.go index f273ddc7bb179..52cad7143aab5 100644 --- a/modules/queue/queue_bytefifo.go +++ b/modules/queue/queue_bytefifo.go @@ -134,7 +134,7 @@ func (q *ByteFIFOQueue) readToChan() { select { case <-q.shutdownCtx.Done(): // tell the pool to shutdown. - q.cancel() + q.baseCtxCancel() return default: q.lock.Lock() @@ -142,7 +142,7 @@ func (q *ByteFIFOQueue) readToChan() { if err != nil { q.lock.Unlock() if err == context.Canceled { - q.cancel() + q.baseCtxCancel() return } log.Error("%s: %s Error on Pop: %v", q.typ, q.name, err) diff --git a/modules/queue/queue_disk_channel.go b/modules/queue/queue_disk_channel.go index a2636d5f08687..c3a1c5781ef09 100644 --- a/modules/queue/queue_disk_channel.go +++ b/modules/queue/queue_disk_channel.go @@ -246,8 +246,8 @@ func (q *PersistableChannelQueue) Shutdown() { q.lock.Unlock() log.Trace("PersistableChannelQueue: %s Cancelling pools", q.delayedStarter.name) - q.channelQueue.cancel() - q.internal.(*LevelQueue).cancel() + q.channelQueue.baseCtxCancel() + q.internal.(*LevelQueue).baseCtxCancel() log.Trace("PersistableChannelQueue: %s Waiting til done", q.delayedStarter.name) q.channelQueue.Wait() q.internal.(*LevelQueue).Wait() diff --git a/modules/queue/unique_queue_disk_channel.go b/modules/queue/unique_queue_disk_channel.go index e7c33c1b7d2d2..65a3941519954 100644 --- a/modules/queue/unique_queue_disk_channel.go +++ b/modules/queue/unique_queue_disk_channel.go @@ -232,8 +232,8 @@ func (q *PersistableChannelUniqueQueue) Shutdown() { } log.Trace("PersistableChannelUniqueQueue: %s Cancelling pools", q.delayedStarter.name) - q.internal.(*LevelUniqueQueue).cancel() - q.channelQueue.cancel() + q.internal.(*LevelUniqueQueue).baseCtxCancel() + q.channelQueue.baseCtxCancel() log.Trace("PersistableChannelUniqueQueue: %s Waiting til done", q.delayedStarter.name) q.channelQueue.Wait() q.internal.(*LevelUniqueQueue).Wait() From cff032f0bacb8ee3a24c5f7bbeb5f2fb7e0068b1 Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Sat, 8 May 2021 20:18:05 +0100 Subject: [PATCH 24/24] fix-windows Signed-off-by: Andrew Thornton --- modules/graceful/manager_windows.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/modules/graceful/manager_windows.go b/modules/graceful/manager_windows.go index 8b860aa63f9e8..51f29778ba7af 100644 --- a/modules/graceful/manager_windows.go +++ b/modules/graceful/manager_windows.go @@ -36,14 +36,14 @@ type Manager struct { isChild bool lock *sync.RWMutex state state - shutdown context.Context - hammer context.Context - terminate context.Context - done context.Context - shutdownCancel context.CancelFunc - hammerCancel context.CancelFunc - terminateCancel context.CancelFunc - doneCancel context.CancelFunc + shutdownCtx context.Context + hammerCtx context.Context + terminateCtx context.Context + doneCtx context.Context + shutdownCtxCancel context.CancelFunc + hammerCtxCancel context.CancelFunc + terminateCtxCancel context.CancelFunc + doneCtxCancel context.CancelFunc runningServerWaitGroup sync.WaitGroup createServerWaitGroup sync.WaitGroup terminateWaitGroup sync.WaitGroup