Skip to content

Commit 3c7632f

Browse files
committed
Add retry logic of connection lifetime to cluster client
1 parent 34ba5cf commit 3c7632f

File tree

2 files changed

+218
-11
lines changed

2 files changed

+218
-11
lines changed

cluster.go

Lines changed: 72 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ type clusterClient struct {
3535
stop uint32
3636
cmd Builder
3737
retry bool
38+
hasLftm bool
3839
}
3940

4041
// NOTE: connrole and conn must be initialized at the same time
@@ -57,6 +58,7 @@ func newClusterClient(opt *ClientOption, connFn connFn, retryer retryHandler) (*
5758
retry: !opt.DisableRetry,
5859
retryHandler: retryer,
5960
stopCh: make(chan struct{}),
61+
hasLftm: opt.ConnLifetime > 0,
6062
}
6163

6264
if opt.ReplicaOnly && opt.SendToReplicas != nil {
@@ -514,13 +516,21 @@ retry:
514516
return newErrResult(err)
515517
}
516518
resp = cc.Do(ctx, cmd)
519+
if resp.Error() == errConnExpired {
520+
goto retry
521+
}
517522
process:
518523
switch addr, mode := c.shouldRefreshRetry(resp.Error(), ctx); mode {
519524
case RedirectMove:
520-
resp = c.redirectOrNew(addr, cc, cmd.Slot(), mode).Do(ctx, cmd)
525+
ncc := c.redirectOrNew(addr, cc, cmd.Slot(), mode)
526+
resp = ncc.Do(ctx, cmd)
527+
if resp.Error() == errConnExpired {
528+
goto retry
529+
}
521530
goto process
522531
case RedirectAsk:
523-
results := c.redirectOrNew(addr, cc, cmd.Slot(), mode).DoMulti(ctx, cmds.AskingCmd, cmd)
532+
ncc := c.redirectOrNew(addr, cc, cmd.Slot(), mode)
533+
results := c.doMulti(ctx, ncc, cmds.AskingCmd, cmd)
524534
resp = results.s[1]
525535
resultsp.Put(results)
526536
goto process
@@ -754,12 +764,12 @@ func (c *clusterClient) doretry(
754764
) {
755765
clean := true
756766
if len(re.commands) != 0 {
757-
resps := cc.DoMulti(ctx, re.commands...)
767+
resps := c.doMulti(ctx, cc, re.commands...)
758768
clean = c.doresultfn(ctx, results, retries, mu, cc, re.cIndexes, re.commands, resps.s, attempts, hasInit)
759769
resultsp.Put(resps)
760770
}
761771
if len(re.cAskings) != 0 {
762-
resps := askingMulti(cc, ctx, re.cAskings)
772+
resps := c.askingMulti(cc, ctx, re.cAskings)
763773
clean = c.doresultfn(ctx, results, retries, mu, cc, re.aIndexes, re.cAskings, resps.s, attempts, hasInit) && clean
764774
resultsp.Put(resps)
765775
}
@@ -845,13 +855,19 @@ retry:
845855
return newErrResult(err)
846856
}
847857
resp = cc.DoCache(ctx, cmd, ttl)
858+
if resp.Error() == errConnExpired {
859+
goto retry
860+
}
848861
process:
849862
switch addr, mode := c.shouldRefreshRetry(resp.Error(), ctx); mode {
850863
case RedirectMove:
851864
resp = c.redirectOrNew(addr, cc, cmd.Slot(), mode).DoCache(ctx, cmd, ttl)
865+
if resp.Error() == errConnExpired {
866+
goto retry
867+
}
852868
goto process
853869
case RedirectAsk:
854-
results := askingMultiCache(c.redirectOrNew(addr, cc, cmd.Slot(), mode), ctx, []CacheableTTL{CT(cmd, ttl)})
870+
results := c.askingMultiCache(c.redirectOrNew(addr, cc, cmd.Slot(), mode), ctx, []CacheableTTL{CT(cmd, ttl)})
855871
resp = results.s[0]
856872
resultsp.Put(results)
857873
goto process
@@ -875,7 +891,49 @@ func (c *clusterClient) DoCache(ctx context.Context, cmd Cacheable, ttl time.Dur
875891
return resp
876892
}
877893

878-
func askingMulti(cc conn, ctx context.Context, multi []Completed) *redisresults {
894+
func (c *clusterClient) doMulti(ctx context.Context, cc conn, multi ...Completed) *redisresults {
895+
resps := cc.DoMulti(ctx, multi...)
896+
if c.hasLftm {
897+
var ml []Completed
898+
recover:
899+
ml = ml[:0]
900+
for i, resp := range resps.s {
901+
if resp.Error() == errConnExpired {
902+
ml = multi[i:]
903+
break
904+
}
905+
}
906+
if len(ml) > 0 {
907+
rs := cc.DoMulti(ctx, ml...).s
908+
resps.s = append(resps.s[:len(resps.s)-len(rs)], rs...)
909+
goto recover
910+
}
911+
}
912+
return resps
913+
}
914+
915+
func (c *clusterClient) doMultiCache(ctx context.Context, cc conn, multi ...CacheableTTL) *redisresults {
916+
resps := cc.DoMultiCache(ctx, multi...)
917+
if c.hasLftm {
918+
var ml []CacheableTTL
919+
recover:
920+
ml = ml[:0]
921+
for i, resp := range resps.s {
922+
if resp.Error() == errConnExpired {
923+
ml = multi[i:]
924+
break
925+
}
926+
}
927+
if len(ml) > 0 {
928+
rs := cc.DoMultiCache(ctx, ml...).s
929+
resps.s = append(resps.s[:len(resps.s)-len(rs)], rs...)
930+
goto recover
931+
}
932+
}
933+
return resps
934+
}
935+
936+
func (c *clusterClient) askingMulti(cc conn, ctx context.Context, multi []Completed) *redisresults {
879937
var inTx bool
880938
commands := make([]Completed, 0, len(multi)*2)
881939
for _, cmd := range multi {
@@ -888,7 +946,7 @@ func askingMulti(cc conn, ctx context.Context, multi []Completed) *redisresults
888946
}
889947
}
890948
results := resultsp.Get(0, len(multi))
891-
resps := cc.DoMulti(ctx, commands...)
949+
resps := c.doMulti(ctx, cc, commands...)
892950
for i, resp := range resps.s {
893951
if commands[i] != cmds.AskingCmd {
894952
results.s = append(results.s, resp)
@@ -898,14 +956,14 @@ func askingMulti(cc conn, ctx context.Context, multi []Completed) *redisresults
898956
return results
899957
}
900958

901-
func askingMultiCache(cc conn, ctx context.Context, multi []CacheableTTL) *redisresults {
959+
func (c *clusterClient) askingMultiCache(cc conn, ctx context.Context, multi []CacheableTTL) *redisresults {
902960
commands := make([]Completed, 0, len(multi)*6)
903961
for _, cmd := range multi {
904962
ck, _ := cmds.CacheKey(cmd.Cmd)
905963
commands = append(commands, cc.OptInCmd(), cmds.AskingCmd, cmds.MultiCmd, cmds.NewCompleted([]string{"PTTL", ck}), Completed(cmd.Cmd), cmds.ExecCmd)
906964
}
907965
results := resultsp.Get(0, len(multi))
908-
resps := cc.DoMulti(ctx, commands...)
966+
resps := c.doMulti(ctx, cc, commands...)
909967
for i := 5; i < len(resps.s); i += 6 {
910968
if arr, err := resps.s[i].ToArray(); err != nil {
911969
if preErr := resps.s[i-1].Error(); preErr != nil { // if {Cmd} get a RedisError
@@ -1048,12 +1106,12 @@ func (c *clusterClient) doretrycache(
10481106
) {
10491107
clean := true
10501108
if len(re.commands) != 0 {
1051-
resps := cc.DoMultiCache(ctx, re.commands...)
1109+
resps := c.doMultiCache(ctx, cc, re.commands...)
10521110
clean = c.resultcachefn(ctx, results, retries, mu, cc, re.cIndexes, re.commands, resps.s, attempts)
10531111
resultsp.Put(resps)
10541112
}
10551113
if len(re.cAskings) != 0 {
1056-
resps := askingMultiCache(cc, ctx, re.cAskings)
1114+
resps := c.askingMultiCache(cc, ctx, re.cAskings)
10571115
clean = c.resultcachefn(ctx, results, retries, mu, cc, re.aIndexes, re.cAskings, resps.s, attempts) && clean
10581116
resultsp.Put(resps)
10591117
}
@@ -1130,6 +1188,9 @@ retry:
11301188
goto ret
11311189
}
11321190
err = cc.Receive(ctx, subscribe, fn)
1191+
if err == errConnExpired {
1192+
goto retry
1193+
}
11331194
if _, mode := c.shouldRefreshRetry(err, ctx); c.retry && mode != RedirectNone {
11341195
shouldRetry := c.retryHandler.WaitOrSkipRetry(ctx, attempts, subscribe, err)
11351196
if shouldRetry {

cluster_test.go

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7574,3 +7574,149 @@ func TestClusterClient_SendToOnlyPrimaryNodeWhenPrimaryNodeSelected(t *testing.T
75747574
}
75757575
})
75767576
}
7577+
7578+
func TestClusterClientConnLifetime(t *testing.T) {
7579+
defer ShouldNotLeaked(SetupLeakDetection())
7580+
7581+
setup := func() (*clusterClient, *mockConn) {
7582+
m := &mockConn{
7583+
DoFn: func(cmd Completed) RedisResult {
7584+
if strings.Join(cmd.Commands(), " ") == "CLUSTER SLOTS" {
7585+
return slotsMultiResp
7586+
}
7587+
return newResult(strmsg('+', "OK"), nil)
7588+
},
7589+
}
7590+
client, err := newClusterClient(
7591+
&ClientOption{InitAddress: []string{":0"}, ConnLifetime: 5 * time.Second},
7592+
func(dst string, opt *ClientOption) conn { return m },
7593+
newRetryer(defaultRetryDelayFn),
7594+
)
7595+
if err != nil {
7596+
t.Fatalf("unexpected error %v", err)
7597+
}
7598+
return client, m
7599+
}
7600+
7601+
t.Run("Do ConnLifetime", func(t *testing.T) {
7602+
client, m := setup()
7603+
var attempts int64
7604+
m.DoFn = func(cmd Completed) RedisResult {
7605+
if strings.Join(cmd.Commands(), " ") == "CLUSTER SLOTS" {
7606+
return slotsMultiResp
7607+
}
7608+
if atomic.AddInt64(&attempts, 1) == 1 {
7609+
return newErrResult(errConnExpired)
7610+
}
7611+
return newResult(strmsg('+', "OK"), nil)
7612+
}
7613+
c := client.B().Get().Key("Do").Build()
7614+
if v, err := client.Do(context.Background(), c).ToString(); err != nil || v != "OK" {
7615+
t.Fatalf("unexpected response %v %v", v, err)
7616+
}
7617+
})
7618+
7619+
t.Run("DoCache ConnLifetime", func(t *testing.T) {
7620+
client, m := setup()
7621+
var attempts int64
7622+
m.DoCacheFn = func(cmd Cacheable, ttl time.Duration) RedisResult {
7623+
if strings.Join(cmd.Commands(), " ") == "CLUSTER SLOTS" {
7624+
return slotsMultiResp
7625+
}
7626+
if atomic.AddInt64(&attempts, 1) == 1 {
7627+
return newErrResult(errConnExpired)
7628+
}
7629+
return newResult(strmsg('+', "OK"), nil)
7630+
}
7631+
if v, err := client.DoCache(context.Background(), client.B().Get().Key("Do").Cache(), 0).ToString(); err != nil || v != "OK" {
7632+
t.Fatalf("unexpected response %v %v", v, err)
7633+
}
7634+
})
7635+
7636+
t.Run("DoMulti ConnLifetime - at the head of processing", func(t *testing.T) {
7637+
client, m := setup()
7638+
var attempts int64
7639+
m.DoMultiFn = func(multi ...Completed) *redisresults {
7640+
if atomic.AddInt64(&attempts, 1) == 1 {
7641+
return &redisresults{s: []RedisResult{newErrResult(errConnExpired)}}
7642+
}
7643+
return &redisresults{s: []RedisResult{newResult(strmsg('+', "OK"), nil)}}
7644+
}
7645+
if v, err := client.DoMulti(context.Background(), client.B().Get().Key("Do").Build())[0].ToString(); err != nil || v != "OK" {
7646+
t.Fatalf("unexpected response %v %v", v, err)
7647+
}
7648+
})
7649+
7650+
t.Run("DoMulti ConnLifetime - in the middle of processing", func(t *testing.T) {
7651+
client, m := setup()
7652+
var attempts int64
7653+
m.DoMultiFn = func(multi ...Completed) *redisresults {
7654+
if atomic.AddInt64(&attempts, 1) == 1 {
7655+
return &redisresults{s: []RedisResult{newResult(strmsg('+', "OK"), nil), newErrResult(errConnExpired)}}
7656+
}
7657+
// recover the failure of the first call
7658+
return &redisresults{s: []RedisResult{newResult(strmsg('+', "OK"), nil)}}
7659+
}
7660+
resps := client.DoMulti(context.Background(), client.B().Get().Key("Do").Build(), client.B().Get().Key("Do").Build())
7661+
if len(resps) != 2 {
7662+
t.Errorf("unexpected response length %v", len(resps))
7663+
}
7664+
for _, resp := range resps {
7665+
if v, err := resp.ToString(); err != nil || v != "OK" {
7666+
t.Fatalf("unexpected response %v %v", v, err)
7667+
}
7668+
}
7669+
})
7670+
7671+
t.Run("DoMultiCache ConnLifetime - at the head of processing", func(t *testing.T) {
7672+
client, m := setup()
7673+
var attempts int64
7674+
m.DoMultiCacheFn = func(multi ...CacheableTTL) *redisresults {
7675+
if atomic.AddInt64(&attempts, 1) == 1 {
7676+
return &redisresults{s: []RedisResult{newErrResult(errConnExpired)}}
7677+
}
7678+
// recover the failure of the first call
7679+
return &redisresults{s: []RedisResult{newResult(strmsg('+', "OK"), nil)}}
7680+
}
7681+
if v, err := client.DoMultiCache(context.Background(), CT(client.B().Get().Key("Do").Cache(), 0))[0].ToString(); err != nil || v != "OK" {
7682+
t.Fatalf("unexpected response %v %v", v, err)
7683+
}
7684+
})
7685+
7686+
t.Run("DoMultiCache ConnLifetime in the middle of processing", func(t *testing.T) {
7687+
client, m := setup()
7688+
var attempts int64
7689+
m.DoMultiCacheFn = func(multi ...CacheableTTL) *redisresults {
7690+
if atomic.AddInt64(&attempts, 1) == 1 {
7691+
return &redisresults{s: []RedisResult{newResult(strmsg('+', "OK"), nil), newErrResult(errConnExpired)}}
7692+
}
7693+
// recover the failure of the first call
7694+
return &redisresults{s: []RedisResult{newResult(strmsg('+', "OK"), nil)}}
7695+
}
7696+
resps := client.DoMultiCache(context.Background(), CT(client.B().Get().Key("Do").Cache(), 0), CT(client.B().Get().Key("Do").Cache(), 0))
7697+
if len(resps) != 2 {
7698+
t.Errorf("unexpected response length %v", len(resps))
7699+
}
7700+
for _, resp := range resps {
7701+
if v, err := resp.ToString(); err != nil || v != "OK" {
7702+
t.Fatalf("unexpected response %v %v", v, err)
7703+
}
7704+
}
7705+
})
7706+
7707+
t.Run("Receive ConnLifetime", func(t *testing.T) {
7708+
client, m := setup()
7709+
c := client.B().Subscribe().Channel("ch").Build()
7710+
hdl := func(message PubSubMessage) {}
7711+
var attempts int64
7712+
m.ReceiveFn = func(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error {
7713+
if atomic.AddInt64(&attempts, 1) == 1 {
7714+
return errConnExpired
7715+
}
7716+
return nil
7717+
}
7718+
if err := client.Receive(context.Background(), c, hdl); err != nil {
7719+
t.Fatalf("unexpected response %v", err)
7720+
}
7721+
})
7722+
}

0 commit comments

Comments
 (0)