Skip to content

Commit 2238fd7

Browse files
committed
Add connection lifetime to sentinel client
1 parent 3c7632f commit 2238fd7

File tree

2 files changed

+203
-2
lines changed

2 files changed

+203
-2
lines changed

sentinel.go

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ func newSentinelClient(opt *ClientOption, connFn connFn, retryer retryHandler) (
2525
sentinels: list.New(),
2626
retry: !opt.DisableRetry,
2727
retryHandler: retryer,
28+
hasLftm: opt.ConnLifetime > 0,
2829
replica: opt.ReplicaOnly,
2930
}
3031

@@ -55,6 +56,7 @@ type sentinelClient struct {
5556
stop uint32
5657
cmd Builder
5758
retry bool
59+
hasLftm bool
5860
replica bool
5961
}
6062

@@ -66,6 +68,9 @@ func (c *sentinelClient) Do(ctx context.Context, cmd Completed) (resp RedisResul
6668
attempts := 1
6769
retry:
6870
resp = c.mConn.Load().(conn).Do(ctx, cmd)
71+
if resp.Error() == errConnExpired {
72+
goto retry
73+
}
6974
if c.retry && cmd.IsReadOnly() && c.isRetryable(resp.Error(), ctx) {
7075
shouldRetry := c.retryHandler.WaitOrSkipRetry(
7176
ctx, attempts, cmd, resp.Error(),
@@ -88,7 +93,24 @@ func (c *sentinelClient) DoMulti(ctx context.Context, multi ...Completed) []Redi
8893

8994
attempts := 1
9095
retry:
91-
resps := c.mConn.Load().(conn).DoMulti(ctx, multi...)
96+
cc := c.mConn.Load().(conn)
97+
resps := cc.DoMulti(ctx, multi...)
98+
if c.hasLftm {
99+
var ml []Completed
100+
recover:
101+
ml = ml[:0]
102+
for i, resp := range resps.s {
103+
if resp.Error() == errConnExpired {
104+
ml = multi[i:]
105+
break
106+
}
107+
}
108+
if len(ml) > 0 {
109+
rs := cc.DoMulti(ctx, ml...).s
110+
resps.s = append(resps.s[:len(resps.s)-len(rs)], rs...)
111+
goto recover
112+
}
113+
}
92114
if c.retry && allReadOnly(multi) {
93115
for i, resp := range resps.s {
94116
if c.isRetryable(resp.Error(), ctx) {
@@ -115,6 +137,9 @@ func (c *sentinelClient) DoCache(ctx context.Context, cmd Cacheable, ttl time.Du
115137
attempts := 1
116138
retry:
117139
resp = c.mConn.Load().(conn).DoCache(ctx, cmd, ttl)
140+
if resp.Error() == errConnExpired {
141+
goto retry
142+
}
118143
if c.retry && c.isRetryable(resp.Error(), ctx) {
119144
shouldRetry := c.retryHandler.WaitOrSkipRetry(ctx, attempts, Completed(cmd), resp.Error())
120145
if shouldRetry {
@@ -135,7 +160,24 @@ func (c *sentinelClient) DoMultiCache(ctx context.Context, multi ...CacheableTTL
135160
}
136161
attempts := 1
137162
retry:
138-
resps := c.mConn.Load().(conn).DoMultiCache(ctx, multi...)
163+
cc := c.mConn.Load().(conn)
164+
resps := cc.DoMultiCache(ctx, multi...)
165+
if c.hasLftm {
166+
var ml []CacheableTTL
167+
recover:
168+
ml = ml[:0]
169+
for i, resp := range resps.s {
170+
if resp.Error() == errConnExpired {
171+
ml = multi[i:]
172+
break
173+
}
174+
}
175+
if len(ml) > 0 {
176+
rs := cc.DoMultiCache(ctx, ml...).s
177+
resps.s = append(resps.s[:len(resps.s)-len(rs)], rs...)
178+
goto recover
179+
}
180+
}
139181
if c.retry {
140182
for i, resp := range resps.s {
141183
if c.isRetryable(resp.Error(), ctx) {
@@ -162,6 +204,9 @@ func (c *sentinelClient) Receive(ctx context.Context, subscribe Completed, fn fu
162204
attempts := 1
163205
retry:
164206
err = c.mConn.Load().(conn).Receive(ctx, subscribe, fn)
207+
if err == errConnExpired {
208+
goto retry
209+
}
165210
if c.retry {
166211
if _, ok := err.(*RedisError); !ok && c.isRetryable(err, ctx) {
167212
shouldRetry := c.retryHandler.WaitOrSkipRetry(

sentinel_test.go

Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1749,3 +1749,159 @@ func TestSentinelClientLoadingRetry(t *testing.T) {
17491749
}
17501750
})
17511751
}
1752+
1753+
func TestSentinelClientConnLifetime(t *testing.T) {
1754+
defer ShouldNotLeaked(SetupLeakDetection())
1755+
1756+
setup := func() (*sentinelClient, *mockConn, *mockConn) {
1757+
s0 := &mockConn{
1758+
DoFn: func(cmd Completed) RedisResult { return RedisResult{} },
1759+
DoMultiFn: func(multi ...Completed) *redisresults {
1760+
return &redisresults{s: []RedisResult{
1761+
{val: slicemsg('*', []RedisMessage{})},
1762+
{val: slicemsg('*', []RedisMessage{
1763+
strmsg('+', ""), strmsg('+', "1"),
1764+
})},
1765+
}}
1766+
},
1767+
}
1768+
m1 := &mockConn{
1769+
DoFn: func(cmd Completed) RedisResult {
1770+
if cmd == cmds.RoleCmd {
1771+
return RedisResult{val: slicemsg('*', []RedisMessage{strmsg('+', "master")})}
1772+
}
1773+
return RedisResult{}
1774+
},
1775+
AddrFn: func() string { return ":1" },
1776+
}
1777+
client, err := newSentinelClient(
1778+
&ClientOption{InitAddress: []string{":0"}, ConnLifetime: 1 * time.Second},
1779+
func(dst string, opt *ClientOption) conn {
1780+
if dst == ":0" {
1781+
return s0
1782+
}
1783+
if dst == ":1" {
1784+
return m1
1785+
}
1786+
return nil
1787+
},
1788+
newRetryer(defaultRetryDelayFn),
1789+
)
1790+
if err != nil {
1791+
t.Fatalf("unexpected err %v", err)
1792+
}
1793+
return client, s0, m1
1794+
}
1795+
1796+
t.Run("Do ConnLifetime", func(t *testing.T) {
1797+
client, _, m := setup()
1798+
var attempts int64
1799+
m.DoFn = func(cmd Completed) RedisResult {
1800+
if atomic.AddInt64(&attempts, 1) == 1 {
1801+
return newErrResult(errConnExpired)
1802+
}
1803+
return newResult(strmsg('+', "OK"), nil)
1804+
}
1805+
if v, err := client.Do(context.Background(), client.B().Get().Key("Do").Build()).ToString(); err != nil || v != "OK" {
1806+
t.Fatalf("unexpected response %v %v", v, err)
1807+
}
1808+
})
1809+
1810+
t.Run("DoCache ConnLifetime", func(t *testing.T) {
1811+
client, _, m := setup()
1812+
var attempts int64
1813+
m.DoCacheFn = func(cmd Cacheable, ttl time.Duration) RedisResult {
1814+
if atomic.AddInt64(&attempts, 1) == 1 {
1815+
return newErrResult(errConnExpired)
1816+
}
1817+
return newResult(strmsg('+', "OK"), nil)
1818+
}
1819+
if v, err := client.DoCache(context.Background(), client.B().Get().Key("Do").Cache(), 0).ToString(); err != nil || v != "OK" {
1820+
t.Fatalf("unexpected response %v %v", v, err)
1821+
}
1822+
})
1823+
1824+
t.Run("DoMulti ConnLifetime - at the head of processing", func(t *testing.T) {
1825+
client, _, m := setup()
1826+
var attempts int64
1827+
m.DoMultiFn = func(multi ...Completed) *redisresults {
1828+
if atomic.AddInt64(&attempts, 1) == 1 {
1829+
return &redisresults{s: []RedisResult{newErrResult(errConnExpired)}}
1830+
}
1831+
return &redisresults{s: []RedisResult{newResult(strmsg('+', "OK"), nil)}}
1832+
}
1833+
if v, err := client.DoMulti(context.Background(), client.B().Get().Key("Do").Build())[0].ToString(); err != nil || v != "OK" {
1834+
t.Fatalf("unexpected response %v %v", v, err)
1835+
}
1836+
})
1837+
1838+
t.Run("DoMulti ConnLifetime - in the middle of processing", func(t *testing.T) {
1839+
client, _, m := setup()
1840+
var attempts int64
1841+
m.DoMultiFn = func(multi ...Completed) *redisresults {
1842+
if atomic.AddInt64(&attempts, 1) == 1 {
1843+
return &redisresults{s: []RedisResult{newResult(strmsg('+', "OK"), nil), newErrResult(errConnExpired)}}
1844+
}
1845+
return &redisresults{s: []RedisResult{newResult(strmsg('+', "OK"), nil)}}
1846+
}
1847+
resps := client.DoMulti(context.Background(), client.B().Get().Key("Do").Build(), client.B().Get().Key("Do").Build())
1848+
if len(resps) != 2 {
1849+
t.Errorf("unexpected response length %v", len(resps))
1850+
}
1851+
for _, resp := range resps {
1852+
if v, err := resp.ToString(); err != nil || v != "OK" {
1853+
t.Fatalf("unexpected response %v %v", v, err)
1854+
}
1855+
}
1856+
})
1857+
1858+
t.Run("DoMultiCache ConnLifetime - at the head of processing", func(t *testing.T) {
1859+
client, _, m := setup()
1860+
var attempts int64
1861+
m.DoMultiCacheFn = func(multi ...CacheableTTL) *redisresults {
1862+
if atomic.AddInt64(&attempts, 1) == 1 {
1863+
return &redisresults{s: []RedisResult{newErrResult(errConnExpired)}}
1864+
}
1865+
return &redisresults{s: []RedisResult{newResult(strmsg('+', "OK"), nil)}}
1866+
}
1867+
if v, err := client.DoMultiCache(context.Background(), CT(client.B().Get().Key("Do").Cache(), 0))[0].ToString(); err != nil || v != "OK" {
1868+
t.Fatalf("unexpected response %v %v", v, err)
1869+
}
1870+
})
1871+
1872+
t.Run("DoMultiCache ConnLifetime - in the middle of processing", func(t *testing.T) {
1873+
client, _, m := setup()
1874+
var attempts int64
1875+
m.DoMultiCacheFn = func(multi ...CacheableTTL) *redisresults {
1876+
if atomic.AddInt64(&attempts, 1) == 1 {
1877+
return &redisresults{s: []RedisResult{newResult(strmsg('+', "OK"), nil), newErrResult(errConnExpired)}}
1878+
}
1879+
return &redisresults{s: []RedisResult{newResult(strmsg('+', "OK"), nil)}}
1880+
}
1881+
resps := client.DoMultiCache(context.Background(), CT(client.B().Get().Key("Do").Cache(), 0), CT(client.B().Get().Key("Do").Cache(), 0))
1882+
if len(resps) != 2 {
1883+
t.Errorf("unexpected response length %v", len(resps))
1884+
}
1885+
for _, resp := range resps {
1886+
if v, err := resp.ToString(); err != nil || v != "OK" {
1887+
t.Fatalf("unexpected response %v %v", v, err)
1888+
}
1889+
}
1890+
})
1891+
1892+
t.Run("Receive ConnLifetime", func(t *testing.T) {
1893+
client, _, m := setup()
1894+
c := client.B().Subscribe().Channel("ch").Build()
1895+
hdl := func(message PubSubMessage) {}
1896+
var attempts int64
1897+
m.ReceiveFn = func(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error {
1898+
if atomic.AddInt64(&attempts, 1) == 1 {
1899+
return errConnExpired
1900+
}
1901+
return nil
1902+
}
1903+
if err := client.Receive(context.Background(), c, hdl); err != nil {
1904+
t.Fatalf("unexpected response %v", err)
1905+
}
1906+
})
1907+
}

0 commit comments

Comments
 (0)