From 2ac4b1f1eb1264c51d7c95192c2ed2d4113289c7 Mon Sep 17 00:00:00 2001 From: Chris Marchbanks Date: Fri, 2 Mar 2018 14:18:46 -0700 Subject: [PATCH 1/2] Stop early-return after majority success in distributor writes --- pkg/distributor/distributor.go | 51 ++++++++++++++++++++++++++-------- 1 file changed, 40 insertions(+), 11 deletions(-) diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 0998d62efac..344be13d110 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -77,6 +77,7 @@ type Config struct { ClientCleanupPeriod time.Duration IngestionRateLimit float64 IngestionBurstSize int + WaitForAllIngesters bool // for testing ingesterClientFactory func(addr string, cfg ingester_client.Config) (client.IngesterClient, error) @@ -92,6 +93,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { flag.DurationVar(&cfg.ClientCleanupPeriod, "distributor.client-cleanup-period", 15*time.Second, "How frequently to clean up clients for ingesters that have gone away.") flag.Float64Var(&cfg.IngestionRateLimit, "distributor.ingestion-rate-limit", 25000, "Per-user ingestion rate limit in samples per second.") flag.IntVar(&cfg.IngestionBurstSize, "distributor.ingestion-burst-size", 50000, "Per-user allowed ingestion burst size (in number of samples).") + flag.BoolVar(&cfg.WaitForAllIngesters, "distributor.wait-for-all-ingesters", false, "Turning this on will no longer short circuit ingester writes once a minimum quorum is reached.") } // New constructs a new Distributor @@ -250,8 +252,10 @@ func tokenFor(userID string, name []byte) uint32 { type sampleTracker struct { labels []client.LabelPair sample client.Sample + total int minSuccess int maxFailures int + finished int32 succeeded int32 failed int32 } @@ -316,6 +320,7 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie // We need a response from a quorum of ingesters, which is n/2 + 1. minSuccess := (len(ingesters[i]) / 2) + 1 samples[i].minSuccess = minSuccess + samples[i].total = len(ingesters[i]) samples[i].maxFailures = len(ingesters[i]) - minSuccess // Skip those that have not heartbeated in a while. NB these are still @@ -387,20 +392,44 @@ func (d *Distributor) sendSamples(ctx context.Context, ingester *ring.IngesterDe // The use of atomic increments here guarantees only a single sendSamples // goroutine will write to either channel. for i := range sampleTrackers { - if err != nil { - if atomic.AddInt32(&sampleTrackers[i].failed, 1) <= int32(sampleTrackers[i].maxFailures) { - continue - } + if d.cfg.WaitForAllIngesters { + waitForAll(err, sampleTrackers[i], pushTracker) + } else { + shortCircuit(err, sampleTrackers[i], pushTracker) + } + } +} + +func shortCircuit(err error, sampleTracker *sampleTracker, pushTracker *pushTracker) { + if err != nil { + if atomic.AddInt32(&sampleTracker.failed, 1) <= int32(sampleTracker.maxFailures) { + return + } + if atomic.AddInt32(&pushTracker.samplesFailed, 1) == 1 { + pushTracker.err <- err + } + } else { + if atomic.AddInt32(&sampleTracker.succeeded, 1) != int32(sampleTracker.minSuccess) { + return + } + if atomic.AddInt32(&pushTracker.samplesPending, -1) == 0 { + pushTracker.done <- struct{}{} + } + } +} + +func waitForAll(err error, sampleTracker *sampleTracker, pushTracker *pushTracker) { + if err != nil { + if atomic.AddInt32(&sampleTracker.failed, 1) > int32(sampleTracker.maxFailures) { if atomic.AddInt32(&pushTracker.samplesFailed, 1) == 1 { pushTracker.err <- err } - } else { - if atomic.AddInt32(&sampleTrackers[i].succeeded, 1) != int32(sampleTrackers[i].minSuccess) { - continue - } - if atomic.AddInt32(&pushTracker.samplesPending, -1) == 0 { - pushTracker.done <- struct{}{} - } + } + } + + if atomic.AddInt32(&sampleTracker.finished, 1) == int32(sampleTracker.total) { + if atomic.AddInt32(&pushTracker.samplesPending, -1) == 0 { + pushTracker.done <- struct{}{} } } } From 0eb2a0f424934d8006964641e88a7c73f0849f95 Mon Sep 17 00:00:00 2001 From: Chris Marchbanks Date: Fri, 2 Mar 2018 16:11:03 -0700 Subject: [PATCH 2/2] Refactored to share ingester error handling --- pkg/distributor/distributor.go | 28 ++++++++++------------------ 1 file changed, 10 insertions(+), 18 deletions(-) diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 344be13d110..98e60ce5905 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -392,8 +392,15 @@ func (d *Distributor) sendSamples(ctx context.Context, ingester *ring.IngesterDe // The use of atomic increments here guarantees only a single sendSamples // goroutine will write to either channel. for i := range sampleTrackers { + if err != nil { + if atomic.AddInt32(&sampleTrackers[i].failed, 1) > int32(sampleTrackers[i].maxFailures) { + if atomic.AddInt32(&pushTracker.samplesFailed, 1) == 1 { + pushTracker.err <- err + } + } + } if d.cfg.WaitForAllIngesters { - waitForAll(err, sampleTrackers[i], pushTracker) + waitForAll(sampleTrackers[i], pushTracker) } else { shortCircuit(err, sampleTrackers[i], pushTracker) } @@ -401,14 +408,7 @@ func (d *Distributor) sendSamples(ctx context.Context, ingester *ring.IngesterDe } func shortCircuit(err error, sampleTracker *sampleTracker, pushTracker *pushTracker) { - if err != nil { - if atomic.AddInt32(&sampleTracker.failed, 1) <= int32(sampleTracker.maxFailures) { - return - } - if atomic.AddInt32(&pushTracker.samplesFailed, 1) == 1 { - pushTracker.err <- err - } - } else { + if err == nil { if atomic.AddInt32(&sampleTracker.succeeded, 1) != int32(sampleTracker.minSuccess) { return } @@ -418,15 +418,7 @@ func shortCircuit(err error, sampleTracker *sampleTracker, pushTracker *pushTrac } } -func waitForAll(err error, sampleTracker *sampleTracker, pushTracker *pushTracker) { - if err != nil { - if atomic.AddInt32(&sampleTracker.failed, 1) > int32(sampleTracker.maxFailures) { - if atomic.AddInt32(&pushTracker.samplesFailed, 1) == 1 { - pushTracker.err <- err - } - } - } - +func waitForAll(sampleTracker *sampleTracker, pushTracker *pushTracker) { if atomic.AddInt32(&sampleTracker.finished, 1) == int32(sampleTracker.total) { if atomic.AddInt32(&pushTracker.samplesPending, -1) == 0 { pushTracker.done <- struct{}{}