From 68b534bcf5b132f4c9300e3879637db6d54170af Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Sun, 3 Oct 2021 20:11:15 -0700 Subject: [PATCH 1/4] Returning quorum error Signed-off-by: Alan Protasio --- pkg/distributor/distributor_test.go | 123 +++++++++++++++++++++++++--- pkg/ring/batch.go | 33 +++++++- 2 files changed, 144 insertions(+), 12 deletions(-) diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index b3790f70322..9715bc5a5f0 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -14,6 +14,8 @@ import ( "testing" "time" + "google.golang.org/grpc/codes" + "github.com/go-kit/log" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/testutil" @@ -24,6 +26,7 @@ import ( "github.com/stretchr/testify/require" "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/user" + "go.uber.org/atomic" "google.golang.org/grpc" "google.golang.org/grpc/health/grpc_health_v1" "google.golang.org/grpc/status" @@ -490,6 +493,106 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) { } } +func TestPush_QuorumError(t *testing.T) { + var limits validation.Limits + flagext.DefaultValues(&limits) + + limits.IngestionRate = math.MaxFloat64 + + dists, ingesters, r, _ := prepare(t, prepConfig{ + numDistributors: 1, + numIngesters: 3, + happyIngesters: 0, + shuffleShardSize: 3, + shardByAllLabels: true, + shuffleShardEnabled: true, + limits: &limits, + }) + + ctx := user.InjectOrgID(context.Background(), "user") + + d := dists[0] + + // Using 489 just to make sure we are not hitting the &limits + // Simulating 2 4xx and 1 5xx -> Should return 4xx + ingesters[0].failResp.Store(httpgrpc.Errorf(489, "Throttling")) + ingesters[1].failResp.Store(httpgrpc.Errorf(500, "InternalServerError")) + ingesters[2].failResp.Store(httpgrpc.Errorf(489, "Throttling")) + + for i := 0; i < 1000; i++ { + request := makeWriteRequest(0, 30, 20) + _, err := d.Push(ctx, request) + status, ok := status.FromError(err) + require.True(t, ok) + require.Equal(t, codes.Code(489), status.Code()) + } + + // Simulating 2 5xx and 1 4xx -> Should return 5xx + ingesters[0].failResp.Store(httpgrpc.Errorf(500, "InternalServerError")) + ingesters[1].failResp.Store(httpgrpc.Errorf(489, "Throttling")) + ingesters[2].failResp.Store(httpgrpc.Errorf(500, "InternalServerError")) + + for i := 0; i < 10000; i++ { + request := makeWriteRequest(0, 300, 200) + _, err := d.Push(ctx, request) + status, ok := status.FromError(err) + require.True(t, ok) + require.Equal(t, codes.Code(500), status.Code()) + } + + // Simulating 2 different errors and 1 success -> This case we may return any of the errors + ingesters[0].failResp.Store(httpgrpc.Errorf(500, "InternalServerError")) + ingesters[1].failResp.Store(httpgrpc.Errorf(489, "Throttling")) + ingesters[2].happy.Store(true) + + for i := 0; i < 1000; i++ { + request := makeWriteRequest(0, 30, 20) + _, err := d.Push(ctx, request) + status, ok := status.FromError(err) + require.True(t, ok) + require.True(t, status.Code() == 489 || status.Code() == 500) + } + + // Simulating 1 error -> Should return 2xx + ingesters[0].failResp.Store(httpgrpc.Errorf(500, "InternalServerError")) + ingesters[1].happy.Store(true) + ingesters[2].happy.Store(true) + + for i := 0; i < 1000; i++ { + request := makeWriteRequest(0, 30, 20) + _, err := d.Push(ctx, request) + require.NoError(t, err) + } + + // Simulating an unhealthy ingester (ingester 2) + ingesters[0].failResp.Store(httpgrpc.Errorf(500, "InternalServerError")) + ingesters[1].happy.Store(true) + ingesters[2].happy.Store(true) + + err := r.KVClient.CAS(context.Background(), ring.IngesterRingKey, func(in interface{}) (interface{}, bool, error) { + r := in.(*ring.Desc) + ingester2 := r.Ingesters["2"] + ingester2.State = ring.LEFT + ingester2.Timestamp = time.Now().Unix() + r.Ingesters["2"] = ingester2 + return in, true, nil + }) + + require.NoError(t, err) + + // Give time to the ring get updated with the KV value + time.Sleep(5 * time.Second) + + for i := 0; i < 1000; i++ { + request := makeWriteRequest(0, 30, 20) + _, err := d.Push(ctx, request) + require.Error(t, err) + status, ok := status.FromError(err) + require.True(t, ok) + require.Equal(t, codes.Code(500), status.Code()) + } +} + func TestDistributor_PushInstanceLimits(t *testing.T) { type testPush struct { @@ -1949,7 +2052,7 @@ func prepare(t *testing.T, cfg prepConfig) ([]*Distributor, []mockIngester, []*p ingesters := []mockIngester{} for i := 0; i < cfg.happyIngesters; i++ { ingesters = append(ingesters, mockIngester{ - happy: true, + happy: *atomic.NewBool(true), queryDelay: cfg.queryDelay, }) } @@ -1961,7 +2064,7 @@ func prepare(t *testing.T, cfg prepConfig) ([]*Distributor, []mockIngester, []*p ingesters = append(ingesters, mockIngester{ queryDelay: cfg.queryDelay, - failResp: miError, + failResp: *atomic.NewError(miError), }) } @@ -2208,8 +2311,8 @@ type mockIngester struct { sync.Mutex client.IngesterClient grpc_health_v1.HealthClient - happy bool - failResp error + happy atomic.Bool + failResp atomic.Error stats client.UsersStatsResponse timeseries map[uint32]*cortexpb.PreallocTimeseries metadata map[uint32]map[cortexpb.MetricMetadata]struct{} @@ -2247,8 +2350,8 @@ func (i *mockIngester) Push(ctx context.Context, req *cortexpb.WriteRequest, opt i.trackCall("Push") - if !i.happy { - return nil, i.failResp + if !i.happy.Load() { + return nil, i.failResp.Load() } if i.timeseries == nil { @@ -2305,7 +2408,7 @@ func (i *mockIngester) Query(ctx context.Context, req *client.QueryRequest, opts i.trackCall("Query") - if !i.happy { + if !i.happy.Load() { return nil, errFail } @@ -2331,7 +2434,7 @@ func (i *mockIngester) QueryStream(ctx context.Context, req *client.QueryRequest i.trackCall("QueryStream") - if !i.happy { + if !i.happy.Load() { return nil, errFail } @@ -2395,7 +2498,7 @@ func (i *mockIngester) MetricsForLabelMatchers(ctx context.Context, req *client. i.trackCall("MetricsForLabelMatchers") - if !i.happy { + if !i.happy.Load() { return nil, errFail } @@ -2421,7 +2524,7 @@ func (i *mockIngester) MetricsMetadata(ctx context.Context, req *client.MetricsM i.trackCall("MetricsMetadata") - if !i.happy { + if !i.happy.Load() { return nil, errFail } diff --git a/pkg/ring/batch.go b/pkg/ring/batch.go index 1e4ee446d91..cd19214009f 100644 --- a/pkg/ring/batch.go +++ b/pkg/ring/batch.go @@ -5,6 +5,8 @@ import ( "fmt" "sync" + "google.golang.org/grpc/status" + "go.uber.org/atomic" ) @@ -25,7 +27,20 @@ type itemTracker struct { minSuccess int maxFailures int succeeded atomic.Int32 - failed atomic.Int32 + failed4xx atomic.Int32 + failed5xx atomic.Int32 + remaining atomic.Int32 + err atomic.Error +} + +func (i *itemTracker) recordError(err error) int32 { + i.err.Store(err) + + if status, ok := status.FromError(err); ok && status.Code()/100 == 4 { + return i.failed4xx.Inc() + } + + return i.failed5xx.Inc() } // DoBatch request against a set of keys in the ring, handling replication and @@ -62,6 +77,7 @@ func DoBatch(ctx context.Context, op Operation, r ReadRing, keys []uint32, callb } itemTrackers[i].minSuccess = len(replicationSet.Instances) - replicationSet.MaxErrors itemTrackers[i].maxFailures = replicationSet.MaxErrors + itemTrackers[i].remaining.Store(int32(len(replicationSet.Instances))) for _, desc := range replicationSet.Instances { curr, found := instances[desc.Addr] @@ -122,15 +138,28 @@ func (b *batchTracker) record(sampleTrackers []*itemTracker, err error) { // The use of atomic increments here guarantees only a single sendSamples // goroutine will write to either channel. for i := range sampleTrackers { + verifyIfShouldReturn := func() { + if sampleTrackers[i].remaining.Dec() == 0 { + if b.rpcsFailed.Inc() == 1 { + b.err <- sampleTrackers[i].err.Load() + } + } + } + if err != nil { - if sampleTrackers[i].failed.Inc() <= int32(sampleTrackers[i].maxFailures) { + errCount := sampleTrackers[i].recordError(err) + + if errCount <= int32(sampleTrackers[i].maxFailures) { + verifyIfShouldReturn() continue } + if b.rpcsFailed.Inc() == 1 { b.err <- err } } else { if sampleTrackers[i].succeeded.Inc() != int32(sampleTrackers[i].minSuccess) { + verifyIfShouldReturn() continue } if b.rpcsPending.Dec() == 0 { From 9fe75528ae63b9da377d237f29cfea338862131b Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Thu, 11 Nov 2021 10:19:28 -0800 Subject: [PATCH 2/4] refactor Signed-off-by: Alan Protasio --- pkg/distributor/distributor_test.go | 16 +++++------ pkg/ring/batch.go | 41 +++++++++++++++-------------- 2 files changed, 29 insertions(+), 28 deletions(-) diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 9715bc5a5f0..a2fd7b3fb44 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -513,23 +513,23 @@ func TestPush_QuorumError(t *testing.T) { d := dists[0] - // Using 489 just to make sure we are not hitting the &limits + // Using 429 just to make sure we are not hitting the &limits // Simulating 2 4xx and 1 5xx -> Should return 4xx - ingesters[0].failResp.Store(httpgrpc.Errorf(489, "Throttling")) + ingesters[0].failResp.Store(httpgrpc.Errorf(429, "Throttling")) ingesters[1].failResp.Store(httpgrpc.Errorf(500, "InternalServerError")) - ingesters[2].failResp.Store(httpgrpc.Errorf(489, "Throttling")) + ingesters[2].failResp.Store(httpgrpc.Errorf(429, "Throttling")) for i := 0; i < 1000; i++ { request := makeWriteRequest(0, 30, 20) _, err := d.Push(ctx, request) status, ok := status.FromError(err) require.True(t, ok) - require.Equal(t, codes.Code(489), status.Code()) + require.Equal(t, codes.Code(429), status.Code()) } // Simulating 2 5xx and 1 4xx -> Should return 5xx ingesters[0].failResp.Store(httpgrpc.Errorf(500, "InternalServerError")) - ingesters[1].failResp.Store(httpgrpc.Errorf(489, "Throttling")) + ingesters[1].failResp.Store(httpgrpc.Errorf(429, "Throttling")) ingesters[2].failResp.Store(httpgrpc.Errorf(500, "InternalServerError")) for i := 0; i < 10000; i++ { @@ -542,7 +542,7 @@ func TestPush_QuorumError(t *testing.T) { // Simulating 2 different errors and 1 success -> This case we may return any of the errors ingesters[0].failResp.Store(httpgrpc.Errorf(500, "InternalServerError")) - ingesters[1].failResp.Store(httpgrpc.Errorf(489, "Throttling")) + ingesters[1].failResp.Store(httpgrpc.Errorf(429, "Throttling")) ingesters[2].happy.Store(true) for i := 0; i < 1000; i++ { @@ -550,7 +550,7 @@ func TestPush_QuorumError(t *testing.T) { _, err := d.Push(ctx, request) status, ok := status.FromError(err) require.True(t, ok) - require.True(t, status.Code() == 489 || status.Code() == 500) + require.True(t, status.Code() == 429 || status.Code() == 500) } // Simulating 1 error -> Should return 2xx @@ -581,7 +581,7 @@ func TestPush_QuorumError(t *testing.T) { require.NoError(t, err) // Give time to the ring get updated with the KV value - time.Sleep(5 * time.Second) + time.Sleep(time.Second) for i := 0; i < 1000; i++ { request := makeWriteRequest(0, 30, 20) diff --git a/pkg/ring/batch.go b/pkg/ring/batch.go index cd19214009f..efb894830de 100644 --- a/pkg/ring/batch.go +++ b/pkg/ring/batch.go @@ -138,32 +138,33 @@ func (b *batchTracker) record(sampleTrackers []*itemTracker, err error) { // The use of atomic increments here guarantees only a single sendSamples // goroutine will write to either channel. for i := range sampleTrackers { - verifyIfShouldReturn := func() { - if sampleTrackers[i].remaining.Dec() == 0 { - if b.rpcsFailed.Inc() == 1 { - b.err <- sampleTrackers[i].err.Load() - } - } - } - if err != nil { + // We count the error by error family (4xx and 5xx) errCount := sampleTrackers[i].recordError(err) - - if errCount <= int32(sampleTrackers[i].maxFailures) { - verifyIfShouldReturn() - continue - } - - if b.rpcsFailed.Inc() == 1 { - b.err <- err + // We should return an error if we reach the maxFailure (quorum) on a give error family OR + // we dont have any remaining ingesters to try + // Ex: 2xx, 4xx, 5xx -> return 5xx + // Ex: 4xx, 4xx, 2xx -> return 4xx + if errCount > int32(sampleTrackers[i].maxFailures) || sampleTrackers[i].remaining.Dec() == 0 { + if b.rpcsFailed.Inc() == 1 { + b.err <- err + } } } else { - if sampleTrackers[i].succeeded.Inc() != int32(sampleTrackers[i].minSuccess) { - verifyIfShouldReturn() + // We should return success if we succeeded calling `minSuccess` ingesters + if sampleTrackers[i].succeeded.Inc() >= int32(sampleTrackers[i].minSuccess) { + if b.rpcsPending.Dec() == 0 { + b.done <- struct{}{} + } continue } - if b.rpcsPending.Dec() == 0 { - b.done <- struct{}{} + + // If we suceeded to call this particular ingester but we dont have any remaining ingesters to try + // and we did not succeeded calling `minSuccess` ingesters we need to return the last error + if sampleTrackers[i].remaining.Dec() == 0 { + if b.rpcsFailed.Inc() == 1 { + b.err <- sampleTrackers[i].err.Load() + } } } } From 9944ca11351419eb9e056d4b5743d0a0e360be3c Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Thu, 11 Nov 2021 11:30:36 -0800 Subject: [PATCH 3/4] rebase Signed-off-by: Alan Protasio --- pkg/distributor/distributor_test.go | 66 +++++++++++++++++------------ pkg/ring/batch.go | 8 ++-- 2 files changed, 44 insertions(+), 30 deletions(-) diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index a2fd7b3fb44..6675c6ab153 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -262,7 +262,7 @@ func TestDistributor_Push(t *testing.T) { limits.IngestionRate = 20 limits.IngestionBurstSize = 20 - ds, _, regs := prepare(t, prepConfig{ + ds, _, regs, _ := prepare(t, prepConfig{ numIngesters: tc.numIngesters, happyIngesters: tc.happyIngesters, numDistributors: 1, @@ -291,7 +291,7 @@ func TestDistributor_Push(t *testing.T) { } func TestDistributor_MetricsCleanup(t *testing.T) { - dists, _, regs := prepare(t, prepConfig{ + dists, _, regs, _ := prepare(t, prepConfig{ numDistributors: 1, }) d := dists[0] @@ -468,7 +468,7 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) { limits.IngestionBurstSize = testData.ingestionBurstSize // Start all expected distributors - distributors, _, _ := prepare(t, prepConfig{ + distributors, _, _, _ := prepare(t, prepConfig{ numIngesters: 3, happyIngesters: 3, numDistributors: testData.distributors, @@ -499,7 +499,7 @@ func TestPush_QuorumError(t *testing.T) { limits.IngestionRate = math.MaxFloat64 - dists, ingesters, r, _ := prepare(t, prepConfig{ + dists, ingesters, _, r := prepare(t, prepConfig{ numDistributors: 1, numIngesters: 3, happyIngesters: 0, @@ -545,7 +545,7 @@ func TestPush_QuorumError(t *testing.T) { ingesters[1].failResp.Store(httpgrpc.Errorf(429, "Throttling")) ingesters[2].happy.Store(true) - for i := 0; i < 1000; i++ { + for i := 0; i < 1; i++ { request := makeWriteRequest(0, 30, 20) _, err := d.Push(ctx, request) status, ok := status.FromError(err) @@ -558,7 +558,7 @@ func TestPush_QuorumError(t *testing.T) { ingesters[1].happy.Store(true) ingesters[2].happy.Store(true) - for i := 0; i < 1000; i++ { + for i := 0; i < 1; i++ { request := makeWriteRequest(0, 30, 20) _, err := d.Push(ctx, request) require.NoError(t, err) @@ -569,7 +569,19 @@ func TestPush_QuorumError(t *testing.T) { ingesters[1].happy.Store(true) ingesters[2].happy.Store(true) - err := r.KVClient.CAS(context.Background(), ring.IngesterRingKey, func(in interface{}) (interface{}, bool, error) { + // Wait group to check when the ring got updated + wg := &sync.WaitGroup{} + wg.Add(1) + + go func() { + r.KVClient.WatchKey(context.Background(), ingester.RingKey, func(i interface{}) bool { + wg.Done() + // False will terminate the watch + return false + }) + }() + + err := r.KVClient.CAS(context.Background(), ingester.RingKey, func(in interface{}) (interface{}, bool, error) { r := in.(*ring.Desc) ingester2 := r.Ingesters["2"] ingester2.State = ring.LEFT @@ -581,7 +593,7 @@ func TestPush_QuorumError(t *testing.T) { require.NoError(t, err) // Give time to the ring get updated with the KV value - time.Sleep(time.Second) + wg.Wait() for i := 0; i < 1000; i++ { request := makeWriteRequest(0, 30, 20) @@ -707,7 +719,7 @@ func TestDistributor_PushInstanceLimits(t *testing.T) { flagext.DefaultValues(limits) // Start all expected distributors - distributors, _, regs := prepare(t, prepConfig{ + distributors, _, regs, _ := prepare(t, prepConfig{ numIngesters: 3, happyIngesters: 3, numDistributors: 1, @@ -799,7 +811,7 @@ func TestDistributor_PushHAInstances(t *testing.T) { limits.AcceptHASamples = true limits.MaxLabelValueLength = 15 - ds, _, _ := prepare(t, prepConfig{ + ds, _, _, _ := prepare(t, prepConfig{ numIngesters: 3, happyIngesters: 3, numDistributors: 1, @@ -963,7 +975,7 @@ func TestDistributor_PushQuery(t *testing.T) { for _, tc := range testcases { t.Run(tc.name, func(t *testing.T) { - ds, ingesters, _ := prepare(t, prepConfig{ + ds, ingesters, _, _ := prepare(t, prepConfig{ numIngesters: tc.numIngesters, happyIngesters: tc.happyIngesters, numDistributors: 1, @@ -1014,7 +1026,7 @@ func TestDistributor_QueryStream_ShouldReturnErrorIfMaxChunksPerQueryLimitIsReac limits.MaxChunksPerQuery = maxChunksLimit // Prepare distributors. - ds, _, _ := prepare(t, prepConfig{ + ds, _, _, _ := prepare(t, prepConfig{ numIngesters: 3, happyIngesters: 3, numDistributors: 1, @@ -1070,7 +1082,7 @@ func TestDistributor_QueryStream_ShouldReturnErrorIfMaxSeriesPerQueryLimitIsReac ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(maxSeriesLimit, 0, 0)) // Prepare distributors. - ds, _, _ := prepare(t, prepConfig{ + ds, _, _, _ := prepare(t, prepConfig{ numIngesters: 3, happyIngesters: 3, numDistributors: 1, @@ -1123,7 +1135,7 @@ func TestDistributor_QueryStream_ShouldReturnErrorIfMaxChunkBytesPerQueryLimitIs // Prepare distributors. // Use replication factor of 2 to always read all the chunks from both ingesters, // this guarantees us to always read the same chunks and have a stable test. - ds, _, _ := prepare(t, prepConfig{ + ds, _, _, _ := prepare(t, prepConfig{ numIngesters: 2, happyIngesters: 2, numDistributors: 1, @@ -1245,7 +1257,7 @@ func TestDistributor_Push_LabelRemoval(t *testing.T) { limits.DropLabels = tc.removeLabels limits.AcceptHASamples = tc.removeReplica - ds, ingesters, _ := prepare(t, prepConfig{ + ds, ingesters, _, _ := prepare(t, prepConfig{ numIngesters: 2, happyIngesters: 2, numDistributors: 1, @@ -1296,7 +1308,7 @@ func TestDistributor_Push_LabelRemoval_RemovingNameLabelWillError(t *testing.T) limits.DropLabels = tc.removeLabels limits.AcceptHASamples = tc.removeReplica - ds, _, _ := prepare(t, prepConfig{ + ds, _, _, _ := prepare(t, prepConfig{ numIngesters: 2, happyIngesters: 2, numDistributors: 1, @@ -1390,7 +1402,7 @@ func TestDistributor_Push_ShouldGuaranteeShardingTokenConsistencyOverTheTime(t * for testName, testData := range tests { t.Run(testName, func(t *testing.T) { - ds, ingesters, _ := prepare(t, prepConfig{ + ds, ingesters, _, _ := prepare(t, prepConfig{ numIngesters: 2, happyIngesters: 2, numDistributors: 1, @@ -1450,7 +1462,7 @@ func TestDistributor_Push_LabelNameValidation(t *testing.T) { for testName, tc := range tests { t.Run(testName, func(t *testing.T) { - ds, _, _ := prepare(t, prepConfig{ + ds, _, _, _ := prepare(t, prepConfig{ numIngesters: 2, happyIngesters: 2, numDistributors: 1, @@ -1512,7 +1524,7 @@ func TestDistributor_Push_ExemplarValidation(t *testing.T) { for testName, tc := range tests { t.Run(testName, func(t *testing.T) { - ds, _, _ := prepare(t, prepConfig{ + ds, _, _, _ := prepare(t, prepConfig{ numIngesters: 2, happyIngesters: 2, numDistributors: 1, @@ -1813,7 +1825,7 @@ func TestSlowQueries(t *testing.T) { expectedErr = errFail } - ds, _, _ := prepare(t, prepConfig{ + ds, _, _, _ := prepare(t, prepConfig{ numIngesters: nIngesters, happyIngesters: happy, numDistributors: 1, @@ -1922,7 +1934,7 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) { now := model.Now() // Create distributor - ds, ingesters, _ := prepare(t, prepConfig{ + ds, ingesters, _, _ := prepare(t, prepConfig{ numIngesters: numIngesters, happyIngesters: numIngesters, numDistributors: 1, @@ -1980,7 +1992,7 @@ func TestDistributor_MetricsMetadata(t *testing.T) { for testName, testData := range tests { t.Run(testName, func(t *testing.T) { // Create distributor - ds, ingesters, _ := prepare(t, prepConfig{ + ds, ingesters, _, _ := prepare(t, prepConfig{ numIngesters: numIngesters, happyIngesters: numIngesters, numDistributors: 1, @@ -2048,7 +2060,7 @@ type prepConfig struct { errFail error } -func prepare(t *testing.T, cfg prepConfig) ([]*Distributor, []mockIngester, []*prometheus.Registry) { +func prepare(t *testing.T, cfg prepConfig) ([]*Distributor, []mockIngester, []*prometheus.Registry, *ring.Ring) { ingesters := []mockIngester{} for i := 0; i < cfg.happyIngesters; i++ { ingesters = append(ingesters, mockIngester{ @@ -2186,7 +2198,7 @@ func prepare(t *testing.T, cfg prepConfig) ([]*Distributor, []mockIngester, []*p t.Cleanup(func() { stopAll(distributors, ingestersRing) }) - return distributors, ingesters, registries + return distributors, ingesters, registries, ingestersRing } func stopAll(ds []*Distributor, r *ring.Ring) { @@ -2684,7 +2696,7 @@ func TestDistributorValidation(t *testing.T) { limits.RejectOldSamplesMaxAge = model.Duration(24 * time.Hour) limits.MaxLabelNamesPerSeries = 2 - ds, _, _ := prepare(t, prepConfig{ + ds, _, _, _ := prepare(t, prepConfig{ numIngesters: 3, happyIngesters: 3, numDistributors: 1, @@ -2865,7 +2877,7 @@ func TestDistributor_Push_Relabel(t *testing.T) { flagext.DefaultValues(&limits) limits.MetricRelabelConfigs = tc.metricRelabelConfigs - ds, ingesters, _ := prepare(t, prepConfig{ + ds, ingesters, _, _ := prepare(t, prepConfig{ numIngesters: 2, happyIngesters: 2, numDistributors: 1, @@ -2916,7 +2928,7 @@ func TestDistributor_Push_RelabelDropWillExportMetricOfDroppedSamples(t *testing flagext.DefaultValues(&limits) limits.MetricRelabelConfigs = metricRelabelConfigs - ds, ingesters, regs := prepare(t, prepConfig{ + ds, ingesters, regs, _ := prepare(t, prepConfig{ numIngesters: 2, happyIngesters: 2, numDistributors: 1, diff --git a/pkg/ring/batch.go b/pkg/ring/batch.go index efb894830de..4895044f754 100644 --- a/pkg/ring/batch.go +++ b/pkg/ring/batch.go @@ -144,14 +144,15 @@ func (b *batchTracker) record(sampleTrackers []*itemTracker, err error) { // We should return an error if we reach the maxFailure (quorum) on a give error family OR // we dont have any remaining ingesters to try // Ex: 2xx, 4xx, 5xx -> return 5xx - // Ex: 4xx, 4xx, 2xx -> return 4xx + // Ex: 4xx, 4xx, _ -> return 4xx + // Ex: 5xx, _, 5xx -> return 5xx if errCount > int32(sampleTrackers[i].maxFailures) || sampleTrackers[i].remaining.Dec() == 0 { if b.rpcsFailed.Inc() == 1 { b.err <- err } } } else { - // We should return success if we succeeded calling `minSuccess` ingesters + // We should return success if we succeeded calling `minSuccess` ingesters. if sampleTrackers[i].succeeded.Inc() >= int32(sampleTrackers[i].minSuccess) { if b.rpcsPending.Dec() == 0 { b.done <- struct{}{} @@ -159,8 +160,9 @@ func (b *batchTracker) record(sampleTrackers []*itemTracker, err error) { continue } - // If we suceeded to call this particular ingester but we dont have any remaining ingesters to try + // If we succeeded to call this particular ingester but we dont have any remaining ingesters to try // and we did not succeeded calling `minSuccess` ingesters we need to return the last error + // Ex: 4xx, 5xx, 2xx if sampleTrackers[i].remaining.Dec() == 0 { if b.rpcsFailed.Inc() == 1 { b.err <- sampleTrackers[i].err.Load() From 1937b48612cc8fb513044f0802649c241f2bec37 Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Mon, 24 Jan 2022 19:58:16 -0800 Subject: [PATCH 4/4] Comments --- pkg/distributor/distributor_test.go | 32 ++++++++++++++--------------- pkg/ring/batch.go | 22 ++++++++++---------- 2 files changed, 26 insertions(+), 28 deletions(-) diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 6675c6ab153..03abe9bfd4d 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -494,6 +494,7 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) { } func TestPush_QuorumError(t *testing.T) { + var limits validation.Limits flagext.DefaultValues(&limits) @@ -513,13 +514,16 @@ func TestPush_QuorumError(t *testing.T) { d := dists[0] + // we should run several write request to make sure we dont have any race condition on the batchTracker#record code + numberOfWrites := 10000 + // Using 429 just to make sure we are not hitting the &limits // Simulating 2 4xx and 1 5xx -> Should return 4xx ingesters[0].failResp.Store(httpgrpc.Errorf(429, "Throttling")) ingesters[1].failResp.Store(httpgrpc.Errorf(500, "InternalServerError")) ingesters[2].failResp.Store(httpgrpc.Errorf(429, "Throttling")) - for i := 0; i < 1000; i++ { + for i := 0; i < numberOfWrites; i++ { request := makeWriteRequest(0, 30, 20) _, err := d.Push(ctx, request) status, ok := status.FromError(err) @@ -532,7 +536,7 @@ func TestPush_QuorumError(t *testing.T) { ingesters[1].failResp.Store(httpgrpc.Errorf(429, "Throttling")) ingesters[2].failResp.Store(httpgrpc.Errorf(500, "InternalServerError")) - for i := 0; i < 10000; i++ { + for i := 0; i < numberOfWrites; i++ { request := makeWriteRequest(0, 300, 200) _, err := d.Push(ctx, request) status, ok := status.FromError(err) @@ -545,7 +549,7 @@ func TestPush_QuorumError(t *testing.T) { ingesters[1].failResp.Store(httpgrpc.Errorf(429, "Throttling")) ingesters[2].happy.Store(true) - for i := 0; i < 1; i++ { + for i := 0; i < numberOfWrites; i++ { request := makeWriteRequest(0, 30, 20) _, err := d.Push(ctx, request) status, ok := status.FromError(err) @@ -569,18 +573,6 @@ func TestPush_QuorumError(t *testing.T) { ingesters[1].happy.Store(true) ingesters[2].happy.Store(true) - // Wait group to check when the ring got updated - wg := &sync.WaitGroup{} - wg.Add(1) - - go func() { - r.KVClient.WatchKey(context.Background(), ingester.RingKey, func(i interface{}) bool { - wg.Done() - // False will terminate the watch - return false - }) - }() - err := r.KVClient.CAS(context.Background(), ingester.RingKey, func(in interface{}) (interface{}, bool, error) { r := in.(*ring.Desc) ingester2 := r.Ingesters["2"] @@ -593,9 +585,15 @@ func TestPush_QuorumError(t *testing.T) { require.NoError(t, err) // Give time to the ring get updated with the KV value - wg.Wait() + for { + replicationSet, _ := r.GetAllHealthy(ring.Read) + if len(replicationSet.Instances) == 2 { + break + } + time.Sleep(100 * time.Millisecond) + } - for i := 0; i < 1000; i++ { + for i := 0; i < numberOfWrites; i++ { request := makeWriteRequest(0, 30, 20) _, err := d.Push(ctx, request) require.Error(t, err) diff --git a/pkg/ring/batch.go b/pkg/ring/batch.go index 4895044f754..26d4cb203a4 100644 --- a/pkg/ring/batch.go +++ b/pkg/ring/batch.go @@ -128,20 +128,19 @@ func DoBatch(ctx context.Context, op Operation, r ReadRing, keys []uint32, callb } func (b *batchTracker) record(sampleTrackers []*itemTracker, err error) { - // If we succeed, decrement each sample's pending count by one. If we reach - // the required number of successful puts on this sample, then decrement the - // number of pending samples by one. If we successfully push all samples to - // min success instances, wake up the waiting rpc so it can return early. - // Similarly, track the number of errors, and if it exceeds maxFailures - // shortcut the waiting rpc. + // If we reach the required number of successful puts on this sample, then decrement the + // number of pending samples by one. // - // The use of atomic increments here guarantees only a single sendSamples - // goroutine will write to either channel. + // The use of atomic increments here is needed as: + // * rpcsPending and rpcsPending guarantees only a single sendSamples goroutine will write to either channel + // * succeeded, failed4xx, failed5xx and remaining guarantees that the "return decision" is made atomically + // avoiding race condition for i := range sampleTrackers { if err != nil { - // We count the error by error family (4xx and 5xx) + // Track the number of errors by error family, and if it exceeds maxFailures + // shortcut the waiting rpc. errCount := sampleTrackers[i].recordError(err) - // We should return an error if we reach the maxFailure (quorum) on a give error family OR + // We should return an error if we reach the maxFailure (quorum) on a given error family OR // we dont have any remaining ingesters to try // Ex: 2xx, 4xx, 5xx -> return 5xx // Ex: 4xx, 4xx, _ -> return 4xx @@ -152,7 +151,8 @@ func (b *batchTracker) record(sampleTrackers []*itemTracker, err error) { } } } else { - // We should return success if we succeeded calling `minSuccess` ingesters. + // If we successfully push all samples to min success instances, + // wake up the waiting rpc so it can return early. if sampleTrackers[i].succeeded.Inc() >= int32(sampleTrackers[i].minSuccess) { if b.rpcsPending.Dec() == 0 { b.done <- struct{}{}