Skip to content

Resent client pool when sentinel switches master #588

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 29, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 26 additions & 45 deletions internal/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,47 +140,6 @@ func (p *ConnPool) lastDialError() error {
return p._lastDialError.Load().(error)
}

func (p *ConnPool) PopFree() *Conn {
select {
case p.queue <- struct{}{}:
default:
timer := timers.Get().(*time.Timer)
timer.Reset(p.opt.PoolTimeout)

select {
case p.queue <- struct{}{}:
if !timer.Stop() {
<-timer.C
}
timers.Put(timer)
case <-timer.C:
timers.Put(timer)
atomic.AddUint32(&p.stats.Timeouts, 1)
return nil
}
}

p.freeConnsMu.Lock()
cn := p.popFree()
p.freeConnsMu.Unlock()

if cn == nil {
<-p.queue
}
return cn
}

func (p *ConnPool) popFree() *Conn {
if len(p.freeConns) == 0 {
return nil
}

idx := len(p.freeConns) - 1
cn := p.freeConns[idx]
p.freeConns = p.freeConns[:idx]
return cn
}

// Get returns existed connection from the pool or creates a new one.
func (p *ConnPool) Get() (*Conn, bool, error) {
if p.closed() {
Expand Down Expand Up @@ -235,6 +194,17 @@ func (p *ConnPool) Get() (*Conn, bool, error) {
return newcn, true, nil
}

func (p *ConnPool) popFree() *Conn {
if len(p.freeConns) == 0 {
return nil
}

idx := len(p.freeConns) - 1
cn := p.freeConns[idx]
p.freeConns = p.freeConns[:idx]
return cn
}

func (p *ConnPool) Put(cn *Conn) error {
if data := cn.Rd.PeekBuffered(); data != nil {
internal.Logf("connection has unread data: %q", data)
Expand Down Expand Up @@ -303,17 +273,28 @@ func (p *ConnPool) closed() bool {
return atomic.LoadUint32(&p._closed) == 1
}

func (p *ConnPool) Filter(fn func(*Conn) bool) error {
var firstErr error
p.connsMu.Lock()
for _, cn := range p.conns {
if fn(cn) {
if err := p.closeConn(cn); err != nil && firstErr == nil {
firstErr = err
}
}
}
p.connsMu.Unlock()
return firstErr
}

func (p *ConnPool) Close() error {
if !atomic.CompareAndSwapUint32(&p._closed, 0, 1) {
return ErrClosed
}

p.connsMu.Lock()
var firstErr error
p.connsMu.Lock()
for _, cn := range p.conns {
if cn == nil {
continue
}
if err := p.closeConn(cn); err != nil && firstErr == nil {
firstErr = err
}
Expand Down
26 changes: 0 additions & 26 deletions internal/pool/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,30 +238,4 @@ var _ = Describe("race", func() {
}
})
})

It("does not happen on Get and PopFree", func() {
connPool = pool.NewConnPool(
&pool.Options{
Dialer: dummyDialer,
PoolSize: 10,
PoolTimeout: time.Minute,
IdleTimeout: time.Second,
IdleCheckFrequency: time.Millisecond,
})

perform(C, func(id int) {
for i := 0; i < N; i++ {
cn, _, err := connPool.Get()
Expect(err).NotTo(HaveOccurred())
if err == nil {
Expect(connPool.Put(cn)).NotTo(HaveOccurred())
}

cn = connPool.PopFree()
if cn != nil {
Expect(connPool.Put(cn)).NotTo(HaveOccurred())
}
}
})
})
})
4 changes: 4 additions & 0 deletions main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ var cluster = &clusterScenario{
clients: make(map[string]*redis.Client, 6),
}

func init() {
//redis.SetLogger(log.New(os.Stderr, "redis: ", log.LstdFlags|log.Lshortfile))
}

var _ = BeforeSuite(func() {
var err error

Expand Down
113 changes: 56 additions & 57 deletions pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,54 +19,53 @@ import (
type PubSub struct {
base baseClient

mu sync.Mutex
cn *pool.Conn
closed bool

subMu sync.Mutex
mu sync.Mutex
cn *pool.Conn
channels []string
patterns []string
closed bool

cmd *Cmd
}

func (c *PubSub) conn() (*pool.Conn, bool, error) {
func (c *PubSub) conn() (*pool.Conn, error) {
c.mu.Lock()
defer c.mu.Unlock()
cn, err := c._conn()
c.mu.Unlock()
return cn, err
}

func (c *PubSub) _conn() (*pool.Conn, error) {
if c.closed {
return nil, false, pool.ErrClosed
return nil, pool.ErrClosed
}

if c.cn != nil {
return c.cn, false, nil
return c.cn, nil
}

cn, err := c.base.connPool.NewConn()
if err != nil {
return nil, false, err
return nil, err
}

if !cn.Inited {
if err := c.base.initConn(cn); err != nil {
_ = c.base.connPool.CloseConn(cn)
return nil, false, err
return nil, err
}
}

if err := c.resubscribe(cn); err != nil {
_ = c.base.connPool.CloseConn(cn)
return nil, false, err
return nil, err
}

c.cn = cn
return cn, true, nil
return cn, nil
}

func (c *PubSub) resubscribe(cn *pool.Conn) error {
c.subMu.Lock()
defer c.subMu.Unlock()

var firstErr error
if len(c.channels) > 0 {
if err := c._subscribe(cn, "subscribe", c.channels...); err != nil && firstErr == nil {
Expand All @@ -81,6 +80,18 @@ func (c *PubSub) resubscribe(cn *pool.Conn) error {
return firstErr
}

func (c *PubSub) _subscribe(cn *pool.Conn, redisCmd string, channels ...string) error {
args := make([]interface{}, 1+len(channels))
args[0] = redisCmd
for i, channel := range channels {
args[1+i] = channel
}
cmd := NewSliceCmd(args...)

cn.SetWriteTimeout(c.base.opt.WriteTimeout)
return writeCmd(cn, cmd)
}

func (c *PubSub) putConn(cn *pool.Conn, err error) {
if !internal.IsBadConn(err, true) {
return
Expand Down Expand Up @@ -114,67 +125,55 @@ func (c *PubSub) Close() error {
return nil
}

func (c *PubSub) subscribe(redisCmd string, channels ...string) error {
cn, isNew, err := c.conn()
if err != nil {
return err
}

if isNew {
return nil
}

err = c._subscribe(cn, redisCmd, channels...)
c.putConn(cn, err)
return err
}

func (c *PubSub) _subscribe(cn *pool.Conn, redisCmd string, channels ...string) error {
args := make([]interface{}, 1+len(channels))
args[0] = redisCmd
for i, channel := range channels {
args[1+i] = channel
}
cmd := NewSliceCmd(args...)

cn.SetWriteTimeout(c.base.opt.WriteTimeout)
return writeCmd(cn, cmd)
}

// Subscribes the client to the specified channels. It returns
// empty subscription if there are no channels.
func (c *PubSub) Subscribe(channels ...string) error {
c.subMu.Lock()
c.mu.Lock()
err := c.subscribe("subscribe", channels...)
c.channels = appendIfNotExists(c.channels, channels...)
c.subMu.Unlock()
return c.subscribe("subscribe", channels...)
c.mu.Unlock()
return err
}

// Subscribes the client to the given patterns. It returns
// empty subscription if there are no patterns.
func (c *PubSub) PSubscribe(patterns ...string) error {
c.subMu.Lock()
c.mu.Lock()
err := c.subscribe("psubscribe", patterns...)
c.patterns = appendIfNotExists(c.patterns, patterns...)
c.subMu.Unlock()
return c.subscribe("psubscribe", patterns...)
c.mu.Unlock()
return err
}

// Unsubscribes the client from the given channels, or from all of
// them if none is given.
func (c *PubSub) Unsubscribe(channels ...string) error {
c.subMu.Lock()
c.mu.Lock()
err := c.subscribe("unsubscribe", channels...)
c.channels = remove(c.channels, channels...)
c.subMu.Unlock()
return c.subscribe("unsubscribe", channels...)
c.mu.Unlock()
return err
}

// Unsubscribes the client from the given patterns, or from all of
// them if none is given.
func (c *PubSub) PUnsubscribe(patterns ...string) error {
c.subMu.Lock()
c.mu.Lock()
err := c.subscribe("punsubscribe", patterns...)
c.patterns = remove(c.patterns, patterns...)
c.subMu.Unlock()
return c.subscribe("punsubscribe", patterns...)
c.mu.Unlock()
return err
}

func (c *PubSub) subscribe(redisCmd string, channels ...string) error {
cn, err := c._conn()
if err != nil {
return err
}

err = c._subscribe(cn, redisCmd, channels...)
c.putConn(cn, err)
return err
}

func (c *PubSub) Ping(payload ...string) error {
Expand All @@ -184,7 +183,7 @@ func (c *PubSub) Ping(payload ...string) error {
}
cmd := NewCmd(args...)

cn, _, err := c.conn()
cn, err := c.conn()
if err != nil {
return err
}
Expand Down Expand Up @@ -277,7 +276,7 @@ func (c *PubSub) ReceiveTimeout(timeout time.Duration) (interface{}, error) {
c.cmd = NewCmd()
}

cn, _, err := c.conn()
cn, err := c.conn()
if err != nil {
return nil, err
}
Expand Down
Loading