Skip to content

Commit 425f2fc

Browse files
committed
fix: use slot node id to detect node re-configuration
1 parent 5072031 commit 425f2fc

File tree

1 file changed

+41
-19
lines changed

1 file changed

+41
-19
lines changed

cluster.go

Lines changed: 41 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -170,17 +170,19 @@ func (opt *ClusterOptions) clientOptions() *Options {
170170
//------------------------------------------------------------------------------
171171

172172
type clusterNode struct {
173+
id string
173174
Client *Client
174175

175176
latency uint32 // atomic
176177
generation uint32 // atomic
177178
failing uint32 // atomic
178179
}
179180

180-
func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode {
181+
func newClusterNode(clOpt *ClusterOptions, id, addr string) *clusterNode {
181182
opt := clOpt.clientOptions()
182183
opt.Addr = addr
183184
node := clusterNode{
185+
id: id,
184186
Client: clOpt.NewClient(opt),
185187
}
186188

@@ -352,33 +354,51 @@ func (c *clusterNodes) GC(generation uint32) {
352354
}
353355
}
354356

355-
func (c *clusterNodes) Get(addr string) (*clusterNode, error) {
357+
func (c *clusterNodes) GetOrCreate(addr string) (*clusterNode, error) {
358+
return c.GetOrCreateWithID(addr, "")
359+
}
360+
361+
func (c *clusterNodes) GetOrCreateWithID(addr, id string) (*clusterNode, error) {
356362
node, err := c.get(addr)
357363
if err != nil {
358364
return nil, err
359365
}
360-
if node != nil {
366+
if node != nil && (id == "" || node.id == id) {
361367
return node, nil
362368
}
363369

364370
c.mu.Lock()
365-
defer c.mu.Unlock()
371+
node, oldNode, err := c.getOrCreate(addr, id)
372+
c.mu.Unlock()
366373

374+
if err != nil {
375+
return nil, err
376+
}
377+
if oldNode != nil {
378+
_ = oldNode.Client.Close()
379+
}
380+
return node, nil
381+
}
382+
383+
func (c *clusterNodes) getOrCreate(addr, id string) (node, oldNode *clusterNode, _ error) {
367384
if c.closed {
368-
return nil, pool.ErrClosed
385+
return nil, nil, pool.ErrClosed
369386
}
370387

371-
node, ok := c.nodes[addr]
388+
oldNode, ok := c.nodes[addr]
372389
if ok {
373-
return node, nil
390+
// The id is changed when node is re-configured, for example, IP addr is changed.
391+
if id == "" || oldNode.id == id {
392+
return oldNode, nil, nil
393+
}
394+
} else {
395+
c.addrs = appendIfNotExists(c.addrs, addr)
374396
}
375397

376-
node = newClusterNode(c.opt, addr)
377-
378-
c.addrs = appendIfNotExists(c.addrs, addr)
398+
node = newClusterNode(c.opt, id, addr)
379399
c.nodes[addr] = node
380400

381-
return node, nil
401+
return node, oldNode, nil
382402
}
383403

384404
func (c *clusterNodes) get(addr string) (*clusterNode, error) {
@@ -416,7 +436,7 @@ func (c *clusterNodes) Random() (*clusterNode, error) {
416436
}
417437

418438
n := rand.Intn(len(addrs))
419-
return c.Get(addrs[n])
439+
return c.GetOrCreate(addrs[n])
420440
}
421441

422442
//------------------------------------------------------------------------------
@@ -474,7 +494,7 @@ func newClusterState(
474494
addr = replaceLoopbackHost(addr, originHost)
475495
}
476496

477-
node, err := c.nodes.Get(addr)
497+
node, err := c.nodes.GetOrCreateWithID(addr, slotNode.ID)
478498
if err != nil {
479499
return nil, err
480500
}
@@ -824,8 +844,10 @@ func (c *ClusterClient) process(ctx context.Context, cmd Cmder) error {
824844
var addr string
825845
moved, ask, addr = isMovedError(lastErr)
826846
if moved || ask {
847+
c.state.LazyReload()
848+
827849
var err error
828-
node, err = c.nodes.Get(addr)
850+
node, err = c.nodes.GetOrCreate(addr)
829851
if err != nil {
830852
return err
831853
}
@@ -1022,7 +1044,7 @@ func (c *ClusterClient) loadState(ctx context.Context) (*clusterState, error) {
10221044
for _, idx := range rand.Perm(len(addrs)) {
10231045
addr := addrs[idx]
10241046

1025-
node, err := c.nodes.Get(addr)
1047+
node, err := c.nodes.GetOrCreate(addr)
10261048
if err != nil {
10271049
if firstErr == nil {
10281050
firstErr = err
@@ -1236,7 +1258,7 @@ func (c *ClusterClient) checkMovedErr(
12361258
return false
12371259
}
12381260

1239-
node, err := c.nodes.Get(addr)
1261+
node, err := c.nodes.GetOrCreate(addr)
12401262
if err != nil {
12411263
return false
12421264
}
@@ -1422,7 +1444,7 @@ func (c *ClusterClient) cmdsMoved(
14221444
addr string,
14231445
failedCmds *cmdsMap,
14241446
) error {
1425-
node, err := c.nodes.Get(addr)
1447+
node, err := c.nodes.GetOrCreate(addr)
14261448
if err != nil {
14271449
return err
14281450
}
@@ -1477,7 +1499,7 @@ func (c *ClusterClient) Watch(ctx context.Context, fn func(*Tx) error, keys ...s
14771499

14781500
moved, ask, addr := isMovedError(err)
14791501
if moved || ask {
1480-
node, err = c.nodes.Get(addr)
1502+
node, err = c.nodes.GetOrCreate(addr)
14811503
if err != nil {
14821504
return err
14831505
}
@@ -1589,7 +1611,7 @@ func (c *ClusterClient) cmdsInfo(ctx context.Context) (map[string]*CommandInfo,
15891611
for _, idx := range perm {
15901612
addr := addrs[idx]
15911613

1592-
node, err := c.nodes.Get(addr)
1614+
node, err := c.nodes.GetOrCreate(addr)
15931615
if err != nil {
15941616
if firstErr == nil {
15951617
firstErr = err

0 commit comments

Comments
 (0)