Skip to content

Use master / another slave node when current node is failing #1056

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 47 additions & 49 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ type clusterNode struct {

latency uint32 // atomic
generation uint32 // atomic
loading uint32 // atomic
failing uint32 // atomic
}

func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode {
Expand Down Expand Up @@ -203,21 +203,21 @@ func (n *clusterNode) Latency() time.Duration {
return time.Duration(latency) * time.Microsecond
}

func (n *clusterNode) MarkAsLoading() {
atomic.StoreUint32(&n.loading, uint32(time.Now().Unix()))
func (n *clusterNode) MarkAsFailing() {
atomic.StoreUint32(&n.failing, uint32(time.Now().Unix()))
}

func (n *clusterNode) Loading() bool {
const minute = int64(time.Minute / time.Second)
func (n *clusterNode) Failing() bool {
const timeout = 15 // 15 seconds

loading := atomic.LoadUint32(&n.loading)
if loading == 0 {
failing := atomic.LoadUint32(&n.failing)
if failing == 0 {
return false
}
if time.Now().Unix()-int64(loading) < minute {
if time.Now().Unix()-int64(failing) < timeout {
return true
}
atomic.StoreUint32(&n.loading, 0)
atomic.StoreUint32(&n.failing, 0)
return false
}

Expand Down Expand Up @@ -522,7 +522,7 @@ func (c *clusterState) slotSlaveNode(slot int) (*clusterNode, error) {
case 1:
return nodes[0], nil
case 2:
if slave := nodes[1]; !slave.Loading() {
if slave := nodes[1]; !slave.Failing() {
return slave, nil
}
return nodes[0], nil
Expand All @@ -531,7 +531,7 @@ func (c *clusterState) slotSlaveNode(slot int) (*clusterNode, error) {
for i := 0; i < 10; i++ {
n := rand.Intn(len(nodes)-1) + 1
slave = nodes[n]
if !slave.Loading() {
if !slave.Failing() {
return slave, nil
}
}
Expand All @@ -551,7 +551,7 @@ func (c *clusterState) slotClosestNode(slot int) (*clusterNode, error) {

var node *clusterNode
for _, n := range nodes {
if n.Loading() {
if n.Failing() {
continue
}
if node == nil || node.Latency()-n.Latency() > threshold {
Expand All @@ -561,10 +561,13 @@ func (c *clusterState) slotClosestNode(slot int) (*clusterNode, error) {
return node, nil
}

func (c *clusterState) slotRandomNode(slot int) *clusterNode {
func (c *clusterState) slotRandomNode(slot int) (*clusterNode, error) {
nodes := c.slotNodes(slot)
if len(nodes) == 0 {
return c.nodes.Random()
}
n := rand.Intn(len(nodes))
return nodes[n]
return nodes[n], nil
}

func (c *clusterState) slotNodes(slot int) []*clusterNode {
Expand Down Expand Up @@ -742,23 +745,26 @@ func (c *ClusterClient) ProcessContext(ctx context.Context, cmd Cmder) error {
}

func (c *ClusterClient) process(ctx context.Context, cmd Cmder) error {
cmdInfo := c.cmdInfo(cmd.Name())
slot := c.cmdSlot(cmd)

var node *clusterNode
var ask bool
for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
var err error

if attempt > 0 {
time.Sleep(c.retryBackoff(attempt))
}

if node == nil {
var err error
_, node, err = c.cmdSlotAndNode(cmd)
node, err = c.cmdNode(cmdInfo, slot)
if err != nil {
cmd.setErr(err)
break
}
}

var err error
if ask {
pipe := node.Client.Pipeline()
_ = pipe.Process(NewCmd("ASKING"))
Expand All @@ -780,7 +786,7 @@ func (c *ClusterClient) process(ctx context.Context, cmd Cmder) error {

// If slave is loading - pick another node.
if c.opt.ReadOnly && internal.IsLoadingError(err) {
node.MarkAsLoading()
node.MarkAsFailing()
node = nil
continue
}
Expand All @@ -807,11 +813,9 @@ func (c *ClusterClient) process(ctx context.Context, cmd Cmder) error {
continue
}

// Second try random node.
node, err = c.nodes.Random()
if err != nil {
break
}
// Second try another node.
node.MarkAsFailing()
node = nil
continue
}

Expand Down Expand Up @@ -1100,17 +1104,20 @@ func (c *ClusterClient) mapCmdsByNode(cmds []Cmder, cmdsMap *cmdsMap) error {

cmdsAreReadOnly := c.opt.ReadOnly && c.cmdsAreReadOnly(cmds)
for _, cmd := range cmds {
slot := c.cmdSlot(cmd)

var node *clusterNode
var err error
if cmdsAreReadOnly {
_, node, err = c.cmdSlotAndNode(cmd)
cmdInfo := c.cmdInfo(cmd.Name())
node, err = c.cmdNode(cmdInfo, slot)
} else {
slot := c.cmdSlot(cmd)
node, err = state.slotMasterNode(slot)
}
if err != nil {
return err
}

cmdsMap.mu.Lock()
cmdsMap.m[node] = append(cmdsMap.m[node], cmd)
cmdsMap.mu.Unlock()
Expand Down Expand Up @@ -1162,7 +1169,7 @@ func (c *ClusterClient) pipelineReadCmds(
}

if c.opt.ReadOnly && internal.IsLoadingError(err) {
node.MarkAsLoading()
node.MarkAsFailing()
} else if internal.IsRedisError(err) {
continue
}
Expand Down Expand Up @@ -1529,14 +1536,6 @@ func (c *ClusterClient) cmdInfo(name string) *CommandInfo {
return info
}

func cmdSlot(cmd Cmder, pos int) int {
if pos == 0 {
return hashtag.RandomSlot()
}
firstKey := cmd.stringArg(pos)
return hashtag.Slot(firstKey)
}

func (c *ClusterClient) cmdSlot(cmd Cmder) int {
args := cmd.Args()
if args[0] == "cluster" && args[1] == "getkeysinslot" {
Expand All @@ -1547,32 +1546,31 @@ func (c *ClusterClient) cmdSlot(cmd Cmder) int {
return cmdSlot(cmd, cmdFirstKeyPos(cmd, cmdInfo))
}

func (c *ClusterClient) cmdSlotAndNode(cmd Cmder) (int, *clusterNode, error) {
func cmdSlot(cmd Cmder, pos int) int {
if pos == 0 {
return hashtag.RandomSlot()
}
firstKey := cmd.stringArg(pos)
return hashtag.Slot(firstKey)
}

func (c *ClusterClient) cmdNode(cmdInfo *CommandInfo, slot int) (*clusterNode, error) {
state, err := c.state.Get()
if err != nil {
return 0, nil, err
return nil, err
}

cmdInfo := c.cmdInfo(cmd.Name())
slot := c.cmdSlot(cmd)

if c.opt.ReadOnly && cmdInfo != nil && cmdInfo.ReadOnly {
if c.opt.RouteByLatency {
node, err := state.slotClosestNode(slot)
return slot, node, err
return state.slotClosestNode(slot)
}

if c.opt.RouteRandomly {
node := state.slotRandomNode(slot)
return slot, node, nil
return state.slotRandomNode(slot)
}

node, err := state.slotSlaveNode(slot)
return slot, node, err
return state.slotSlaveNode(slot)
}

node, err := state.slotMasterNode(slot)
return slot, node, err
return state.slotMasterNode(slot)
}

func (c *ClusterClient) slotMasterNode(slot int) (*clusterNode, error) {
Expand Down
2 changes: 2 additions & 0 deletions ring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,8 @@ var _ = Describe("Ring watch", func() {
})

ring.ForEachShard(func(cl *redis.Client) error {
defer GinkgoRecover()

pool := cl.Pool()
Expect(pool.Len()).To(BeNumerically("<=", 10))
Expect(pool.IdleLen()).To(BeNumerically("<=", 10))
Expand Down