Skip to content

Commit 7d6de67

Browse files
tomwilkiebboreham
authored andcommitted
Unify the replication calculation between query and push. (#681)
* Unify the replication calculation between query and push. * Replace 'circle' with 'ring'. * Deal with ingesters coming / going better.
1 parent 9ce397a commit 7d6de67

File tree

7 files changed

+324
-145
lines changed

7 files changed

+324
-145
lines changed

pkg/distributor/distributor.go

Lines changed: 53 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,6 @@ type Config struct {
7272
BillingConfig billing.Config
7373
IngesterClientConfig ingester_client.Config
7474

75-
ReplicationFactor int
7675
RemoteTimeout time.Duration
7776
ClientCleanupPeriod time.Duration
7877
IngestionRateLimit float64
@@ -88,7 +87,6 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
8887
flag.BoolVar(&cfg.EnableBilling, "distributor.enable-billing", false, "Report number of ingested samples to billing system.")
8988
cfg.BillingConfig.RegisterFlags(f)
9089
cfg.IngesterClientConfig.RegisterFlags(f)
91-
flag.IntVar(&cfg.ReplicationFactor, "distributor.replication-factor", 3, "The number of ingesters to write to and read from.")
9290
flag.DurationVar(&cfg.RemoteTimeout, "distributor.remote-timeout", 2*time.Second, "Timeout for downstream ingesters.")
9391
flag.DurationVar(&cfg.ClientCleanupPeriod, "distributor.client-cleanup-period", 15*time.Second, "How frequently to clean up clients for ingesters that have gone away.")
9492
flag.Float64Var(&cfg.IngestionRateLimit, "distributor.ingestion-rate-limit", 25000, "Per-user ingestion rate limit in samples per second.")
@@ -98,9 +96,6 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
9896

9997
// New constructs a new Distributor
10098
func New(cfg Config, ring ring.ReadRing) (*Distributor, error) {
101-
if 0 > cfg.ReplicationFactor {
102-
return nil, fmt.Errorf("ReplicationFactor must be greater than zero: %d", cfg.ReplicationFactor)
103-
}
10499
if cfg.ingesterClientFactory == nil {
105100
cfg.ingesterClientFactory = ingester_client.MakeIngesterClient
106101
}
@@ -189,7 +184,13 @@ func (d *Distributor) Stop() {
189184

190185
func (d *Distributor) removeStaleIngesterClients() {
191186
ingesters := map[string]struct{}{}
192-
for _, ing := range d.ring.GetAll() {
187+
replicationSet, err := d.ring.GetAll()
188+
if err != nil {
189+
level.Error(util.Logger).Log("msg", "error removing stale ingester clients", "err", err)
190+
return
191+
}
192+
193+
for _, ing := range replicationSet.Ingesters {
193194
ingesters[ing.Addr] = struct{}{}
194195
}
195196

@@ -270,45 +271,17 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie
270271
return nil, errIngestionRateLimitExceeded
271272
}
272273

273-
var ingesters [][]*ring.IngesterDesc
274-
if err := instrument.TimeRequestHistogram(ctx, "Distributor.Push[ring-lookup]", d.sendDuration, func(context.Context) error {
275-
var err error
276-
ingesters, err = d.ring.BatchGet(keys, d.cfg.ReplicationFactor, ring.Write)
277-
if err != nil {
278-
return err
279-
}
280-
return nil
281-
}); err != nil {
274+
replicationSets, err := d.ring.BatchGet(keys, ring.Write)
275+
if err != nil {
282276
return nil, err
283277
}
284278

285279
samplesByIngester := map[*ring.IngesterDesc][]*sampleTracker{}
286-
for i := range samples {
287-
// We need a response from a quorum of ingesters, which is n/2 + 1.
288-
minSuccess := (len(ingesters[i]) / 2) + 1
289-
samples[i].minSuccess = minSuccess
290-
samples[i].maxFailures = len(ingesters[i]) - minSuccess
291-
292-
// Skip those that have not heartbeated in a while. NB these are still
293-
// included in the calculation of minSuccess, so if too many failed ingesters
294-
// will cause the whole write to fail.
295-
liveIngesters := make([]*ring.IngesterDesc, 0, len(ingesters[i]))
296-
for _, ingester := range ingesters[i] {
297-
if d.ring.IsHealthy(ingester) {
298-
liveIngesters = append(liveIngesters, ingester)
299-
}
300-
}
301-
302-
// This is just a shortcut - if there are not minSuccess available ingesters,
303-
// after filtering out dead ones, don't even bother trying.
304-
if len(liveIngesters) < minSuccess {
305-
return nil, fmt.Errorf("wanted at least %d live ingesters to process write, had %d",
306-
minSuccess, len(liveIngesters))
307-
}
308-
309-
for _, liveIngester := range liveIngesters {
310-
sampleForIngester := samplesByIngester[liveIngester]
311-
samplesByIngester[liveIngester] = append(sampleForIngester, &samples[i])
280+
for i, replicationSet := range replicationSets {
281+
samples[i].minSuccess = len(replicationSet.Ingesters) - replicationSet.MaxErrors
282+
samples[i].maxFailures = replicationSet.MaxErrors
283+
for _, ingester := range replicationSet.Ingesters {
284+
samplesByIngester[ingester] = append(samplesByIngester[ingester], &samples[i])
312285
}
313286
}
314287

@@ -425,58 +398,53 @@ func (d *Distributor) Query(ctx context.Context, from, to model.Time, matchers .
425398
}
426399

427400
// Get ingesters by metricName if one exists, otherwise get all ingesters
428-
var ingesters []*ring.IngesterDesc
401+
var replicationSet ring.ReplicationSet
429402
if ok && metricNameMatcher.Type == labels.MatchEqual {
430-
ingesters, err = d.ring.Get(tokenFor(userID, []byte(metricNameMatcher.Value)), d.cfg.ReplicationFactor, ring.Read)
431-
if err != nil {
432-
return promql.ErrStorage(err)
433-
}
403+
replicationSet, err = d.ring.Get(tokenFor(userID, []byte(metricNameMatcher.Value)), ring.Read)
434404
} else {
435-
ingesters = d.ring.GetAll()
405+
replicationSet, err = d.ring.GetAll()
406+
}
407+
if err != nil {
408+
return promql.ErrStorage(err)
436409
}
437410

438-
matrix, err = d.queryIngesters(ctx, ingesters, d.cfg.ReplicationFactor, req)
411+
matrix, err = d.queryIngesters(ctx, replicationSet, req)
439412
return promql.ErrStorage(err)
440413
})
441414
return matrix, err
442415
}
443416

444417
// Query implements Querier.
445-
func (d *Distributor) queryIngesters(ctx context.Context, ingesters []*ring.IngesterDesc, replicationFactor int, req *client.QueryRequest) (model.Matrix, error) {
446-
// We need a response from a quorum of ingesters, where maxErrs is n/2, where n is the replicationFactor
447-
maxErrs := replicationFactor / 2
448-
minSuccess := len(ingesters) - maxErrs
449-
if len(ingesters) < minSuccess {
450-
return nil, fmt.Errorf("could only find %d ingesters for query. Need at least %d", len(ingesters), minSuccess)
451-
}
452-
418+
func (d *Distributor) queryIngesters(ctx context.Context, replicationSet ring.ReplicationSet, req *client.QueryRequest) (model.Matrix, error) {
453419
// Fetch samples from multiple ingesters
454-
var numErrs int32
455-
errReceived := make(chan error)
456-
results := make(chan model.Matrix, len(ingesters))
457-
458-
for _, ing := range ingesters {
420+
errs := make(chan error, len(replicationSet.Ingesters))
421+
results := make(chan model.Matrix, len(replicationSet.Ingesters))
422+
for _, ing := range replicationSet.Ingesters {
459423
go func(ing *ring.IngesterDesc) {
460424
result, err := d.queryIngester(ctx, ing, req)
461425
if err != nil {
462-
if atomic.AddInt32(&numErrs, 1) == int32(maxErrs+1) {
463-
errReceived <- err
464-
}
426+
errs <- err
465427
} else {
466428
results <- result
467429
}
468430
}(ing)
469431
}
470432

471-
// Only wait for minSuccess ingesters (or an error), and accumulate the samples
433+
// Only wait for minSuccessful responses (or maxErrors), and accumulate the samples
472434
// by fingerprint, merging them into any existing samples.
473435
fpToSampleStream := map[model.Fingerprint]*model.SampleStream{}
474-
for i := 0; i < minSuccess; i++ {
436+
minSuccess := len(replicationSet.Ingesters) - replicationSet.MaxErrors
437+
var numErrs, numSuccess int
438+
for numSuccess < minSuccess {
475439
select {
476-
case err := <-errReceived:
477-
return nil, err
440+
case err := <-errs:
441+
numErrs++
442+
if numErrs > replicationSet.MaxErrors {
443+
return nil, err
444+
}
478445

479446
case result := <-results:
447+
numSuccess++
480448
for _, ss := range result {
481449
fp := ss.Metric.Fingerprint()
482450
mss, ok := fpToSampleStream[fp]
@@ -516,9 +484,13 @@ func (d *Distributor) queryIngester(ctx context.Context, ing *ring.IngesterDesc,
516484

517485
// forAllIngesters runs f, in parallel, for all ingesters
518486
func (d *Distributor) forAllIngesters(f func(client.IngesterClient) (interface{}, error)) ([]interface{}, error) {
487+
replicationSet, err := d.ring.GetAll()
488+
if err != nil {
489+
return nil, err
490+
}
491+
519492
resps, errs := make(chan interface{}), make(chan error)
520-
ingesters := d.ring.GetAll()
521-
for _, ingester := range ingesters {
493+
for _, ingester := range replicationSet.Ingesters {
522494
go func(ingester *ring.IngesterDesc) {
523495
client, err := d.ingesterPool.GetClientFor(ingester.Addr)
524496
if err != nil {
@@ -537,17 +509,19 @@ func (d *Distributor) forAllIngesters(f func(client.IngesterClient) (interface{}
537509

538510
var lastErr error
539511
result, numErrs := []interface{}{}, 0
540-
for range ingesters {
512+
for range replicationSet.Ingesters {
541513
select {
542514
case resp := <-resps:
543515
result = append(result, resp)
544516
case lastErr = <-errs:
545517
numErrs++
546518
}
547519
}
548-
if numErrs > d.cfg.ReplicationFactor/2 {
520+
521+
if numErrs > replicationSet.MaxErrors {
549522
return nil, lastErr
550523
}
524+
551525
return result, nil
552526
}
553527

@@ -624,8 +598,8 @@ func (d *Distributor) UserStats(ctx context.Context) (*UserStats, error) {
624598
totalStats.NumSeries += resp.(*client.UserStatsResponse).NumSeries
625599
}
626600

627-
totalStats.IngestionRate /= float64(d.cfg.ReplicationFactor)
628-
totalStats.NumSeries /= uint64(d.cfg.ReplicationFactor)
601+
totalStats.IngestionRate /= float64(d.ring.ReplicationFactor())
602+
totalStats.NumSeries /= uint64(d.ring.ReplicationFactor())
629603

630604
return totalStats, nil
631605
}
@@ -645,8 +619,11 @@ func (d *Distributor) AllUserStats(ctx context.Context) ([]UserIDStats, error) {
645619
req := &client.UserStatsRequest{}
646620
ctx = user.InjectOrgID(ctx, "1") // fake: ingester insists on having an org ID
647621
// Not using d.forAllIngesters(), so we can fail after first error.
648-
ingesters := d.ring.GetAll()
649-
for _, ingester := range ingesters {
622+
replicationSet, err := d.ring.GetAll()
623+
if err != nil {
624+
return nil, err
625+
}
626+
for _, ingester := range replicationSet.Ingesters {
650627
client, err := d.ingesterPool.GetClientFor(ingester.Addr)
651628
if err != nil {
652629
return nil, err

pkg/distributor/distributor_test.go

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -20,28 +20,36 @@ import (
2020
// mockRing doesn't do any consistent hashing, just returns same ingesters for every query.
2121
type mockRing struct {
2222
prometheus.Counter
23-
ingesters []*ring.IngesterDesc
24-
heartbeatTimeout time.Duration
23+
ingesters []*ring.IngesterDesc
2524
}
2625

27-
func (r mockRing) Get(key uint32, n int, op ring.Operation) ([]*ring.IngesterDesc, error) {
28-
return r.ingesters[:n], nil
26+
func (r mockRing) Get(key uint32, op ring.Operation) (ring.ReplicationSet, error) {
27+
return ring.ReplicationSet{
28+
Ingesters: r.ingesters[:3],
29+
MaxErrors: 1,
30+
}, nil
2931
}
3032

31-
func (r mockRing) BatchGet(keys []uint32, n int, op ring.Operation) ([][]*ring.IngesterDesc, error) {
32-
result := [][]*ring.IngesterDesc{}
33+
func (r mockRing) BatchGet(keys []uint32, op ring.Operation) ([]ring.ReplicationSet, error) {
34+
result := []ring.ReplicationSet{}
3335
for i := 0; i < len(keys); i++ {
34-
result = append(result, r.ingesters[:n])
36+
result = append(result, ring.ReplicationSet{
37+
Ingesters: r.ingesters[:3],
38+
MaxErrors: 1,
39+
})
3540
}
3641
return result, nil
3742
}
3843

39-
func (r mockRing) GetAll() []*ring.IngesterDesc {
40-
return r.ingesters
44+
func (r mockRing) GetAll() (ring.ReplicationSet, error) {
45+
return ring.ReplicationSet{
46+
Ingesters: r.ingesters,
47+
MaxErrors: 1,
48+
}, nil
4149
}
4250

43-
func (r mockRing) IsHealthy(ingester *ring.IngesterDesc) bool {
44-
return time.Now().Sub(time.Unix(ingester.Timestamp, 0)) <= r.heartbeatTimeout
51+
func (r mockRing) ReplicationFactor() int {
52+
return 3
4553
}
4654

4755
type mockIngester struct {
@@ -159,12 +167,10 @@ func TestDistributorPush(t *testing.T) {
159167
Counter: prometheus.NewCounter(prometheus.CounterOpts{
160168
Name: "foo",
161169
}),
162-
ingesters: ingesterDescs,
163-
heartbeatTimeout: 1 * time.Minute,
170+
ingesters: ingesterDescs,
164171
}
165172

166173
d, err := New(Config{
167-
ReplicationFactor: 3,
168174
RemoteTimeout: 1 * time.Minute,
169175
ClientCleanupPeriod: 1 * time.Minute,
170176
IngestionRateLimit: 10000,
@@ -299,12 +305,10 @@ func TestDistributorQuery(t *testing.T) {
299305
Counter: prometheus.NewCounter(prometheus.CounterOpts{
300306
Name: "foo",
301307
}),
302-
ingesters: ingesterDescs,
303-
heartbeatTimeout: 1 * time.Minute,
308+
ingesters: ingesterDescs,
304309
}
305310

306311
d, err := New(Config{
307-
ReplicationFactor: 3,
308312
RemoteTimeout: 1 * time.Minute,
309313
ClientCleanupPeriod: 1 * time.Minute,
310314
IngestionRateLimit: 10000,

pkg/ring/http.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ func (r *Ring) ServeHTTP(w http.ResponseWriter, req *http.Request) {
102102
ing := r.ringDesc.Ingesters[id]
103103
timestamp := time.Unix(ing.Timestamp, 0)
104104
state := ing.State.String()
105-
if !r.IsHealthy(ing) {
105+
if !r.IsHealthy(ing, Reporting) {
106106
state = unhealthy
107107
}
108108

pkg/ring/replication_strategy.go

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package ring
2+
3+
import (
4+
"fmt"
5+
"time"
6+
)
7+
8+
// replicationStrategy decides, given the set of ingesters eligible for a key,
9+
// which ingesters you will try and write to and how many failures you will
10+
// tolerate.
11+
// - Filters out dead ingesters so the one doesn't even try to write to them.
12+
// - Checks there is enough ingesters for an operation to succeed.
13+
func (r *Ring) replicationStrategy(ingesters []*IngesterDesc, op Operation) (
14+
liveIngesters []*IngesterDesc, maxFailure int, err error,
15+
) {
16+
// We need a response from a quorum of ingesters, which is n/2 + 1. In the
17+
// case of a node joining/leaving, the actual replica set might be bigger
18+
// than the replication factor, so use the bigger or the two.
19+
replicationFactor := r.cfg.ReplicationFactor
20+
if len(ingesters) > replicationFactor {
21+
replicationFactor = len(ingesters)
22+
}
23+
minSuccess := (replicationFactor / 2) + 1
24+
maxFailure = replicationFactor - minSuccess
25+
26+
// Skip those that have not heartbeated in a while. NB these are still
27+
// included in the calculation of minSuccess, so if too many failed ingesters
28+
// will cause the whole write to fail.
29+
liveIngesters = make([]*IngesterDesc, 0, len(ingesters))
30+
for _, ingester := range ingesters {
31+
if r.IsHealthy(ingester, op) {
32+
liveIngesters = append(liveIngesters, ingester)
33+
} else {
34+
maxFailure--
35+
}
36+
}
37+
38+
// This is just a shortcut - if there are not minSuccess available ingesters,
39+
// after filtering out dead ones, don't even bother trying.
40+
if maxFailure < 0 || len(liveIngesters) < minSuccess {
41+
err = fmt.Errorf("at least %d live ingesters required, could only find %d",
42+
minSuccess, len(liveIngesters))
43+
return
44+
}
45+
46+
return
47+
}
48+
49+
// IsHealthy checks whether an ingester appears to be alive and heartbeating
50+
func (r *Ring) IsHealthy(ingester *IngesterDesc, op Operation) bool {
51+
if op == Write && ingester.State != ACTIVE {
52+
return false
53+
} else if op == Read && ingester.State == JOINING {
54+
return false
55+
}
56+
return time.Now().Sub(time.Unix(ingester.Timestamp, 0)) <= r.cfg.HeartbeatTimeout
57+
}
58+
59+
// ReplicationFactor of the ring.
60+
func (r *Ring) ReplicationFactor() int {
61+
return r.cfg.ReplicationFactor
62+
}

0 commit comments

Comments
 (0)