diff --git a/CHANGELOG.md b/CHANGELOG.md index 11b0e54e9..9ba11e011 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,14 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release. ### Fixed +- Mode type description in the connection_pool subpackage (#208) +- Missed Role type constants in the connection_pool subpackage (#208) +- ConnectionPool does not close UnknownRole connections (#208) +- Segmentation faults in ConnectionPool requests after disconnect (#208) +- Addresses in ConnectionPool may be changed from an external code (#208) +- ConnectionPool recreates connections too often (#208) +- A connection is still opened after ConnectionPool.Close() (#208) + ## [1.8.0] - 2022-08-17 ### Added diff --git a/connection_pool/connection_pool.go b/connection_pool/connection_pool.go index 1b080fc1f..e0c862386 100644 --- a/connection_pool/connection_pool.go +++ b/connection_pool/connection_pool.go @@ -14,7 +14,7 @@ import ( "errors" "fmt" "log" - "sync/atomic" + "sync" "time" "github.com/tarantool/go-tarantool" @@ -25,11 +25,11 @@ var ( ErrWrongCheckTimeout = errors.New("wrong check timeout, must be greater than 0") ErrNoConnection = errors.New("no active connections") ErrTooManyArgs = errors.New("too many arguments") - ErrIncorrectResponse = errors.New("Incorrect response format") - ErrIncorrectStatus = errors.New("Incorrect instance status: status should be `running`") - ErrNoRwInstance = errors.New("Can't find rw instance in pool") - ErrNoRoInstance = errors.New("Can't find ro instance in pool") - ErrNoHealthyInstance = errors.New("Can't find healthy instance in pool") + ErrIncorrectResponse = errors.New("incorrect response format") + ErrIncorrectStatus = errors.New("incorrect instance status: status should be `running`") + ErrNoRwInstance = errors.New("can't find rw instance in pool") + ErrNoRoInstance = errors.New("can't find ro instance in pool") + ErrNoHealthyInstance = errors.New("can't find healthy instance in pool") ) /* @@ -69,12 +69,13 @@ type ConnectionPool struct { connOpts tarantool.Opts opts OptsPool - notify chan tarantool.ConnEvent - state State - control chan struct{} - roPool *RoundRobinStrategy - rwPool *RoundRobinStrategy - anyPool *RoundRobinStrategy + notify chan tarantool.ConnEvent + state state + control chan struct{} + roPool *RoundRobinStrategy + rwPool *RoundRobinStrategy + anyPool *RoundRobinStrategy + poolsMutex sync.RWMutex } // ConnectWithOpts creates pool for instances with addresses addrs @@ -96,22 +97,33 @@ func ConnectWithOpts(addrs []string, connOpts tarantool.Opts, opts OptsPool) (co anyPool := NewEmptyRoundRobin(size) connPool = &ConnectionPool{ - addrs: addrs, + addrs: make([]string, 0, len(addrs)), connOpts: connOpts, opts: opts, notify: notify, + state: unknownState, control: make(chan struct{}), rwPool: rwPool, roPool: roPool, anyPool: anyPool, } + m := make(map[string]bool) + for _, addr := range addrs { + if _, ok := m[addr]; !ok { + m[addr] = true + connPool.addrs = append(connPool.addrs, addr) + } + } + somebodyAlive := connPool.fillPools() if !somebodyAlive { - connPool.Close() + connPool.state.set(closedState) + connPool.closeImpl() return nil, ErrNoConnection } + connPool.state.set(connectedState) go connPool.checker() return connPool, nil @@ -127,16 +139,26 @@ func Connect(addrs []string, connOpts tarantool.Opts) (connPool *ConnectionPool, // ConnectedNow gets connected status of pool. func (connPool *ConnectionPool) ConnectedNow(mode Mode) (bool, error) { - if connPool.getState() != connConnected { + connPool.poolsMutex.RLock() + defer connPool.poolsMutex.RUnlock() + + if connPool.state.get() != connectedState { return false, nil } - - conn, err := connPool.getNextConnection(mode) - if err != nil || conn == nil { - return false, err + switch mode { + case ANY: + return !connPool.anyPool.IsEmpty(), nil + case RW: + return !connPool.rwPool.IsEmpty(), nil + case RO: + return !connPool.roPool.IsEmpty(), nil + case PreferRW: + fallthrough + case PreferRO: + return !connPool.rwPool.IsEmpty() || !connPool.roPool.IsEmpty(), nil + default: + return false, ErrNoHealthyInstance } - - return conn.ConnectedNow(), nil } // ConfiguredTimeout gets timeout of current connection. @@ -149,28 +171,53 @@ func (connPool *ConnectionPool) ConfiguredTimeout(mode Mode) (time.Duration, err return conn.ConfiguredTimeout(), nil } -// Close closes connections in pool. -func (connPool *ConnectionPool) Close() []error { +func (connPool *ConnectionPool) closeImpl() []error { close(connPool.control) - connPool.state = connClosed - rwErrs := connPool.rwPool.CloseConns() - roErrs := connPool.roPool.CloseConns() + errs := make([]error, 0, len(connPool.addrs)) - allErrs := append(rwErrs, roErrs...) + for _, addr := range connPool.addrs { + if conn := connPool.anyPool.DeleteConnByAddr(addr); conn != nil { + if err := conn.Close(); err != nil { + errs = append(errs, err) + } + } + connPool.rwPool.DeleteConnByAddr(addr) + connPool.roPool.DeleteConnByAddr(addr) + } - return allErrs + return errs +} + +// Close closes connections in pool. +func (connPool *ConnectionPool) Close() []error { + if connPool.state.cas(connectedState, closedState) { + connPool.poolsMutex.Lock() + defer connPool.poolsMutex.Unlock() + + return connPool.closeImpl() + } + return nil } // GetAddrs gets addresses of connections in pool. func (connPool *ConnectionPool) GetAddrs() []string { - return connPool.addrs + cpy := make([]string, len(connPool.addrs)) + copy(cpy, connPool.addrs) + return cpy } // GetPoolInfo gets information of connections (connected status, ro/rw role). func (connPool *ConnectionPool) GetPoolInfo() map[string]*ConnectionInfo { info := make(map[string]*ConnectionInfo) + connPool.poolsMutex.RLock() + defer connPool.poolsMutex.RUnlock() + + if connPool.state.get() != connectedState { + return info + } + for _, addr := range connPool.addrs { conn, role := connPool.getConnectionFromPool(addr) if conn != nil { @@ -558,6 +605,15 @@ func (connPool *ConnectionPool) NewStream(userMode Mode) (*tarantool.Stream, err return conn.NewStream() } +// NewPrepared passes a sql statement to Tarantool for preparation synchronously. +func (connPool *ConnectionPool) NewPrepared(expr string, userMode Mode) (*tarantool.Prepared, error) { + conn, err := connPool.getNextConnection(userMode) + if err != nil { + return nil, err + } + return conn.NewPrepared(expr) +} + // // private // @@ -565,71 +621,56 @@ func (connPool *ConnectionPool) NewStream(userMode Mode) (*tarantool.Stream, err func (connPool *ConnectionPool) getConnectionRole(conn *tarantool.Connection) (Role, error) { resp, err := conn.Call17("box.info", []interface{}{}) if err != nil { - return unknown, err + return UnknownRole, err } if resp == nil { - return unknown, ErrIncorrectResponse + return UnknownRole, ErrIncorrectResponse } if len(resp.Data) < 1 { - return unknown, ErrIncorrectResponse + return UnknownRole, ErrIncorrectResponse } instanceStatus, ok := resp.Data[0].(map[interface{}]interface{})["status"] if !ok { - return unknown, ErrIncorrectResponse + return UnknownRole, ErrIncorrectResponse } if instanceStatus != "running" { - return unknown, ErrIncorrectStatus - } - - resp, err = conn.Call17("box.info", []interface{}{}) - if err != nil { - return unknown, err - } - if resp == nil { - return unknown, ErrIncorrectResponse - } - if len(resp.Data) < 1 { - return unknown, ErrIncorrectResponse + return UnknownRole, ErrIncorrectStatus } replicaRole, ok := resp.Data[0].(map[interface{}]interface{})["ro"] if !ok { - return unknown, ErrIncorrectResponse + return UnknownRole, ErrIncorrectResponse } switch replicaRole { case false: - return master, nil + return MasterRole, nil case true: - return replica, nil + return ReplicaRole, nil } - return unknown, nil + return UnknownRole, nil } func (connPool *ConnectionPool) getConnectionFromPool(addr string) (*tarantool.Connection, Role) { - conn := connPool.rwPool.GetConnByAddr(addr) - if conn != nil { - return conn, master + if conn := connPool.rwPool.GetConnByAddr(addr); conn != nil { + return conn, MasterRole } - conn = connPool.roPool.GetConnByAddr(addr) - if conn != nil { - return conn, replica + if conn := connPool.roPool.GetConnByAddr(addr); conn != nil { + return conn, ReplicaRole } - return connPool.anyPool.GetConnByAddr(addr), unknown + return connPool.anyPool.GetConnByAddr(addr), UnknownRole } func (connPool *ConnectionPool) deleteConnectionFromPool(addr string) { _ = connPool.anyPool.DeleteConnByAddr(addr) conn := connPool.rwPool.DeleteConnByAddr(addr) - if conn != nil { - return + if conn == nil { + connPool.roPool.DeleteConnByAddr(addr) } - - connPool.roPool.DeleteConnByAddr(addr) } func (connPool *ConnectionPool) setConnectionToPool(addr string, conn *tarantool.Connection) error { @@ -641,9 +682,9 @@ func (connPool *ConnectionPool) setConnectionToPool(addr string, conn *tarantool connPool.anyPool.AddConn(addr, conn) switch role { - case master: + case MasterRole: connPool.rwPool.AddConn(addr, conn) - case replica: + case ReplicaRole: connPool.roPool.AddConn(addr, conn) } @@ -676,45 +717,30 @@ func (connPool *ConnectionPool) refreshConnection(addr string) { } func (connPool *ConnectionPool) checker() { - timer := time.NewTicker(connPool.opts.CheckTimeout) defer timer.Stop() - for connPool.getState() != connClosed { + for connPool.state.get() != closedState { select { case <-connPool.control: return case e := <-connPool.notify: - if connPool.getState() == connClosed { - return - } - if e.Conn.ClosedNow() { - addr := e.Conn.Addr() - if conn, _ := connPool.getConnectionFromPool(addr); conn == nil { - continue - } - conn, _ := tarantool.Connect(addr, connPool.connOpts) - if conn != nil { - err := connPool.setConnectionToPool(addr, conn) - if err != nil { - conn.Close() - log.Printf("tarantool: storing connection to %s failed: %s\n", addr, err.Error()) - } - } else { - connPool.deleteConnectionFromPool(addr) - } + connPool.poolsMutex.Lock() + if connPool.state.get() == connectedState && e.Conn.ClosedNow() { + connPool.deleteConnectionFromPool(e.Conn.Addr()) } + connPool.poolsMutex.Unlock() case <-timer.C: - for _, addr := range connPool.addrs { - if connPool.getState() == connClosed { - return + connPool.poolsMutex.Lock() + if connPool.state.get() == connectedState { + for _, addr := range connPool.addrs { + // Reopen connection + // Relocate connection between subpools + // if ro/rw was updated + connPool.refreshConnection(addr) } - - // Reopen connection - // Relocate connection between subpools - // if ro/rw was updated - connPool.refreshConnection(addr) } + connPool.poolsMutex.Unlock() } } } @@ -722,6 +748,8 @@ func (connPool *ConnectionPool) checker() { func (connPool *ConnectionPool) fillPools() bool { somebodyAlive := false + // It is called before checker() goroutine and before closeImpl() may be + // called so we don't expect concurrency issues here. for _, addr := range connPool.addrs { conn, err := tarantool.Connect(addr, connPool.connOpts) if err != nil { @@ -740,57 +768,38 @@ func (connPool *ConnectionPool) fillPools() bool { return somebodyAlive } -func (connPool *ConnectionPool) getState() uint32 { - return atomic.LoadUint32((*uint32)(&connPool.state)) -} - func (connPool *ConnectionPool) getNextConnection(mode Mode) (*tarantool.Connection, error) { switch mode { case ANY: - if connPool.anyPool.IsEmpty() { - return nil, ErrNoHealthyInstance + if next := connPool.anyPool.GetNextConnection(); next != nil { + return next, nil } - - return connPool.anyPool.GetNextConnection(), nil - case RW: - if connPool.rwPool.IsEmpty() { - return nil, ErrNoRwInstance + if next := connPool.rwPool.GetNextConnection(); next != nil { + return next, nil } - - return connPool.rwPool.GetNextConnection(), nil - + return nil, ErrNoRwInstance case RO: - if connPool.roPool.IsEmpty() { - return nil, ErrNoRoInstance + if next := connPool.roPool.GetNextConnection(); next != nil { + return next, nil } - - return connPool.roPool.GetNextConnection(), nil - + return nil, ErrNoRoInstance case PreferRW: - if !connPool.rwPool.IsEmpty() { - return connPool.rwPool.GetNextConnection(), nil + if next := connPool.rwPool.GetNextConnection(); next != nil { + return next, nil } - - if !connPool.roPool.IsEmpty() { - return connPool.roPool.GetNextConnection(), nil + if next := connPool.roPool.GetNextConnection(); next != nil { + return next, nil } - - return nil, ErrNoHealthyInstance - case PreferRO: - if !connPool.roPool.IsEmpty() { - return connPool.roPool.GetNextConnection(), nil + if next := connPool.roPool.GetNextConnection(); next != nil { + return next, nil } - - if !connPool.rwPool.IsEmpty() { - return connPool.rwPool.GetNextConnection(), nil + if next := connPool.rwPool.GetNextConnection(); next != nil { + return next, nil } - - return nil, ErrNoHealthyInstance } - return nil, ErrNoHealthyInstance } @@ -812,12 +821,3 @@ func newErrorFuture(err error) *tarantool.Future { fut.SetError(err) return fut } - -// NewPrepared passes a sql statement to Tarantool for preparation synchronously. -func (connPool *ConnectionPool) NewPrepared(expr string, userMode Mode) (*tarantool.Prepared, error) { - conn, err := connPool.getNextConnection(userMode) - if err != nil { - return nil, err - } - return conn.NewPrepared(expr) -} diff --git a/connection_pool/connection_pool_test.go b/connection_pool/connection_pool_test.go index 4dc3ac12c..326604b51 100644 --- a/connection_pool/connection_pool_test.go +++ b/connection_pool/connection_pool_test.go @@ -80,6 +80,31 @@ func TestConnSuccessfully(t *testing.T) { require.Nil(t, err) } +func TestConnSuccessfullyDuplicates(t *testing.T) { + server := servers[0] + connPool, err := connection_pool.Connect([]string{server, server, server, server}, connOpts) + require.Nilf(t, err, "failed to connect") + require.NotNilf(t, connPool, "conn is nil after Connect") + + defer connPool.Close() + + args := test_helpers.CheckStatusesArgs{ + ConnPool: connPool, + Mode: connection_pool.ANY, + Servers: []string{server}, + ExpectedPoolStatus: true, + ExpectedStatuses: map[string]bool{ + server: true, + }, + } + + err = test_helpers.CheckPoolStatuses(args) + require.Nil(t, err) + + addrs := connPool.GetAddrs() + require.Equalf(t, []string{server}, addrs, "should be only one address") +} + func TestReconnect(t *testing.T) { server := servers[0] @@ -208,6 +233,61 @@ func TestClose(t *testing.T) { require.Nil(t, err) } +func TestRequestOnClosed(t *testing.T) { + server1 := servers[0] + server2 := servers[1] + + connPool, err := connection_pool.Connect([]string{server1, server2}, connOpts) + require.Nilf(t, err, "failed to connect") + require.NotNilf(t, connPool, "conn is nil after Connect") + + defer connPool.Close() + + test_helpers.StopTarantoolWithCleanup(instances[0]) + test_helpers.StopTarantoolWithCleanup(instances[1]) + + args := test_helpers.CheckStatusesArgs{ + ConnPool: connPool, + Mode: connection_pool.ANY, + Servers: []string{server1, server2}, + ExpectedPoolStatus: false, + ExpectedStatuses: map[string]bool{ + server1: false, + server2: false, + }, + } + err = test_helpers.Retry(test_helpers.CheckPoolStatuses, args, defaultCountRetry, defaultTimeoutRetry) + require.Nil(t, err) + + _, err = connPool.Ping(connection_pool.ANY) + require.NotNilf(t, err, "err is nil after Ping") + + err = test_helpers.RestartTarantool(&instances[0]) + require.Nilf(t, err, "failed to restart tarantool") + + err = test_helpers.RestartTarantool(&instances[1]) + require.Nilf(t, err, "failed to restart tarantool") +} + +func TestGetPoolInfo(t *testing.T) { + server1 := servers[0] + server2 := servers[1] + + srvs := []string{server1, server2} + expected := []string{server1, server2} + connPool, err := connection_pool.Connect(srvs, connOpts) + require.Nilf(t, err, "failed to connect") + require.NotNilf(t, connPool, "conn is nil after Connect") + + defer connPool.Close() + + srvs[0] = "x" + connPool.GetAddrs()[1] = "y" + for i, addr := range connPool.GetAddrs() { + require.Equal(t, expected[i], addr) + } +} + func TestCall17(t *testing.T) { roles := []bool{false, true, false, false, true} @@ -431,7 +511,7 @@ func TestRoundRobinStrategy_NoReplica(t *testing.T) { // RO _, err = connPool.Eval("return box.cfg.listen", []interface{}{}, connection_pool.RO) require.NotNilf(t, err, "expected to fail after Eval, but error is nil") - require.Equal(t, "Can't find ro instance in pool", err.Error()) + require.Equal(t, "can't find ro instance in pool", err.Error()) // ANY args := test_helpers.ListenOnInstanceArgs{ @@ -502,7 +582,7 @@ func TestRoundRobinStrategy_NoMaster(t *testing.T) { // RW _, err = connPool.Eval("return box.cfg.listen", []interface{}{}, connection_pool.RW) require.NotNilf(t, err, "expected to fail after Eval, but error is nil") - require.Equal(t, "Can't find rw instance in pool", err.Error()) + require.Equal(t, "can't find rw instance in pool", err.Error()) // ANY args := test_helpers.ListenOnInstanceArgs{ @@ -1295,7 +1375,7 @@ func TestNewPrepared(t *testing.T) { stmt, err := connPool.NewPrepared("SELECT NAME0, NAME1 FROM SQL_TEST WHERE NAME0=:id AND NAME1=:name;", connection_pool.RO) require.Nilf(t, err, "fail to prepare statement: %v", err) - if connPool.GetPoolInfo()[stmt.Conn.Addr()].ConnRole != connection_pool.RO { + if connPool.GetPoolInfo()[stmt.Conn.Addr()].ConnRole != connection_pool.ReplicaRole { t.Errorf("wrong role for the statement's connection") } diff --git a/connection_pool/const.go b/connection_pool/const.go index 334806437..d77a55044 100644 --- a/connection_pool/const.go +++ b/connection_pool/const.go @@ -1,21 +1,7 @@ package connection_pool -type Mode uint32 -type Role uint32 -type State uint32 - /* -Mode parameter: - -- ANY (use any instance) - the request can be executed on any instance (master or replica). - -- RW (writeable instance (master)) - the request can only be executed on master. - -- RO (read only instance (replica)) - the request can only be executed on replica. - -- PREFER_RO (prefer read only instance (replica)) - if there is one, otherwise fallback to a writeable one (master). - -- PREFER_RW (prefer write only instance (master)) - if there is one, otherwise fallback to a read only one (replica). +Default mode for each request table: Request Default mode ---------- -------------- @@ -30,23 +16,21 @@ Mode parameter: | select | ANY | | get | ANY | */ -const ( - ANY = iota - RW - RO - PreferRW - PreferRO -) +type Mode uint32 -// master/replica role const ( - unknown = iota - master - replica + ANY Mode = iota // The request can be executed on any instance (master or replica). + RW // The request can only be executed on master. + RO // The request can only be executed on replica. + PreferRW // If there is one, otherwise fallback to a writeable one (master). + PreferRO // If there is one, otherwise fallback to a read only one (replica). ) -// pool state +// Role describes a role of an instance by its mode. +type Role uint32 + const ( - connConnected = iota - connClosed + UnknownRole Role = iota // A connection pool failed to discover mode of the instance. + MasterRole // The instance is read-write mode. + ReplicaRole // The instance is in read-only mode. ) diff --git a/connection_pool/example_test.go b/connection_pool/example_test.go index 7f6c34700..65eff2cca 100644 --- a/connection_pool/example_test.go +++ b/connection_pool/example_test.go @@ -254,7 +254,7 @@ func ExampleConnectionPool_SelectAsync_err() { fmt.Println("Future", 0, "Error", err) // Output: - // Future 0 Error Can't find rw instance in pool + // Future 0 Error can't find rw instance in pool } func ExampleConnectionPool_Ping() { diff --git a/connection_pool/round_robin.go b/connection_pool/round_robin.go index 7f3f0d098..b83d877d9 100644 --- a/connection_pool/round_robin.go +++ b/connection_pool/round_robin.go @@ -2,17 +2,16 @@ package connection_pool import ( "sync" - "sync/atomic" "github.com/tarantool/go-tarantool" ) type RoundRobinStrategy struct { conns []*tarantool.Connection - indexByAddr map[string]int + indexByAddr map[string]uint mutex sync.RWMutex - size int - current uint64 + size uint + current uint } func (r *RoundRobinStrategy) GetConnByAddr(addr string) *tarantool.Connection { @@ -46,8 +45,10 @@ func (r *RoundRobinStrategy) DeleteConnByAddr(addr string) *tarantool.Connection r.conns = append(r.conns[:index], r.conns[index+1:]...) r.size -= 1 - for index, conn := range r.conns { - r.indexByAddr[conn.Addr()] = index + for k, v := range r.indexByAddr { + if v > index { + r.indexByAddr[k] = v - 1 + } } return conn @@ -60,46 +61,22 @@ func (r *RoundRobinStrategy) IsEmpty() bool { return r.size == 0 } -func (r *RoundRobinStrategy) CloseConns() []error { - r.mutex.Lock() - defer r.mutex.Unlock() - - errs := make([]error, len(r.conns)) - - for i, conn := range r.conns { - errs[i] = conn.Close() - } - - return errs -} - func (r *RoundRobinStrategy) GetNextConnection() *tarantool.Connection { r.mutex.RLock() defer r.mutex.RUnlock() - // We want to iterate through the elements in a circular order - // so the first element in cycle is connections[next] - // and the last one is connections[next + length]. - next := r.nextIndex() - cycleLen := len(r.conns) + next - for i := next; i < cycleLen; i++ { - idx := i % len(r.conns) - if r.conns[idx].ConnectedNow() { - if i != next { - atomic.StoreUint64(&r.current, uint64(idx)) - } - return r.conns[idx] - } + if r.size == 0 { + return nil } - - return nil + return r.conns[r.nextIndex()] } func NewEmptyRoundRobin(size int) *RoundRobinStrategy { return &RoundRobinStrategy{ conns: make([]*tarantool.Connection, 0, size), - indexByAddr: make(map[string]int), + indexByAddr: make(map[string]uint), size: 0, + current: 0, } } @@ -107,11 +84,17 @@ func (r *RoundRobinStrategy) AddConn(addr string, conn *tarantool.Connection) { r.mutex.Lock() defer r.mutex.Unlock() - r.conns = append(r.conns, conn) - r.indexByAddr[addr] = r.size - r.size += 1 + if idx, ok := r.indexByAddr[addr]; ok { + r.conns[idx] = conn + } else { + r.conns = append(r.conns, conn) + r.indexByAddr[addr] = r.size + r.size += 1 + } } -func (r *RoundRobinStrategy) nextIndex() int { - return int(atomic.AddUint64(&r.current, uint64(1)) % uint64(len(r.conns))) +func (r *RoundRobinStrategy) nextIndex() uint { + ret := r.current % r.size + r.current++ + return ret } diff --git a/connection_pool/round_robin_test.go b/connection_pool/round_robin_test.go new file mode 100644 index 000000000..6b54ecfd8 --- /dev/null +++ b/connection_pool/round_robin_test.go @@ -0,0 +1,71 @@ +package connection_pool_test + +import ( + "testing" + + "github.com/tarantool/go-tarantool" + . "github.com/tarantool/go-tarantool/connection_pool" +) + +const ( + validAddr1 = "x" + validAddr2 = "y" +) + +func TestRoundRobinAddDelete(t *testing.T) { + rr := NewEmptyRoundRobin(10) + + addrs := []string{validAddr1, validAddr2} + conns := []*tarantool.Connection{&tarantool.Connection{}, &tarantool.Connection{}} + + for i, addr := range addrs { + rr.AddConn(addr, conns[i]) + } + + for i, addr := range addrs { + if conn := rr.DeleteConnByAddr(addr); conn != conns[i] { + t.Errorf("Unexpected connection on address %s", addr) + } + } + if !rr.IsEmpty() { + t.Errorf("RoundRobin does not empty") + } +} + +func TestRoundRobinAddDuplicateDelete(t *testing.T) { + rr := NewEmptyRoundRobin(10) + + conn1 := &tarantool.Connection{} + conn2 := &tarantool.Connection{} + + rr.AddConn(validAddr1, conn1) + rr.AddConn(validAddr1, conn2) + + if rr.DeleteConnByAddr(validAddr1) != conn2 { + t.Errorf("Unexpected deleted connection") + } + if !rr.IsEmpty() { + t.Errorf("RoundRobin does not empty") + } + if rr.DeleteConnByAddr(validAddr1) != nil { + t.Errorf("Unexpected value after second deletion") + } +} + +func TestRoundRobinGetNextConnection(t *testing.T) { + rr := NewEmptyRoundRobin(10) + + addrs := []string{validAddr1, validAddr2} + conns := []*tarantool.Connection{&tarantool.Connection{}, &tarantool.Connection{}} + + for i, addr := range addrs { + rr.AddConn(addr, conns[i]) + } + + expectedConns := []*tarantool.Connection{conns[0], conns[1], conns[0], conns[1]} + for i, expected := range expectedConns { + if rr.GetNextConnection() != expected { + t.Errorf("Unexpected connection on %d call", i) + } + } +} diff --git a/connection_pool/state.go b/connection_pool/state.go new file mode 100644 index 000000000..a9d20392e --- /dev/null +++ b/connection_pool/state.go @@ -0,0 +1,26 @@ +package connection_pool + +import ( + "sync/atomic" +) + +// pool state +type state uint32 + +const ( + unknownState state = iota + connectedState + closedState +) + +func (s *state) set(news state) { + atomic.StoreUint32((*uint32)(s), uint32(news)) +} + +func (s *state) cas(olds, news state) bool { + return atomic.CompareAndSwapUint32((*uint32)(s), uint32(olds), uint32(news)) +} + +func (s *state) get() state { + return state(atomic.LoadUint32((*uint32)(s))) +}