diff --git a/CHANGELOG.md b/CHANGELOG.md index e46dd9069ac..25f9f84159f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,10 +1,13 @@ # Changelog ## master / unreleased -* [FEATURE] Compactor: Added `-compactor.block-files-concurrency` allowing to configure number of go routines for download/upload block files during compaction. #4784 * [ENHANCEMENT] Querier/Ruler: Retry store-gateway in case of unexpected failure, instead of failing the query. #4532 * [ENHANCEMENT] Ring: DoBatch prioritize 4xx errors when failing. #4783 +* [FEATURE] Compactor: Added `-compactor.block-files-concurrency` allowing to configure number of go routines for download/upload block files during compaction. #4784 * [FEATURE] Compactor: Added -compactor.blocks-fetch-concurrency` allowing to configure number of go routines for blocks during compaction. #4787 +* [BUGFIX] Memberlist: Add join with no retrying when starting service. #4804 + + ## 1.13.0 2022-07-14 diff --git a/pkg/ring/kv/memberlist/memberlist_client.go b/pkg/ring/kv/memberlist/memberlist_client.go index 66ffa731fe0..db38617abef 100644 --- a/pkg/ring/kv/memberlist/memberlist_client.go +++ b/pkg/ring/kv/memberlist/memberlist_client.go @@ -411,7 +411,7 @@ func (m *KV) buildMemberlistConfig() (*memberlist.Config, error) { return mlCfg, nil } -func (m *KV) starting(_ context.Context) error { +func (m *KV) starting(ctx context.Context) error { mlCfg, err := m.buildMemberlistConfig() if err != nil { return err @@ -438,6 +438,15 @@ func (m *KV) starting(_ context.Context) error { } m.initWG.Done() + if len(m.cfg.JoinMembers) > 0 { + // Lookup SRV records for given addresses to discover members. + members := m.discoverMembers(ctx, m.cfg.JoinMembers) + + err := m.joinMembersOnStarting(members) + if err != nil { + level.Warn(m.logger).Log("msg", "failed to join memberlist cluster on startup", "err", err) + } + } return nil } @@ -450,7 +459,7 @@ func (m *KV) running(ctx context.Context) error { // Lookup SRV records for given addresses to discover members. members := m.discoverMembers(ctx, m.cfg.JoinMembers) - err := m.joinMembersOnStartup(ctx, members) + err := m.joinMembersOnRunning(ctx, members) if err != nil { level.Error(m.logger).Log("msg", "failed to join memberlist cluster", "err", err) @@ -517,7 +526,7 @@ func (m *KV) JoinMembers(members []string) (int, error) { return m.memberlist.Join(members) } -func (m *KV) joinMembersOnStartup(ctx context.Context, members []string) error { +func (m *KV) joinMembersOnRunning(ctx context.Context, members []string) error { reached, err := m.memberlist.Join(members) if err == nil { level.Info(m.logger).Log("msg", "joined memberlist cluster", "reached_nodes", reached) @@ -556,6 +565,16 @@ func (m *KV) joinMembersOnStartup(ctx context.Context, members []string) error { return lastErr } +func (m *KV) joinMembersOnStarting(members []string) error { + reached, err := m.memberlist.Join(members) + if err == nil { + level.Info(m.logger).Log("msg", "joined memberlist cluster", "reached_nodes", reached) + return nil + } + + return err +} + // Provides a dns-based member disovery to join a memberlist cluster w/o knowning members' addresses upfront. func (m *KV) discoverMembers(ctx context.Context, members []string) []string { if len(members) == 0 { diff --git a/pkg/ring/kv/memberlist/memberlist_client_test.go b/pkg/ring/kv/memberlist/memberlist_client_test.go index 758ee49dc3d..0a66b52c7b0 100644 --- a/pkg/ring/kv/memberlist/memberlist_client_test.go +++ b/pkg/ring/kv/memberlist/memberlist_client_test.go @@ -789,6 +789,39 @@ func TestMemberlistFailsToJoin(t *testing.T) { require.Equal(t, mkv.FailureCase(), errFailedToJoinCluster) } +func TestMemberlistJoinOnStarting(t *testing.T) { + ports, err := getFreePorts(2) + require.NoError(t, err) + + var cfg1 KVConfig + flagext.DefaultValues(&cfg1) + cfg1.TCPTransport = TCPTransportConfig{ + BindAddrs: []string{"localhost"}, + BindPort: ports[0], + } + + cfg1.RandomizeNodeName = true + cfg1.Codecs = []codec.Codec{dataCodec{}} + cfg1.AbortIfJoinFails = false + + cfg2 := cfg1 + cfg2.TCPTransport.BindPort = ports[1] + cfg2.JoinMembers = []string{fmt.Sprintf("localhost:%d", ports[0])} + cfg2.RejoinInterval = 1 * time.Second + + mkv1 := NewKV(cfg1, log.NewNopLogger(), &dnsProviderMock{}, prometheus.NewPedanticRegistry()) + require.NoError(t, mkv1.starting(context.Background())) + + mkv2 := NewKV(cfg2, log.NewNopLogger(), &dnsProviderMock{}, prometheus.NewPedanticRegistry()) + require.NoError(t, mkv2.starting(context.Background())) + + membersFunc := func() interface{} { + return mkv2.memberlist.NumMembers() + } + + poll(t, 5*time.Second, 2, membersFunc) +} + func getFreePorts(count int) ([]int, error) { var ports []int for i := 0; i < count; i++ {