Skip to content

Commit 1cc1ba9

Browse files
committed
Use master / another slave node when current node is failing
1 parent aa5f492 commit 1cc1ba9

File tree

1 file changed

+47
-49
lines changed

1 file changed

+47
-49
lines changed

cluster.go

Lines changed: 47 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ type clusterNode struct {
155155

156156
latency uint32 // atomic
157157
generation uint32 // atomic
158-
loading uint32 // atomic
158+
failing uint32 // atomic
159159
}
160160

161161
func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode {
@@ -203,21 +203,21 @@ func (n *clusterNode) Latency() time.Duration {
203203
return time.Duration(latency) * time.Microsecond
204204
}
205205

206-
func (n *clusterNode) MarkAsLoading() {
207-
atomic.StoreUint32(&n.loading, uint32(time.Now().Unix()))
206+
func (n *clusterNode) MarkAsFailing() {
207+
atomic.StoreUint32(&n.failing, uint32(time.Now().Unix()))
208208
}
209209

210-
func (n *clusterNode) Loading() bool {
211-
const minute = int64(time.Minute / time.Second)
210+
func (n *clusterNode) Failing() bool {
211+
const timeout = 15 // 15 seconds
212212

213-
loading := atomic.LoadUint32(&n.loading)
214-
if loading == 0 {
213+
failing := atomic.LoadUint32(&n.failing)
214+
if failing == 0 {
215215
return false
216216
}
217-
if time.Now().Unix()-int64(loading) < minute {
217+
if time.Now().Unix()-int64(failing) < timeout {
218218
return true
219219
}
220-
atomic.StoreUint32(&n.loading, 0)
220+
atomic.StoreUint32(&n.failing, 0)
221221
return false
222222
}
223223

@@ -522,7 +522,7 @@ func (c *clusterState) slotSlaveNode(slot int) (*clusterNode, error) {
522522
case 1:
523523
return nodes[0], nil
524524
case 2:
525-
if slave := nodes[1]; !slave.Loading() {
525+
if slave := nodes[1]; !slave.Failing() {
526526
return slave, nil
527527
}
528528
return nodes[0], nil
@@ -531,7 +531,7 @@ func (c *clusterState) slotSlaveNode(slot int) (*clusterNode, error) {
531531
for i := 0; i < 10; i++ {
532532
n := rand.Intn(len(nodes)-1) + 1
533533
slave = nodes[n]
534-
if !slave.Loading() {
534+
if !slave.Failing() {
535535
return slave, nil
536536
}
537537
}
@@ -551,7 +551,7 @@ func (c *clusterState) slotClosestNode(slot int) (*clusterNode, error) {
551551

552552
var node *clusterNode
553553
for _, n := range nodes {
554-
if n.Loading() {
554+
if n.Failing() {
555555
continue
556556
}
557557
if node == nil || node.Latency()-n.Latency() > threshold {
@@ -561,10 +561,13 @@ func (c *clusterState) slotClosestNode(slot int) (*clusterNode, error) {
561561
return node, nil
562562
}
563563

564-
func (c *clusterState) slotRandomNode(slot int) *clusterNode {
564+
func (c *clusterState) slotRandomNode(slot int) (*clusterNode, error) {
565565
nodes := c.slotNodes(slot)
566+
if len(nodes) == 0 {
567+
return c.nodes.Random()
568+
}
566569
n := rand.Intn(len(nodes))
567-
return nodes[n]
570+
return nodes[n], nil
568571
}
569572

570573
func (c *clusterState) slotNodes(slot int) []*clusterNode {
@@ -742,23 +745,26 @@ func (c *ClusterClient) ProcessContext(ctx context.Context, cmd Cmder) error {
742745
}
743746

744747
func (c *ClusterClient) process(ctx context.Context, cmd Cmder) error {
748+
cmdInfo := c.cmdInfo(cmd.Name())
749+
slot := c.cmdSlot(cmd)
750+
745751
var node *clusterNode
746752
var ask bool
747753
for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
754+
var err error
755+
748756
if attempt > 0 {
749757
time.Sleep(c.retryBackoff(attempt))
750758
}
751759

752760
if node == nil {
753-
var err error
754-
_, node, err = c.cmdSlotAndNode(cmd)
761+
node, err = c.cmdNode(cmdInfo, slot)
755762
if err != nil {
756763
cmd.setErr(err)
757764
break
758765
}
759766
}
760767

761-
var err error
762768
if ask {
763769
pipe := node.Client.Pipeline()
764770
_ = pipe.Process(NewCmd("ASKING"))
@@ -780,7 +786,7 @@ func (c *ClusterClient) process(ctx context.Context, cmd Cmder) error {
780786

781787
// If slave is loading - pick another node.
782788
if c.opt.ReadOnly && internal.IsLoadingError(err) {
783-
node.MarkAsLoading()
789+
node.MarkAsFailing()
784790
node = nil
785791
continue
786792
}
@@ -807,11 +813,9 @@ func (c *ClusterClient) process(ctx context.Context, cmd Cmder) error {
807813
continue
808814
}
809815

810-
// Second try random node.
811-
node, err = c.nodes.Random()
812-
if err != nil {
813-
break
814-
}
816+
// Second try another node.
817+
node.MarkAsFailing()
818+
node = nil
815819
continue
816820
}
817821

@@ -1100,17 +1104,20 @@ func (c *ClusterClient) mapCmdsByNode(cmds []Cmder, cmdsMap *cmdsMap) error {
11001104

11011105
cmdsAreReadOnly := c.opt.ReadOnly && c.cmdsAreReadOnly(cmds)
11021106
for _, cmd := range cmds {
1107+
slot := c.cmdSlot(cmd)
1108+
11031109
var node *clusterNode
11041110
var err error
11051111
if cmdsAreReadOnly {
1106-
_, node, err = c.cmdSlotAndNode(cmd)
1112+
cmdInfo := c.cmdInfo(cmd.Name())
1113+
node, err = c.cmdNode(cmdInfo, slot)
11071114
} else {
1108-
slot := c.cmdSlot(cmd)
11091115
node, err = state.slotMasterNode(slot)
11101116
}
11111117
if err != nil {
11121118
return err
11131119
}
1120+
11141121
cmdsMap.mu.Lock()
11151122
cmdsMap.m[node] = append(cmdsMap.m[node], cmd)
11161123
cmdsMap.mu.Unlock()
@@ -1162,7 +1169,7 @@ func (c *ClusterClient) pipelineReadCmds(
11621169
}
11631170

11641171
if c.opt.ReadOnly && internal.IsLoadingError(err) {
1165-
node.MarkAsLoading()
1172+
node.MarkAsFailing()
11661173
} else if internal.IsRedisError(err) {
11671174
continue
11681175
}
@@ -1529,14 +1536,6 @@ func (c *ClusterClient) cmdInfo(name string) *CommandInfo {
15291536
return info
15301537
}
15311538

1532-
func cmdSlot(cmd Cmder, pos int) int {
1533-
if pos == 0 {
1534-
return hashtag.RandomSlot()
1535-
}
1536-
firstKey := cmd.stringArg(pos)
1537-
return hashtag.Slot(firstKey)
1538-
}
1539-
15401539
func (c *ClusterClient) cmdSlot(cmd Cmder) int {
15411540
args := cmd.Args()
15421541
if args[0] == "cluster" && args[1] == "getkeysinslot" {
@@ -1547,32 +1546,31 @@ func (c *ClusterClient) cmdSlot(cmd Cmder) int {
15471546
return cmdSlot(cmd, cmdFirstKeyPos(cmd, cmdInfo))
15481547
}
15491548

1550-
func (c *ClusterClient) cmdSlotAndNode(cmd Cmder) (int, *clusterNode, error) {
1549+
func cmdSlot(cmd Cmder, pos int) int {
1550+
if pos == 0 {
1551+
return hashtag.RandomSlot()
1552+
}
1553+
firstKey := cmd.stringArg(pos)
1554+
return hashtag.Slot(firstKey)
1555+
}
1556+
1557+
func (c *ClusterClient) cmdNode(cmdInfo *CommandInfo, slot int) (*clusterNode, error) {
15511558
state, err := c.state.Get()
15521559
if err != nil {
1553-
return 0, nil, err
1560+
return nil, err
15541561
}
15551562

1556-
cmdInfo := c.cmdInfo(cmd.Name())
1557-
slot := c.cmdSlot(cmd)
1558-
15591563
if c.opt.ReadOnly && cmdInfo != nil && cmdInfo.ReadOnly {
15601564
if c.opt.RouteByLatency {
1561-
node, err := state.slotClosestNode(slot)
1562-
return slot, node, err
1565+
return state.slotClosestNode(slot)
15631566
}
1564-
15651567
if c.opt.RouteRandomly {
1566-
node := state.slotRandomNode(slot)
1567-
return slot, node, nil
1568+
return state.slotRandomNode(slot)
15681569
}
1569-
1570-
node, err := state.slotSlaveNode(slot)
1571-
return slot, node, err
1570+
return state.slotSlaveNode(slot)
15721571
}
15731572

1574-
node, err := state.slotMasterNode(slot)
1575-
return slot, node, err
1573+
return state.slotMasterNode(slot)
15761574
}
15771575

15781576
func (c *ClusterClient) slotMasterNode(slot int) (*clusterNode, error) {

0 commit comments

Comments
 (0)