Skip to content

Commit e8d5028

Browse files
authored
Fix flaky test. (#3313)
* Get log for failed test. Signed-off-by: Peter Štibraný <[email protected]> * Make more CAS attempts, and add a bit of random delay between them to reduce CAS collisions. Signed-off-by: Peter Štibraný <[email protected]> * Fix cleanup of lifecycler. Signed-off-by: Peter Štibraný <[email protected]> * Fix cleanup of lifecycler. Signed-off-by: Peter Štibraný <[email protected]> * Fix another cleanup function. Signed-off-by: Peter Štibraný <[email protected]> * Remove racy buffering of log messages. Signed-off-by: Peter Štibraný <[email protected]>
1 parent 28bf0fa commit e8d5028

File tree

2 files changed

+25
-6
lines changed

2 files changed

+25
-6
lines changed

pkg/ring/kv/consul/client.go

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"flag"
66
"fmt"
7+
"math/rand"
78
"net/http"
89
"time"
910

@@ -41,6 +42,10 @@ type Config struct {
4142
ConsistentReads bool `yaml:"consistent_reads"`
4243
WatchKeyRateLimit float64 `yaml:"watch_rate_limit"` // Zero disables rate limit
4344
WatchKeyBurstSize int `yaml:"watch_burst_size"` // Burst when doing rate-limit, defaults to 1
45+
46+
// Used in tests only.
47+
MaxCasRetries int `yaml:"-"`
48+
CasRetryDelay time.Duration `yaml:"-"`
4449
}
4550

4651
type kv interface {
@@ -117,11 +122,22 @@ func (c *Client) CAS(ctx context.Context, key string, f func(in interface{}) (ou
117122
}
118123

119124
func (c *Client) cas(ctx context.Context, key string, f func(in interface{}) (out interface{}, retry bool, err error)) error {
120-
var (
121-
index = uint64(0)
125+
retries := c.cfg.MaxCasRetries
126+
if retries == 0 {
122127
retries = 10
123-
)
128+
}
129+
130+
sleepBeforeRetry := time.Duration(0)
131+
if c.cfg.CasRetryDelay > 0 {
132+
sleepBeforeRetry = time.Duration(rand.Int63n(c.cfg.CasRetryDelay.Nanoseconds()))
133+
}
134+
135+
index := uint64(0)
124136
for i := 0; i < retries; i++ {
137+
if i > 0 && sleepBeforeRetry > 0 {
138+
time.Sleep(sleepBeforeRetry)
139+
}
140+
125141
// Get with default options - don't want stale data to compare with
126142
options := &consul.QueryOptions{}
127143
kvp, _, err := c.kv.Get(key, options.WithContext(ctx))

pkg/ring/ring_test.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -873,7 +873,7 @@ func TestRingUpdates(t *testing.T) {
873873
require.NoError(t, err)
874874
require.NoError(t, services.StartAndAwaitRunning(context.Background(), ring))
875875
t.Cleanup(func() {
876-
_ = services.StartAndAwaitRunning(context.Background(), ring)
876+
_ = services.StopAndAwaitTerminated(context.Background(), ring)
877877
})
878878

879879
require.Equal(t, 0, ring.IngesterCount())
@@ -944,7 +944,7 @@ func startLifecycler(t *testing.T, cfg Config, heartbeat time.Duration, lifecycl
944944
require.NoError(t, services.StartAndAwaitRunning(context.Background(), lc))
945945

946946
t.Cleanup(func() {
947-
_ = services.StartAndAwaitRunning(context.Background(), lc)
947+
_ = services.StopAndAwaitTerminated(context.Background(), lc)
948948
})
949949

950950
return lc
@@ -953,7 +953,10 @@ func startLifecycler(t *testing.T, cfg Config, heartbeat time.Duration, lifecycl
953953
// This test checks if shuffle-sharded ring can be reused, and whether it receives
954954
// updates from "main" ring.
955955
func TestShuffleShardWithCaching(t *testing.T) {
956-
inmem := consul.NewInMemoryClient(GetCodec())
956+
inmem := consul.NewInMemoryClientWithConfig(GetCodec(), consul.Config{
957+
MaxCasRetries: 20,
958+
CasRetryDelay: 500 * time.Millisecond,
959+
})
957960

958961
cfg := Config{
959962
KVStore: kv.Config{Mock: inmem},

0 commit comments

Comments
 (0)