From 1f6268e9e0b9b8c5a85ee83f8d693334ac521be2 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Fri, 2 Feb 2018 09:59:40 +0000 Subject: [PATCH 01/12] Unify the replication calculation between query and push. --- pkg/distributor/distributor.go | 52 +++++++++---------------- pkg/distributor/replication_strategy.go | 43 ++++++++++++++++++++ 2 files changed, 62 insertions(+), 33 deletions(-) create mode 100644 pkg/distributor/replication_strategy.go diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 9e2efae5325..f2a53dc59b7 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -285,25 +285,11 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie samplesByIngester := map[*ring.IngesterDesc][]*sampleTracker{} for i := range samples { // We need a response from a quorum of ingesters, which is n/2 + 1. - minSuccess := (len(ingesters[i]) / 2) + 1 - samples[i].minSuccess = minSuccess - samples[i].maxFailures = len(ingesters[i]) - minSuccess - - // Skip those that have not heartbeated in a while. NB these are still - // included in the calculation of minSuccess, so if too many failed ingesters - // will cause the whole write to fail. - liveIngesters := make([]*ring.IngesterDesc, 0, len(ingesters[i])) - for _, ingester := range ingesters[i] { - if d.ring.IsHealthy(ingester) { - liveIngesters = append(liveIngesters, ingester) - } - } - - // This is just a shortcut - if there are not minSuccess available ingesters, - // after filtering out dead ones, don't even bother trying. - if len(liveIngesters) < minSuccess { - return nil, fmt.Errorf("wanted at least %d live ingesters to process write, had %d", - minSuccess, len(liveIngesters)) + var err error + var liveIngesters []*ring.IngesterDesc + samples[i].minSuccess, samples[i].maxFailures, liveIngesters, err = d.replicationStrategy(ingesters[i]) + if err != nil { + return nil, err } for _, liveIngester := range liveIngesters { @@ -444,39 +430,39 @@ func (d *Distributor) Query(ctx context.Context, from, to model.Time, matchers . // Query implements Querier. func (d *Distributor) queryIngesters(ctx context.Context, ingesters []*ring.IngesterDesc, replicationFactor int, req *client.QueryRequest) (model.Matrix, error) { // We need a response from a quorum of ingesters, where maxErrs is n/2, where n is the replicationFactor - maxErrs := replicationFactor / 2 - minSuccess := len(ingesters) - maxErrs - if len(ingesters) < minSuccess { - return nil, fmt.Errorf("could only find %d ingesters for query. Need at least %d", len(ingesters), minSuccess) + minSuccess, maxErrors, ingesters, err := d.replicationStrategy(ingesters) + if err != nil { + return nil, err } // Fetch samples from multiple ingesters - var numErrs int32 - errReceived := make(chan error) + errs := make(chan error, len(ingesters)) results := make(chan model.Matrix, len(ingesters)) - for _, ing := range ingesters { go func(ing *ring.IngesterDesc) { result, err := d.queryIngester(ctx, ing, req) if err != nil { - if atomic.AddInt32(&numErrs, 1) == int32(maxErrs+1) { - errReceived <- err - } + errs <- err } else { results <- result } }(ing) } - // Only wait for minSuccess ingesters (or an error), and accumulate the samples + // Only wait for minSuccessful responses (or maxErrors), and accumulate the samples // by fingerprint, merging them into any existing samples. fpToSampleStream := map[model.Fingerprint]*model.SampleStream{} - for i := 0; i < minSuccess; i++ { + var numErrs, numSuccess int + for numSuccess < minSuccess { select { - case err := <-errReceived: - return nil, err + case err := <-errs: + numErrs++ + if numErrs > maxErrors { + return nil, err + } case result := <-results: + numSuccess++ for _, ss := range result { fp := ss.Metric.Fingerprint() mss, ok := fpToSampleStream[fp] diff --git a/pkg/distributor/replication_strategy.go b/pkg/distributor/replication_strategy.go new file mode 100644 index 00000000000..3e360dc1464 --- /dev/null +++ b/pkg/distributor/replication_strategy.go @@ -0,0 +1,43 @@ +package distributor + +import ( + "fmt" + + "github.com/weaveworks/cortex/pkg/ring" +) + +func (d *Distributor) replicationStrategy(ingesters []*ring.IngesterDesc) ( + minSuccess, maxFailure int, liveIngesters []*ring.IngesterDesc, err error, +) { + if len(ingesters) < d.cfg.ReplicationFactor { + err = fmt.Errorf("at least %d ingesters required, could only find %d ", + d.cfg.ReplicationFactor, len(ingesters)) + return + } + + minSuccess = (len(ingesters) / 2) + 1 + maxFailure = len(ingesters) - minSuccess + if maxFailure < 0 { + maxFailure = 0 + } + + // Skip those that have not heartbeated in a while. NB these are still + // included in the calculation of minSuccess, so if too many failed ingesters + // will cause the whole write to fail. + liveIngesters = make([]*ring.IngesterDesc, 0, len(ingesters)) + for _, ingester := range ingesters { + if d.ring.IsHealthy(ingester) { + liveIngesters = append(liveIngesters, ingester) + } + } + + // This is just a shortcut - if there are not minSuccess available ingesters, + // after filtering out dead ones, don't even bother trying. + if len(liveIngesters) < minSuccess { + err = fmt.Errorf("at least %d live ingesters required, could only find %d", + minSuccess, len(liveIngesters)) + return + } + + return +} From 6fbf4b24ccbbd67ee36a0fb663007f03420ee6a7 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Fri, 2 Feb 2018 10:07:28 +0000 Subject: [PATCH 02/12] Replace 'circle' with 'ring'. --- pkg/ring/ring.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/ring/ring.go b/pkg/ring/ring.go index 41f189dabce..5f923d531bd 100644 --- a/pkg/ring/ring.go +++ b/pkg/ring/ring.go @@ -49,7 +49,7 @@ func (x uint32s) Less(i, j int) bool { return x[i] < x[j] } func (x uint32s) Swap(i, j int) { x[i], x[j] = x[j], x[i] } // ErrEmptyRing is the error returned when trying to get an element when nothing has been added to hash. -var ErrEmptyRing = errors.New("empty circle") +var ErrEmptyRing = errors.New("empty ring") // Config for a Ring type Config struct { @@ -67,7 +67,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.HeartbeatTimeout, "ring.heartbeat-timeout", time.Minute, "The heartbeat timeout after which ingesters are skipped for reads/writes.") } -// Ring holds the information about the members of the consistent hash circle. +// Ring holds the information about the members of the consistent hash ring. type Ring struct { KVClient KVClient done chan struct{} @@ -219,7 +219,7 @@ func (r *Ring) IsHealthy(ingester *IngesterDesc) bool { return time.Now().Sub(time.Unix(ingester.Timestamp, 0)) <= r.heartbeatTimeout } -// GetAll returns all available ingesters in the circle. +// GetAll returns all available ingesters in the ring. func (r *Ring) GetAll() []*IngesterDesc { r.mtx.RLock() defer r.mtx.RUnlock() From bed8eb2c852994a37e0f44677b26b177e4ead2b4 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Mon, 5 Feb 2018 19:23:21 +0000 Subject: [PATCH 03/12] Deal with ingesters coming / going better. --- pkg/distributor/distributor.go | 1 - pkg/distributor/replication_strategy.go | 16 +-- pkg/distributor/replication_strategy_test.go | 109 +++++++++++++++++++ pkg/ring/ring.go | 14 ++- 4 files changed, 126 insertions(+), 14 deletions(-) create mode 100644 pkg/distributor/replication_strategy_test.go diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index f2a53dc59b7..6500491df9a 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -284,7 +284,6 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie samplesByIngester := map[*ring.IngesterDesc][]*sampleTracker{} for i := range samples { - // We need a response from a quorum of ingesters, which is n/2 + 1. var err error var liveIngesters []*ring.IngesterDesc samples[i].minSuccess, samples[i].maxFailures, liveIngesters, err = d.replicationStrategy(ingesters[i]) diff --git a/pkg/distributor/replication_strategy.go b/pkg/distributor/replication_strategy.go index 3e360dc1464..aec9e4f8633 100644 --- a/pkg/distributor/replication_strategy.go +++ b/pkg/distributor/replication_strategy.go @@ -9,14 +9,16 @@ import ( func (d *Distributor) replicationStrategy(ingesters []*ring.IngesterDesc) ( minSuccess, maxFailure int, liveIngesters []*ring.IngesterDesc, err error, ) { - if len(ingesters) < d.cfg.ReplicationFactor { - err = fmt.Errorf("at least %d ingesters required, could only find %d ", - d.cfg.ReplicationFactor, len(ingesters)) - return + // We need a response from a quorum of ingesters, which is n/2 + 1. In the + // case of a node joining/leaving, the actual replica set might be bigger + // than the replication factor, so we need to account for this. + // See comming in ring/ring.go:getInternal. + replicationFactor := d.cfg.ReplicationFactor + if len(ingesters) > replicationFactor { + replicationFactor = len(ingesters) } - - minSuccess = (len(ingesters) / 2) + 1 - maxFailure = len(ingesters) - minSuccess + minSuccess = (replicationFactor / 2) + 1 + maxFailure = replicationFactor - minSuccess if maxFailure < 0 { maxFailure = 0 } diff --git a/pkg/distributor/replication_strategy_test.go b/pkg/distributor/replication_strategy_test.go new file mode 100644 index 00000000000..fcd14c0e83b --- /dev/null +++ b/pkg/distributor/replication_strategy_test.go @@ -0,0 +1,109 @@ +package distributor + +import ( + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/weaveworks/cortex/pkg/ring" +) + +func TestReplicationStrategy(t *testing.T) { + for i, tc := range []struct { + RF, LiveIngesters, DeadIngesters int + ExpectedMinSuccess, ExpectedMaxFailure int + ExpectedError string + }{ + // Ensure it works for a single ingester, for local testing. + { + RF: 1, + LiveIngesters: 1, + ExpectedMinSuccess: 1, + }, + + { + RF: 1, + DeadIngesters: 1, + ExpectedMinSuccess: 1, + ExpectedError: "at least 1 live ingesters required, could only find 0", + }, + + // Ensure it works for the default production config. + { + RF: 3, + LiveIngesters: 3, + ExpectedMinSuccess: 2, + ExpectedMaxFailure: 1, + }, + + { + RF: 3, + LiveIngesters: 2, + DeadIngesters: 1, + ExpectedMinSuccess: 2, + ExpectedMaxFailure: 1, + }, + + { + RF: 3, + LiveIngesters: 1, + DeadIngesters: 2, + ExpectedMinSuccess: 2, + ExpectedMaxFailure: 1, + ExpectedError: "at least 2 live ingesters required, could only find 1", + }, + + // Ensure it works when adding / removing nodes. + + // A node is joining or leaving, replica set expands. + { + RF: 3, + LiveIngesters: 4, + ExpectedMinSuccess: 3, + ExpectedMaxFailure: 1, + }, + + { + RF: 3, + LiveIngesters: 2, + DeadIngesters: 2, + ExpectedMinSuccess: 3, + ExpectedMaxFailure: 1, + ExpectedError: "at least 3 live ingesters required, could only find 2", + }, + } { + ingesters := []*ring.IngesterDesc{} + for i := 0; i < tc.LiveIngesters; i++ { + ingesters = append(ingesters, &ring.IngesterDesc{ + Timestamp: time.Now().Unix(), + }) + } + for i := 0; i < tc.DeadIngesters; i++ { + ingesters = append(ingesters, &ring.IngesterDesc{}) + } + + r, err := ring.New(ring.Config{ + Mock: ring.NewInMemoryKVClient(), + HeartbeatTimeout: 100 * time.Second, + }) + require.NoError(t, err) + + d := Distributor{ + cfg: Config{ + ReplicationFactor: tc.RF, + }, + ring: r, + } + + t.Run(fmt.Sprintf("[%d]", i), func(t *testing.T) { + minSuccess, maxFailure, _, err := d.replicationStrategy(ingesters) + assert.Equal(t, tc.ExpectedMinSuccess, minSuccess) + assert.Equal(t, tc.ExpectedMaxFailure, maxFailure) + if tc.ExpectedError != "" { + assert.EqualError(t, err, tc.ExpectedError) + } + }) + } +} diff --git a/pkg/ring/ring.go b/pkg/ring/ring.go index 5f923d531bd..eba5ba2ffa5 100644 --- a/pkg/ring/ring.go +++ b/pkg/ring/ring.go @@ -195,18 +195,17 @@ func (r *Ring) getInternal(key uint32, n int, op Operation) ([]*IngesterDesc, er distinctHosts[token.Ingester] = struct{}{} ingester := r.ringDesc.Ingesters[token.Ingester] - // Ingesters that are not ACTIVE do not count to the replication limit. We do - // not want to Write to them because they are about to go away, but we do - // want to write the extra replica somewhere. So we increase the size of the - // set of replicas for the key. This means we have to also increase the + // We do not want to Write to Ingesters that are not ACTIVE, because they are + // about to go away, but we do want to write the extra replica somewhere. + // So we increase the size of the set of replicas for the key. + // This means we have to also increase the // size of the replica set for read, but we can read from Leaving ingesters, // so don't skip it in this case. + // NB ingester will be filterer later (by distributor/replication_strategy). if op == Write && ingester.State != ACTIVE { n++ - continue } else if op == Read && (ingester.State != ACTIVE && ingester.State != LEAVING) { n++ - continue } ingesters = append(ingesters, ingester) @@ -216,6 +215,9 @@ func (r *Ring) getInternal(key uint32, n int, op Operation) ([]*IngesterDesc, er // IsHealthy checks whether an ingester appears to be alive and heartbeating func (r *Ring) IsHealthy(ingester *IngesterDesc) bool { + if ingester.State != ACTIVE { + return false + } return time.Now().Sub(time.Unix(ingester.Timestamp, 0)) <= r.heartbeatTimeout } From cdfd837a944a2a9f91aa9d0868c1051f9d3ecfec Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Mon, 5 Feb 2018 20:08:22 +0000 Subject: [PATCH 04/12] Fix comment. --- pkg/distributor/replication_strategy.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/distributor/replication_strategy.go b/pkg/distributor/replication_strategy.go index aec9e4f8633..5bcab757644 100644 --- a/pkg/distributor/replication_strategy.go +++ b/pkg/distributor/replication_strategy.go @@ -12,7 +12,7 @@ func (d *Distributor) replicationStrategy(ingesters []*ring.IngesterDesc) ( // We need a response from a quorum of ingesters, which is n/2 + 1. In the // case of a node joining/leaving, the actual replica set might be bigger // than the replication factor, so we need to account for this. - // See comming in ring/ring.go:getInternal. + // See comment in ring/ring.go:getInternal. replicationFactor := d.cfg.ReplicationFactor if len(ingesters) > replicationFactor { replicationFactor = len(ingesters) From aa94d7d08312ff9c03e15a5bbc34b5ccdb0cfa4f Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Tue, 6 Feb 2018 11:15:47 +0000 Subject: [PATCH 05/12] Actually, all the replication calculations really belong in ring. --- pkg/distributor/distributor.go | 89 +++++++------- pkg/distributor/distributor_test.go | 29 +++-- .../replication_strategy.go | 30 +++-- .../replication_strategy_test.go | 25 ++-- pkg/ring/ring.go | 109 +++++++++++------- pkg/ring/ring_test.go | 5 +- 6 files changed, 159 insertions(+), 128 deletions(-) rename pkg/{distributor => ring}/replication_strategy.go (59%) rename pkg/{distributor => ring}/replication_strategy_test.go (80%) diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 6500491df9a..d139e46532e 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -72,7 +72,6 @@ type Config struct { BillingConfig billing.Config IngesterClientConfig ingester_client.Config - ReplicationFactor int RemoteTimeout time.Duration ClientCleanupPeriod time.Duration IngestionRateLimit float64 @@ -88,7 +87,6 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { flag.BoolVar(&cfg.EnableBilling, "distributor.enable-billing", false, "Report number of ingested samples to billing system.") cfg.BillingConfig.RegisterFlags(f) cfg.IngesterClientConfig.RegisterFlags(f) - flag.IntVar(&cfg.ReplicationFactor, "distributor.replication-factor", 3, "The number of ingesters to write to and read from.") flag.DurationVar(&cfg.RemoteTimeout, "distributor.remote-timeout", 2*time.Second, "Timeout for downstream ingesters.") flag.DurationVar(&cfg.ClientCleanupPeriod, "distributor.client-cleanup-period", 15*time.Second, "How frequently to clean up clients for ingesters that have gone away.") flag.Float64Var(&cfg.IngestionRateLimit, "distributor.ingestion-rate-limit", 25000, "Per-user ingestion rate limit in samples per second.") @@ -98,9 +96,6 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { // New constructs a new Distributor func New(cfg Config, ring ring.ReadRing) (*Distributor, error) { - if 0 > cfg.ReplicationFactor { - return nil, fmt.Errorf("ReplicationFactor must be greater than zero: %d", cfg.ReplicationFactor) - } if cfg.ingesterClientFactory == nil { cfg.ingesterClientFactory = ingester_client.MakeIngesterClient } @@ -189,7 +184,13 @@ func (d *Distributor) Stop() { func (d *Distributor) removeStaleIngesterClients() { ingesters := map[string]struct{}{} - for _, ing := range d.ring.GetAll() { + replicationSet, err := d.ring.GetAll() + if err != nil { + level.Error(util.Logger).Log("msg", "error removing stale ingester clients", "err", err) + return + } + + for _, ing := range replicationSet.Ingesters { ingesters[ing.Addr] = struct{}{} } @@ -270,30 +271,17 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie return nil, errIngestionRateLimitExceeded } - var ingesters [][]*ring.IngesterDesc - if err := instrument.TimeRequestHistogram(ctx, "Distributor.Push[ring-lookup]", d.sendDuration, func(context.Context) error { - var err error - ingesters, err = d.ring.BatchGet(keys, d.cfg.ReplicationFactor, ring.Write) - if err != nil { - return err - } - return nil - }); err != nil { + replicationSets, err := d.ring.BatchGet(keys, ring.Write) + if err != nil { return nil, err } samplesByIngester := map[*ring.IngesterDesc][]*sampleTracker{} - for i := range samples { - var err error - var liveIngesters []*ring.IngesterDesc - samples[i].minSuccess, samples[i].maxFailures, liveIngesters, err = d.replicationStrategy(ingesters[i]) - if err != nil { - return nil, err - } - - for _, liveIngester := range liveIngesters { - sampleForIngester := samplesByIngester[liveIngester] - samplesByIngester[liveIngester] = append(sampleForIngester, &samples[i]) + for i, replicationSet := range replicationSets { + samples[i].minSuccess = len(replicationSet.Ingesters) - replicationSet.MaxErrors + samples[i].maxFailures = replicationSet.MaxErrors + for _, ingester := range replicationSet.Ingesters { + samplesByIngester[ingester] = append(samplesByIngester[ingester], &samples[i]) } } @@ -410,34 +398,28 @@ func (d *Distributor) Query(ctx context.Context, from, to model.Time, matchers . } // Get ingesters by metricName if one exists, otherwise get all ingesters - var ingesters []*ring.IngesterDesc + var replicationSet ring.ReplicationSet if ok && metricNameMatcher.Type == labels.MatchEqual { - ingesters, err = d.ring.Get(tokenFor(userID, []byte(metricNameMatcher.Value)), d.cfg.ReplicationFactor, ring.Read) - if err != nil { - return promql.ErrStorage(err) - } + replicationSet, err = d.ring.Get(tokenFor(userID, []byte(metricNameMatcher.Value)), ring.Read) } else { - ingesters = d.ring.GetAll() + replicationSet, err = d.ring.GetAll() + } + if err != nil { + return promql.ErrStorage(err) } - matrix, err = d.queryIngesters(ctx, ingesters, d.cfg.ReplicationFactor, req) + matrix, err = d.queryIngesters(ctx, replicationSet, req) return promql.ErrStorage(err) }) return matrix, err } // Query implements Querier. -func (d *Distributor) queryIngesters(ctx context.Context, ingesters []*ring.IngesterDesc, replicationFactor int, req *client.QueryRequest) (model.Matrix, error) { - // We need a response from a quorum of ingesters, where maxErrs is n/2, where n is the replicationFactor - minSuccess, maxErrors, ingesters, err := d.replicationStrategy(ingesters) - if err != nil { - return nil, err - } - +func (d *Distributor) queryIngesters(ctx context.Context, replicationSet ring.ReplicationSet, req *client.QueryRequest) (model.Matrix, error) { // Fetch samples from multiple ingesters - errs := make(chan error, len(ingesters)) - results := make(chan model.Matrix, len(ingesters)) - for _, ing := range ingesters { + errs := make(chan error, len(replicationSet.Ingesters)) + results := make(chan model.Matrix, len(replicationSet.Ingesters)) + for _, ing := range replicationSet.Ingesters { go func(ing *ring.IngesterDesc) { result, err := d.queryIngester(ctx, ing, req) if err != nil { @@ -451,12 +433,13 @@ func (d *Distributor) queryIngesters(ctx context.Context, ingesters []*ring.Inge // Only wait for minSuccessful responses (or maxErrors), and accumulate the samples // by fingerprint, merging them into any existing samples. fpToSampleStream := map[model.Fingerprint]*model.SampleStream{} + minSuccess := len(replicationSet.Ingesters) - replicationSet.MaxErrors var numErrs, numSuccess int for numSuccess < minSuccess { select { case err := <-errs: numErrs++ - if numErrs > maxErrors { + if numErrs > replicationSet.MaxErrors { return nil, err } @@ -501,9 +484,13 @@ func (d *Distributor) queryIngester(ctx context.Context, ing *ring.IngesterDesc, // forAllIngesters runs f, in parallel, for all ingesters func (d *Distributor) forAllIngesters(f func(client.IngesterClient) (interface{}, error)) ([]interface{}, error) { + replicationSet, err := d.ring.GetAll() + if err != nil { + return nil, err + } + resps, errs := make(chan interface{}), make(chan error) - ingesters := d.ring.GetAll() - for _, ingester := range ingesters { + for _, ingester := range replicationSet.Ingesters { go func(ingester *ring.IngesterDesc) { client, err := d.ingesterPool.GetClientFor(ingester.Addr) if err != nil { @@ -522,7 +509,7 @@ func (d *Distributor) forAllIngesters(f func(client.IngesterClient) (interface{} var lastErr error result, numErrs := []interface{}{}, 0 - for range ingesters { + for range replicationSet.Ingesters { select { case resp := <-resps: result = append(result, resp) @@ -530,9 +517,11 @@ func (d *Distributor) forAllIngesters(f func(client.IngesterClient) (interface{} numErrs++ } } - if numErrs > d.cfg.ReplicationFactor/2 { + + if numErrs > replicationSet.MaxErrors { return nil, lastErr } + return result, nil } @@ -609,8 +598,8 @@ func (d *Distributor) UserStats(ctx context.Context) (*UserStats, error) { totalStats.NumSeries += resp.(*client.UserStatsResponse).NumSeries } - totalStats.IngestionRate /= float64(d.cfg.ReplicationFactor) - totalStats.NumSeries /= uint64(d.cfg.ReplicationFactor) + totalStats.IngestionRate /= float64(d.ring.ReplicationFactor()) + totalStats.NumSeries /= uint64(d.ring.ReplicationFactor()) return totalStats, nil } diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index bb46ea75e1d..84a0fccd08d 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -24,24 +24,33 @@ type mockRing struct { heartbeatTimeout time.Duration } -func (r mockRing) Get(key uint32, n int, op ring.Operation) ([]*ring.IngesterDesc, error) { - return r.ingesters[:n], nil +func (r mockRing) Get(key uint32, op ring.Operation) (ring.ReplicationSet, error) { + return ring.ReplicationSet{ + Ingesters: r.ingesters[:3], + MaxErrors: 1, + }, nil } -func (r mockRing) BatchGet(keys []uint32, n int, op ring.Operation) ([][]*ring.IngesterDesc, error) { - result := [][]*ring.IngesterDesc{} +func (r mockRing) BatchGet(keys []uint32, op ring.Operation) ([]ring.ReplicationSet, error) { + result := []ring.ReplicationSet{} for i := 0; i < len(keys); i++ { - result = append(result, r.ingesters[:n]) + result = append(result, ring.ReplicationSet{ + Ingesters: r.ingesters[:3], + MaxErrors: 1, + }) } return result, nil } -func (r mockRing) GetAll() []*ring.IngesterDesc { - return r.ingesters +func (r mockRing) GetAll() (ring.ReplicationSet, error) { + return ring.ReplicationSet{ + Ingesters: r.ingesters, + MaxErrors: 1, + }, nil } -func (r mockRing) IsHealthy(ingester *ring.IngesterDesc) bool { - return time.Now().Sub(time.Unix(ingester.Timestamp, 0)) <= r.heartbeatTimeout +func (r mockRing) ReplicationFactor() int { + return 3 } type mockIngester struct { @@ -164,7 +173,6 @@ func TestDistributorPush(t *testing.T) { } d, err := New(Config{ - ReplicationFactor: 3, RemoteTimeout: 1 * time.Minute, ClientCleanupPeriod: 1 * time.Minute, IngestionRateLimit: 10000, @@ -304,7 +312,6 @@ func TestDistributorQuery(t *testing.T) { } d, err := New(Config{ - ReplicationFactor: 3, RemoteTimeout: 1 * time.Minute, ClientCleanupPeriod: 1 * time.Minute, IngestionRateLimit: 10000, diff --git a/pkg/distributor/replication_strategy.go b/pkg/ring/replication_strategy.go similarity index 59% rename from pkg/distributor/replication_strategy.go rename to pkg/ring/replication_strategy.go index 5bcab757644..6dfddce8b91 100644 --- a/pkg/distributor/replication_strategy.go +++ b/pkg/ring/replication_strategy.go @@ -1,19 +1,18 @@ -package distributor +package ring import ( "fmt" - - "github.com/weaveworks/cortex/pkg/ring" + "time" ) -func (d *Distributor) replicationStrategy(ingesters []*ring.IngesterDesc) ( - minSuccess, maxFailure int, liveIngesters []*ring.IngesterDesc, err error, +func (r *Ring) replicationStrategy(ingesters []*IngesterDesc) ( + minSuccess, maxFailure int, liveIngesters []*IngesterDesc, err error, ) { // We need a response from a quorum of ingesters, which is n/2 + 1. In the // case of a node joining/leaving, the actual replica set might be bigger // than the replication factor, so we need to account for this. - // See comment in ring/ring.go:getInternal. - replicationFactor := d.cfg.ReplicationFactor + // See comment in ring.go:getInternal. + replicationFactor := r.cfg.ReplicationFactor if len(ingesters) > replicationFactor { replicationFactor = len(ingesters) } @@ -26,9 +25,9 @@ func (d *Distributor) replicationStrategy(ingesters []*ring.IngesterDesc) ( // Skip those that have not heartbeated in a while. NB these are still // included in the calculation of minSuccess, so if too many failed ingesters // will cause the whole write to fail. - liveIngesters = make([]*ring.IngesterDesc, 0, len(ingesters)) + liveIngesters = make([]*IngesterDesc, 0, len(ingesters)) for _, ingester := range ingesters { - if d.ring.IsHealthy(ingester) { + if r.IsHealthy(ingester) { liveIngesters = append(liveIngesters, ingester) } } @@ -43,3 +42,16 @@ func (d *Distributor) replicationStrategy(ingesters []*ring.IngesterDesc) ( return } + +// IsHealthy checks whether an ingester appears to be alive and heartbeating +func (r *Ring) IsHealthy(ingester *IngesterDesc) bool { + if ingester.State != ACTIVE { + return false + } + return time.Now().Sub(time.Unix(ingester.Timestamp, 0)) <= r.cfg.HeartbeatTimeout +} + +// ReplicationFactor of the ring. +func (r *Ring) ReplicationFactor() int { + return r.cfg.ReplicationFactor +} diff --git a/pkg/distributor/replication_strategy_test.go b/pkg/ring/replication_strategy_test.go similarity index 80% rename from pkg/distributor/replication_strategy_test.go rename to pkg/ring/replication_strategy_test.go index fcd14c0e83b..dff76496f2f 100644 --- a/pkg/distributor/replication_strategy_test.go +++ b/pkg/ring/replication_strategy_test.go @@ -1,4 +1,4 @@ -package distributor +package ring import ( "fmt" @@ -7,7 +7,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/weaveworks/cortex/pkg/ring" ) func TestReplicationStrategy(t *testing.T) { @@ -74,31 +73,25 @@ func TestReplicationStrategy(t *testing.T) { ExpectedError: "at least 3 live ingesters required, could only find 2", }, } { - ingesters := []*ring.IngesterDesc{} + ingesters := []*IngesterDesc{} for i := 0; i < tc.LiveIngesters; i++ { - ingesters = append(ingesters, &ring.IngesterDesc{ + ingesters = append(ingesters, &IngesterDesc{ Timestamp: time.Now().Unix(), }) } for i := 0; i < tc.DeadIngesters; i++ { - ingesters = append(ingesters, &ring.IngesterDesc{}) + ingesters = append(ingesters, &IngesterDesc{}) } - r, err := ring.New(ring.Config{ - Mock: ring.NewInMemoryKVClient(), - HeartbeatTimeout: 100 * time.Second, + r, err := New(Config{ + Mock: NewInMemoryKVClient(), + HeartbeatTimeout: 100 * time.Second, + ReplicationFactor: tc.RF, }) require.NoError(t, err) - d := Distributor{ - cfg: Config{ - ReplicationFactor: tc.RF, - }, - ring: r, - } - t.Run(fmt.Sprintf("[%d]", i), func(t *testing.T) { - minSuccess, maxFailure, _, err := d.replicationStrategy(ingesters) + minSuccess, maxFailure, _, err := r.replicationStrategy(ingesters) assert.Equal(t, tc.ExpectedMinSuccess, minSuccess) assert.Equal(t, tc.ExpectedMaxFailure, maxFailure) if tc.ExpectedError != "" { diff --git a/pkg/ring/ring.go b/pkg/ring/ring.go index eba5ba2ffa5..f1704483b12 100644 --- a/pkg/ring/ring.go +++ b/pkg/ring/ring.go @@ -6,6 +6,7 @@ import ( "context" "errors" "flag" + "fmt" "math" "sort" "sync" @@ -27,10 +28,17 @@ const ( type ReadRing interface { prometheus.Collector - Get(key uint32, n int, op Operation) ([]*IngesterDesc, error) - BatchGet(keys []uint32, n int, op Operation) ([][]*IngesterDesc, error) - GetAll() []*IngesterDesc - IsHealthy(*IngesterDesc) bool + Get(key uint32, op Operation) (ReplicationSet, error) + BatchGet(keys []uint32, op Operation) ([]ReplicationSet, error) + GetAll() (ReplicationSet, error) + ReplicationFactor() int +} + +// ReplicationSet describes the ingesters to talk to for a given key, and how +// many errors to tolerate. +type ReplicationSet struct { + Ingesters []*IngesterDesc + MaxErrors int } // Operation can be Read or Write @@ -54,9 +62,10 @@ var ErrEmptyRing = errors.New("empty ring") // Config for a Ring type Config struct { ConsulConfig - store string - HeartbeatTimeout time.Duration - Mock KVClient + store string + HeartbeatTimeout time.Duration + ReplicationFactor int + Mock KVClient } // RegisterFlags adds the flags required to config this to the given FlagSet @@ -65,14 +74,15 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.StringVar(&cfg.store, "ring.store", "consul", "Backend storage to use for the ring (consul, inmemory).") f.DurationVar(&cfg.HeartbeatTimeout, "ring.heartbeat-timeout", time.Minute, "The heartbeat timeout after which ingesters are skipped for reads/writes.") + f.IntVar(&cfg.ReplicationFactor, "distributor.replication-factor", 3, "The number of ingesters to write to and read from.") } // Ring holds the information about the members of the consistent hash ring. type Ring struct { - KVClient KVClient - done chan struct{} - quit context.CancelFunc - heartbeatTimeout time.Duration + cfg Config + KVClient KVClient + done chan struct{} + quit context.CancelFunc mtx sync.RWMutex ringDesc *Desc @@ -84,6 +94,10 @@ type Ring struct { // New creates a new Ring func New(cfg Config) (*Ring, error) { + if 0 > cfg.ReplicationFactor { + return nil, fmt.Errorf("ReplicationFactor must be greater than zero: %d", cfg.ReplicationFactor) + } + store := cfg.Mock if store == nil { var err error @@ -101,10 +115,10 @@ func New(cfg Config) (*Ring, error) { } r := &Ring{ - KVClient: store, - heartbeatTimeout: cfg.HeartbeatTimeout, - done: make(chan struct{}), - ringDesc: &Desc{}, + cfg: cfg, + KVClient: store, + done: make(chan struct{}), + ringDesc: &Desc{}, ingesterOwnershipDesc: prometheus.NewDesc( "cortex_ring_ingester_ownership_percent", "The percent ownership of the ring by ingester", @@ -150,38 +164,41 @@ func (r *Ring) loop(ctx context.Context) { } // Get returns n (or more) ingesters which form the replicas for the given key. -func (r *Ring) Get(key uint32, n int, op Operation) ([]*IngesterDesc, error) { +func (r *Ring) Get(key uint32, op Operation) (ReplicationSet, error) { r.mtx.RLock() defer r.mtx.RUnlock() - return r.getInternal(key, n, op) + return r.getInternal(key, op) } -// BatchGet returns n (or more) ingesters which form the replicas for the given key. -// The order of the result matches the order of the input. -func (r *Ring) BatchGet(keys []uint32, n int, op Operation) ([][]*IngesterDesc, error) { +// BatchGet returns ReplicationFactor (or more) ingesters which form the replicas +// for the given key. The order of the result matches the order of the input. +func (r *Ring) BatchGet(keys []uint32, op Operation) ([]ReplicationSet, error) { r.mtx.RLock() defer r.mtx.RUnlock() - result := make([][]*IngesterDesc, len(keys), len(keys)) + result := make([]ReplicationSet, len(keys), len(keys)) for i, key := range keys { - ingesters, err := r.getInternal(key, n, op) + rs, err := r.getInternal(key, op) if err != nil { return nil, err } - result[i] = ingesters + result[i] = rs } return result, nil } -func (r *Ring) getInternal(key uint32, n int, op Operation) ([]*IngesterDesc, error) { +func (r *Ring) getInternal(key uint32, op Operation) (ReplicationSet, error) { if r.ringDesc == nil || len(r.ringDesc.Tokens) == 0 { - return nil, ErrEmptyRing + return ReplicationSet{}, ErrEmptyRing } - ingesters := make([]*IngesterDesc, 0, n) - distinctHosts := map[string]struct{}{} - start := r.search(key) - iterations := 0 + var ( + n = r.cfg.ReplicationFactor + ingesters = make([]*IngesterDesc, 0, n) + distinctHosts = map[string]struct{}{} + start = r.search(key) + iterations = 0 + ) for i := start; len(distinctHosts) < n && iterations < len(r.ringDesc.Tokens); i++ { iterations++ // Wrap i around in the ring. @@ -210,34 +227,46 @@ func (r *Ring) getInternal(key uint32, n int, op Operation) ([]*IngesterDesc, er ingesters = append(ingesters, ingester) } - return ingesters, nil -} -// IsHealthy checks whether an ingester appears to be alive and heartbeating -func (r *Ring) IsHealthy(ingester *IngesterDesc) bool { - if ingester.State != ACTIVE { - return false + _, maxFailure, liveIngesters, err := r.replicationStrategy(ingesters) + if err != nil { + return ReplicationSet{}, err } - return time.Now().Sub(time.Unix(ingester.Timestamp, 0)) <= r.heartbeatTimeout + + return ReplicationSet{ + Ingesters: liveIngesters, + MaxErrors: maxFailure, + }, nil } // GetAll returns all available ingesters in the ring. -func (r *Ring) GetAll() []*IngesterDesc { +func (r *Ring) GetAll() (ReplicationSet, error) { r.mtx.RLock() defer r.mtx.RUnlock() - if r.ringDesc == nil { - return nil + if r.ringDesc == nil || len(r.ringDesc.Tokens) == 0 { + return ReplicationSet{}, ErrEmptyRing } ingesters := make([]*IngesterDesc, 0, len(r.ringDesc.Ingesters)) + maxErrors := r.cfg.ReplicationFactor / 2 + for _, ingester := range r.ringDesc.Ingesters { if !r.IsHealthy(ingester) { + maxErrors-- continue } ingesters = append(ingesters, ingester) } - return ingesters + + if maxErrors < 0 { + return ReplicationSet{}, fmt.Errorf("too many failed ingesters") + } + + return ReplicationSet{ + Ingesters: ingesters, + MaxErrors: maxErrors, + }, nil } func (r *Ring) search(key uint32) int { diff --git a/pkg/ring/ring_test.go b/pkg/ring/ring_test.go index b9a32a49dec..e90f3db7201 100644 --- a/pkg/ring/ring_test.go +++ b/pkg/ring/ring_test.go @@ -28,7 +28,8 @@ func BenchmarkRing(b *testing.B) { consul.PutBytes(ConsulKey, ringBytes) r, err := New(Config{ - Mock: consul, + Mock: consul, + ReplicationFactor: 3, }) if err != nil { b.Fatal(err) @@ -38,6 +39,6 @@ func BenchmarkRing(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { keys := GenerateTokens(100, nil) - r.BatchGet(keys, 3, Write) + r.BatchGet(keys, Write) } } From 333f67a797d71ccbd1353b64cd9bdd9934e09e06 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Tue, 6 Feb 2018 11:36:49 +0000 Subject: [PATCH 06/12] Now we're only propagating MaxError, need to make sure replicationStrategy is up to date. --- pkg/ring/replication_strategy.go | 12 +++--- pkg/ring/replication_strategy_test.go | 55 ++++++++++++++------------- pkg/ring/ring.go | 2 +- 3 files changed, 34 insertions(+), 35 deletions(-) diff --git a/pkg/ring/replication_strategy.go b/pkg/ring/replication_strategy.go index 6dfddce8b91..668c2c36f49 100644 --- a/pkg/ring/replication_strategy.go +++ b/pkg/ring/replication_strategy.go @@ -6,7 +6,7 @@ import ( ) func (r *Ring) replicationStrategy(ingesters []*IngesterDesc) ( - minSuccess, maxFailure int, liveIngesters []*IngesterDesc, err error, + liveIngesters []*IngesterDesc, maxFailure int, err error, ) { // We need a response from a quorum of ingesters, which is n/2 + 1. In the // case of a node joining/leaving, the actual replica set might be bigger @@ -16,11 +16,8 @@ func (r *Ring) replicationStrategy(ingesters []*IngesterDesc) ( if len(ingesters) > replicationFactor { replicationFactor = len(ingesters) } - minSuccess = (replicationFactor / 2) + 1 + minSuccess := (replicationFactor / 2) + 1 maxFailure = replicationFactor - minSuccess - if maxFailure < 0 { - maxFailure = 0 - } // Skip those that have not heartbeated in a while. NB these are still // included in the calculation of minSuccess, so if too many failed ingesters @@ -29,12 +26,13 @@ func (r *Ring) replicationStrategy(ingesters []*IngesterDesc) ( for _, ingester := range ingesters { if r.IsHealthy(ingester) { liveIngesters = append(liveIngesters, ingester) + } else { + maxFailure-- } } - // This is just a shortcut - if there are not minSuccess available ingesters, // after filtering out dead ones, don't even bother trying. - if len(liveIngesters) < minSuccess { + if maxFailure < 0 || len(liveIngesters) < minSuccess { err = fmt.Errorf("at least %d live ingesters required, could only find %d", minSuccess, len(liveIngesters)) return diff --git a/pkg/ring/replication_strategy_test.go b/pkg/ring/replication_strategy_test.go index dff76496f2f..37e2a22d57b 100644 --- a/pkg/ring/replication_strategy_test.go +++ b/pkg/ring/replication_strategy_test.go @@ -11,29 +11,27 @@ import ( func TestReplicationStrategy(t *testing.T) { for i, tc := range []struct { - RF, LiveIngesters, DeadIngesters int - ExpectedMinSuccess, ExpectedMaxFailure int - ExpectedError string + RF, LiveIngesters, DeadIngesters int + ExpectedMaxFailure int + ExpectedError string }{ // Ensure it works for a single ingester, for local testing. { RF: 1, LiveIngesters: 1, - ExpectedMinSuccess: 1, + ExpectedMaxFailure: 0, }, { - RF: 1, - DeadIngesters: 1, - ExpectedMinSuccess: 1, - ExpectedError: "at least 1 live ingesters required, could only find 0", + RF: 1, + DeadIngesters: 1, + ExpectedError: "at least 1 live ingesters required, could only find 0", }, // Ensure it works for the default production config. { RF: 3, LiveIngesters: 3, - ExpectedMinSuccess: 2, ExpectedMaxFailure: 1, }, @@ -41,17 +39,14 @@ func TestReplicationStrategy(t *testing.T) { RF: 3, LiveIngesters: 2, DeadIngesters: 1, - ExpectedMinSuccess: 2, - ExpectedMaxFailure: 1, + ExpectedMaxFailure: 0, }, { - RF: 3, - LiveIngesters: 1, - DeadIngesters: 2, - ExpectedMinSuccess: 2, - ExpectedMaxFailure: 1, - ExpectedError: "at least 2 live ingesters required, could only find 1", + RF: 3, + LiveIngesters: 1, + DeadIngesters: 2, + ExpectedError: "at least 2 live ingesters required, could only find 1", }, // Ensure it works when adding / removing nodes. @@ -60,17 +55,21 @@ func TestReplicationStrategy(t *testing.T) { { RF: 3, LiveIngesters: 4, - ExpectedMinSuccess: 3, ExpectedMaxFailure: 1, }, { RF: 3, - LiveIngesters: 2, - DeadIngesters: 2, - ExpectedMinSuccess: 3, - ExpectedMaxFailure: 1, - ExpectedError: "at least 3 live ingesters required, could only find 2", + LiveIngesters: 3, + DeadIngesters: 1, + ExpectedMaxFailure: 0, + }, + + { + RF: 3, + LiveIngesters: 2, + DeadIngesters: 2, + ExpectedError: "at least 3 live ingesters required, could only find 2", }, } { ingesters := []*IngesterDesc{} @@ -91,10 +90,12 @@ func TestReplicationStrategy(t *testing.T) { require.NoError(t, err) t.Run(fmt.Sprintf("[%d]", i), func(t *testing.T) { - minSuccess, maxFailure, _, err := r.replicationStrategy(ingesters) - assert.Equal(t, tc.ExpectedMinSuccess, minSuccess) - assert.Equal(t, tc.ExpectedMaxFailure, maxFailure) - if tc.ExpectedError != "" { + liveIngesters, maxFailure, err := r.replicationStrategy(ingesters) + if tc.ExpectedError == "" { + assert.NoError(t, err) + assert.Equal(t, tc.LiveIngesters, len(liveIngesters)) + assert.Equal(t, tc.ExpectedMaxFailure, maxFailure) + } else { assert.EqualError(t, err, tc.ExpectedError) } }) diff --git a/pkg/ring/ring.go b/pkg/ring/ring.go index f1704483b12..7d32a210889 100644 --- a/pkg/ring/ring.go +++ b/pkg/ring/ring.go @@ -228,7 +228,7 @@ func (r *Ring) getInternal(key uint32, op Operation) (ReplicationSet, error) { ingesters = append(ingesters, ingester) } - _, maxFailure, liveIngesters, err := r.replicationStrategy(ingesters) + liveIngesters, maxFailure, err := r.replicationStrategy(ingesters) if err != nil { return ReplicationSet{}, err } From cf2905037ad8994f1f16694d559a9f13bf6dfc8d Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Tue, 6 Feb 2018 15:09:16 +0000 Subject: [PATCH 07/12] Review feedback. --- pkg/distributor/distributor_test.go | 9 +++------ pkg/ring/replication_strategy.go | 9 +++++++-- pkg/ring/ring.go | 9 ++++----- 3 files changed, 14 insertions(+), 13 deletions(-) diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 84a0fccd08d..69d9d270f22 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -20,8 +20,7 @@ import ( // mockRing doesn't do any consistent hashing, just returns same ingesters for every query. type mockRing struct { prometheus.Counter - ingesters []*ring.IngesterDesc - heartbeatTimeout time.Duration + ingesters []*ring.IngesterDesc } func (r mockRing) Get(key uint32, op ring.Operation) (ring.ReplicationSet, error) { @@ -168,8 +167,7 @@ func TestDistributorPush(t *testing.T) { Counter: prometheus.NewCounter(prometheus.CounterOpts{ Name: "foo", }), - ingesters: ingesterDescs, - heartbeatTimeout: 1 * time.Minute, + ingesters: ingesterDescs, } d, err := New(Config{ @@ -307,8 +305,7 @@ func TestDistributorQuery(t *testing.T) { Counter: prometheus.NewCounter(prometheus.CounterOpts{ Name: "foo", }), - ingesters: ingesterDescs, - heartbeatTimeout: 1 * time.Minute, + ingesters: ingesterDescs, } d, err := New(Config{ diff --git a/pkg/ring/replication_strategy.go b/pkg/ring/replication_strategy.go index 668c2c36f49..6528f28bc78 100644 --- a/pkg/ring/replication_strategy.go +++ b/pkg/ring/replication_strategy.go @@ -5,13 +5,17 @@ import ( "time" ) +// replicationStrategy decides, given the set of ingesters eligable for a key, +// which ingesters you will try and write to and how many failures you will +// tolerate. +// - Filters out dead ingesters so the one doesn't even try to write to them. +// - Checks there is enough ingesters for an operation to succeed. func (r *Ring) replicationStrategy(ingesters []*IngesterDesc) ( liveIngesters []*IngesterDesc, maxFailure int, err error, ) { // We need a response from a quorum of ingesters, which is n/2 + 1. In the // case of a node joining/leaving, the actual replica set might be bigger - // than the replication factor, so we need to account for this. - // See comment in ring.go:getInternal. + // than the replication factor, so use the bigger or the two. replicationFactor := r.cfg.ReplicationFactor if len(ingesters) > replicationFactor { replicationFactor = len(ingesters) @@ -30,6 +34,7 @@ func (r *Ring) replicationStrategy(ingesters []*IngesterDesc) ( maxFailure-- } } + // This is just a shortcut - if there are not minSuccess available ingesters, // after filtering out dead ones, don't even bother trying. if maxFailure < 0 || len(liveIngesters) < minSuccess { diff --git a/pkg/ring/ring.go b/pkg/ring/ring.go index 7d32a210889..c660c766bf9 100644 --- a/pkg/ring/ring.go +++ b/pkg/ring/ring.go @@ -212,13 +212,12 @@ func (r *Ring) getInternal(key uint32, op Operation) (ReplicationSet, error) { distinctHosts[token.Ingester] = struct{}{} ingester := r.ringDesc.Ingesters[token.Ingester] - // We do not want to Write to Ingesters that are not ACTIVE, because they are - // about to go away, but we do want to write the extra replica somewhere. - // So we increase the size of the set of replicas for the key. - // This means we have to also increase the + // We do not want to Write to Ingesters that are not ACTIVE, but we do want + // to write the extra replica somewhere. So we increase the size of the set + // of replicas for the key. This means we have to also increase the // size of the replica set for read, but we can read from Leaving ingesters, // so don't skip it in this case. - // NB ingester will be filterer later (by distributor/replication_strategy). + // NB dead ingester will be filtered later (by replication_strategy.go). if op == Write && ingester.State != ACTIVE { n++ } else if op == Read && (ingester.State != ACTIVE && ingester.State != LEAVING) { From 23323c49bd54a64f9433c2638096202f02d166c8 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Tue, 6 Feb 2018 15:31:24 +0000 Subject: [PATCH 08/12] Typo --- pkg/ring/replication_strategy.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/ring/replication_strategy.go b/pkg/ring/replication_strategy.go index 6528f28bc78..e53f62ca5b3 100644 --- a/pkg/ring/replication_strategy.go +++ b/pkg/ring/replication_strategy.go @@ -5,7 +5,7 @@ import ( "time" ) -// replicationStrategy decides, given the set of ingesters eligable for a key, +// replicationStrategy decides, given the set of ingesters eligible for a key, // which ingesters you will try and write to and how many failures you will // tolerate. // - Filters out dead ingesters so the one doesn't even try to write to them. From 85e441cda5543720ba6b6c986f3222f20a5bc605 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Tue, 20 Feb 2018 12:33:20 +0000 Subject: [PATCH 09/12] Fix nits --- pkg/ring/ring.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/ring/ring.go b/pkg/ring/ring.go index c660c766bf9..ef54855761a 100644 --- a/pkg/ring/ring.go +++ b/pkg/ring/ring.go @@ -94,7 +94,7 @@ type Ring struct { // New creates a new Ring func New(cfg Config) (*Ring, error) { - if 0 > cfg.ReplicationFactor { + if cfg.ReplicationFactor <= 0 { return nil, fmt.Errorf("ReplicationFactor must be greater than zero: %d", cfg.ReplicationFactor) } @@ -171,7 +171,7 @@ func (r *Ring) Get(key uint32, op Operation) (ReplicationSet, error) { } // BatchGet returns ReplicationFactor (or more) ingesters which form the replicas -// for the given key. The order of the result matches the order of the input. +// for the given keys. The order of the result matches the order of the input. func (r *Ring) BatchGet(keys []uint32, op Operation) ([]ReplicationSet, error) { r.mtx.RLock() defer r.mtx.RUnlock() From fb52bbb2d0d6020d771a3862d5f1faad606221fc Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Tue, 20 Feb 2018 12:44:08 +0000 Subject: [PATCH 10/12] Reintroduce old behaviour, of reading from LEAVING ingesters. --- pkg/ring/http.go | 2 +- pkg/ring/replication_strategy.go | 10 ++++++---- pkg/ring/replication_strategy_test.go | 3 ++- pkg/ring/ring.go | 6 +++--- 4 files changed, 12 insertions(+), 9 deletions(-) diff --git a/pkg/ring/http.go b/pkg/ring/http.go index e3b067a0988..71cb5c820c6 100644 --- a/pkg/ring/http.go +++ b/pkg/ring/http.go @@ -102,7 +102,7 @@ func (r *Ring) ServeHTTP(w http.ResponseWriter, req *http.Request) { ing := r.ringDesc.Ingesters[id] timestamp := time.Unix(ing.Timestamp, 0) state := ing.State.String() - if !r.IsHealthy(ing) { + if !r.IsHealthy(ing, Write) { state = unhealthy } diff --git a/pkg/ring/replication_strategy.go b/pkg/ring/replication_strategy.go index e53f62ca5b3..2e0c8d5f4c3 100644 --- a/pkg/ring/replication_strategy.go +++ b/pkg/ring/replication_strategy.go @@ -10,7 +10,7 @@ import ( // tolerate. // - Filters out dead ingesters so the one doesn't even try to write to them. // - Checks there is enough ingesters for an operation to succeed. -func (r *Ring) replicationStrategy(ingesters []*IngesterDesc) ( +func (r *Ring) replicationStrategy(ingesters []*IngesterDesc, op Operation) ( liveIngesters []*IngesterDesc, maxFailure int, err error, ) { // We need a response from a quorum of ingesters, which is n/2 + 1. In the @@ -28,7 +28,7 @@ func (r *Ring) replicationStrategy(ingesters []*IngesterDesc) ( // will cause the whole write to fail. liveIngesters = make([]*IngesterDesc, 0, len(ingesters)) for _, ingester := range ingesters { - if r.IsHealthy(ingester) { + if r.IsHealthy(ingester, op) { liveIngesters = append(liveIngesters, ingester) } else { maxFailure-- @@ -47,8 +47,10 @@ func (r *Ring) replicationStrategy(ingesters []*IngesterDesc) ( } // IsHealthy checks whether an ingester appears to be alive and heartbeating -func (r *Ring) IsHealthy(ingester *IngesterDesc) bool { - if ingester.State != ACTIVE { +func (r *Ring) IsHealthy(ingester *IngesterDesc, op Operation) bool { + if op == Write && ingester.State != ACTIVE { + return false + } else if op == Read && ingester.State == JOINING { return false } return time.Now().Sub(time.Unix(ingester.Timestamp, 0)) <= r.cfg.HeartbeatTimeout diff --git a/pkg/ring/replication_strategy_test.go b/pkg/ring/replication_strategy_test.go index 37e2a22d57b..f81f10583ed 100644 --- a/pkg/ring/replication_strategy_test.go +++ b/pkg/ring/replication_strategy_test.go @@ -12,6 +12,7 @@ import ( func TestReplicationStrategy(t *testing.T) { for i, tc := range []struct { RF, LiveIngesters, DeadIngesters int + op Operation // Will default to READ ExpectedMaxFailure int ExpectedError string }{ @@ -90,7 +91,7 @@ func TestReplicationStrategy(t *testing.T) { require.NoError(t, err) t.Run(fmt.Sprintf("[%d]", i), func(t *testing.T) { - liveIngesters, maxFailure, err := r.replicationStrategy(ingesters) + liveIngesters, maxFailure, err := r.replicationStrategy(ingesters, tc.op) if tc.ExpectedError == "" { assert.NoError(t, err) assert.Equal(t, tc.LiveIngesters, len(liveIngesters)) diff --git a/pkg/ring/ring.go b/pkg/ring/ring.go index ef54855761a..52c7940ecdd 100644 --- a/pkg/ring/ring.go +++ b/pkg/ring/ring.go @@ -227,7 +227,7 @@ func (r *Ring) getInternal(key uint32, op Operation) (ReplicationSet, error) { ingesters = append(ingesters, ingester) } - liveIngesters, maxFailure, err := r.replicationStrategy(ingesters) + liveIngesters, maxFailure, err := r.replicationStrategy(ingesters, op) if err != nil { return ReplicationSet{}, err } @@ -251,7 +251,7 @@ func (r *Ring) GetAll() (ReplicationSet, error) { maxErrors := r.cfg.ReplicationFactor / 2 for _, ingester := range r.ringDesc.Ingesters { - if !r.IsHealthy(ingester) { + if !r.IsHealthy(ingester, Read) { maxErrors-- continue } @@ -325,7 +325,7 @@ func (r *Ring) Collect(ch chan<- prometheus.Metric) { JOINING.String(): 0, } for _, ingester := range r.ringDesc.Ingesters { - if !r.IsHealthy(ingester) { + if !r.IsHealthy(ingester, Write) { byState[unhealthy]++ } else { byState[ingester.State.String()]++ From 16112e45e190c681958b4cf7f04a724927b0a032 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Fri, 23 Mar 2018 17:36:00 +0000 Subject: [PATCH 11/12] Rebase fix-up --- pkg/distributor/distributor.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index d139e46532e..3deebc36d13 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -619,8 +619,11 @@ func (d *Distributor) AllUserStats(ctx context.Context) ([]UserIDStats, error) { req := &client.UserStatsRequest{} ctx = user.InjectOrgID(ctx, "1") // fake: ingester insists on having an org ID // Not using d.forAllIngesters(), so we can fail after first error. - ingesters := d.ring.GetAll() - for _, ingester := range ingesters { + replicationSet, err := d.ring.GetAll() + if err != nil { + return nil, err + } + for _, ingester := range replicationSet.Ingesters { client, err := d.ingesterPool.GetClientFor(ingester.Addr) if err != nil { return nil, err From ace81ba5b1373eaf15a712ad085dd1a54e44b5fa Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Fri, 23 Mar 2018 17:24:49 +0000 Subject: [PATCH 12/12] Don't exclude some ingesters from metrics and http admin page --- pkg/ring/http.go | 2 +- pkg/ring/ring.go | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/pkg/ring/http.go b/pkg/ring/http.go index 71cb5c820c6..c0746d6ad24 100644 --- a/pkg/ring/http.go +++ b/pkg/ring/http.go @@ -102,7 +102,7 @@ func (r *Ring) ServeHTTP(w http.ResponseWriter, req *http.Request) { ing := r.ringDesc.Ingesters[id] timestamp := time.Unix(ing.Timestamp, 0) state := ing.State.String() - if !r.IsHealthy(ing, Write) { + if !r.IsHealthy(ing, Reporting) { state = unhealthy } diff --git a/pkg/ring/ring.go b/pkg/ring/ring.go index 52c7940ecdd..bd9c08f22b6 100644 --- a/pkg/ring/ring.go +++ b/pkg/ring/ring.go @@ -48,6 +48,7 @@ type Operation int const ( Read Operation = iota Write + Reporting // Special value for inquiring about health ) type uint32s []uint32 @@ -325,7 +326,7 @@ func (r *Ring) Collect(ch chan<- prometheus.Metric) { JOINING.String(): 0, } for _, ingester := range r.ringDesc.Ingesters { - if !r.IsHealthy(ingester, Write) { + if !r.IsHealthy(ingester, Reporting) { byState[unhealthy]++ } else { byState[ingester.State.String()]++