diff --git a/pkg/alertmanager/multitenant.go b/pkg/alertmanager/multitenant.go index 58a24afcf1e..9b99ddff306 100644 --- a/pkg/alertmanager/multitenant.go +++ b/pkg/alertmanager/multitenant.go @@ -28,11 +28,13 @@ import ( "github.com/weaveworks/cortex/pkg/util" ) -const ( +var backoffConfig = util.BackoffConfig{ // Backoff for loading initial configuration set. - minBackoff = 100 * time.Millisecond - maxBackoff = 2 * time.Second + MinBackoff: 100 * time.Millisecond, + MaxBackoff: 2 * time.Second, +} +const ( // If a config sets the Slack URL to this, it will be rewritten to // a URL derived from Config.AutoSlackRoot autoSlackURL = "internal://monitor" @@ -331,7 +333,7 @@ func (am *MultitenantAlertmanager) Stop() { // Load the full set of configurations from the server, retrying with backoff // until we can get them. func (am *MultitenantAlertmanager) loadAllConfigs() map[string]configs.View { - backoff := minBackoff + backoff := util.NewBackoff(backoffConfig, nil) for { cfgs, err := am.poll() if err == nil { @@ -339,11 +341,7 @@ func (am *MultitenantAlertmanager) loadAllConfigs() map[string]configs.View { return cfgs } level.Warn(util.Logger).Log("msg", "MultitenantAlertmanager: error fetching all configurations, backing off", "err", err) - time.Sleep(backoff) - backoff *= 2 - if backoff > maxBackoff { - backoff = maxBackoff - } + backoff.Wait() } } diff --git a/pkg/chunk/aws_storage_client.go b/pkg/chunk/aws_storage_client.go index f42fd75cc9f..5a1132af09e 100644 --- a/pkg/chunk/aws_storage_client.go +++ b/pkg/chunk/aws_storage_client.go @@ -8,6 +8,7 @@ import ( "io/ioutil" "net/url" "strings" + "time" "github.com/go-kit/kit/log/level" ot "github.com/opentracing/opentracing-go" @@ -43,6 +44,14 @@ const ( dynamoDBMaxReadBatchSize = 100 ) +var backoffConfig = util.BackoffConfig{ + // Backoff for dynamoDB requests, to match AWS lib - see: + // https://github.com/aws/aws-sdk-go/blob/master/service/dynamodb/customizations.go + MinBackoff: 50 * time.Millisecond, + MaxBackoff: 50 * time.Second, + MaxRetries: 20, +} + var ( dynamoRequestDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{ Namespace: "cortex", @@ -182,12 +191,12 @@ func (a awsStorageClient) BatchWrite(ctx context.Context, input WriteBatch) erro outstanding := input.(dynamoDBWriteBatch) unprocessed := dynamoDBWriteBatch{} - backoff := resetBackoff() + backoff := util.NewBackoff(backoffConfig, ctx.Done()) defer func() { - dynamoQueryRetryCount.WithLabelValues("BatchWrite").Observe(float64(backoff.numRetries)) + dynamoQueryRetryCount.WithLabelValues("BatchWrite").Observe(float64(backoff.NumRetries())) }() - for outstanding.Len()+unprocessed.Len() > 0 && !backoff.finished() { + for outstanding.Len()+unprocessed.Len() > 0 && backoff.Ongoing() { requests := dynamoDBWriteBatch{} requests.TakeReqs(outstanding, dynamoDBMaxWriteBatchSize) requests.TakeReqs(unprocessed, dynamoDBMaxWriteBatchSize) @@ -216,7 +225,7 @@ func (a awsStorageClient) BatchWrite(ctx context.Context, input WriteBatch) erro // so back off and retry all. if awsErr, ok := err.(awserr.Error); ok && ((awsErr.Code() == dynamodb.ErrCodeProvisionedThroughputExceededException) || request.Retryable()) { unprocessed.TakeReqs(requests, -1) - backoff.backoff() + backoff.Wait() continue } @@ -229,15 +238,15 @@ func (a awsStorageClient) BatchWrite(ctx context.Context, input WriteBatch) erro unprocessed.TakeReqs(unprocessedItems, -1) // I am unclear why we don't count here; perhaps the idea is // that while we are making _some_ progress we should carry on. - backoff.backoffWithoutCounting() + backoff.WaitWithoutCounting() continue } - backoff = resetBackoff() + backoff.Reset() } if valuesLeft := outstanding.Len() + unprocessed.Len(); valuesLeft > 0 { - return fmt.Errorf("failed to write chunk after %d retries, %d values remaining", backoff.numRetries, valuesLeft) + return fmt.Errorf("failed to write chunk after %d retries, %d values remaining", backoff.NumRetries(), valuesLeft) } return nil } @@ -310,13 +319,13 @@ func (a awsStorageClient) QueryPages(ctx context.Context, query IndexQuery, call } func (a awsStorageClient) queryPage(ctx context.Context, input *dynamodb.QueryInput, page dynamoDBRequest) (dynamoDBReadResponse, error) { - backoff := resetBackoff() + backoff := util.NewBackoff(backoffConfig, ctx.Done()) defer func() { - dynamoQueryRetryCount.WithLabelValues("queryPage").Observe(float64(backoff.numRetries)) + dynamoQueryRetryCount.WithLabelValues("queryPage").Observe(float64(backoff.NumRetries())) }() var err error - for !backoff.finished() { + for backoff.Ongoing() { err = instrument.TimeRequestHistogram(ctx, "DynamoDB.QueryPages", dynamoRequestDuration, func(_ context.Context) error { return page.Send() }) @@ -330,9 +339,9 @@ func (a awsStorageClient) queryPage(ctx context.Context, input *dynamodb.QueryIn recordDynamoError(*input.TableName, err, "DynamoDB.QueryPages") if awsErr, ok := err.(awserr.Error); ok && ((awsErr.Code() == dynamodb.ErrCodeProvisionedThroughputExceededException) || page.Retryable()) { if awsErr.Code() != dynamodb.ErrCodeProvisionedThroughputExceededException { - level.Warn(util.Logger).Log("msg", "DynamoDB error", "retry", backoff.numRetries, "table", *input.TableName, "err", err) + level.Warn(util.Logger).Log("msg", "DynamoDB error", "retry", backoff.NumRetries(), "table", *input.TableName, "err", err) } - backoff.backoff() + backoff.Wait() continue } return nil, fmt.Errorf("QueryPage error: table=%v, err=%v", *input.TableName, err) @@ -548,12 +557,12 @@ func (a awsStorageClient) getDynamoDBChunks(ctx context.Context, chunks []Chunk) result := []Chunk{} unprocessed := dynamoDBReadRequest{} - backoff := resetBackoff() + backoff := util.NewBackoff(backoffConfig, ctx.Done()) defer func() { - dynamoQueryRetryCount.WithLabelValues("getDynamoDBChunks").Observe(float64(backoff.numRetries)) + dynamoQueryRetryCount.WithLabelValues("getDynamoDBChunks").Observe(float64(backoff.NumRetries())) }() - for outstanding.Len()+unprocessed.Len() > 0 && !backoff.finished() { + for outstanding.Len()+unprocessed.Len() > 0 && backoff.Ongoing() { requests := dynamoDBReadRequest{} requests.TakeReqs(outstanding, dynamoDBMaxReadBatchSize) requests.TakeReqs(unprocessed, dynamoDBMaxReadBatchSize) @@ -582,7 +591,7 @@ func (a awsStorageClient) getDynamoDBChunks(ctx context.Context, chunks []Chunk) // so back off and retry all. if awsErr, ok := err.(awserr.Error); ok && ((awsErr.Code() == dynamodb.ErrCodeProvisionedThroughputExceededException) || request.Retryable()) { unprocessed.TakeReqs(requests, -1) - backoff.backoff() + backoff.Wait() continue } @@ -601,16 +610,16 @@ func (a awsStorageClient) getDynamoDBChunks(ctx context.Context, chunks []Chunk) unprocessed.TakeReqs(unprocessedKeys, -1) // I am unclear why we don't count here; perhaps the idea is // that while we are making _some_ progress we should carry on. - backoff.backoffWithoutCounting() + backoff.WaitWithoutCounting() continue } - backoff = resetBackoff() + backoff.Reset() } if valuesLeft := outstanding.Len() + unprocessed.Len(); valuesLeft > 0 { // Return the chunks we did fetch, because partial results may be useful - return result, fmt.Errorf("failed to query chunks after %d retries, %d values remaining", backoff.numRetries, valuesLeft) + return result, fmt.Errorf("failed to query chunks after %d retries, %d values remaining", backoff.NumRetries(), valuesLeft) } return result, nil } diff --git a/pkg/chunk/backoff.go b/pkg/chunk/backoff.go deleted file mode 100644 index 7b5735c940a..00000000000 --- a/pkg/chunk/backoff.go +++ /dev/null @@ -1,44 +0,0 @@ -package chunk - -import ( - "math/rand" - "time" -) - -const ( - // Backoff for dynamoDB requests, to match AWS lib - see: - // https://github.com/aws/aws-sdk-go/blob/master/service/dynamodb/customizations.go - minBackoff = 50 * time.Millisecond - maxBackoff = 50 * time.Second - maxRetries = 20 -) - -type backoff struct { - numRetries int - duration time.Duration -} - -func resetBackoff() backoff { - return backoff{numRetries: 0, duration: minBackoff} -} - -func (b backoff) finished() bool { - return b.numRetries >= maxRetries -} - -func (b *backoff) backoff() { - b.numRetries++ - b.backoffWithoutCounting() -} - -func (b *backoff) backoffWithoutCounting() { - if !b.finished() { - time.Sleep(b.duration) - } - // Based on the "Decorrelated Jitter" approach from https://www.awsarchitectureblog.com/2015/03/backoff.html - // sleep = min(cap, random_between(base, sleep * 3)) - b.duration = minBackoff + time.Duration(rand.Int63n(int64((b.duration*3)-minBackoff))) - if b.duration > maxBackoff { - b.duration = maxBackoff - } -} diff --git a/pkg/chunk/dynamodb_table_client.go b/pkg/chunk/dynamodb_table_client.go index 1ac793ec356..27455ce331a 100644 --- a/pkg/chunk/dynamodb_table_client.go +++ b/pkg/chunk/dynamodb_table_client.go @@ -66,12 +66,12 @@ func (d dynamoTableClient) backoffAndRetry(ctx context.Context, fn func(context. d.limiter.Wait(ctx) } - backoff := resetBackoff() - for !backoff.finished() { + backoff := util.NewBackoff(backoffConfig, ctx.Done()) + for backoff.Ongoing() { if err := fn(ctx); err != nil { if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() == "ThrottlingException" { - level.Warn(util.WithContext(ctx, util.Logger)).Log("msg", "got error, backing off and retrying", "err", err, "retry", backoff.numRetries) - backoff.backoff() + level.Warn(util.WithContext(ctx, util.Logger)).Log("msg", "got error, backing off and retrying", "err", err, "retry", backoff.NumRetries()) + backoff.Wait() continue } else { return err @@ -79,7 +79,7 @@ func (d dynamoTableClient) backoffAndRetry(ctx context.Context, fn func(context. } return nil } - return fmt.Errorf("retried %d times, failing", backoff.numRetries) + return fmt.Errorf("retried %d times, failing", backoff.NumRetries()) } func (d dynamoTableClient) ListTables(ctx context.Context) ([]string, error) { diff --git a/pkg/ring/consul_client.go b/pkg/ring/consul_client.go index e1bb9be59c5..32b7ae7c1a7 100644 --- a/pkg/ring/consul_client.go +++ b/pkg/ring/consul_client.go @@ -11,6 +11,7 @@ import ( "github.com/golang/snappy" consul "github.com/hashicorp/consul/api" cleanhttp "github.com/hashicorp/go-cleanhttp" + "github.com/weaveworks/cortex/pkg/util" ) @@ -189,36 +190,9 @@ func (c *consulClient) CAS(key string, f CASCallback) error { return fmt.Errorf("failed to CAS %s", key) } -const ( - initialBackoff = 1 * time.Second - maxBackoff = 1 * time.Minute -) - -type backoff struct { - done <-chan struct{} - backoff time.Duration -} - -func newBackoff(done <-chan struct{}) *backoff { - return &backoff{ - done: done, - backoff: initialBackoff, - } -} - -func (b *backoff) reset() { - b.backoff = initialBackoff -} - -func (b *backoff) wait() { - select { - case <-b.done: - case <-time.After(b.backoff): - b.backoff = b.backoff * 2 - if b.backoff > maxBackoff { - b.backoff = maxBackoff - } - } +var backoffConfig = util.BackoffConfig{ + MinBackoff: 1 * time.Second, + MaxBackoff: 1 * time.Minute, } func isClosed(done <-chan struct{}) bool { @@ -238,7 +212,7 @@ func isClosed(done <-chan struct{}) bool { // the done channel is closed. func (c *consulClient) WatchPrefix(prefix string, done <-chan struct{}, f func(string, interface{}) bool) { var ( - backoff = newBackoff(done) + backoff = util.NewBackoff(backoffConfig, done) index = uint64(0) ) for { @@ -252,10 +226,10 @@ func (c *consulClient) WatchPrefix(prefix string, done <-chan struct{}, f func(s }) if err != nil { level.Error(util.Logger).Log("msg", "error getting path", "prefix", prefix, "err", err) - backoff.wait() + backoff.Wait() continue } - backoff.reset() + backoff.Reset() // Skip if the index is the same as last time, because the key value is // guaranteed to be the same as last time @@ -285,7 +259,7 @@ func (c *consulClient) WatchPrefix(prefix string, done <-chan struct{}, f func(s // the done channel is closed. func (c *consulClient) WatchKey(key string, done <-chan struct{}, f func(interface{}) bool) { var ( - backoff = newBackoff(done) + backoff = util.NewBackoff(backoffConfig, done) index = uint64(0) ) for { @@ -299,10 +273,10 @@ func (c *consulClient) WatchKey(key string, done <-chan struct{}, f func(interfa }) if err != nil || kvp == nil { level.Error(util.Logger).Log("msg", "error getting path", "key", key, "err", err) - backoff.wait() + backoff.Wait() continue } - backoff.reset() + backoff.Reset() // Skip if the index is the same as last time, because the key value is // guaranteed to be the same as last time diff --git a/pkg/ruler/scheduler.go b/pkg/ruler/scheduler.go index cb9f787649a..36f100f9c6c 100644 --- a/pkg/ruler/scheduler.go +++ b/pkg/ruler/scheduler.go @@ -18,11 +18,13 @@ import ( "github.com/weaveworks/cortex/pkg/util" ) -const ( +var backoffConfig = util.BackoffConfig{ // Backoff for loading initial configuration set. - minBackoff = 100 * time.Millisecond - maxBackoff = 2 * time.Second + MinBackoff: 100 * time.Millisecond, + MaxBackoff: 2 * time.Second, +} +const ( timeLogFormat = "2006-01-02T15:04:05" ) @@ -136,7 +138,7 @@ func (s *scheduler) Stop() { // Load the full set of configurations from the server, retrying with backoff // until we can get them. func (s *scheduler) loadAllConfigs() map[string]configs.View { - backoff := minBackoff + backoff := util.NewBackoff(backoffConfig, nil) for { cfgs, err := s.poll() if err == nil { @@ -144,11 +146,7 @@ func (s *scheduler) loadAllConfigs() map[string]configs.View { return cfgs } level.Warn(util.Logger).Log("msg", "scheduler: error fetching all configurations, backing off", "err", err) - time.Sleep(backoff) - backoff *= 2 - if backoff > maxBackoff { - backoff = maxBackoff - } + backoff.Wait() } } diff --git a/pkg/util/backoff.go b/pkg/util/backoff.go new file mode 100644 index 00000000000..e088e8b06b0 --- /dev/null +++ b/pkg/util/backoff.go @@ -0,0 +1,73 @@ +package util + +import ( + "math/rand" + "time" +) + +// BackoffConfig configures a Backoff +type BackoffConfig struct { + MinBackoff time.Duration // start backoff at this level + MaxBackoff time.Duration // increase exponentially to this level + MaxRetries int // give up after this many; zero means infinite retries +} + +// Backoff implements exponential backoff with randomized wait times +type Backoff struct { + cfg BackoffConfig + done <-chan struct{} + numRetries int + cancelled bool + duration time.Duration +} + +// NewBackoff creates a Backoff object. Pass a 'done' channel that can be closed to terminate the operation. +func NewBackoff(cfg BackoffConfig, done <-chan struct{}) *Backoff { + return &Backoff{ + cfg: cfg, + done: done, + duration: cfg.MinBackoff, + } +} + +// Reset the Backoff back to its initial condition +func (b *Backoff) Reset() { + b.numRetries = 0 + b.cancelled = false + b.duration = b.cfg.MinBackoff +} + +// Ongoing returns true if caller should keep going +func (b *Backoff) Ongoing() bool { + return !b.cancelled && (b.cfg.MaxRetries == 0 || b.numRetries < b.cfg.MaxRetries) +} + +// NumRetries returns the number of retries so far +func (b *Backoff) NumRetries() int { + return b.numRetries +} + +// Wait sleeps for the backoff time then increases the retry count and backoff time +// Returns immediately if done channel is closed +func (b *Backoff) Wait() { + b.numRetries++ + b.WaitWithoutCounting() +} + +// WaitWithoutCounting sleeps for the backoff time then increases backoff time +// Returns immediately if done channel is closed +func (b *Backoff) WaitWithoutCounting() { + if b.Ongoing() { + select { + case <-b.done: + b.cancelled = true + case <-time.After(b.duration): + } + } + // Based on the "Decorrelated Jitter" approach from https://www.awsarchitectureblog.com/2015/03/backoff.html + // sleep = min(cap, random_between(base, sleep * 3)) + b.duration = b.cfg.MinBackoff + time.Duration(rand.Int63n(int64((b.duration*3)-b.cfg.MinBackoff))) + if b.duration > b.cfg.MaxBackoff { + b.duration = b.cfg.MaxBackoff + } +}