From da687425a8938b30542b1bd4a764b68473b2019d Mon Sep 17 00:00:00 2001 From: Albert Date: Thu, 3 Jun 2021 18:42:09 -0700 Subject: [PATCH 01/11] add MaxRetries to WaitInstanceState Signed-off-by: Albert --- CHANGELOG.md | 1 + pkg/alertmanager/multitenant.go | 4 +- pkg/alertmanager/multitenant_test.go | 8 +- pkg/compactor/compactor.go | 3 +- pkg/ring/ring.go | 7 +- pkg/ring/util.go | 4 +- pkg/ring/util_test.go | 112 +++++++++++++++++++++ pkg/storegateway/gateway.go | 4 +- pkg/storegateway/gateway_test.go | 2 +- pkg/storegateway/sharding_strategy_test.go | 4 +- 10 files changed, 133 insertions(+), 16 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b4a0d990659..7313504fbe2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -38,6 +38,7 @@ * [BUGFIX] Ruler: fix `/ruler/rule_groups` endpoint doesn't work when used with object store. #4182 * [BUGFIX] Ruler: Honor the evaluation delay for the `ALERTS` and `ALERTS_FOR_STATE` series. #4227 * [BUGFIX] Fixed cache fetch error on Redis Cluster. #4056 +* [CHANGE] Change WaitInstanceState to use MaxRetries parameter. ## Blocksconvert diff --git a/pkg/alertmanager/multitenant.go b/pkg/alertmanager/multitenant.go index 809ad8810b5..84f00d83625 100644 --- a/pkg/alertmanager/multitenant.go +++ b/pkg/alertmanager/multitenant.go @@ -464,7 +464,7 @@ func (am *MultitenantAlertmanager) starting(ctx context.Context) (err error) { // We wait until the instance is in the JOINING state, once it does we know that tokens are assigned to this instance and we'll be ready to perform an initial sync of configs. level.Info(am.logger).Log("waiting until alertmanager is JOINING in the ring") - if err = ring.WaitInstanceState(ctx, am.ring, am.ringLifecycler.GetInstanceID(), ring.JOINING); err != nil { + if err = ring.WaitInstanceState(ctx, am.ring, am.ringLifecycler.GetInstanceID(), ring.JOINING, 0); err != nil { return err } level.Info(am.logger).Log("msg", "alertmanager is JOINING in the ring") @@ -493,7 +493,7 @@ func (am *MultitenantAlertmanager) starting(ctx context.Context) (err error) { // Wait until the ring client detected this instance in the ACTIVE state. level.Info(am.logger).Log("msg", "waiting until alertmanager is ACTIVE in the ring") - if err := ring.WaitInstanceState(ctx, am.ring, am.ringLifecycler.GetInstanceID(), ring.ACTIVE); err != nil { + if err := ring.WaitInstanceState(ctx, am.ring, am.ringLifecycler.GetInstanceID(), ring.ACTIVE, 0); err != nil { return err } level.Info(am.logger).Log("msg", "alertmanager is ACTIVE in the ring") diff --git a/pkg/alertmanager/multitenant_test.go b/pkg/alertmanager/multitenant_test.go index 4cb05b8a2c5..e3ad46a72cf 100644 --- a/pkg/alertmanager/multitenant_test.go +++ b/pkg/alertmanager/multitenant_test.go @@ -1163,7 +1163,7 @@ func TestMultitenantAlertmanager_PerTenantSharding(t *testing.T) { // The alertmanager is ready to be tested once all instances are ACTIVE and the ring settles. for _, am := range instances { for _, id := range instanceIDs { - require.NoError(t, ring.WaitInstanceState(ctx, am.ring, id, ring.ACTIVE)) + require.NoError(t, ring.WaitInstanceState(ctx, am.ring, id, ring.ACTIVE, 0)) } } } @@ -1449,7 +1449,7 @@ func TestAlertmanager_ReplicasPosition(t *testing.T) { for _, am := range instances { for _, id := range instanceIDs { - require.NoError(t, ring.WaitInstanceState(ctx, am.ring, id, ring.ACTIVE)) + require.NoError(t, ring.WaitInstanceState(ctx, am.ring, id, ring.ACTIVE, 0)) } } @@ -1568,7 +1568,7 @@ func TestAlertmanager_StateReplicationWithSharding(t *testing.T) { // The alertmanager is ready to be tested once all instances are ACTIVE and the ring settles. for _, am := range instances { for _, id := range instanceIDs { - require.NoError(t, ring.WaitInstanceState(ctx, am.ring, id, ring.ACTIVE)) + require.NoError(t, ring.WaitInstanceState(ctx, am.ring, id, ring.ACTIVE, 0)) } } } @@ -1757,7 +1757,7 @@ func TestAlertmanager_StateReplicationWithSharding_InitialSyncFromPeers(t *testi // The alertmanager is ready to be tested once all instances are ACTIVE and the ring settles. for _, am := range instances { for _, id := range instanceIDs { - require.NoError(t, ring.WaitInstanceState(ctx, am.ring, id, ring.ACTIVE)) + require.NoError(t, ring.WaitInstanceState(ctx, am.ring, id, ring.ACTIVE, 0)) } } } diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index 51952688cb4..f9cd7ec89d6 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -38,6 +38,7 @@ import ( const ( blocksMarkedForDeletionName = "cortex_compactor_blocks_marked_for_deletion_total" blocksMarkedForDeletionHelp = "Total number of blocks marked for deletion in compactor." + MAX_RETRIES = 600 ) var ( @@ -393,7 +394,7 @@ func (c *Compactor) starting(ctx context.Context) error { // users scanner depends on the ring (to check whether an user belongs // to this shard or not). level.Info(c.logger).Log("msg", "waiting until compactor is ACTIVE in the ring") - if err := ring.WaitInstanceState(ctx, c.ring, c.ringLifecycler.ID, ring.ACTIVE); err != nil { + if err := ring.WaitInstanceState(ctx, c.ring, c.ringLifecycler.ID, ring.ACTIVE, MAX_RETRIES); err != nil { return err } level.Info(c.logger).Log("msg", "compactor is ACTIVE in the ring") diff --git a/pkg/ring/ring.go b/pkg/ring/ring.go index 539ec65e75e..347b05312e3 100644 --- a/pkg/ring/ring.go +++ b/pkg/ring/ring.go @@ -71,6 +71,10 @@ type ReadRing interface { // and size (number of instances). ShuffleShard(identifier string, size int) ReadRing + // GetInstanceState returns the current state of an instance or an error if the + // instance does not exist in the ring. + GetInstanceState(instanceID string) (InstanceState, error) + // ShuffleShardWithLookback is like ShuffleShard() but the returned subring includes // all instances that have been part of the identifier's shard since "now - lookbackPeriod". ShuffleShardWithLookback(identifier string, size int, lookbackPeriod time.Duration, now time.Time) ReadRing @@ -763,8 +767,7 @@ func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Dur } } -// GetInstanceState returns the current state of an instance or an error if the -// instance does not exist in the ring. +// GetInstanceState implements ReadRing. func (r *Ring) GetInstanceState(instanceID string) (InstanceState, error) { r.mtx.RLock() defer r.mtx.RUnlock() diff --git a/pkg/ring/util.go b/pkg/ring/util.go index ac5c27388c9..7162e98d1f8 100644 --- a/pkg/ring/util.go +++ b/pkg/ring/util.go @@ -69,11 +69,11 @@ func GetInstancePort(configPort, listenPort int) int { // WaitInstanceState waits until the input instanceID is registered within the // ring matching the provided state. A timeout should be provided within the context. -func WaitInstanceState(ctx context.Context, r *Ring, instanceID string, state InstanceState) error { +func WaitInstanceState(ctx context.Context, r ReadRing, instanceID string, state InstanceState, maxRetries int) error { backoff := util.NewBackoff(ctx, util.BackoffConfig{ MinBackoff: 100 * time.Millisecond, MaxBackoff: time.Second, - MaxRetries: 0, + MaxRetries: maxRetries, }) for backoff.Ongoing() { diff --git a/pkg/ring/util_test.go b/pkg/ring/util_test.go index c3a3e037e9e..7bf97533019 100644 --- a/pkg/ring/util_test.go +++ b/pkg/ring/util_test.go @@ -6,10 +6,65 @@ import ( "testing" "time" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" ) +type RingMock struct { + mock.Mock +} + +func (r *RingMock) Collect(ch chan<- prometheus.Metric) {} + +func (r *RingMock) Describe(ch chan<- *prometheus.Desc) {} + +func (r *RingMock) Get(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts, bufZones []string) (ReplicationSet, error) { + args := r.Called(key, op, bufDescs, bufHosts, bufZones) + return args.Get(0).(ReplicationSet), args.Error(1) +} + +func (r *RingMock) GetAllHealthy(op Operation) (ReplicationSet, error) { + args := r.Called(op) + return args.Get(0).(ReplicationSet), args.Error(1) +} + +func (r *RingMock) GetReplicationSetForOperation(op Operation) (ReplicationSet, error) { + args := r.Called(op) + return args.Get(0).(ReplicationSet), args.Error(1) +} + +func (r *RingMock) ReplicationFactor() int { + return 0 +} + +func (r *RingMock) InstancesCount() int { + return 0 +} + +func (r *RingMock) ShuffleShard(identifier string, size int) ReadRing { + args := r.Called(identifier, size) + return args.Get(0).(ReadRing) +} + +func (r *RingMock) GetInstanceState(instanceID string) (InstanceState, error) { + args := r.Called(instanceID) + return args.Get(0).(InstanceState), args.Error(1) +} + +func (r *RingMock) ShuffleShardWithLookback(identifier string, size int, lookbackPeriod time.Duration, now time.Time) ReadRing { + args := r.Called(identifier, size, lookbackPeriod, now) + return args.Get(0).(ReadRing) +} + +func (r *RingMock) HasInstance(instanceID string) bool { + return true +} + +func (r *RingMock) CleanupShuffleShardCache(identifier string) {} + func TestGenerateTokens(t *testing.T) { tokens := GenerateTokens(1000000, nil) @@ -184,3 +239,60 @@ func TestWaitRingStabilityShouldReturnErrorIfMaxWaitingIsReached(t *testing.T) { assert.InDelta(t, maxWaiting, elapsedTime, float64(2*time.Second)) } + +func TestWaitInstanceStateExitsAfterMaxRetries(t *testing.T) { + t.Parallel() + + const ( + instanceId = "test" + ) + + const ( + maxRetries = 5 + ) + + ring := &RingMock{} + ring.On("GetInstanceState", mock.Anything, mock.Anything).Return(ACTIVE, nil) + + WaitInstanceState(context.Background(), ring, instanceId, PENDING, maxRetries) + + ring.AssertNumberOfCalls(t, "GetInstanceState", maxRetries) +} + +func TestWaitInstanceStateDoesMaxRetriesOnError(t *testing.T) { + t.Parallel() + + const ( + instanceId = "test" + ) + + const ( + maxRetries = 5 + ) + + ring := &RingMock{} + ring.On("GetInstanceState", mock.Anything, mock.Anything).Return(PENDING, errors.New("instance not found in the ring")) + + WaitInstanceState(context.Background(), ring, instanceId, ACTIVE, maxRetries) + + ring.AssertNumberOfCalls(t, "GetInstanceState", maxRetries) +} + +func TestWaitInstanceStateExitsAfterActualStateEqualsState(t *testing.T) { + t.Parallel() + + const ( + instanceId = "test" + ) + + const ( + maxRetries = 5 + ) + + ring := &RingMock{} + ring.On("GetInstanceState", mock.Anything, mock.Anything).Return(ACTIVE, nil) + + WaitInstanceState(context.Background(), ring, instanceId, ACTIVE, maxRetries) + + ring.AssertNumberOfCalls(t, "GetInstanceState", 1) +} diff --git a/pkg/storegateway/gateway.go b/pkg/storegateway/gateway.go index 00a7a093fea..00be1f48042 100644 --- a/pkg/storegateway/gateway.go +++ b/pkg/storegateway/gateway.go @@ -227,7 +227,7 @@ func (g *StoreGateway) starting(ctx context.Context) (err error) { // make sure that when we'll run the initial sync we already know the tokens // assigned to this instance. level.Info(g.logger).Log("msg", "waiting until store-gateway is JOINING in the ring") - if err := ring.WaitInstanceState(ctx, g.ring, g.ringLifecycler.GetInstanceID(), ring.JOINING); err != nil { + if err := ring.WaitInstanceState(ctx, g.ring, g.ringLifecycler.GetInstanceID(), ring.JOINING, 0); err != nil { return err } level.Info(g.logger).Log("msg", "store-gateway is JOINING in the ring") @@ -252,7 +252,7 @@ func (g *StoreGateway) starting(ctx context.Context) (err error) { // make sure that when we'll run the loop it won't be detected as a ring // topology change. level.Info(g.logger).Log("msg", "waiting until store-gateway is ACTIVE in the ring") - if err := ring.WaitInstanceState(ctx, g.ring, g.ringLifecycler.GetInstanceID(), ring.ACTIVE); err != nil { + if err := ring.WaitInstanceState(ctx, g.ring, g.ringLifecycler.GetInstanceID(), ring.ACTIVE, 0); err != nil { return err } level.Info(g.logger).Log("msg", "store-gateway is ACTIVE in the ring") diff --git a/pkg/storegateway/gateway_test.go b/pkg/storegateway/gateway_test.go index a3ba3aa8145..b7a70674c1b 100644 --- a/pkg/storegateway/gateway_test.go +++ b/pkg/storegateway/gateway_test.go @@ -354,7 +354,7 @@ func TestStoreGateway_BlocksSharding(t *testing.T) { // A gateway is ready for the test once it sees all instances ACTIVE in the ring. for _, g := range gateways { for _, instanceID := range gatewayIds { - require.NoError(t, ring.WaitInstanceState(ctx, g.ring, instanceID, ring.ACTIVE)) + require.NoError(t, ring.WaitInstanceState(ctx, g.ring, instanceID, ring.ACTIVE, 0)) } } } diff --git a/pkg/storegateway/sharding_strategy_test.go b/pkg/storegateway/sharding_strategy_test.go index b504769ad93..4240013e6ee 100644 --- a/pkg/storegateway/sharding_strategy_test.go +++ b/pkg/storegateway/sharding_strategy_test.go @@ -263,7 +263,7 @@ func TestDefaultShardingStrategy(t *testing.T) { defer services.StopAndAwaitTerminated(ctx, r) //nolint:errcheck // Wait until the ring client has synced. - require.NoError(t, ring.WaitInstanceState(ctx, r, "instance-1", ring.ACTIVE)) + require.NoError(t, ring.WaitInstanceState(ctx, r, "instance-1", ring.ACTIVE, 0)) for instanceAddr, expectedBlocks := range testData.expectedBlocks { filter := NewDefaultShardingStrategy(r, instanceAddr, log.NewNopLogger()) @@ -620,7 +620,7 @@ func TestShuffleShardingStrategy(t *testing.T) { defer services.StopAndAwaitTerminated(ctx, r) //nolint:errcheck // Wait until the ring client has synced. - require.NoError(t, ring.WaitInstanceState(ctx, r, "instance-1", ring.ACTIVE)) + require.NoError(t, ring.WaitInstanceState(ctx, r, "instance-1", ring.ACTIVE, 0)) // Assert on filter users. for _, expected := range testData.expectedUsers { From f78aaea0886b8c7a2cb67e9ae4329aaa92ff891d Mon Sep 17 00:00:00 2001 From: Albert Date: Thu, 3 Jun 2021 18:44:24 -0700 Subject: [PATCH 02/11] update CHANGELOG.md Signed-off-by: Albert --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7313504fbe2..e20e59425e3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -38,7 +38,7 @@ * [BUGFIX] Ruler: fix `/ruler/rule_groups` endpoint doesn't work when used with object store. #4182 * [BUGFIX] Ruler: Honor the evaluation delay for the `ALERTS` and `ALERTS_FOR_STATE` series. #4227 * [BUGFIX] Fixed cache fetch error on Redis Cluster. #4056 -* [CHANGE] Change WaitInstanceState to use MaxRetries parameter. +* [CHANGE] Change WaitInstanceState to use MaxRetries parameter. #4262 ## Blocksconvert From 70531dfe1d70c7c985f17307ba4ed3c8d1bb079c Mon Sep 17 00:00:00 2001 From: Albert Date: Fri, 4 Jun 2021 09:26:29 -0700 Subject: [PATCH 03/11] Add timeout for waiting on compactor to become ACTIVE in the ring. Signed-off-by: Albert --- CHANGELOG.md | 2 +- pkg/alertmanager/multitenant.go | 4 +-- pkg/alertmanager/multitenant_test.go | 8 ++--- pkg/compactor/compactor.go | 11 +++--- pkg/ring/util.go | 3 +- pkg/ring/util_test.go | 41 ++++++++++++---------- pkg/storegateway/gateway.go | 4 +-- pkg/storegateway/gateway_test.go | 2 +- pkg/storegateway/sharding_strategy_test.go | 4 +-- 9 files changed, 42 insertions(+), 37 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e20e59425e3..26abfab971f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -38,7 +38,7 @@ * [BUGFIX] Ruler: fix `/ruler/rule_groups` endpoint doesn't work when used with object store. #4182 * [BUGFIX] Ruler: Honor the evaluation delay for the `ALERTS` and `ALERTS_FOR_STATE` series. #4227 * [BUGFIX] Fixed cache fetch error on Redis Cluster. #4056 -* [CHANGE] Change WaitInstanceState to use MaxRetries parameter. #4262 +* [CHANGE] Add timeout for waiting on compactor to become ACTIVE in the ring. #4262 ## Blocksconvert diff --git a/pkg/alertmanager/multitenant.go b/pkg/alertmanager/multitenant.go index 84f00d83625..809ad8810b5 100644 --- a/pkg/alertmanager/multitenant.go +++ b/pkg/alertmanager/multitenant.go @@ -464,7 +464,7 @@ func (am *MultitenantAlertmanager) starting(ctx context.Context) (err error) { // We wait until the instance is in the JOINING state, once it does we know that tokens are assigned to this instance and we'll be ready to perform an initial sync of configs. level.Info(am.logger).Log("waiting until alertmanager is JOINING in the ring") - if err = ring.WaitInstanceState(ctx, am.ring, am.ringLifecycler.GetInstanceID(), ring.JOINING, 0); err != nil { + if err = ring.WaitInstanceState(ctx, am.ring, am.ringLifecycler.GetInstanceID(), ring.JOINING); err != nil { return err } level.Info(am.logger).Log("msg", "alertmanager is JOINING in the ring") @@ -493,7 +493,7 @@ func (am *MultitenantAlertmanager) starting(ctx context.Context) (err error) { // Wait until the ring client detected this instance in the ACTIVE state. level.Info(am.logger).Log("msg", "waiting until alertmanager is ACTIVE in the ring") - if err := ring.WaitInstanceState(ctx, am.ring, am.ringLifecycler.GetInstanceID(), ring.ACTIVE, 0); err != nil { + if err := ring.WaitInstanceState(ctx, am.ring, am.ringLifecycler.GetInstanceID(), ring.ACTIVE); err != nil { return err } level.Info(am.logger).Log("msg", "alertmanager is ACTIVE in the ring") diff --git a/pkg/alertmanager/multitenant_test.go b/pkg/alertmanager/multitenant_test.go index e3ad46a72cf..4cb05b8a2c5 100644 --- a/pkg/alertmanager/multitenant_test.go +++ b/pkg/alertmanager/multitenant_test.go @@ -1163,7 +1163,7 @@ func TestMultitenantAlertmanager_PerTenantSharding(t *testing.T) { // The alertmanager is ready to be tested once all instances are ACTIVE and the ring settles. for _, am := range instances { for _, id := range instanceIDs { - require.NoError(t, ring.WaitInstanceState(ctx, am.ring, id, ring.ACTIVE, 0)) + require.NoError(t, ring.WaitInstanceState(ctx, am.ring, id, ring.ACTIVE)) } } } @@ -1449,7 +1449,7 @@ func TestAlertmanager_ReplicasPosition(t *testing.T) { for _, am := range instances { for _, id := range instanceIDs { - require.NoError(t, ring.WaitInstanceState(ctx, am.ring, id, ring.ACTIVE, 0)) + require.NoError(t, ring.WaitInstanceState(ctx, am.ring, id, ring.ACTIVE)) } } @@ -1568,7 +1568,7 @@ func TestAlertmanager_StateReplicationWithSharding(t *testing.T) { // The alertmanager is ready to be tested once all instances are ACTIVE and the ring settles. for _, am := range instances { for _, id := range instanceIDs { - require.NoError(t, ring.WaitInstanceState(ctx, am.ring, id, ring.ACTIVE, 0)) + require.NoError(t, ring.WaitInstanceState(ctx, am.ring, id, ring.ACTIVE)) } } } @@ -1757,7 +1757,7 @@ func TestAlertmanager_StateReplicationWithSharding_InitialSyncFromPeers(t *testi // The alertmanager is ready to be tested once all instances are ACTIVE and the ring settles. for _, am := range instances { for _, id := range instanceIDs { - require.NoError(t, ring.WaitInstanceState(ctx, am.ring, id, ring.ACTIVE, 0)) + require.NoError(t, ring.WaitInstanceState(ctx, am.ring, id, ring.ACTIVE)) } } } diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index f9cd7ec89d6..3d12cfbef20 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -38,12 +38,12 @@ import ( const ( blocksMarkedForDeletionName = "cortex_compactor_blocks_marked_for_deletion_total" blocksMarkedForDeletionHelp = "Total number of blocks marked for deletion in compactor." - MAX_RETRIES = 600 ) var ( - errInvalidBlockRanges = "compactor block range periods should be divisible by the previous one, but %s is not divisible by %s" - RingOp = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, nil) + errInvalidBlockRanges = "compactor block range periods should be divisible by the previous one, but %s is not divisible by %s" + RingOp = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, nil) + WaitInstanceStateTimeoutDuration = time.Duration(600) DefaultBlocksGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion prometheus.Counter, garbageCollectedBlocks prometheus.Counter) compact.Grouper { return compact.NewDefaultGrouper( @@ -394,7 +394,10 @@ func (c *Compactor) starting(ctx context.Context) error { // users scanner depends on the ring (to check whether an user belongs // to this shard or not). level.Info(c.logger).Log("msg", "waiting until compactor is ACTIVE in the ring") - if err := ring.WaitInstanceState(ctx, c.ring, c.ringLifecycler.ID, ring.ACTIVE, MAX_RETRIES); err != nil { + + ctx, cancel := context.WithTimeout(ctx, WaitInstanceStateTimeoutDuration*time.Second) + defer cancel() + if err := ring.WaitInstanceState(ctx, c.ring, c.ringLifecycler.ID, ring.ACTIVE); err != nil { return err } level.Info(c.logger).Log("msg", "compactor is ACTIVE in the ring") diff --git a/pkg/ring/util.go b/pkg/ring/util.go index 7162e98d1f8..b3fa9d19bc3 100644 --- a/pkg/ring/util.go +++ b/pkg/ring/util.go @@ -69,11 +69,10 @@ func GetInstancePort(configPort, listenPort int) int { // WaitInstanceState waits until the input instanceID is registered within the // ring matching the provided state. A timeout should be provided within the context. -func WaitInstanceState(ctx context.Context, r ReadRing, instanceID string, state InstanceState, maxRetries int) error { +func WaitInstanceState(ctx context.Context, r ReadRing, instanceID string, state InstanceState) error { backoff := util.NewBackoff(ctx, util.BackoffConfig{ MinBackoff: 100 * time.Millisecond, MaxBackoff: time.Second, - MaxRetries: maxRetries, }) for backoff.Ongoing() { diff --git a/pkg/ring/util_test.go b/pkg/ring/util_test.go index 7bf97533019..e857d040cde 100644 --- a/pkg/ring/util_test.go +++ b/pkg/ring/util_test.go @@ -240,59 +240,62 @@ func TestWaitRingStabilityShouldReturnErrorIfMaxWaitingIsReached(t *testing.T) { assert.InDelta(t, maxWaiting, elapsedTime, float64(2*time.Second)) } -func TestWaitInstanceStateExitsAfterMaxRetries(t *testing.T) { +func TestWaitInstanceStateTimeout(t *testing.T) { t.Parallel() const ( - instanceId = "test" + instanceId = "test" + timeoutDuration = time.Duration(3) ) - const ( - maxRetries = 5 - ) + ctx, cancel := context.WithTimeout(context.Background(), timeoutDuration*time.Second) + defer cancel() ring := &RingMock{} ring.On("GetInstanceState", mock.Anything, mock.Anything).Return(ACTIVE, nil) - WaitInstanceState(context.Background(), ring, instanceId, PENDING, maxRetries) + err := WaitInstanceState(ctx, ring, instanceId, PENDING) - ring.AssertNumberOfCalls(t, "GetInstanceState", maxRetries) + assert.Equal(t, context.DeadlineExceeded, err) + ring.AssertCalled(t, "GetInstanceState", instanceId) } -func TestWaitInstanceStateDoesMaxRetriesOnError(t *testing.T) { +func TestWaitInstanceStateTimeoutOnError(t *testing.T) { t.Parallel() const ( - instanceId = "test" + instanceId = "test" + timeoutDuration = time.Duration(3) ) - const ( - maxRetries = 5 - ) + ctx, cancel := context.WithTimeout(context.Background(), timeoutDuration*time.Second) + defer cancel() ring := &RingMock{} ring.On("GetInstanceState", mock.Anything, mock.Anything).Return(PENDING, errors.New("instance not found in the ring")) - WaitInstanceState(context.Background(), ring, instanceId, ACTIVE, maxRetries) + err := WaitInstanceState(ctx, ring, instanceId, ACTIVE) - ring.AssertNumberOfCalls(t, "GetInstanceState", maxRetries) + assert.Equal(t, context.DeadlineExceeded, err) + ring.AssertCalled(t, "GetInstanceState", instanceId) } func TestWaitInstanceStateExitsAfterActualStateEqualsState(t *testing.T) { t.Parallel() const ( - instanceId = "test" + instanceId = "test" + timeoutDuration = time.Duration(3) ) - const ( - maxRetries = 5 - ) + ctx, cancel := context.WithTimeout(context.Background(), timeoutDuration*time.Second) + defer cancel() ring := &RingMock{} ring.On("GetInstanceState", mock.Anything, mock.Anything).Return(ACTIVE, nil) - WaitInstanceState(context.Background(), ring, instanceId, ACTIVE, maxRetries) + err := WaitInstanceState(ctx, ring, instanceId, ACTIVE) + assert.Nil(t, err) ring.AssertNumberOfCalls(t, "GetInstanceState", 1) } diff --git a/pkg/storegateway/gateway.go b/pkg/storegateway/gateway.go index 00be1f48042..00a7a093fea 100644 --- a/pkg/storegateway/gateway.go +++ b/pkg/storegateway/gateway.go @@ -227,7 +227,7 @@ func (g *StoreGateway) starting(ctx context.Context) (err error) { // make sure that when we'll run the initial sync we already know the tokens // assigned to this instance. level.Info(g.logger).Log("msg", "waiting until store-gateway is JOINING in the ring") - if err := ring.WaitInstanceState(ctx, g.ring, g.ringLifecycler.GetInstanceID(), ring.JOINING, 0); err != nil { + if err := ring.WaitInstanceState(ctx, g.ring, g.ringLifecycler.GetInstanceID(), ring.JOINING); err != nil { return err } level.Info(g.logger).Log("msg", "store-gateway is JOINING in the ring") @@ -252,7 +252,7 @@ func (g *StoreGateway) starting(ctx context.Context) (err error) { // make sure that when we'll run the loop it won't be detected as a ring // topology change. level.Info(g.logger).Log("msg", "waiting until store-gateway is ACTIVE in the ring") - if err := ring.WaitInstanceState(ctx, g.ring, g.ringLifecycler.GetInstanceID(), ring.ACTIVE, 0); err != nil { + if err := ring.WaitInstanceState(ctx, g.ring, g.ringLifecycler.GetInstanceID(), ring.ACTIVE); err != nil { return err } level.Info(g.logger).Log("msg", "store-gateway is ACTIVE in the ring") diff --git a/pkg/storegateway/gateway_test.go b/pkg/storegateway/gateway_test.go index b7a70674c1b..a3ba3aa8145 100644 --- a/pkg/storegateway/gateway_test.go +++ b/pkg/storegateway/gateway_test.go @@ -354,7 +354,7 @@ func TestStoreGateway_BlocksSharding(t *testing.T) { // A gateway is ready for the test once it sees all instances ACTIVE in the ring. for _, g := range gateways { for _, instanceID := range gatewayIds { - require.NoError(t, ring.WaitInstanceState(ctx, g.ring, instanceID, ring.ACTIVE, 0)) + require.NoError(t, ring.WaitInstanceState(ctx, g.ring, instanceID, ring.ACTIVE)) } } } diff --git a/pkg/storegateway/sharding_strategy_test.go b/pkg/storegateway/sharding_strategy_test.go index 4240013e6ee..b504769ad93 100644 --- a/pkg/storegateway/sharding_strategy_test.go +++ b/pkg/storegateway/sharding_strategy_test.go @@ -263,7 +263,7 @@ func TestDefaultShardingStrategy(t *testing.T) { defer services.StopAndAwaitTerminated(ctx, r) //nolint:errcheck // Wait until the ring client has synced. - require.NoError(t, ring.WaitInstanceState(ctx, r, "instance-1", ring.ACTIVE, 0)) + require.NoError(t, ring.WaitInstanceState(ctx, r, "instance-1", ring.ACTIVE)) for instanceAddr, expectedBlocks := range testData.expectedBlocks { filter := NewDefaultShardingStrategy(r, instanceAddr, log.NewNopLogger()) @@ -620,7 +620,7 @@ func TestShuffleShardingStrategy(t *testing.T) { defer services.StopAndAwaitTerminated(ctx, r) //nolint:errcheck // Wait until the ring client has synced. - require.NoError(t, ring.WaitInstanceState(ctx, r, "instance-1", ring.ACTIVE, 0)) + require.NoError(t, ring.WaitInstanceState(ctx, r, "instance-1", ring.ACTIVE)) // Assert on filter users. for _, expected := range testData.expectedUsers { From 46dc87b16e02d84b9b9f1f84a078b7c56afead9b Mon Sep 17 00:00:00 2001 From: Albert Date: Fri, 4 Jun 2021 09:30:31 -0700 Subject: [PATCH 04/11] add MaxRetries variable back to WaitInstanceState Signed-off-by: Albert --- pkg/ring/util.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/ring/util.go b/pkg/ring/util.go index b3fa9d19bc3..06e053dd269 100644 --- a/pkg/ring/util.go +++ b/pkg/ring/util.go @@ -73,6 +73,7 @@ func WaitInstanceState(ctx context.Context, r ReadRing, instanceID string, state backoff := util.NewBackoff(ctx, util.BackoffConfig{ MinBackoff: 100 * time.Millisecond, MaxBackoff: time.Second, + MaxRetries: 0, }) for backoff.Ongoing() { From 768671bb3194694bde6b15c347518b4bd1e1d4e4 Mon Sep 17 00:00:00 2001 From: Albert Date: Wed, 16 Jun 2021 01:23:53 -0700 Subject: [PATCH 05/11] Fix linting issues Signed-off-by: Albert --- pkg/ring/util_test.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/pkg/ring/util_test.go b/pkg/ring/util_test.go index e857d040cde..e451a566450 100644 --- a/pkg/ring/util_test.go +++ b/pkg/ring/util_test.go @@ -244,7 +244,7 @@ func TestWaitInstanceStateTimeout(t *testing.T) { t.Parallel() const ( - instanceId = "test" + instanceID = "test" timeoutDuration = time.Duration(3) ) @@ -254,17 +254,17 @@ func TestWaitInstanceStateTimeout(t *testing.T) { ring := &RingMock{} ring.On("GetInstanceState", mock.Anything, mock.Anything).Return(ACTIVE, nil) - err := WaitInstanceState(ctx, ring, instanceId, PENDING) + err := WaitInstanceState(ctx, ring, instanceID, PENDING) assert.Equal(t, context.DeadlineExceeded, err) - ring.AssertCalled(t, "GetInstanceState", instanceId) + ring.AssertCalled(t, "GetInstanceState", instanceID) } func TestWaitInstanceStateTimeoutOnError(t *testing.T) { t.Parallel() const ( - instanceId = "test" + instanceID = "test" timeoutDuration = time.Duration(3) ) @@ -274,17 +274,17 @@ func TestWaitInstanceStateTimeoutOnError(t *testing.T) { ring := &RingMock{} ring.On("GetInstanceState", mock.Anything, mock.Anything).Return(PENDING, errors.New("instance not found in the ring")) - err := WaitInstanceState(ctx, ring, instanceId, ACTIVE) + err := WaitInstanceState(ctx, ring, instanceID, ACTIVE) assert.Equal(t, context.DeadlineExceeded, err) - ring.AssertCalled(t, "GetInstanceState", instanceId) + ring.AssertCalled(t, "GetInstanceState", instanceID) } func TestWaitInstanceStateExitsAfterActualStateEqualsState(t *testing.T) { t.Parallel() const ( - instanceId = "test" + instanceID = "test" timeoutDuration = time.Duration(3) ) @@ -294,7 +294,7 @@ func TestWaitInstanceStateExitsAfterActualStateEqualsState(t *testing.T) { ring := &RingMock{} ring.On("GetInstanceState", mock.Anything, mock.Anything).Return(ACTIVE, nil) - err := WaitInstanceState(ctx, ring, instanceId, ACTIVE) + err := WaitInstanceState(ctx, ring, instanceID, ACTIVE) assert.Nil(t, err) ring.AssertNumberOfCalls(t, "GetInstanceState", 1) From 5b894ff3e89a24a9bd416bf09fb62be34ccf1149 Mon Sep 17 00:00:00 2001 From: Albert Date: Wed, 16 Jun 2021 14:00:17 -0700 Subject: [PATCH 06/11] Remove duplicate entry from changelog Signed-off-by: Albert --- CHANGELOG.md | 1 - 1 file changed, 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c93b7168e2e..ac7592d9e4f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -49,7 +49,6 @@ * [BUGFIX] Fixed cache fetch error on Redis Cluster. #4056 * [BUGFIX] Ingester: fix issue where runtime limits erroneously override default limits. #4246 * [BUGFIX] Ruler: fix startup in single-binary mode when the new `ruler_storage` is used. #4252 -* [CHANGE] Add timeout for waiting on compactor to become ACTIVE in the ring. #4262 * [BUGFIX] Querier: fix queries failing with "at least 1 healthy replica required, could only find 0" error right after scaling up store-gateways until they're ACTIVE in the ring. #4263 * [BUGFIX] Store-gateway: when blocks sharding is enabled, do not load all blocks in each store-gateway in case of a cold startup, but load only blocks owned by the store-gateway replica. #4271 * [BUGFIX] Memberlist: fix to setting the default configuration value for `-memberlist.retransmit-factor` when not provided. This should improve propagation delay of the ring state (including, but not limited to, tombstones). Note that if the configuration is already explicitly given, this fix has no effect. #4269 From 1fc181f16f71c034d66168606f5d146255c3ccbb Mon Sep 17 00:00:00 2001 From: Albert Date: Tue, 22 Jun 2021 12:06:04 -0700 Subject: [PATCH 07/11] Address PR comments and set timeout to be configurable Signed-off-by: Albert --- docs/blocks-storage/compactor.md | 4 ++++ pkg/compactor/compactor.go | 8 +++---- pkg/compactor/compactor_ring.go | 5 +++++ pkg/compactor/compactor_test.go | 29 +++++++++++++++++++++++++ pkg/ring/kv/consul/mock.go | 36 ++++++++++++++++++++++++++++++++ pkg/ring/ring.go | 3 ++- 6 files changed, 80 insertions(+), 5 deletions(-) diff --git a/docs/blocks-storage/compactor.md b/docs/blocks-storage/compactor.md index 213405822c8..7a936a31669 100644 --- a/docs/blocks-storage/compactor.md +++ b/docs/blocks-storage/compactor.md @@ -230,4 +230,8 @@ compactor: # Name of network interface to read address from. # CLI flag: -compactor.ring.instance-interface-names [instance_interface_names: | default = [eth0 en0]] + + # Timeout for compactor to become ACTIVE in the ring + # CLI flag: -compactor.ring.starting-timeout + [starting_timeout: | default = 10m] ``` diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index b7f8ab1b59b..9ae12be1876 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -41,9 +41,8 @@ const ( ) var ( - errInvalidBlockRanges = "compactor block range periods should be divisible by the previous one, but %s is not divisible by %s" - RingOp = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, nil) - WaitInstanceStateTimeoutDuration = time.Duration(600) + errInvalidBlockRanges = "compactor block range periods should be divisible by the previous one, but %s is not divisible by %s" + RingOp = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, nil) DefaultBlocksGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion prometheus.Counter, garbageCollectedBlocks prometheus.Counter) compact.Grouper { return compact.NewDefaultGrouper( @@ -395,9 +394,10 @@ func (c *Compactor) starting(ctx context.Context) error { // to this shard or not). level.Info(c.logger).Log("msg", "waiting until compactor is ACTIVE in the ring") - ctx, cancel := context.WithTimeout(ctx, WaitInstanceStateTimeoutDuration*time.Second) + ctx, cancel := context.WithTimeout(ctx, c.compactorCfg.ShardingRing.StartingTimeout) defer cancel() if err := ring.WaitInstanceState(ctx, c.ring, c.ringLifecycler.ID, ring.ACTIVE); err != nil { + level.Error(c.logger).Log("msg", "compactor failed to become ACTIVE in the ring", "err", err) return err } level.Info(c.logger).Log("msg", "compactor is ACTIVE in the ring") diff --git a/pkg/compactor/compactor_ring.go b/pkg/compactor/compactor_ring.go index 74c3a77ad3b..b29750337da 100644 --- a/pkg/compactor/compactor_ring.go +++ b/pkg/compactor/compactor_ring.go @@ -34,6 +34,8 @@ type RingConfig struct { // Injected internally ListenPort int `yaml:"-"` + + StartingTimeout time.Duration `yaml:"starting_timeout"` } // RegisterFlags adds the flags required to config this to the given FlagSet @@ -59,6 +61,9 @@ func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet) { f.StringVar(&cfg.InstanceAddr, "compactor.ring.instance-addr", "", "IP address to advertise in the ring.") f.IntVar(&cfg.InstancePort, "compactor.ring.instance-port", 0, "Port to advertise in the ring (defaults to server.grpc-listen-port).") f.StringVar(&cfg.InstanceID, "compactor.ring.instance-id", hostname, "Instance ID to register in the ring.") + + // Timeout durations + f.DurationVar(&cfg.StartingTimeout, "compactor.ring.starting-timeout", 10*time.Minute, "Timeout for waiting on compactor to become ACTIVE in the ring") } // ToLifecyclerConfig returns a LifecyclerConfig based on the compactor diff --git a/pkg/compactor/compactor_test.go b/pkg/compactor/compactor_test.go index 9c14cb09247..c6bca4a8ef3 100644 --- a/pkg/compactor/compactor_test.go +++ b/pkg/compactor/compactor_test.go @@ -1092,6 +1092,9 @@ func prepareConfig() Config { compactorCfg.ShardingRing.WaitStabilityMinDuration = 0 compactorCfg.ShardingRing.WaitStabilityMaxDuration = 0 + // Set lower timeout for waiting on compactor to become ACTIVE in the ring for unit tests + compactorCfg.ShardingRing.StartingTimeout = 5 * time.Second + return compactorCfg } @@ -1279,3 +1282,29 @@ func TestCompactor_DeleteLocalSyncFiles(t *testing.T) { require.NotEqual(t, numUsers, c1Users) require.Equal(t, numUsers, c1Users+c2Users) } + +func TestCompactor_ShouldFailCompactionOnTimeout(t *testing.T) { + t.Parallel() + + // Mock the bucket + bucketClient := &bucket.ClientMock{} + + cfg := prepareConfig() + cfg.ShardingEnabled = true + cfg.ShardingRing.InstanceID = "compactor-1" + cfg.ShardingRing.InstanceAddr = "1.2.3.4" + cfg.ShardingRing.KVStore.Mock = consul.NewBadInMemoryClient(ring.GetCodec()) + + c, _, _, logs, _ := prepare(t, cfg, bucketClient) + + // Try to start the compactor with a bad consul kv-store. The + err := services.StartAndAwaitRunning(context.Background(), c) + + // Assert that the compactor timesout + assert.Equal(t, context.DeadlineExceeded, err) + + assert.ElementsMatch(t, []string{ + `level=info component=compactor msg="waiting until compactor is ACTIVE in the ring"`, + `level=error component=compactor msg="compactor failed to become ACTIVE in the ring" err="context deadline exceeded"`, + }, removeIgnoredLogs(strings.Split(strings.TrimSpace(logs.String()), "\n"))) +} diff --git a/pkg/ring/kv/consul/mock.go b/pkg/ring/kv/consul/mock.go index 5d1e4557395..6c9ec788490 100644 --- a/pkg/ring/kv/consul/mock.go +++ b/pkg/ring/kv/consul/mock.go @@ -1,6 +1,7 @@ package consul import ( + "errors" "fmt" "strings" "sync" @@ -228,3 +229,38 @@ func mockedMaxWaitTime(queryWaitTime time.Duration) time.Duration { return queryWaitTime } + +type mockBadKV struct { + mockKV +} + +// NewBadInMemoryClient makes a new mock consul client. +func NewBadInMemoryClient(codec codec.Codec) *Client { + test := mockBadKV{} + test.kvps = nil + return NewBadInMemoryClientWithConfig(codec, Config{}) +} + +// NewBadInMemoryClientWithConfig makes a new mock consul client with supplied Config +// and a failing CAS method for tests. +func NewBadInMemoryClientWithConfig(codec codec.Codec, cfg Config) *Client { + m := mockBadKV{ + mockKV: mockKV{ + kvps: map[string]*consul.KVPair{}, + // Always start from 1, we NEVER want to report back index 0 in the responses. + // This is in line with Consul, and our new checks for index return value in client.go. + current: 1, + }, + } + m.cond = sync.NewCond(&m.mtx) + go m.loop() + return &Client{ + kv: &m, + codec: codec, + cfg: cfg, + } +} + +func (m *mockBadKV) CAS(p *consul.KVPair, q *consul.WriteOptions) (bool, *consul.WriteMeta, error) { + return false, nil, errors.New("CAS Error") +} diff --git a/pkg/ring/ring.go b/pkg/ring/ring.go index 347b05312e3..8d5bd0c925a 100644 --- a/pkg/ring/ring.go +++ b/pkg/ring/ring.go @@ -767,7 +767,8 @@ func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Dur } } -// GetInstanceState implements ReadRing. +// GetInstanceState returns the current state of an instance or an error if the +// instance does not exist in the ring. func (r *Ring) GetInstanceState(instanceID string) (InstanceState, error) { r.mtx.RLock() defer r.mtx.RUnlock() From b0c5ea3f84dc843070da6ecbf52e05fe363c4b66 Mon Sep 17 00:00:00 2001 From: Albert Date: Wed, 30 Jun 2021 10:28:33 -0700 Subject: [PATCH 08/11] Address PR comments and fix tests Signed-off-by: Albert --- docs/blocks-storage/compactor.md | 4 +-- docs/configuration/config-file-reference.md | 4 +++ pkg/compactor/compactor.go | 4 +-- pkg/compactor/compactor_ring.go | 8 +++-- pkg/compactor/compactor_test.go | 10 ++++-- pkg/ring/kv/consul/mock.go | 36 --------------------- 6 files changed, 20 insertions(+), 46 deletions(-) diff --git a/docs/blocks-storage/compactor.md b/docs/blocks-storage/compactor.md index 7a936a31669..29377a9e689 100644 --- a/docs/blocks-storage/compactor.md +++ b/docs/blocks-storage/compactor.md @@ -232,6 +232,6 @@ compactor: [instance_interface_names: | default = [eth0 en0]] # Timeout for compactor to become ACTIVE in the ring - # CLI flag: -compactor.ring.starting-timeout - [starting_timeout: | default = 10m] + # CLI flag: -compactor.ring.wait-active-instance-timeout + [wait_active_instance_timeout: | default = 10m] ``` diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 801dc78f46e..be592d85096 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -5197,6 +5197,10 @@ sharding_ring: # Name of network interface to read address from. # CLI flag: -compactor.ring.instance-interface-names [instance_interface_names: | default = [eth0 en0]] + + # Timeout for compactor to become ACTIVE in the ring + # CLI flag: -compactor.ring.wait-active-instance-timeout + [wait_active_instance_timeout: | default = 10m] ``` ### `store_gateway_config` diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index 3f48fc68686..0af2088bd59 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -394,9 +394,9 @@ func (c *Compactor) starting(ctx context.Context) error { // to this shard or not). level.Info(c.logger).Log("msg", "waiting until compactor is ACTIVE in the ring") - ctx, cancel := context.WithTimeout(ctx, c.compactorCfg.ShardingRing.StartingTimeout) + ctxWithTimeout, cancel := context.WithTimeout(ctx, c.compactorCfg.ShardingRing.WaitActiveInstanceTimeout) defer cancel() - if err := ring.WaitInstanceState(ctx, c.ring, c.ringLifecycler.ID, ring.ACTIVE); err != nil { + if err := ring.WaitInstanceState(ctxWithTimeout, c.ring, c.ringLifecycler.ID, ring.ACTIVE); err != nil { level.Error(c.logger).Log("msg", "compactor failed to become ACTIVE in the ring", "err", err) return err } diff --git a/pkg/compactor/compactor_ring.go b/pkg/compactor/compactor_ring.go index b29750337da..5b55c8a8719 100644 --- a/pkg/compactor/compactor_ring.go +++ b/pkg/compactor/compactor_ring.go @@ -35,7 +35,9 @@ type RingConfig struct { // Injected internally ListenPort int `yaml:"-"` - StartingTimeout time.Duration `yaml:"starting_timeout"` + WaitActiveInstanceTimeout time.Duration `yaml:"wait_active_instance_timeout"` + + ObservePeriod time.Duration `yaml:"-"` } // RegisterFlags adds the flags required to config this to the given FlagSet @@ -63,7 +65,7 @@ func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet) { f.StringVar(&cfg.InstanceID, "compactor.ring.instance-id", hostname, "Instance ID to register in the ring.") // Timeout durations - f.DurationVar(&cfg.StartingTimeout, "compactor.ring.starting-timeout", 10*time.Minute, "Timeout for waiting on compactor to become ACTIVE in the ring") + f.DurationVar(&cfg.WaitActiveInstanceTimeout, "compactor.ring.wait-active-instance-timeout", 10*time.Minute, "Timeout for waiting on compactor to become ACTIVE in the ring.") } // ToLifecyclerConfig returns a LifecyclerConfig based on the compactor @@ -92,7 +94,7 @@ func (cfg *RingConfig) ToLifecyclerConfig() ring.LifecyclerConfig { lc.InfNames = cfg.InstanceInterfaceNames lc.UnregisterOnShutdown = true lc.HeartbeatPeriod = cfg.HeartbeatPeriod - lc.ObservePeriod = 0 + lc.ObservePeriod = cfg.ObservePeriod lc.JoinAfter = 0 lc.MinReadyDuration = 0 lc.FinalSleep = 0 diff --git a/pkg/compactor/compactor_test.go b/pkg/compactor/compactor_test.go index c6bca4a8ef3..da1a83cead3 100644 --- a/pkg/compactor/compactor_test.go +++ b/pkg/compactor/compactor_test.go @@ -1093,7 +1093,7 @@ func prepareConfig() Config { compactorCfg.ShardingRing.WaitStabilityMaxDuration = 0 // Set lower timeout for waiting on compactor to become ACTIVE in the ring for unit tests - compactorCfg.ShardingRing.StartingTimeout = 5 * time.Second + compactorCfg.ShardingRing.WaitActiveInstanceTimeout = 5 * time.Second return compactorCfg } @@ -1288,19 +1288,23 @@ func TestCompactor_ShouldFailCompactionOnTimeout(t *testing.T) { // Mock the bucket bucketClient := &bucket.ClientMock{} + bucketClient.MockIter("", []string{}, nil) cfg := prepareConfig() cfg.ShardingEnabled = true cfg.ShardingRing.InstanceID = "compactor-1" cfg.ShardingRing.InstanceAddr = "1.2.3.4" - cfg.ShardingRing.KVStore.Mock = consul.NewBadInMemoryClient(ring.GetCodec()) + cfg.ShardingRing.KVStore.Mock = consul.NewInMemoryClient(ring.GetCodec()) + + // Set ObservePeriod to longer than the timeout period to mock a timeout while waiting on ring to become ACTIVE + cfg.ShardingRing.ObservePeriod = time.Second * 10 c, _, _, logs, _ := prepare(t, cfg, bucketClient) // Try to start the compactor with a bad consul kv-store. The err := services.StartAndAwaitRunning(context.Background(), c) - // Assert that the compactor timesout + // Assert that the compactor timed out assert.Equal(t, context.DeadlineExceeded, err) assert.ElementsMatch(t, []string{ diff --git a/pkg/ring/kv/consul/mock.go b/pkg/ring/kv/consul/mock.go index 6c9ec788490..5d1e4557395 100644 --- a/pkg/ring/kv/consul/mock.go +++ b/pkg/ring/kv/consul/mock.go @@ -1,7 +1,6 @@ package consul import ( - "errors" "fmt" "strings" "sync" @@ -229,38 +228,3 @@ func mockedMaxWaitTime(queryWaitTime time.Duration) time.Duration { return queryWaitTime } - -type mockBadKV struct { - mockKV -} - -// NewBadInMemoryClient makes a new mock consul client. -func NewBadInMemoryClient(codec codec.Codec) *Client { - test := mockBadKV{} - test.kvps = nil - return NewBadInMemoryClientWithConfig(codec, Config{}) -} - -// NewBadInMemoryClientWithConfig makes a new mock consul client with supplied Config -// and a failing CAS method for tests. -func NewBadInMemoryClientWithConfig(codec codec.Codec, cfg Config) *Client { - m := mockBadKV{ - mockKV: mockKV{ - kvps: map[string]*consul.KVPair{}, - // Always start from 1, we NEVER want to report back index 0 in the responses. - // This is in line with Consul, and our new checks for index return value in client.go. - current: 1, - }, - } - m.cond = sync.NewCond(&m.mtx) - go m.loop() - return &Client{ - kv: &m, - codec: codec, - cfg: cfg, - } -} - -func (m *mockBadKV) CAS(p *consul.KVPair, q *consul.WriteOptions) (bool, *consul.WriteMeta, error) { - return false, nil, errors.New("CAS Error") -} From a4b27b72a8654e0374d1bb7602092c4530f537b4 Mon Sep 17 00:00:00 2001 From: Albert Date: Wed, 30 Jun 2021 10:34:35 -0700 Subject: [PATCH 09/11] Update unit tests Signed-off-by: Albert --- pkg/ring/util_test.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/pkg/ring/util_test.go b/pkg/ring/util_test.go index e451a566450..9a4a69c6904 100644 --- a/pkg/ring/util_test.go +++ b/pkg/ring/util_test.go @@ -245,10 +245,10 @@ func TestWaitInstanceStateTimeout(t *testing.T) { const ( instanceID = "test" - timeoutDuration = time.Duration(3) + timeoutDuration = time.Second ) - ctx, cancel := context.WithTimeout(context.Background(), timeoutDuration*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), timeoutDuration) defer cancel() ring := &RingMock{} @@ -265,10 +265,10 @@ func TestWaitInstanceStateTimeoutOnError(t *testing.T) { const ( instanceID = "test" - timeoutDuration = time.Duration(3) + timeoutDuration = time.Second ) - ctx, cancel := context.WithTimeout(context.Background(), timeoutDuration*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), timeoutDuration) defer cancel() ring := &RingMock{} @@ -285,10 +285,10 @@ func TestWaitInstanceStateExitsAfterActualStateEqualsState(t *testing.T) { const ( instanceID = "test" - timeoutDuration = time.Duration(3) + timeoutDuration = time.Second ) - ctx, cancel := context.WithTimeout(context.Background(), timeoutDuration*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), timeoutDuration) defer cancel() ring := &RingMock{} From d168f2570fd16e67152907f431017006b34762e3 Mon Sep 17 00:00:00 2001 From: Albert Date: Thu, 1 Jul 2021 10:36:57 -0700 Subject: [PATCH 10/11] Update changelog and fix linting Signed-off-by: Albert --- CHANGELOG.md | 2 +- docs/blocks-storage/compactor.md | 2 +- docs/configuration/config-file-reference.md | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 31829f74aba..674d9de5716 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,7 +3,7 @@ ## master / unreleased * [CHANGE] Querier / ruler: Change `-querier.max-fetched-chunks-per-query` configuration to limit to maximum number of chunks that can be fetched in a single query. The number of chunks fetched by ingesters AND long-term storare combined should not exceed the value configured on `-querier.max-fetched-chunks-per-query`. #4260 -* [CHANGE] Add timeout for waiting on compactor to become ACTIVE in the ring. #4262 +* [ENHANCEMENT] Add timeout for waiting on compactor to become ACTIVE in the ring. #4262 ## 1.10.0-rc.0 / 2021-06-28 diff --git a/docs/blocks-storage/compactor.md b/docs/blocks-storage/compactor.md index 29377a9e689..280545afabc 100644 --- a/docs/blocks-storage/compactor.md +++ b/docs/blocks-storage/compactor.md @@ -231,7 +231,7 @@ compactor: # CLI flag: -compactor.ring.instance-interface-names [instance_interface_names: | default = [eth0 en0]] - # Timeout for compactor to become ACTIVE in the ring + # Timeout for waiting on compactor to become ACTIVE in the ring. # CLI flag: -compactor.ring.wait-active-instance-timeout [wait_active_instance_timeout: | default = 10m] ``` diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 7639d16e03f..0267d01e885 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -5196,7 +5196,7 @@ sharding_ring: # CLI flag: -compactor.ring.instance-interface-names [instance_interface_names: | default = [eth0 en0]] - # Timeout for compactor to become ACTIVE in the ring + # Timeout for waiting on compactor to become ACTIVE in the ring. # CLI flag: -compactor.ring.wait-active-instance-timeout [wait_active_instance_timeout: | default = 10m] ``` From 858d5ad66b41d0515872217c0f4b6fcdac039ba6 Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Mon, 5 Jul 2021 17:17:03 +0200 Subject: [PATCH 11/11] Fixed CHANGELOG entry order Signed-off-by: Marco Pracucci --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 365e022e60b..2f825f9bf2f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,8 +3,8 @@ ## master / unreleased * [CHANGE] Querier / ruler: Change `-querier.max-fetched-chunks-per-query` configuration to limit to maximum number of chunks that can be fetched in a single query. The number of chunks fetched by ingesters AND long-term storare combined should not exceed the value configured on `-querier.max-fetched-chunks-per-query`. #4260 -* [BUGFIX] HA Tracker: when cleaning up obsolete elected replicas from KV store, tracker didn't update number of cluster per user correctly. #4336 * [ENHANCEMENT] Add timeout for waiting on compactor to become ACTIVE in the ring. #4262 +* [BUGFIX] HA Tracker: when cleaning up obsolete elected replicas from KV store, tracker didn't update number of cluster per user correctly. #4336 ## 1.10.0-rc.0 / 2021-06-28