From a301d2d48fb4acfabc00ffe8550d9405caef59bf Mon Sep 17 00:00:00 2001 From: Steve Simpson Date: Wed, 9 Jun 2021 14:51:07 +0200 Subject: [PATCH 1/5] Fix default memberlist configuration value for RetransmitMult. If configuration is not explicitly given for RetransmitMult (via `-memberlist.retransmit_factor`), then it is intended to be picked up from `DefaultLANConfig`. However, though the correct value was being used to configure `memberlist` itself, zero would be passed into the `TransmitLimitedQueue` used for broadcasting ring updates. This essentially means that ring updates are only ever gossiped once. Signed-off-by: Steve Simpson --- pkg/ring/kv/memberlist/memberlist_client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/ring/kv/memberlist/memberlist_client.go b/pkg/ring/kv/memberlist/memberlist_client.go index 8187911ac97..e2d9ede504d 100644 --- a/pkg/ring/kv/memberlist/memberlist_client.go +++ b/pkg/ring/kv/memberlist/memberlist_client.go @@ -415,7 +415,7 @@ func (m *KV) starting(_ context.Context) error { m.memberlist = list m.broadcasts = &memberlist.TransmitLimitedQueue{ NumNodes: list.NumMembers, - RetransmitMult: m.cfg.RetransmitMult, + RetransmitMult: mlCfg.RetransmitMult, } m.initWG.Done() From 0e50a40101defaa91a2b216940096f032b67786c Mon Sep 17 00:00:00 2001 From: Steve Simpson Date: Thu, 10 Jun 2021 14:46:54 +0200 Subject: [PATCH 2/5] Add simplified integration test case. Signed-off-by: Steve Simpson --- ...tegration_memberlist_single_binary_test.go | 95 +++++++++++++++++-- 1 file changed, 85 insertions(+), 10 deletions(-) diff --git a/integration/integration_memberlist_single_binary_test.go b/integration/integration_memberlist_single_binary_test.go index f37de6de181..5ddf937a49a 100644 --- a/integration/integration_memberlist_single_binary_test.go +++ b/integration/integration_memberlist_single_binary_test.go @@ -5,6 +5,7 @@ package integration import ( "crypto/x509" "crypto/x509/pkix" + "fmt" "os" "path/filepath" "testing" @@ -109,16 +110,16 @@ func testSingleBinaryEnv(t *testing.T, tlsEnabled bool) { func newSingleBinary(name string, servername string, join string) *e2ecortex.CortexService { flags := map[string]string{ - "-ingester.final-sleep": "0s", - "-ingester.join-after": "0s", // join quickly - "-ingester.min-ready-duration": "0s", - "-ingester.concurrent-flushes": "10", - "-ingester.max-transfer-retries": "0", // disable - "-ingester.num-tokens": "512", - "-ingester.observe-period": "5s", // to avoid conflicts in tokens - "-ring.store": "memberlist", - "-memberlist.bind-port": "8000", - "-memberlist.pullpush-interval": "3s", // speed up state convergence to make test faster and avoid flakiness + "-ingester.final-sleep": "0s", + "-ingester.join-after": "0s", // join quickly + "-ingester.min-ready-duration": "0s", + "-ingester.concurrent-flushes": "10", + "-ingester.max-transfer-retries": "0", // disable + "-ingester.num-tokens": "512", + "-ingester.observe-period": "5s", // to avoid conflicts in tokens + "-ring.store": "memberlist", + "-memberlist.bind-port": "8000", + "-memberlist.left-ingesters-timeout": "600s", // effectively disable } if join != "" { @@ -145,3 +146,77 @@ func newSingleBinary(name string, servername string, join string) *e2ecortex.Cor serv.SetBackoff(backOff) return serv } + +func TestSingleBinaryWithMemberlistScaling(t *testing.T) { + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + dynamo := e2edb.NewDynamoDB() + require.NoError(t, s.StartAndWaitReady(dynamo)) + require.NoError(t, writeFileToSharedDir(s, cortexSchemaConfigFile, []byte(cortexSchemaConfigYaml))) + + // Scale up instances. These numbers seem enough to reliably reproduce some unwanted + // consequences of slow propagation, such as missing tombstones. + + maxCortex := 20 + minCortex := 3 + instances := make([]*e2ecortex.CortexService, 0) + + for i := 0; i < maxCortex; i++ { + name := fmt.Sprintf("cortex-%d", i+1) + join := "" + if i > 0 { + join = fmt.Sprintf("%s-cortex-1:8000", networkName) + } + c := newSingleBinary(name, "", join) + require.NoError(t, s.StartAndWaitReady(c)) + instances = append(instances, c) + } + + // Sanity check the ring membership and give each instance time to see every other instance. + + for _, c := range instances { + require.NoError(t, c.WaitSumMetrics(e2e.Equals(float64(maxCortex)), "cortex_ring_members")) + require.NoError(t, c.WaitSumMetrics(e2e.Equals(0), "memberlist_client_kv_store_value_tombstones")) + } + + // Scale down as fast as possible but cleanly, in order to send out tombstones. + + for len(instances) > minCortex { + i := len(instances) - 1 + require.NoError(t, s.Stop(instances[i])) + instances = instances[:i] + } + + // If all is working as expected, then tombstones should have propagated easily within this time period. + // The logging is mildly spammy, but it has proven extremely useful for debugging convergence cases. + // We don't use WaitSumMetrics [over all instances] here so we can log the per-instance metrics. + + expectedRingMembers := float64(minCortex) + expectedTombstones := float64(maxCortex - minCortex) + + require.Eventually(t, func() bool { + ok := true + for _, c := range instances { + metrics, err := c.SumMetrics([]string{ + "cortex_ring_members", "memberlist_client_kv_store_value_tombstones", + }) + require.NoError(t, err) + t.Logf("%s: cortex_ring_members=%f memberlist_client_kv_store_value_tombstones=%f\n", + c.Name(), metrics[0], metrics[1]) + + // Don't short circuit the check, so we log the state for all instances. + if metrics[0] != expectedRingMembers { + ok = false + } + if metrics[1] != expectedTombstones { + ok = false + } + + } + return ok + }, 30*time.Second, 2*time.Second, + "expected all instances to have %f ring members and %f tombstones", + expectedRingMembers, expectedTombstones) +} From 957dbb268bde5219f6d9ef8e45231143be656d27 Mon Sep 17 00:00:00 2001 From: Steve Simpson Date: Thu, 10 Jun 2021 15:46:22 +0200 Subject: [PATCH 3/5] Changelog. Signed-off-by: Steve Simpson --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 31ccb682fa1..a2790502eda 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -49,6 +49,7 @@ * [BUGFIX] Ingester: fix issue where runtime limits erroneously override default limits. #4246 * [BUGFIX] Ruler: fix startup in single-binary mode when the new `ruler_storage` is used. #4252 * [BUGFIX] Querier: fix queries failing with "at least 1 healthy replica required, could only find 0" error right after scaling up store-gateways until they're ACTIVE in the ring. #4263 +* [BUGFIX] Memberlist: fix to setting the default configuration value for `-memberlist.retransmit-factor` when not provided. This should improve propagation delay of the ring state (including, but not limited to, tombstones). Note that if the configuration is already explicitly given, this fix has no effect. #4269 ## Blocksconvert From a08bf5f260c846ded23fc9030be69b66889a6ac8 Mon Sep 17 00:00:00 2001 From: Steve Simpson Date: Thu, 10 Jun 2021 19:57:36 +0200 Subject: [PATCH 4/5] Review comments. Signed-off-by: Steve Simpson --- integration/integration_memberlist_single_binary_test.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/integration/integration_memberlist_single_binary_test.go b/integration/integration_memberlist_single_binary_test.go index 5ddf937a49a..53c1a945fd0 100644 --- a/integration/integration_memberlist_single_binary_test.go +++ b/integration/integration_memberlist_single_binary_test.go @@ -12,6 +12,7 @@ import ( "time" "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" "github.com/cortexproject/cortex/integration/ca" "github.com/cortexproject/cortex/integration/e2e" @@ -167,7 +168,7 @@ func TestSingleBinaryWithMemberlistScaling(t *testing.T) { name := fmt.Sprintf("cortex-%d", i+1) join := "" if i > 0 { - join = fmt.Sprintf("%s-cortex-1:8000", networkName) + join = e2e.NetworkContainerHostPort(networkName, "cortex-1", 8000) } c := newSingleBinary(name, "", join) require.NoError(t, s.StartAndWaitReady(c)) @@ -183,11 +184,14 @@ func TestSingleBinaryWithMemberlistScaling(t *testing.T) { // Scale down as fast as possible but cleanly, in order to send out tombstones. + stop := errgroup.Group{} for len(instances) > minCortex { i := len(instances) - 1 - require.NoError(t, s.Stop(instances[i])) + c := instances[i] instances = instances[:i] + stop.Go(func() error { return s.Stop(c) }) } + require.NoError(t, stop.Wait()) // If all is working as expected, then tombstones should have propagated easily within this time period. // The logging is mildly spammy, but it has proven extremely useful for debugging convergence cases. From 4c2de95f74f23a03d978a93f6debafb6cdf77753 Mon Sep 17 00:00:00 2001 From: Steve Simpson Date: Fri, 11 Jun 2021 09:36:08 +0200 Subject: [PATCH 5/5] Fix to race condition in unit test. The test was shutting down the KV store then attempting to read form it. Sometimes this would work if the KV took some time to shutdown, which it often will, but if it shuts down quickly, then the read will fail. Signed-off-by: Steve Simpson --- pkg/ring/kv/memberlist/memberlist_client_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/ring/kv/memberlist/memberlist_client_test.go b/pkg/ring/kv/memberlist/memberlist_client_test.go index 87a2b57ec6f..22e598197f1 100644 --- a/pkg/ring/kv/memberlist/memberlist_client_test.go +++ b/pkg/ring/kv/memberlist/memberlist_client_test.go @@ -561,9 +561,6 @@ func TestMultipleClients(t *testing.T) { }) cancel() // make linter happy - // Let clients exchange messages for a while - close(stop) - t.Logf("Ring updates observed: %d", updates) if updates < members { @@ -615,6 +612,9 @@ func TestMultipleClients(t *testing.T) { } } } + + // We cannot shutdown the KV until now in order for Get() to work reliably. + close(stop) } func TestJoinMembersWithRetryBackoff(t *testing.T) {