Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 7 additions & 9 deletions pkg/alertmanager/multitenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@ import (
"github.com/weaveworks/cortex/pkg/util"
)

const (
var backoffConfig = util.BackoffConfig{
// Backoff for loading initial configuration set.
minBackoff = 100 * time.Millisecond
maxBackoff = 2 * time.Second
MinBackoff: 100 * time.Millisecond,
MaxBackoff: 2 * time.Second,
}

const (
// If a config sets the Slack URL to this, it will be rewritten to
// a URL derived from Config.AutoSlackRoot
autoSlackURL = "internal://monitor"
Expand Down Expand Up @@ -331,19 +333,15 @@ func (am *MultitenantAlertmanager) Stop() {
// Load the full set of configurations from the server, retrying with backoff
// until we can get them.
func (am *MultitenantAlertmanager) loadAllConfigs() map[string]configs.View {
backoff := minBackoff
backoff := util.NewBackoff(backoffConfig, nil)
for {
cfgs, err := am.poll()
if err == nil {
level.Debug(util.Logger).Log("msg", "MultitenantAlertmanager: initial configuration load", "num_configs", len(cfgs))
return cfgs
}
level.Warn(util.Logger).Log("msg", "MultitenantAlertmanager: error fetching all configurations, backing off", "err", err)
time.Sleep(backoff)
backoff *= 2
if backoff > maxBackoff {
backoff = maxBackoff
}
backoff.Wait()
}
}

Expand Down
47 changes: 28 additions & 19 deletions pkg/chunk/aws_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"io/ioutil"
"net/url"
"strings"
"time"

"github.com/go-kit/kit/log/level"
ot "github.com/opentracing/opentracing-go"
Expand Down Expand Up @@ -43,6 +44,14 @@ const (
dynamoDBMaxReadBatchSize = 100
)

var backoffConfig = util.BackoffConfig{
// Backoff for dynamoDB requests, to match AWS lib - see:
// https://github.com/aws/aws-sdk-go/blob/master/service/dynamodb/customizations.go
MinBackoff: 50 * time.Millisecond,
MaxBackoff: 50 * time.Second,
MaxRetries: 20,
}

var (
dynamoRequestDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "cortex",
Expand Down Expand Up @@ -182,12 +191,12 @@ func (a awsStorageClient) BatchWrite(ctx context.Context, input WriteBatch) erro
outstanding := input.(dynamoDBWriteBatch)
unprocessed := dynamoDBWriteBatch{}

backoff := resetBackoff()
backoff := util.NewBackoff(backoffConfig, ctx.Done())
defer func() {
dynamoQueryRetryCount.WithLabelValues("BatchWrite").Observe(float64(backoff.numRetries))
dynamoQueryRetryCount.WithLabelValues("BatchWrite").Observe(float64(backoff.NumRetries()))
}()

for outstanding.Len()+unprocessed.Len() > 0 && !backoff.finished() {
for outstanding.Len()+unprocessed.Len() > 0 && backoff.Ongoing() {
requests := dynamoDBWriteBatch{}
requests.TakeReqs(outstanding, dynamoDBMaxWriteBatchSize)
requests.TakeReqs(unprocessed, dynamoDBMaxWriteBatchSize)
Expand Down Expand Up @@ -216,7 +225,7 @@ func (a awsStorageClient) BatchWrite(ctx context.Context, input WriteBatch) erro
// so back off and retry all.
if awsErr, ok := err.(awserr.Error); ok && ((awsErr.Code() == dynamodb.ErrCodeProvisionedThroughputExceededException) || request.Retryable()) {
unprocessed.TakeReqs(requests, -1)
backoff.backoff()
backoff.Wait()
continue
}

Expand All @@ -229,15 +238,15 @@ func (a awsStorageClient) BatchWrite(ctx context.Context, input WriteBatch) erro
unprocessed.TakeReqs(unprocessedItems, -1)
// I am unclear why we don't count here; perhaps the idea is
// that while we are making _some_ progress we should carry on.
backoff.backoffWithoutCounting()
backoff.WaitWithoutCounting()
continue
}

backoff = resetBackoff()
backoff.Reset()
}

if valuesLeft := outstanding.Len() + unprocessed.Len(); valuesLeft > 0 {
return fmt.Errorf("failed to write chunk after %d retries, %d values remaining", backoff.numRetries, valuesLeft)
return fmt.Errorf("failed to write chunk after %d retries, %d values remaining", backoff.NumRetries(), valuesLeft)
}
return nil
}
Expand Down Expand Up @@ -310,13 +319,13 @@ func (a awsStorageClient) QueryPages(ctx context.Context, query IndexQuery, call
}

func (a awsStorageClient) queryPage(ctx context.Context, input *dynamodb.QueryInput, page dynamoDBRequest) (dynamoDBReadResponse, error) {
backoff := resetBackoff()
backoff := util.NewBackoff(backoffConfig, ctx.Done())
defer func() {
dynamoQueryRetryCount.WithLabelValues("queryPage").Observe(float64(backoff.numRetries))
dynamoQueryRetryCount.WithLabelValues("queryPage").Observe(float64(backoff.NumRetries()))
}()

var err error
for !backoff.finished() {
for backoff.Ongoing() {
err = instrument.TimeRequestHistogram(ctx, "DynamoDB.QueryPages", dynamoRequestDuration, func(_ context.Context) error {
return page.Send()
})
Expand All @@ -330,9 +339,9 @@ func (a awsStorageClient) queryPage(ctx context.Context, input *dynamodb.QueryIn
recordDynamoError(*input.TableName, err, "DynamoDB.QueryPages")
if awsErr, ok := err.(awserr.Error); ok && ((awsErr.Code() == dynamodb.ErrCodeProvisionedThroughputExceededException) || page.Retryable()) {
if awsErr.Code() != dynamodb.ErrCodeProvisionedThroughputExceededException {
level.Warn(util.Logger).Log("msg", "DynamoDB error", "retry", backoff.numRetries, "table", *input.TableName, "err", err)
level.Warn(util.Logger).Log("msg", "DynamoDB error", "retry", backoff.NumRetries(), "table", *input.TableName, "err", err)
}
backoff.backoff()
backoff.Wait()
continue
}
return nil, fmt.Errorf("QueryPage error: table=%v, err=%v", *input.TableName, err)
Expand Down Expand Up @@ -548,12 +557,12 @@ func (a awsStorageClient) getDynamoDBChunks(ctx context.Context, chunks []Chunk)

result := []Chunk{}
unprocessed := dynamoDBReadRequest{}
backoff := resetBackoff()
backoff := util.NewBackoff(backoffConfig, ctx.Done())
defer func() {
dynamoQueryRetryCount.WithLabelValues("getDynamoDBChunks").Observe(float64(backoff.numRetries))
dynamoQueryRetryCount.WithLabelValues("getDynamoDBChunks").Observe(float64(backoff.NumRetries()))
}()

for outstanding.Len()+unprocessed.Len() > 0 && !backoff.finished() {
for outstanding.Len()+unprocessed.Len() > 0 && backoff.Ongoing() {
requests := dynamoDBReadRequest{}
requests.TakeReqs(outstanding, dynamoDBMaxReadBatchSize)
requests.TakeReqs(unprocessed, dynamoDBMaxReadBatchSize)
Expand Down Expand Up @@ -582,7 +591,7 @@ func (a awsStorageClient) getDynamoDBChunks(ctx context.Context, chunks []Chunk)
// so back off and retry all.
if awsErr, ok := err.(awserr.Error); ok && ((awsErr.Code() == dynamodb.ErrCodeProvisionedThroughputExceededException) || request.Retryable()) {
unprocessed.TakeReqs(requests, -1)
backoff.backoff()
backoff.Wait()
continue
}

Expand All @@ -601,16 +610,16 @@ func (a awsStorageClient) getDynamoDBChunks(ctx context.Context, chunks []Chunk)
unprocessed.TakeReqs(unprocessedKeys, -1)
// I am unclear why we don't count here; perhaps the idea is
// that while we are making _some_ progress we should carry on.
backoff.backoffWithoutCounting()
backoff.WaitWithoutCounting()
continue
}

backoff = resetBackoff()
backoff.Reset()
}

if valuesLeft := outstanding.Len() + unprocessed.Len(); valuesLeft > 0 {
// Return the chunks we did fetch, because partial results may be useful
return result, fmt.Errorf("failed to query chunks after %d retries, %d values remaining", backoff.numRetries, valuesLeft)
return result, fmt.Errorf("failed to query chunks after %d retries, %d values remaining", backoff.NumRetries(), valuesLeft)
}
return result, nil
}
Expand Down
44 changes: 0 additions & 44 deletions pkg/chunk/backoff.go

This file was deleted.

10 changes: 5 additions & 5 deletions pkg/chunk/dynamodb_table_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,20 +66,20 @@ func (d dynamoTableClient) backoffAndRetry(ctx context.Context, fn func(context.
d.limiter.Wait(ctx)
}

backoff := resetBackoff()
for !backoff.finished() {
backoff := util.NewBackoff(backoffConfig, ctx.Done())
for backoff.Ongoing() {
if err := fn(ctx); err != nil {
if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() == "ThrottlingException" {
level.Warn(util.WithContext(ctx, util.Logger)).Log("msg", "got error, backing off and retrying", "err", err, "retry", backoff.numRetries)
backoff.backoff()
level.Warn(util.WithContext(ctx, util.Logger)).Log("msg", "got error, backing off and retrying", "err", err, "retry", backoff.NumRetries())
backoff.Wait()
continue
} else {
return err
}
}
return nil
}
return fmt.Errorf("retried %d times, failing", backoff.numRetries)
return fmt.Errorf("retried %d times, failing", backoff.NumRetries())
}

func (d dynamoTableClient) ListTables(ctx context.Context) ([]string, error) {
Expand Down
46 changes: 10 additions & 36 deletions pkg/ring/consul_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/golang/snappy"
consul "github.com/hashicorp/consul/api"
cleanhttp "github.com/hashicorp/go-cleanhttp"

"github.com/weaveworks/cortex/pkg/util"
)

Expand Down Expand Up @@ -189,36 +190,9 @@ func (c *consulClient) CAS(key string, f CASCallback) error {
return fmt.Errorf("failed to CAS %s", key)
}

const (
initialBackoff = 1 * time.Second
maxBackoff = 1 * time.Minute
)

type backoff struct {
done <-chan struct{}
backoff time.Duration
}

func newBackoff(done <-chan struct{}) *backoff {
return &backoff{
done: done,
backoff: initialBackoff,
}
}

func (b *backoff) reset() {
b.backoff = initialBackoff
}

func (b *backoff) wait() {
select {
case <-b.done:
case <-time.After(b.backoff):
b.backoff = b.backoff * 2
if b.backoff > maxBackoff {
b.backoff = maxBackoff
}
}
var backoffConfig = util.BackoffConfig{
MinBackoff: 1 * time.Second,
MaxBackoff: 1 * time.Minute,
}

func isClosed(done <-chan struct{}) bool {
Expand All @@ -238,7 +212,7 @@ func isClosed(done <-chan struct{}) bool {
// the done channel is closed.
func (c *consulClient) WatchPrefix(prefix string, done <-chan struct{}, f func(string, interface{}) bool) {
var (
backoff = newBackoff(done)
backoff = util.NewBackoff(backoffConfig, done)
index = uint64(0)
)
for {
Expand All @@ -252,10 +226,10 @@ func (c *consulClient) WatchPrefix(prefix string, done <-chan struct{}, f func(s
})
if err != nil {
level.Error(util.Logger).Log("msg", "error getting path", "prefix", prefix, "err", err)
backoff.wait()
backoff.Wait()
continue
}
backoff.reset()
backoff.Reset()

// Skip if the index is the same as last time, because the key value is
// guaranteed to be the same as last time
Expand Down Expand Up @@ -285,7 +259,7 @@ func (c *consulClient) WatchPrefix(prefix string, done <-chan struct{}, f func(s
// the done channel is closed.
func (c *consulClient) WatchKey(key string, done <-chan struct{}, f func(interface{}) bool) {
var (
backoff = newBackoff(done)
backoff = util.NewBackoff(backoffConfig, done)
index = uint64(0)
)
for {
Expand All @@ -299,10 +273,10 @@ func (c *consulClient) WatchKey(key string, done <-chan struct{}, f func(interfa
})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder is Ongoing should check if done is closed, and then replace the call to isClosed here with a call to Ongoing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the Context docs we would check Err() != nil

That would change the semantics; probably in an ok way, but I felt this was conflating the two mechanisms too much and we should check explicitly in the outer loop.

I have another PR under way where I'm doing that. Maybe I'll change my mind.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, that comment didn't make very much sense in this context. See #618 for what I meant.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:-) Thanks Brian. I'll try and look at that tomorrow, but I'm flying then, so may be a little delayed.

if err != nil || kvp == nil {
level.Error(util.Logger).Log("msg", "error getting path", "key", key, "err", err)
backoff.wait()
backoff.Wait()
continue
}
backoff.reset()
backoff.Reset()

// Skip if the index is the same as last time, because the key value is
// guaranteed to be the same as last time
Expand Down
16 changes: 7 additions & 9 deletions pkg/ruler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@ import (
"github.com/weaveworks/cortex/pkg/util"
)

const (
var backoffConfig = util.BackoffConfig{
// Backoff for loading initial configuration set.
minBackoff = 100 * time.Millisecond
maxBackoff = 2 * time.Second
MinBackoff: 100 * time.Millisecond,
MaxBackoff: 2 * time.Second,
}

const (
timeLogFormat = "2006-01-02T15:04:05"
)

Expand Down Expand Up @@ -136,19 +138,15 @@ func (s *scheduler) Stop() {
// Load the full set of configurations from the server, retrying with backoff
// until we can get them.
func (s *scheduler) loadAllConfigs() map[string]configs.View {
backoff := minBackoff
backoff := util.NewBackoff(backoffConfig, nil)
for {
cfgs, err := s.poll()
if err == nil {
level.Debug(util.Logger).Log("msg", "scheduler: initial configuration load", "num_configs", len(cfgs))
return cfgs
}
level.Warn(util.Logger).Log("msg", "scheduler: error fetching all configurations, backing off", "err", err)
time.Sleep(backoff)
backoff *= 2
if backoff > maxBackoff {
backoff = maxBackoff
}
backoff.Wait()
}
}

Expand Down
Loading