Skip to content

Commit 93315de

Browse files
committed
bugfix: prevent recreate connection after Close()
The ConnectionPool.checker() goroutine may still work some time after ConnectionPool.Close() call. It may lead to re-open connection in a concurrent closing pool. The connection still opened after the pool is closed. The patch adds RWLock to protect blocks which work with anyPool, roPool and rwPool. We don't need to protect regular requests because in the worst case, we will send a request into a closed connection. It can happen for other reasons and it seems like we can't avoid it. So it seems like an expected behavior. Closes #208
1 parent 656c1b7 commit 93315de

File tree

4 files changed

+79
-42
lines changed

4 files changed

+79
-42
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
2020
- Segmentation faults in ConnectionPool requests after disconnect (#208)
2121
- Addresses in ConnectionPool may be changed from an external code (#208)
2222
- ConnectionPool recreates connections too often (#208)
23+
- A connection is still opened after ConnectionPool.Close() (#208)
2324

2425
## [1.8.0] - 2022-08-17
2526

connection_pool/connection_pool.go

Lines changed: 52 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import (
1414
"errors"
1515
"fmt"
1616
"log"
17-
"sync/atomic"
17+
"sync"
1818
"time"
1919

2020
"github.com/tarantool/go-tarantool"
@@ -69,12 +69,13 @@ type ConnectionPool struct {
6969
connOpts tarantool.Opts
7070
opts OptsPool
7171

72-
notify chan tarantool.ConnEvent
73-
state State
74-
control chan struct{}
75-
roPool *RoundRobinStrategy
76-
rwPool *RoundRobinStrategy
77-
anyPool *RoundRobinStrategy
72+
notify chan tarantool.ConnEvent
73+
state state
74+
control chan struct{}
75+
roPool *RoundRobinStrategy
76+
rwPool *RoundRobinStrategy
77+
anyPool *RoundRobinStrategy
78+
poolsMutex sync.RWMutex
7879
}
7980

8081
// ConnectWithOpts creates pool for instances with addresses addrs
@@ -100,6 +101,7 @@ func ConnectWithOpts(addrs []string, connOpts tarantool.Opts, opts OptsPool) (co
100101
connOpts: connOpts,
101102
opts: opts,
102103
notify: notify,
104+
state: unknownState,
103105
control: make(chan struct{}),
104106
rwPool: rwPool,
105107
roPool: roPool,
@@ -109,10 +111,12 @@ func ConnectWithOpts(addrs []string, connOpts tarantool.Opts, opts OptsPool) (co
109111

110112
somebodyAlive := connPool.fillPools()
111113
if !somebodyAlive {
112-
connPool.Close()
114+
connPool.state.set(closedState)
115+
connPool.closeImpl()
113116
return nil, ErrNoConnection
114117
}
115118

119+
connPool.state.set(connectedState)
116120
go connPool.checker()
117121

118122
return connPool, nil
@@ -128,7 +132,10 @@ func Connect(addrs []string, connOpts tarantool.Opts) (connPool *ConnectionPool,
128132

129133
// ConnectedNow gets connected status of pool.
130134
func (connPool *ConnectionPool) ConnectedNow(mode Mode) (bool, error) {
131-
if connPool.getState() != connConnected {
135+
connPool.poolsMutex.RLock()
136+
defer connPool.poolsMutex.RUnlock()
137+
138+
if connPool.state.get() != connectedState {
132139
return false, nil
133140
}
134141
switch mode {
@@ -157,10 +164,8 @@ func (connPool *ConnectionPool) ConfiguredTimeout(mode Mode) (time.Duration, err
157164
return conn.ConfiguredTimeout(), nil
158165
}
159166

160-
// Close closes connections in pool.
161-
func (connPool *ConnectionPool) Close() []error {
167+
func (connPool *ConnectionPool) closeImpl() []error {
162168
close(connPool.control)
163-
connPool.state = connClosed
164169

165170
errs := make([]error, 0, len(connPool.addrs))
166171

@@ -177,17 +182,36 @@ func (connPool *ConnectionPool) Close() []error {
177182
return errs
178183
}
179184

185+
// Close closes connections in pool.
186+
func (connPool *ConnectionPool) Close() []error {
187+
if connPool.state.cas(connectedState, closedState) {
188+
connPool.poolsMutex.Lock()
189+
defer connPool.poolsMutex.Unlock()
190+
191+
return connPool.closeImpl()
192+
}
193+
return []error{}
194+
}
195+
180196
// GetAddrs gets addresses of connections in pool.
181197
func (connPool *ConnectionPool) GetAddrs() []string {
182198
cpy := make([]string, len(connPool.addrs))
183199
copy(cpy, connPool.addrs)
200+
184201
return cpy
185202
}
186203

187204
// GetPoolInfo gets information of connections (connected status, ro/rw role).
188205
func (connPool *ConnectionPool) GetPoolInfo() map[string]*ConnectionInfo {
189206
info := make(map[string]*ConnectionInfo)
190207

208+
connPool.poolsMutex.RLock()
209+
defer connPool.poolsMutex.RUnlock()
210+
211+
if connPool.state.get() != connectedState {
212+
return info
213+
}
214+
191215
for _, addr := range connPool.addrs {
192216
conn, role := connPool.getConnectionFromPool(addr)
193217
if conn != nil {
@@ -638,11 +662,9 @@ func (connPool *ConnectionPool) getConnectionFromPool(addr string) (*tarantool.C
638662
func (connPool *ConnectionPool) deleteConnectionFromPool(addr string) {
639663
_ = connPool.anyPool.DeleteConnByAddr(addr)
640664
conn := connPool.rwPool.DeleteConnByAddr(addr)
641-
if conn != nil {
642-
return
665+
if conn == nil {
666+
connPool.roPool.DeleteConnByAddr(addr)
643667
}
644-
645-
connPool.roPool.DeleteConnByAddr(addr)
646668
}
647669

648670
func (connPool *ConnectionPool) setConnectionToPool(addr string, conn *tarantool.Connection) error {
@@ -689,39 +711,39 @@ func (connPool *ConnectionPool) refreshConnection(addr string) {
689711
}
690712

691713
func (connPool *ConnectionPool) checker() {
692-
693714
timer := time.NewTicker(connPool.opts.CheckTimeout)
694715
defer timer.Stop()
695716

696-
for connPool.getState() != connClosed {
717+
for connPool.state.get() != closedState {
697718
select {
698719
case <-connPool.control:
699720
return
700721
case e := <-connPool.notify:
701-
if connPool.getState() == connClosed {
702-
return
703-
}
704-
if e.Conn.ClosedNow() {
722+
connPool.poolsMutex.Lock()
723+
if connPool.state.get() == connectedState && e.Conn.ClosedNow() {
705724
connPool.deleteConnectionFromPool(e.Conn.Addr())
706725
}
726+
connPool.poolsMutex.Unlock()
707727
case <-timer.C:
708-
for _, addr := range connPool.addrs {
709-
if connPool.getState() == connClosed {
710-
return
728+
connPool.poolsMutex.Lock()
729+
if connPool.state.get() == connectedState {
730+
for _, addr := range connPool.addrs {
731+
// Reopen connection
732+
// Relocate connection between subpools
733+
// if ro/rw was updated
734+
connPool.refreshConnection(addr)
711735
}
712-
713-
// Reopen connection
714-
// Relocate connection between subpools
715-
// if ro/rw was updated
716-
connPool.refreshConnection(addr)
717736
}
737+
connPool.poolsMutex.Unlock()
718738
}
719739
}
720740
}
721741

722742
func (connPool *ConnectionPool) fillPools() bool {
723743
somebodyAlive := false
724744

745+
// It is called before checker() goroutine and before closeImpl() may be
746+
// called so we don't expect concurrency issues here.
725747
for _, addr := range connPool.addrs {
726748
conn, err := tarantool.Connect(addr, connPool.connOpts)
727749
if err != nil {
@@ -740,10 +762,6 @@ func (connPool *ConnectionPool) fillPools() bool {
740762
return somebodyAlive
741763
}
742764

743-
func (connPool *ConnectionPool) getState() uint32 {
744-
return atomic.LoadUint32((*uint32)(&connPool.state))
745-
}
746-
747765
func (connPool *ConnectionPool) getNextConnection(mode Mode) (*tarantool.Connection, error) {
748766

749767
switch mode {

connection_pool/const.go

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,3 @@ const (
3434
MasterRole // The instance is read-write mode.
3535
ReplicaRole // The instance is in read-only mode.
3636
)
37-
38-
type State uint32
39-
40-
// pool state
41-
const (
42-
connConnected = iota
43-
connClosed
44-
)

connection_pool/state.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package connection_pool
2+
3+
import (
4+
"sync/atomic"
5+
)
6+
7+
// pool state
8+
type state uint32
9+
10+
const (
11+
unknownState state = iota
12+
connectedState
13+
closedState
14+
)
15+
16+
func (s *state) set(news state) {
17+
atomic.StoreUint32((*uint32)(s), uint32(news))
18+
}
19+
20+
func (s *state) cas(olds, news state) bool {
21+
return atomic.CompareAndSwapUint32((*uint32)(s), uint32(olds), uint32(news))
22+
}
23+
24+
func (s *state) get() state {
25+
return state(atomic.LoadUint32((*uint32)(s)))
26+
}

0 commit comments

Comments
 (0)