Skip to content

Commit a4cccca

Browse files
committed
Unify the replication calculation between query and push.
1 parent b292164 commit a4cccca

File tree

2 files changed

+62
-33
lines changed

2 files changed

+62
-33
lines changed

pkg/distributor/distributor.go

Lines changed: 19 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -315,25 +315,11 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie
315315
samplesByIngester := map[*ring.IngesterDesc][]*sampleTracker{}
316316
for i := range samples {
317317
// We need a response from a quorum of ingesters, which is n/2 + 1.
318-
minSuccess := (len(ingesters[i]) / 2) + 1
319-
samples[i].minSuccess = minSuccess
320-
samples[i].maxFailures = len(ingesters[i]) - minSuccess
321-
322-
// Skip those that have not heartbeated in a while. NB these are still
323-
// included in the calculation of minSuccess, so if too many failed ingesters
324-
// will cause the whole write to fail.
325-
liveIngesters := make([]*ring.IngesterDesc, 0, len(ingesters[i]))
326-
for _, ingester := range ingesters[i] {
327-
if d.ring.IsHealthy(ingester) {
328-
liveIngesters = append(liveIngesters, ingester)
329-
}
330-
}
331-
332-
// This is just a shortcut - if there are not minSuccess available ingesters,
333-
// after filtering out dead ones, don't even bother trying.
334-
if len(liveIngesters) < minSuccess {
335-
return nil, fmt.Errorf("wanted at least %d live ingesters to process write, had %d",
336-
minSuccess, len(liveIngesters))
318+
var err error
319+
var liveIngesters []*ring.IngesterDesc
320+
samples[i].minSuccess, samples[i].maxFailures, liveIngesters, err = d.replicationStrategy(ingesters[i])
321+
if err != nil {
322+
return nil, err
337323
}
338324

339325
for _, liveIngester := range liveIngesters {
@@ -467,39 +453,39 @@ func (d *Distributor) Query(ctx context.Context, from, to model.Time, matchers .
467453
// Query implements Querier.
468454
func (d *Distributor) queryIngesters(ctx context.Context, ingesters []*ring.IngesterDesc, replicationFactor int, req *client.QueryRequest) (model.Matrix, error) {
469455
// We need a response from a quorum of ingesters, where maxErrs is n/2, where n is the replicationFactor
470-
maxErrs := replicationFactor / 2
471-
minSuccess := len(ingesters) - maxErrs
472-
if len(ingesters) < minSuccess {
473-
return nil, fmt.Errorf("could only find %d ingesters for query. Need at least %d", len(ingesters), minSuccess)
456+
minSuccess, maxErrors, ingesters, err := d.replicationStrategy(ingesters)
457+
if err != nil {
458+
return nil, err
474459
}
475460

476461
// Fetch samples from multiple ingesters
477-
var numErrs int32
478-
errReceived := make(chan error)
462+
errs := make(chan error, len(ingesters))
479463
results := make(chan model.Matrix, len(ingesters))
480-
481464
for _, ing := range ingesters {
482465
go func(ing *ring.IngesterDesc) {
483466
result, err := d.queryIngester(ctx, ing, req)
484467
if err != nil {
485-
if atomic.AddInt32(&numErrs, 1) == int32(maxErrs+1) {
486-
errReceived <- err
487-
}
468+
errs <- err
488469
} else {
489470
results <- result
490471
}
491472
}(ing)
492473
}
493474

494-
// Only wait for minSuccess ingesters (or an error), and accumulate the samples
475+
// Only wait for minSuccessful responses (or maxErrors), and accumulate the samples
495476
// by fingerprint, merging them into any existing samples.
496477
fpToSampleStream := map[model.Fingerprint]*model.SampleStream{}
497-
for i := 0; i < minSuccess; i++ {
478+
var numErrs, numSuccess int
479+
for numSuccess < minSuccess {
498480
select {
499-
case err := <-errReceived:
500-
return nil, err
481+
case err := <-errs:
482+
numErrs++
483+
if numErrs > maxErrors {
484+
return nil, err
485+
}
501486

502487
case result := <-results:
488+
numSuccess++
503489
for _, ss := range result {
504490
fp := ss.Metric.Fingerprint()
505491
mss, ok := fpToSampleStream[fp]
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package distributor
2+
3+
import (
4+
"fmt"
5+
6+
"github.com/weaveworks/cortex/pkg/ring"
7+
)
8+
9+
func (d *Distributor) replicationStrategy(ingesters []*ring.IngesterDesc) (
10+
minSuccess, maxFailure int, liveIngesters []*ring.IngesterDesc, err error,
11+
) {
12+
if len(ingesters) < d.cfg.ReplicationFactor {
13+
err = fmt.Errorf("at least %d ingesters required, could only find %d ",
14+
d.cfg.ReplicationFactor, len(ingesters))
15+
return
16+
}
17+
18+
minSuccess = (len(ingesters) / 2) + 1
19+
maxFailure = len(ingesters) - minSuccess
20+
if maxFailure < 0 {
21+
maxFailure = 0
22+
}
23+
24+
// Skip those that have not heartbeated in a while. NB these are still
25+
// included in the calculation of minSuccess, so if too many failed ingesters
26+
// will cause the whole write to fail.
27+
liveIngesters = make([]*ring.IngesterDesc, 0, len(ingesters))
28+
for _, ingester := range ingesters {
29+
if d.ring.IsHealthy(ingester) {
30+
liveIngesters = append(liveIngesters, ingester)
31+
}
32+
}
33+
34+
// This is just a shortcut - if there are not minSuccess available ingesters,
35+
// after filtering out dead ones, don't even bother trying.
36+
if len(liveIngesters) < minSuccess {
37+
err = fmt.Errorf("at least %d live ingesters required, could only find %d",
38+
minSuccess, len(liveIngesters))
39+
return
40+
}
41+
42+
return
43+
}

0 commit comments

Comments
 (0)