diff --git a/CHANGELOG.md b/CHANGELOG.md index 31ccb682fa1..a2790502eda 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -49,6 +49,7 @@ * [BUGFIX] Ingester: fix issue where runtime limits erroneously override default limits. #4246 * [BUGFIX] Ruler: fix startup in single-binary mode when the new `ruler_storage` is used. #4252 * [BUGFIX] Querier: fix queries failing with "at least 1 healthy replica required, could only find 0" error right after scaling up store-gateways until they're ACTIVE in the ring. #4263 +* [BUGFIX] Memberlist: fix to setting the default configuration value for `-memberlist.retransmit-factor` when not provided. This should improve propagation delay of the ring state (including, but not limited to, tombstones). Note that if the configuration is already explicitly given, this fix has no effect. #4269 ## Blocksconvert diff --git a/integration/integration_memberlist_single_binary_test.go b/integration/integration_memberlist_single_binary_test.go index f37de6de181..53c1a945fd0 100644 --- a/integration/integration_memberlist_single_binary_test.go +++ b/integration/integration_memberlist_single_binary_test.go @@ -5,12 +5,14 @@ package integration import ( "crypto/x509" "crypto/x509/pkix" + "fmt" "os" "path/filepath" "testing" "time" "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" "github.com/cortexproject/cortex/integration/ca" "github.com/cortexproject/cortex/integration/e2e" @@ -109,16 +111,16 @@ func testSingleBinaryEnv(t *testing.T, tlsEnabled bool) { func newSingleBinary(name string, servername string, join string) *e2ecortex.CortexService { flags := map[string]string{ - "-ingester.final-sleep": "0s", - "-ingester.join-after": "0s", // join quickly - "-ingester.min-ready-duration": "0s", - "-ingester.concurrent-flushes": "10", - "-ingester.max-transfer-retries": "0", // disable - "-ingester.num-tokens": "512", - "-ingester.observe-period": "5s", // to avoid conflicts in tokens - "-ring.store": "memberlist", - "-memberlist.bind-port": "8000", - "-memberlist.pullpush-interval": "3s", // speed up state convergence to make test faster and avoid flakiness + "-ingester.final-sleep": "0s", + "-ingester.join-after": "0s", // join quickly + "-ingester.min-ready-duration": "0s", + "-ingester.concurrent-flushes": "10", + "-ingester.max-transfer-retries": "0", // disable + "-ingester.num-tokens": "512", + "-ingester.observe-period": "5s", // to avoid conflicts in tokens + "-ring.store": "memberlist", + "-memberlist.bind-port": "8000", + "-memberlist.left-ingesters-timeout": "600s", // effectively disable } if join != "" { @@ -145,3 +147,80 @@ func newSingleBinary(name string, servername string, join string) *e2ecortex.Cor serv.SetBackoff(backOff) return serv } + +func TestSingleBinaryWithMemberlistScaling(t *testing.T) { + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + dynamo := e2edb.NewDynamoDB() + require.NoError(t, s.StartAndWaitReady(dynamo)) + require.NoError(t, writeFileToSharedDir(s, cortexSchemaConfigFile, []byte(cortexSchemaConfigYaml))) + + // Scale up instances. These numbers seem enough to reliably reproduce some unwanted + // consequences of slow propagation, such as missing tombstones. + + maxCortex := 20 + minCortex := 3 + instances := make([]*e2ecortex.CortexService, 0) + + for i := 0; i < maxCortex; i++ { + name := fmt.Sprintf("cortex-%d", i+1) + join := "" + if i > 0 { + join = e2e.NetworkContainerHostPort(networkName, "cortex-1", 8000) + } + c := newSingleBinary(name, "", join) + require.NoError(t, s.StartAndWaitReady(c)) + instances = append(instances, c) + } + + // Sanity check the ring membership and give each instance time to see every other instance. + + for _, c := range instances { + require.NoError(t, c.WaitSumMetrics(e2e.Equals(float64(maxCortex)), "cortex_ring_members")) + require.NoError(t, c.WaitSumMetrics(e2e.Equals(0), "memberlist_client_kv_store_value_tombstones")) + } + + // Scale down as fast as possible but cleanly, in order to send out tombstones. + + stop := errgroup.Group{} + for len(instances) > minCortex { + i := len(instances) - 1 + c := instances[i] + instances = instances[:i] + stop.Go(func() error { return s.Stop(c) }) + } + require.NoError(t, stop.Wait()) + + // If all is working as expected, then tombstones should have propagated easily within this time period. + // The logging is mildly spammy, but it has proven extremely useful for debugging convergence cases. + // We don't use WaitSumMetrics [over all instances] here so we can log the per-instance metrics. + + expectedRingMembers := float64(minCortex) + expectedTombstones := float64(maxCortex - minCortex) + + require.Eventually(t, func() bool { + ok := true + for _, c := range instances { + metrics, err := c.SumMetrics([]string{ + "cortex_ring_members", "memberlist_client_kv_store_value_tombstones", + }) + require.NoError(t, err) + t.Logf("%s: cortex_ring_members=%f memberlist_client_kv_store_value_tombstones=%f\n", + c.Name(), metrics[0], metrics[1]) + + // Don't short circuit the check, so we log the state for all instances. + if metrics[0] != expectedRingMembers { + ok = false + } + if metrics[1] != expectedTombstones { + ok = false + } + + } + return ok + }, 30*time.Second, 2*time.Second, + "expected all instances to have %f ring members and %f tombstones", + expectedRingMembers, expectedTombstones) +} diff --git a/pkg/ring/kv/memberlist/memberlist_client.go b/pkg/ring/kv/memberlist/memberlist_client.go index 8187911ac97..e2d9ede504d 100644 --- a/pkg/ring/kv/memberlist/memberlist_client.go +++ b/pkg/ring/kv/memberlist/memberlist_client.go @@ -415,7 +415,7 @@ func (m *KV) starting(_ context.Context) error { m.memberlist = list m.broadcasts = &memberlist.TransmitLimitedQueue{ NumNodes: list.NumMembers, - RetransmitMult: m.cfg.RetransmitMult, + RetransmitMult: mlCfg.RetransmitMult, } m.initWG.Done() diff --git a/pkg/ring/kv/memberlist/memberlist_client_test.go b/pkg/ring/kv/memberlist/memberlist_client_test.go index 87a2b57ec6f..22e598197f1 100644 --- a/pkg/ring/kv/memberlist/memberlist_client_test.go +++ b/pkg/ring/kv/memberlist/memberlist_client_test.go @@ -561,9 +561,6 @@ func TestMultipleClients(t *testing.T) { }) cancel() // make linter happy - // Let clients exchange messages for a while - close(stop) - t.Logf("Ring updates observed: %d", updates) if updates < members { @@ -615,6 +612,9 @@ func TestMultipleClients(t *testing.T) { } } } + + // We cannot shutdown the KV until now in order for Get() to work reliably. + close(stop) } func TestJoinMembersWithRetryBackoff(t *testing.T) {