Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
* [BUGFIX] Ingester: fix issue where runtime limits erroneously override default limits. #4246
* [BUGFIX] Ruler: fix startup in single-binary mode when the new `ruler_storage` is used. #4252
* [BUGFIX] Querier: fix queries failing with "at least 1 healthy replica required, could only find 0" error right after scaling up store-gateways until they're ACTIVE in the ring. #4263
* [BUGFIX] Memberlist: fix to setting the default configuration value for `-memberlist.retransmit-factor` when not provided. This should improve propagation delay of the ring state (including, but not limited to, tombstones). Note that if the configuration is already explicitly given, this fix has no effect. #4269

## Blocksconvert

Expand Down
99 changes: 89 additions & 10 deletions integration/integration_memberlist_single_binary_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@ package integration
import (
"crypto/x509"
"crypto/x509/pkix"
"fmt"
"os"
"path/filepath"
"testing"
"time"

"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"

"github.com/cortexproject/cortex/integration/ca"
"github.com/cortexproject/cortex/integration/e2e"
Expand Down Expand Up @@ -109,16 +111,16 @@ func testSingleBinaryEnv(t *testing.T, tlsEnabled bool) {

func newSingleBinary(name string, servername string, join string) *e2ecortex.CortexService {
flags := map[string]string{
"-ingester.final-sleep": "0s",
"-ingester.join-after": "0s", // join quickly
"-ingester.min-ready-duration": "0s",
"-ingester.concurrent-flushes": "10",
"-ingester.max-transfer-retries": "0", // disable
"-ingester.num-tokens": "512",
"-ingester.observe-period": "5s", // to avoid conflicts in tokens
"-ring.store": "memberlist",
"-memberlist.bind-port": "8000",
"-memberlist.pullpush-interval": "3s", // speed up state convergence to make test faster and avoid flakiness
"-ingester.final-sleep": "0s",
"-ingester.join-after": "0s", // join quickly
"-ingester.min-ready-duration": "0s",
"-ingester.concurrent-flushes": "10",
"-ingester.max-transfer-retries": "0", // disable
"-ingester.num-tokens": "512",
"-ingester.observe-period": "5s", // to avoid conflicts in tokens
"-ring.store": "memberlist",
"-memberlist.bind-port": "8000",
"-memberlist.left-ingesters-timeout": "600s", // effectively disable
}

if join != "" {
Expand All @@ -145,3 +147,80 @@ func newSingleBinary(name string, servername string, join string) *e2ecortex.Cor
serv.SetBackoff(backOff)
return serv
}

func TestSingleBinaryWithMemberlistScaling(t *testing.T) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

dynamo := e2edb.NewDynamoDB()
require.NoError(t, s.StartAndWaitReady(dynamo))
require.NoError(t, writeFileToSharedDir(s, cortexSchemaConfigFile, []byte(cortexSchemaConfigYaml)))

// Scale up instances. These numbers seem enough to reliably reproduce some unwanted
// consequences of slow propagation, such as missing tombstones.

maxCortex := 20
minCortex := 3
instances := make([]*e2ecortex.CortexService, 0)

for i := 0; i < maxCortex; i++ {
name := fmt.Sprintf("cortex-%d", i+1)
join := ""
if i > 0 {
join = e2e.NetworkContainerHostPort(networkName, "cortex-1", 8000)
}
c := newSingleBinary(name, "", join)
require.NoError(t, s.StartAndWaitReady(c))
instances = append(instances, c)
}

// Sanity check the ring membership and give each instance time to see every other instance.

for _, c := range instances {
require.NoError(t, c.WaitSumMetrics(e2e.Equals(float64(maxCortex)), "cortex_ring_members"))
require.NoError(t, c.WaitSumMetrics(e2e.Equals(0), "memberlist_client_kv_store_value_tombstones"))
}

// Scale down as fast as possible but cleanly, in order to send out tombstones.

stop := errgroup.Group{}
for len(instances) > minCortex {
i := len(instances) - 1
c := instances[i]
instances = instances[:i]
stop.Go(func() error { return s.Stop(c) })
}
require.NoError(t, stop.Wait())

// If all is working as expected, then tombstones should have propagated easily within this time period.
// The logging is mildly spammy, but it has proven extremely useful for debugging convergence cases.
// We don't use WaitSumMetrics [over all instances] here so we can log the per-instance metrics.

expectedRingMembers := float64(minCortex)
expectedTombstones := float64(maxCortex - minCortex)

require.Eventually(t, func() bool {
ok := true
for _, c := range instances {
metrics, err := c.SumMetrics([]string{
"cortex_ring_members", "memberlist_client_kv_store_value_tombstones",
})
require.NoError(t, err)
t.Logf("%s: cortex_ring_members=%f memberlist_client_kv_store_value_tombstones=%f\n",
c.Name(), metrics[0], metrics[1])

// Don't short circuit the check, so we log the state for all instances.
if metrics[0] != expectedRingMembers {
ok = false
}
if metrics[1] != expectedTombstones {
ok = false
}

}
return ok
}, 30*time.Second, 2*time.Second,
"expected all instances to have %f ring members and %f tombstones",
expectedRingMembers, expectedTombstones)
}
2 changes: 1 addition & 1 deletion pkg/ring/kv/memberlist/memberlist_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ func (m *KV) starting(_ context.Context) error {
m.memberlist = list
m.broadcasts = &memberlist.TransmitLimitedQueue{
NumNodes: list.NumMembers,
RetransmitMult: m.cfg.RetransmitMult,
RetransmitMult: mlCfg.RetransmitMult,
}
m.initWG.Done()

Expand Down
6 changes: 3 additions & 3 deletions pkg/ring/kv/memberlist/memberlist_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -561,9 +561,6 @@ func TestMultipleClients(t *testing.T) {
})
cancel() // make linter happy

// Let clients exchange messages for a while
close(stop)

t.Logf("Ring updates observed: %d", updates)

if updates < members {
Expand Down Expand Up @@ -615,6 +612,9 @@ func TestMultipleClients(t *testing.T) {
}
}
}

// We cannot shutdown the KV until now in order for Get() to work reliably.
close(stop)
}

func TestJoinMembersWithRetryBackoff(t *testing.T) {
Expand Down