Skip to content

Commit c95006b

Browse files
committed
Resent client pool when sentinel switches master
1 parent 736d0f9 commit c95006b

File tree

6 files changed

+139
-146
lines changed

6 files changed

+139
-146
lines changed

internal/pool/pool.go

Lines changed: 26 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -140,47 +140,6 @@ func (p *ConnPool) lastDialError() error {
140140
return p._lastDialError.Load().(error)
141141
}
142142

143-
func (p *ConnPool) PopFree() *Conn {
144-
select {
145-
case p.queue <- struct{}{}:
146-
default:
147-
timer := timers.Get().(*time.Timer)
148-
timer.Reset(p.opt.PoolTimeout)
149-
150-
select {
151-
case p.queue <- struct{}{}:
152-
if !timer.Stop() {
153-
<-timer.C
154-
}
155-
timers.Put(timer)
156-
case <-timer.C:
157-
timers.Put(timer)
158-
atomic.AddUint32(&p.stats.Timeouts, 1)
159-
return nil
160-
}
161-
}
162-
163-
p.freeConnsMu.Lock()
164-
cn := p.popFree()
165-
p.freeConnsMu.Unlock()
166-
167-
if cn == nil {
168-
<-p.queue
169-
}
170-
return cn
171-
}
172-
173-
func (p *ConnPool) popFree() *Conn {
174-
if len(p.freeConns) == 0 {
175-
return nil
176-
}
177-
178-
idx := len(p.freeConns) - 1
179-
cn := p.freeConns[idx]
180-
p.freeConns = p.freeConns[:idx]
181-
return cn
182-
}
183-
184143
// Get returns existed connection from the pool or creates a new one.
185144
func (p *ConnPool) Get() (*Conn, bool, error) {
186145
if p.closed() {
@@ -235,6 +194,17 @@ func (p *ConnPool) Get() (*Conn, bool, error) {
235194
return newcn, true, nil
236195
}
237196

197+
func (p *ConnPool) popFree() *Conn {
198+
if len(p.freeConns) == 0 {
199+
return nil
200+
}
201+
202+
idx := len(p.freeConns) - 1
203+
cn := p.freeConns[idx]
204+
p.freeConns = p.freeConns[:idx]
205+
return cn
206+
}
207+
238208
func (p *ConnPool) Put(cn *Conn) error {
239209
if data := cn.Rd.PeekBuffered(); data != nil {
240210
internal.Logf("connection has unread data: %q", data)
@@ -303,17 +273,28 @@ func (p *ConnPool) closed() bool {
303273
return atomic.LoadUint32(&p._closed) == 1
304274
}
305275

276+
func (p *ConnPool) Filter(fn func(*Conn) bool) error {
277+
var firstErr error
278+
p.connsMu.Lock()
279+
for _, cn := range p.conns {
280+
if fn(cn) {
281+
if err := p.closeConn(cn); err != nil && firstErr == nil {
282+
firstErr = err
283+
}
284+
}
285+
}
286+
p.connsMu.Unlock()
287+
return firstErr
288+
}
289+
306290
func (p *ConnPool) Close() error {
307291
if !atomic.CompareAndSwapUint32(&p._closed, 0, 1) {
308292
return ErrClosed
309293
}
310294

311-
p.connsMu.Lock()
312295
var firstErr error
296+
p.connsMu.Lock()
313297
for _, cn := range p.conns {
314-
if cn == nil {
315-
continue
316-
}
317298
if err := p.closeConn(cn); err != nil && firstErr == nil {
318299
firstErr = err
319300
}

main_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package redis_test
33
import (
44
"errors"
55
"fmt"
6+
"log"
67
"net"
78
"os"
89
"os/exec"
@@ -50,6 +51,10 @@ var cluster = &clusterScenario{
5051
clients: make(map[string]*redis.Client, 6),
5152
}
5253

54+
func init() {
55+
redis.SetLogger(log.New(os.Stderr, "redis: ", log.LstdFlags|log.Lshortfile))
56+
}
57+
5358
var _ = BeforeSuite(func() {
5459
var err error
5560

pubsub.go

Lines changed: 38 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,11 @@ import (
1919
type PubSub struct {
2020
base baseClient
2121

22-
mu sync.Mutex
23-
cn *pool.Conn
24-
closed bool
25-
26-
subMu sync.Mutex
22+
mu sync.Mutex
23+
cn *pool.Conn
2724
channels []string
2825
patterns []string
26+
closed bool
2927

3028
cmd *Cmd
3129
}
@@ -64,9 +62,6 @@ func (c *PubSub) conn() (*pool.Conn, bool, error) {
6462
}
6563

6664
func (c *PubSub) resubscribe(cn *pool.Conn) error {
67-
c.subMu.Lock()
68-
defer c.subMu.Unlock()
69-
7065
var firstErr error
7166
if len(c.channels) > 0 {
7267
if err := c._subscribe(cn, "subscribe", c.channels...); err != nil && firstErr == nil {
@@ -81,6 +76,18 @@ func (c *PubSub) resubscribe(cn *pool.Conn) error {
8176
return firstErr
8277
}
8378

79+
func (c *PubSub) _subscribe(cn *pool.Conn, redisCmd string, channels ...string) error {
80+
args := make([]interface{}, 1+len(channels))
81+
args[0] = redisCmd
82+
for i, channel := range channels {
83+
args[1+i] = channel
84+
}
85+
cmd := NewSliceCmd(args...)
86+
87+
cn.SetWriteTimeout(c.base.opt.WriteTimeout)
88+
return writeCmd(cn, cmd)
89+
}
90+
8491
func (c *PubSub) putConn(cn *pool.Conn, err error) {
8592
if !internal.IsBadConn(err, true) {
8693
return
@@ -114,69 +121,57 @@ func (c *PubSub) Close() error {
114121
return nil
115122
}
116123

117-
func (c *PubSub) subscribe(redisCmd string, channels ...string) error {
118-
cn, isNew, err := c.conn()
119-
if err != nil {
120-
return err
121-
}
122-
123-
if isNew {
124-
return nil
125-
}
126-
127-
err = c._subscribe(cn, redisCmd, channels...)
128-
c.putConn(cn, err)
129-
return err
130-
}
131-
132-
func (c *PubSub) _subscribe(cn *pool.Conn, redisCmd string, channels ...string) error {
133-
args := make([]interface{}, 1+len(channels))
134-
args[0] = redisCmd
135-
for i, channel := range channels {
136-
args[1+i] = channel
137-
}
138-
cmd := NewSliceCmd(args...)
139-
140-
cn.SetWriteTimeout(c.base.opt.WriteTimeout)
141-
return writeCmd(cn, cmd)
142-
}
143-
144124
// Subscribes the client to the specified channels. It returns
145125
// empty subscription if there are no channels.
146126
func (c *PubSub) Subscribe(channels ...string) error {
147-
c.subMu.Lock()
127+
c.mu.Lock()
148128
c.channels = appendIfNotExists(c.channels, channels...)
149-
c.subMu.Unlock()
129+
c.mu.Unlock()
150130
return c.subscribe("subscribe", channels...)
151131
}
152132

153133
// Subscribes the client to the given patterns. It returns
154134
// empty subscription if there are no patterns.
155135
func (c *PubSub) PSubscribe(patterns ...string) error {
156-
c.subMu.Lock()
136+
c.mu.Lock()
157137
c.patterns = appendIfNotExists(c.patterns, patterns...)
158-
c.subMu.Unlock()
138+
c.mu.Unlock()
159139
return c.subscribe("psubscribe", patterns...)
160140
}
161141

162142
// Unsubscribes the client from the given channels, or from all of
163143
// them if none is given.
164144
func (c *PubSub) Unsubscribe(channels ...string) error {
165-
c.subMu.Lock()
145+
c.mu.Lock()
166146
c.channels = remove(c.channels, channels...)
167-
c.subMu.Unlock()
147+
c.mu.Unlock()
168148
return c.subscribe("unsubscribe", channels...)
169149
}
170150

171151
// Unsubscribes the client from the given patterns, or from all of
172152
// them if none is given.
173153
func (c *PubSub) PUnsubscribe(patterns ...string) error {
174-
c.subMu.Lock()
154+
c.mu.Lock()
175155
c.patterns = remove(c.patterns, patterns...)
176-
c.subMu.Unlock()
156+
c.mu.Unlock()
177157
return c.subscribe("punsubscribe", patterns...)
178158
}
179159

160+
func (c *PubSub) subscribe(redisCmd string, channels ...string) error {
161+
cn, isNew, err := c.conn()
162+
if err != nil {
163+
return err
164+
}
165+
166+
if isNew {
167+
return nil
168+
}
169+
170+
err = c._subscribe(cn, redisCmd, channels...)
171+
c.putConn(cn, err)
172+
return err
173+
}
174+
180175
func (c *PubSub) Ping(payload ...string) error {
181176
args := []interface{}{"ping"}
182177
if len(payload) == 1 {

redis.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -387,7 +387,10 @@ func (c *Client) pubSub() *PubSub {
387387
func (c *Client) Subscribe(channels ...string) *PubSub {
388388
pubsub := c.pubSub()
389389
if len(channels) > 0 {
390-
_ = pubsub.Subscribe(channels...)
390+
err := pubsub.Subscribe(channels...)
391+
if err != nil {
392+
panic(err)
393+
}
391394
}
392395
return pubsub
393396
}

0 commit comments

Comments
 (0)