From 5f31390baad69dbdcc1f66043d8b13e09711e9a9 Mon Sep 17 00:00:00 2001 From: Oleg Jukovec Date: Thu, 18 Aug 2022 09:10:51 +0300 Subject: [PATCH 01/12] doc: fix connection_pool Mode description The patch fixes PREFER_RW/PREFER_RO usage. It also make comments in Go-style. See `Const` section in `Go Doc Comments` guide [1]. 1. https://go.dev/doc/comment Part of #208 --- CHANGELOG.md | 2 ++ connection_pool/const.go | 32 ++++++++++++-------------------- 2 files changed, 14 insertions(+), 20 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 11b0e54e9..fd2b8876b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,8 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release. ### Fixed +- Mode type description in the connection_pool subpackage (#208) + ## [1.8.0] - 2022-08-17 ### Added diff --git a/connection_pool/const.go b/connection_pool/const.go index 334806437..c93d4ee3e 100644 --- a/connection_pool/const.go +++ b/connection_pool/const.go @@ -1,21 +1,7 @@ package connection_pool -type Mode uint32 -type Role uint32 -type State uint32 - /* -Mode parameter: - -- ANY (use any instance) - the request can be executed on any instance (master or replica). - -- RW (writeable instance (master)) - the request can only be executed on master. - -- RO (read only instance (replica)) - the request can only be executed on replica. - -- PREFER_RO (prefer read only instance (replica)) - if there is one, otherwise fallback to a writeable one (master). - -- PREFER_RW (prefer write only instance (master)) - if there is one, otherwise fallback to a read only one (replica). +Default mode for each request table: Request Default mode ---------- -------------- @@ -30,14 +16,18 @@ Mode parameter: | select | ANY | | get | ANY | */ +type Mode uint32 + const ( - ANY = iota - RW - RO - PreferRW - PreferRO + ANY Mode = iota // The request can be executed on any instance (master or replica). + RW // The request can only be executed on master. + RO // The request can only be executed on replica. + PreferRW // If there is one, otherwise fallback to a writeable one (master). + PreferRO // If there is one, otherwise fallback to a read only one (replica). ) +type Role uint32 + // master/replica role const ( unknown = iota @@ -45,6 +35,8 @@ const ( replica ) +type State uint32 + // pool state const ( connConnected = iota From 60fd8a7cf40b9384fe2978c5234706235813b2d3 Mon Sep 17 00:00:00 2001 From: Oleg Jukovec Date: Thu, 18 Aug 2022 13:10:48 +0300 Subject: [PATCH 02/12] code health: move NewPrepared to public code block Part of #208 --- connection_pool/connection_pool.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/connection_pool/connection_pool.go b/connection_pool/connection_pool.go index 1b080fc1f..fb46c8101 100644 --- a/connection_pool/connection_pool.go +++ b/connection_pool/connection_pool.go @@ -558,6 +558,15 @@ func (connPool *ConnectionPool) NewStream(userMode Mode) (*tarantool.Stream, err return conn.NewStream() } +// NewPrepared passes a sql statement to Tarantool for preparation synchronously. +func (connPool *ConnectionPool) NewPrepared(expr string, userMode Mode) (*tarantool.Prepared, error) { + conn, err := connPool.getNextConnection(userMode) + if err != nil { + return nil, err + } + return conn.NewPrepared(expr) +} + // // private // @@ -812,12 +821,3 @@ func newErrorFuture(err error) *tarantool.Future { fut.SetError(err) return fut } - -// NewPrepared passes a sql statement to Tarantool for preparation synchronously. -func (connPool *ConnectionPool) NewPrepared(expr string, userMode Mode) (*tarantool.Prepared, error) { - conn, err := connPool.getNextConnection(userMode) - if err != nil { - return nil, err - } - return conn.NewPrepared(expr) -} From 571155d33c6328aa865e2f9a2517566879db7650 Mon Sep 17 00:00:00 2001 From: Oleg Jukovec Date: Tue, 16 Aug 2022 17:30:18 +0300 Subject: [PATCH 03/12] code health: remove duplicate box.info check It's enough to get `box.info` table once in ConnectionPool.getConnectionRole(). Part of #208 --- connection_pool/connection_pool.go | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/connection_pool/connection_pool.go b/connection_pool/connection_pool.go index fb46c8101..d3b7b5d1b 100644 --- a/connection_pool/connection_pool.go +++ b/connection_pool/connection_pool.go @@ -591,17 +591,6 @@ func (connPool *ConnectionPool) getConnectionRole(conn *tarantool.Connection) (R return unknown, ErrIncorrectStatus } - resp, err = conn.Call17("box.info", []interface{}{}) - if err != nil { - return unknown, err - } - if resp == nil { - return unknown, ErrIncorrectResponse - } - if len(resp.Data) < 1 { - return unknown, ErrIncorrectResponse - } - replicaRole, ok := resp.Data[0].(map[interface{}]interface{})["ro"] if !ok { return unknown, ErrIncorrectResponse From 5404205f1bbb8c8e3444b148fbf60ce231e19fac Mon Sep 17 00:00:00 2001 From: Oleg Jukovec Date: Tue, 16 Aug 2022 17:39:21 +0300 Subject: [PATCH 04/12] code health: unify error format in connection_pool After the patch all errors in the connection_pool subpackage start with a lowercase letter [1]. 1. https://github.com/golang/go/wiki/CodeReviewComments#error-strings Part of #208 --- connection_pool/connection_pool.go | 10 +++++----- connection_pool/connection_pool_test.go | 4 ++-- connection_pool/example_test.go | 2 +- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/connection_pool/connection_pool.go b/connection_pool/connection_pool.go index d3b7b5d1b..ed8cc38fb 100644 --- a/connection_pool/connection_pool.go +++ b/connection_pool/connection_pool.go @@ -25,11 +25,11 @@ var ( ErrWrongCheckTimeout = errors.New("wrong check timeout, must be greater than 0") ErrNoConnection = errors.New("no active connections") ErrTooManyArgs = errors.New("too many arguments") - ErrIncorrectResponse = errors.New("Incorrect response format") - ErrIncorrectStatus = errors.New("Incorrect instance status: status should be `running`") - ErrNoRwInstance = errors.New("Can't find rw instance in pool") - ErrNoRoInstance = errors.New("Can't find ro instance in pool") - ErrNoHealthyInstance = errors.New("Can't find healthy instance in pool") + ErrIncorrectResponse = errors.New("incorrect response format") + ErrIncorrectStatus = errors.New("incorrect instance status: status should be `running`") + ErrNoRwInstance = errors.New("can't find rw instance in pool") + ErrNoRoInstance = errors.New("can't find ro instance in pool") + ErrNoHealthyInstance = errors.New("can't find healthy instance in pool") ) /* diff --git a/connection_pool/connection_pool_test.go b/connection_pool/connection_pool_test.go index 4dc3ac12c..0e34a6b95 100644 --- a/connection_pool/connection_pool_test.go +++ b/connection_pool/connection_pool_test.go @@ -431,7 +431,7 @@ func TestRoundRobinStrategy_NoReplica(t *testing.T) { // RO _, err = connPool.Eval("return box.cfg.listen", []interface{}{}, connection_pool.RO) require.NotNilf(t, err, "expected to fail after Eval, but error is nil") - require.Equal(t, "Can't find ro instance in pool", err.Error()) + require.Equal(t, "can't find ro instance in pool", err.Error()) // ANY args := test_helpers.ListenOnInstanceArgs{ @@ -502,7 +502,7 @@ func TestRoundRobinStrategy_NoMaster(t *testing.T) { // RW _, err = connPool.Eval("return box.cfg.listen", []interface{}{}, connection_pool.RW) require.NotNilf(t, err, "expected to fail after Eval, but error is nil") - require.Equal(t, "Can't find rw instance in pool", err.Error()) + require.Equal(t, "can't find rw instance in pool", err.Error()) // ANY args := test_helpers.ListenOnInstanceArgs{ diff --git a/connection_pool/example_test.go b/connection_pool/example_test.go index 7f6c34700..65eff2cca 100644 --- a/connection_pool/example_test.go +++ b/connection_pool/example_test.go @@ -254,7 +254,7 @@ func ExampleConnectionPool_SelectAsync_err() { fmt.Println("Future", 0, "Error", err) // Output: - // Future 0 Error Can't find rw instance in pool + // Future 0 Error can't find rw instance in pool } func ExampleConnectionPool_Ping() { From eb6b87538105c29f2fa98df615fdf3b054fcd747 Mon Sep 17 00:00:00 2001 From: Oleg Jukovec Date: Thu, 18 Aug 2022 09:36:26 +0300 Subject: [PATCH 05/12] bugfix: move Role constants into the public API We return a role of an instance from `GetPoolInfo()` call. It is unexpected that Role constants are not a part of the public API. Part of #208 --- CHANGELOG.md | 1 + connection_pool/connection_pool.go | 34 ++++++++++++------------- connection_pool/connection_pool_test.go | 2 +- connection_pool/const.go | 8 +++--- 4 files changed, 22 insertions(+), 23 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index fd2b8876b..9ed7967b3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release. ### Fixed - Mode type description in the connection_pool subpackage (#208) +- Missed Role type constants in the connection_pool subpackage (#208) ## [1.8.0] - 2022-08-17 diff --git a/connection_pool/connection_pool.go b/connection_pool/connection_pool.go index ed8cc38fb..28aca1989 100644 --- a/connection_pool/connection_pool.go +++ b/connection_pool/connection_pool.go @@ -574,50 +574,48 @@ func (connPool *ConnectionPool) NewPrepared(expr string, userMode Mode) (*tarant func (connPool *ConnectionPool) getConnectionRole(conn *tarantool.Connection) (Role, error) { resp, err := conn.Call17("box.info", []interface{}{}) if err != nil { - return unknown, err + return UnknownRole, err } if resp == nil { - return unknown, ErrIncorrectResponse + return UnknownRole, ErrIncorrectResponse } if len(resp.Data) < 1 { - return unknown, ErrIncorrectResponse + return UnknownRole, ErrIncorrectResponse } instanceStatus, ok := resp.Data[0].(map[interface{}]interface{})["status"] if !ok { - return unknown, ErrIncorrectResponse + return UnknownRole, ErrIncorrectResponse } if instanceStatus != "running" { - return unknown, ErrIncorrectStatus + return UnknownRole, ErrIncorrectStatus } replicaRole, ok := resp.Data[0].(map[interface{}]interface{})["ro"] if !ok { - return unknown, ErrIncorrectResponse + return UnknownRole, ErrIncorrectResponse } switch replicaRole { case false: - return master, nil + return MasterRole, nil case true: - return replica, nil + return ReplicaRole, nil } - return unknown, nil + return UnknownRole, nil } func (connPool *ConnectionPool) getConnectionFromPool(addr string) (*tarantool.Connection, Role) { - conn := connPool.rwPool.GetConnByAddr(addr) - if conn != nil { - return conn, master + if conn := connPool.rwPool.GetConnByAddr(addr); conn != nil { + return conn, MasterRole } - conn = connPool.roPool.GetConnByAddr(addr) - if conn != nil { - return conn, replica + if conn := connPool.roPool.GetConnByAddr(addr); conn != nil { + return conn, ReplicaRole } - return connPool.anyPool.GetConnByAddr(addr), unknown + return connPool.anyPool.GetConnByAddr(addr), UnknownRole } func (connPool *ConnectionPool) deleteConnectionFromPool(addr string) { @@ -639,9 +637,9 @@ func (connPool *ConnectionPool) setConnectionToPool(addr string, conn *tarantool connPool.anyPool.AddConn(addr, conn) switch role { - case master: + case MasterRole: connPool.rwPool.AddConn(addr, conn) - case replica: + case ReplicaRole: connPool.roPool.AddConn(addr, conn) } diff --git a/connection_pool/connection_pool_test.go b/connection_pool/connection_pool_test.go index 0e34a6b95..b6380219d 100644 --- a/connection_pool/connection_pool_test.go +++ b/connection_pool/connection_pool_test.go @@ -1295,7 +1295,7 @@ func TestNewPrepared(t *testing.T) { stmt, err := connPool.NewPrepared("SELECT NAME0, NAME1 FROM SQL_TEST WHERE NAME0=:id AND NAME1=:name;", connection_pool.RO) require.Nilf(t, err, "fail to prepare statement: %v", err) - if connPool.GetPoolInfo()[stmt.Conn.Addr()].ConnRole != connection_pool.RO { + if connPool.GetPoolInfo()[stmt.Conn.Addr()].ConnRole != connection_pool.ReplicaRole { t.Errorf("wrong role for the statement's connection") } diff --git a/connection_pool/const.go b/connection_pool/const.go index c93d4ee3e..2cfcbcb00 100644 --- a/connection_pool/const.go +++ b/connection_pool/const.go @@ -26,13 +26,13 @@ const ( PreferRO // If there is one, otherwise fallback to a read only one (replica). ) +// Role describes a role of an instance by its mode. type Role uint32 -// master/replica role const ( - unknown = iota - master - replica + UnknownRole Role = iota // A connection pool failed to discover mode of the instance. + MasterRole // The instance is read-write mode. + ReplicaRole // The instance is in read-only mode. ) type State uint32 From d7b0e43f72bb4680638606ff2ba46d11f41590cd Mon Sep 17 00:00:00 2001 From: Oleg Jukovec Date: Wed, 17 Aug 2022 10:14:15 +0300 Subject: [PATCH 06/12] bugfix: close `UnknownRole` connections from a pool We add all connections into ConnectionPool.anyPool, but `MasterRole` connection only in ConnectionPool.rwPool and `ReplicaRole` connections only in ConnectionPool.roPool. As a result `UnknownRole` connections appears only in the `anyPool`. See `setConnectionToPool` implementation. So we need to close connections from the `anyPool` instead of `roPool` + `rwPool`. Part of #208 --- CHANGELOG.md | 1 + connection_pool/connection_pool.go | 15 +++++++++++---- connection_pool/round_robin.go | 13 ------------- 3 files changed, 12 insertions(+), 17 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9ed7967b3..6f3be2de8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release. - Mode type description in the connection_pool subpackage (#208) - Missed Role type constants in the connection_pool subpackage (#208) +- ConnectionPool does not close UnknownRole connections (#208) ## [1.8.0] - 2022-08-17 diff --git a/connection_pool/connection_pool.go b/connection_pool/connection_pool.go index 28aca1989..e424fe85e 100644 --- a/connection_pool/connection_pool.go +++ b/connection_pool/connection_pool.go @@ -154,12 +154,19 @@ func (connPool *ConnectionPool) Close() []error { close(connPool.control) connPool.state = connClosed - rwErrs := connPool.rwPool.CloseConns() - roErrs := connPool.roPool.CloseConns() + errs := make([]error, 0, len(connPool.addrs)) - allErrs := append(rwErrs, roErrs...) + for _, addr := range connPool.addrs { + if conn := connPool.anyPool.DeleteConnByAddr(addr); conn != nil { + if err := conn.Close(); err != nil { + errs = append(errs, err) + } + } + connPool.rwPool.DeleteConnByAddr(addr) + connPool.roPool.DeleteConnByAddr(addr) + } - return allErrs + return errs } // GetAddrs gets addresses of connections in pool. diff --git a/connection_pool/round_robin.go b/connection_pool/round_robin.go index 7f3f0d098..63afaeb12 100644 --- a/connection_pool/round_robin.go +++ b/connection_pool/round_robin.go @@ -60,19 +60,6 @@ func (r *RoundRobinStrategy) IsEmpty() bool { return r.size == 0 } -func (r *RoundRobinStrategy) CloseConns() []error { - r.mutex.Lock() - defer r.mutex.Unlock() - - errs := make([]error, len(r.conns)) - - for i, conn := range r.conns { - errs[i] = conn.Close() - } - - return errs -} - func (r *RoundRobinStrategy) GetNextConnection() *tarantool.Connection { r.mutex.RLock() defer r.mutex.RUnlock() From 7048557e982984b82a0b7cdc568b005e880bd6a1 Mon Sep 17 00:00:00 2001 From: Oleg Jukovec Date: Thu, 18 Aug 2022 08:30:16 +0300 Subject: [PATCH 07/12] bugfix: logic of add/delete in round robin strategy * Duplicates could be added to a list of connections, which then cannot be deleted. * The delete function uses an address from a connection instead of argument value. It may lead to unexpected errors. Part of #208 --- connection_pool/round_robin.go | 16 ++++++--- connection_pool/round_robin_test.go | 55 +++++++++++++++++++++++++++++ 2 files changed, 66 insertions(+), 5 deletions(-) create mode 100644 connection_pool/round_robin_test.go diff --git a/connection_pool/round_robin.go b/connection_pool/round_robin.go index 63afaeb12..9e512bfbb 100644 --- a/connection_pool/round_robin.go +++ b/connection_pool/round_robin.go @@ -46,8 +46,10 @@ func (r *RoundRobinStrategy) DeleteConnByAddr(addr string) *tarantool.Connection r.conns = append(r.conns[:index], r.conns[index+1:]...) r.size -= 1 - for index, conn := range r.conns { - r.indexByAddr[conn.Addr()] = index + for k, v := range r.indexByAddr { + if v > index { + r.indexByAddr[k] = v - 1 + } } return conn @@ -94,9 +96,13 @@ func (r *RoundRobinStrategy) AddConn(addr string, conn *tarantool.Connection) { r.mutex.Lock() defer r.mutex.Unlock() - r.conns = append(r.conns, conn) - r.indexByAddr[addr] = r.size - r.size += 1 + if idx, ok := r.indexByAddr[addr]; ok { + r.conns[idx] = conn + } else { + r.conns = append(r.conns, conn) + r.indexByAddr[addr] = r.size + r.size += 1 + } } func (r *RoundRobinStrategy) nextIndex() int { diff --git a/connection_pool/round_robin_test.go b/connection_pool/round_robin_test.go new file mode 100644 index 000000000..2ca8d19b9 --- /dev/null +++ b/connection_pool/round_robin_test.go @@ -0,0 +1,55 @@ +package connection_pool_test + +import ( + "testing" + + "github.com/tarantool/go-tarantool" + . "github.com/tarantool/go-tarantool/connection_pool" +) + +const ( + validAddr1 = "x" + validAddr2 = "y" +) + +func TestRoundRobinAddDelete(t *testing.T) { + rr := NewEmptyRoundRobin(10) + + addrs := []string{validAddr1, validAddr2} + conns := []*tarantool.Connection{&tarantool.Connection{}, &tarantool.Connection{}} + + for i, addr := range addrs { + rr.AddConn(addr, conns[i]) + } + + for i, addr := range addrs { + if conn := rr.DeleteConnByAddr(addr); conn != conns[i] { + t.Errorf("Unexpected connection on address %s", addr) + } + } + if !rr.IsEmpty() { + t.Errorf("RoundRobin does not empty") + } +} + +func TestRoundRobinAddDuplicateDelete(t *testing.T) { + rr := NewEmptyRoundRobin(10) + + conn1 := &tarantool.Connection{} + conn2 := &tarantool.Connection{} + + rr.AddConn(validAddr1, conn1) + rr.AddConn(validAddr1, conn2) + + if rr.DeleteConnByAddr(validAddr1) != conn2 { + t.Errorf("Unexpected deleted connection") + } + if !rr.IsEmpty() { + t.Errorf("RoundRobin does not empty") + } + if rr.DeleteConnByAddr(validAddr1) != nil { + t.Errorf("Unexpected value after second deletion") + } +} + + From 56e8692618f878dc8ce2b0726182ab199d0206f2 Mon Sep 17 00:00:00 2001 From: Oleg Jukovec Date: Thu, 18 Aug 2022 14:06:16 +0300 Subject: [PATCH 08/12] bugfix: prevent segfaults in connection_pool After the patch ConnectionPool.getNextConnection() does not return (nil, nil). It always return a connection or an error. The check Connection.ConnectedNow() does not have sence because the connection may be closed right after the call. The code just complicates the logic and does not protect against anything. A chain of two atomic operations IsEmpty() + GetNextConnection() wrong because it leads too a race condition. Part of #208 --- CHANGELOG.md | 1 + connection_pool/connection_pool.go | 66 +++++++++++-------------- connection_pool/connection_pool_test.go | 36 ++++++++++++++ connection_pool/round_robin.go | 34 +++++-------- connection_pool/round_robin_test.go | 16 ++++++ 5 files changed, 94 insertions(+), 59 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6f3be2de8..125081630 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release. - Mode type description in the connection_pool subpackage (#208) - Missed Role type constants in the connection_pool subpackage (#208) - ConnectionPool does not close UnknownRole connections (#208) +- Segmentation faults in ConnectionPool requests after disconnect (#208) ## [1.8.0] - 2022-08-17 diff --git a/connection_pool/connection_pool.go b/connection_pool/connection_pool.go index e424fe85e..0fcd88d07 100644 --- a/connection_pool/connection_pool.go +++ b/connection_pool/connection_pool.go @@ -130,13 +130,20 @@ func (connPool *ConnectionPool) ConnectedNow(mode Mode) (bool, error) { if connPool.getState() != connConnected { return false, nil } - - conn, err := connPool.getNextConnection(mode) - if err != nil || conn == nil { - return false, err + switch mode { + case ANY: + return !connPool.anyPool.IsEmpty(), nil + case RW: + return !connPool.rwPool.IsEmpty(), nil + case RO: + return !connPool.roPool.IsEmpty(), nil + case PreferRW: + fallthrough + case PreferRO: + return !connPool.rwPool.IsEmpty() || !connPool.roPool.IsEmpty(), nil + default: + return false, ErrNoHealthyInstance } - - return conn.ConnectedNow(), nil } // ConfiguredTimeout gets timeout of current connection. @@ -751,49 +758,34 @@ func (connPool *ConnectionPool) getNextConnection(mode Mode) (*tarantool.Connect switch mode { case ANY: - if connPool.anyPool.IsEmpty() { - return nil, ErrNoHealthyInstance + if next := connPool.anyPool.GetNextConnection(); next != nil { + return next, nil } - - return connPool.anyPool.GetNextConnection(), nil - case RW: - if connPool.rwPool.IsEmpty() { - return nil, ErrNoRwInstance + if next := connPool.rwPool.GetNextConnection(); next != nil { + return next, nil } - - return connPool.rwPool.GetNextConnection(), nil - + return nil, ErrNoRwInstance case RO: - if connPool.roPool.IsEmpty() { - return nil, ErrNoRoInstance + if next := connPool.roPool.GetNextConnection(); next != nil { + return next, nil } - - return connPool.roPool.GetNextConnection(), nil - + return nil, ErrNoRoInstance case PreferRW: - if !connPool.rwPool.IsEmpty() { - return connPool.rwPool.GetNextConnection(), nil + if next := connPool.rwPool.GetNextConnection(); next != nil { + return next, nil } - - if !connPool.roPool.IsEmpty() { - return connPool.roPool.GetNextConnection(), nil + if next := connPool.roPool.GetNextConnection(); next != nil { + return next, nil } - - return nil, ErrNoHealthyInstance - case PreferRO: - if !connPool.roPool.IsEmpty() { - return connPool.roPool.GetNextConnection(), nil + if next := connPool.roPool.GetNextConnection(); next != nil { + return next, nil } - - if !connPool.rwPool.IsEmpty() { - return connPool.rwPool.GetNextConnection(), nil + if next := connPool.rwPool.GetNextConnection(); next != nil { + return next, nil } - - return nil, ErrNoHealthyInstance } - return nil, ErrNoHealthyInstance } diff --git a/connection_pool/connection_pool_test.go b/connection_pool/connection_pool_test.go index b6380219d..a01bf5fd2 100644 --- a/connection_pool/connection_pool_test.go +++ b/connection_pool/connection_pool_test.go @@ -208,6 +208,42 @@ func TestClose(t *testing.T) { require.Nil(t, err) } +func TestRequestOnClosed(t *testing.T) { + server1 := servers[0] + server2 := servers[1] + + connPool, err := connection_pool.Connect([]string{server1, server2}, connOpts) + require.Nilf(t, err, "failed to connect") + require.NotNilf(t, connPool, "conn is nil after Connect") + + defer connPool.Close() + + test_helpers.StopTarantoolWithCleanup(instances[0]) + test_helpers.StopTarantoolWithCleanup(instances[1]) + + args := test_helpers.CheckStatusesArgs{ + ConnPool: connPool, + Mode: connection_pool.ANY, + Servers: []string{server1, server2}, + ExpectedPoolStatus: false, + ExpectedStatuses: map[string]bool{ + server1: false, + server2: false, + }, + } + err = test_helpers.Retry(test_helpers.CheckPoolStatuses, args, defaultCountRetry, defaultTimeoutRetry) + require.Nil(t, err) + + _, err = connPool.Ping(connection_pool.ANY) + require.NotNilf(t, err, "err is nil after Ping") + + err = test_helpers.RestartTarantool(&instances[0]) + require.Nilf(t, err, "failed to restart tarantool") + + err = test_helpers.RestartTarantool(&instances[1]) + require.Nilf(t, err, "failed to restart tarantool") +} + func TestCall17(t *testing.T) { roles := []bool{false, true, false, false, true} diff --git a/connection_pool/round_robin.go b/connection_pool/round_robin.go index 9e512bfbb..b83d877d9 100644 --- a/connection_pool/round_robin.go +++ b/connection_pool/round_robin.go @@ -2,17 +2,16 @@ package connection_pool import ( "sync" - "sync/atomic" "github.com/tarantool/go-tarantool" ) type RoundRobinStrategy struct { conns []*tarantool.Connection - indexByAddr map[string]int + indexByAddr map[string]uint mutex sync.RWMutex - size int - current uint64 + size uint + current uint } func (r *RoundRobinStrategy) GetConnByAddr(addr string) *tarantool.Connection { @@ -66,29 +65,18 @@ func (r *RoundRobinStrategy) GetNextConnection() *tarantool.Connection { r.mutex.RLock() defer r.mutex.RUnlock() - // We want to iterate through the elements in a circular order - // so the first element in cycle is connections[next] - // and the last one is connections[next + length]. - next := r.nextIndex() - cycleLen := len(r.conns) + next - for i := next; i < cycleLen; i++ { - idx := i % len(r.conns) - if r.conns[idx].ConnectedNow() { - if i != next { - atomic.StoreUint64(&r.current, uint64(idx)) - } - return r.conns[idx] - } + if r.size == 0 { + return nil } - - return nil + return r.conns[r.nextIndex()] } func NewEmptyRoundRobin(size int) *RoundRobinStrategy { return &RoundRobinStrategy{ conns: make([]*tarantool.Connection, 0, size), - indexByAddr: make(map[string]int), + indexByAddr: make(map[string]uint), size: 0, + current: 0, } } @@ -105,6 +93,8 @@ func (r *RoundRobinStrategy) AddConn(addr string, conn *tarantool.Connection) { } } -func (r *RoundRobinStrategy) nextIndex() int { - return int(atomic.AddUint64(&r.current, uint64(1)) % uint64(len(r.conns))) +func (r *RoundRobinStrategy) nextIndex() uint { + ret := r.current % r.size + r.current++ + return ret } diff --git a/connection_pool/round_robin_test.go b/connection_pool/round_robin_test.go index 2ca8d19b9..6b54ecfd8 100644 --- a/connection_pool/round_robin_test.go +++ b/connection_pool/round_robin_test.go @@ -52,4 +52,20 @@ func TestRoundRobinAddDuplicateDelete(t *testing.T) { } } +func TestRoundRobinGetNextConnection(t *testing.T) { + rr := NewEmptyRoundRobin(10) + + addrs := []string{validAddr1, validAddr2} + conns := []*tarantool.Connection{&tarantool.Connection{}, &tarantool.Connection{}} + for i, addr := range addrs { + rr.AddConn(addr, conns[i]) + } + + expectedConns := []*tarantool.Connection{conns[0], conns[1], conns[0], conns[1]} + for i, expected := range expectedConns { + if rr.GetNextConnection() != expected { + t.Errorf("Unexpected connection on %d call", i) + } + } +} From 5a041bc2c11210ba01e26b8cb2f42d6cbcf8a939 Mon Sep 17 00:00:00 2001 From: Oleg Jukovec Date: Thu, 18 Aug 2022 10:01:27 +0300 Subject: [PATCH 09/12] bugfix: protect addresses from external changes We need to use copies of slices, not just pointers to them. It helps to avoid unexpected changes. Part of #208 --- CHANGELOG.md | 1 + connection_pool/connection_pool.go | 7 +++++-- connection_pool/connection_pool_test.go | 19 +++++++++++++++++++ 3 files changed, 25 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 125081630..51551325d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release. - Missed Role type constants in the connection_pool subpackage (#208) - ConnectionPool does not close UnknownRole connections (#208) - Segmentation faults in ConnectionPool requests after disconnect (#208) +- Addresses in ConnectionPool may be changed from an external code (#208) ## [1.8.0] - 2022-08-17 diff --git a/connection_pool/connection_pool.go b/connection_pool/connection_pool.go index 0fcd88d07..68fcf7155 100644 --- a/connection_pool/connection_pool.go +++ b/connection_pool/connection_pool.go @@ -96,7 +96,7 @@ func ConnectWithOpts(addrs []string, connOpts tarantool.Opts, opts OptsPool) (co anyPool := NewEmptyRoundRobin(size) connPool = &ConnectionPool{ - addrs: addrs, + addrs: make([]string, len(addrs)), connOpts: connOpts, opts: opts, notify: notify, @@ -105,6 +105,7 @@ func ConnectWithOpts(addrs []string, connOpts tarantool.Opts, opts OptsPool) (co roPool: roPool, anyPool: anyPool, } + copy(connPool.addrs, addrs) somebodyAlive := connPool.fillPools() if !somebodyAlive { @@ -178,7 +179,9 @@ func (connPool *ConnectionPool) Close() []error { // GetAddrs gets addresses of connections in pool. func (connPool *ConnectionPool) GetAddrs() []string { - return connPool.addrs + cpy := make([]string, len(connPool.addrs)) + copy(cpy, connPool.addrs) + return cpy } // GetPoolInfo gets information of connections (connected status, ro/rw role). diff --git a/connection_pool/connection_pool_test.go b/connection_pool/connection_pool_test.go index a01bf5fd2..ccf1f14ab 100644 --- a/connection_pool/connection_pool_test.go +++ b/connection_pool/connection_pool_test.go @@ -244,6 +244,25 @@ func TestRequestOnClosed(t *testing.T) { require.Nilf(t, err, "failed to restart tarantool") } +func TestGetPoolInfo(t *testing.T) { + server1 := servers[0] + server2 := servers[1] + + srvs := []string{server1, server2} + expected := []string{server1, server2} + connPool, err := connection_pool.Connect(srvs, connOpts) + require.Nilf(t, err, "failed to connect") + require.NotNilf(t, connPool, "conn is nil after Connect") + + defer connPool.Close() + + srvs[0] = "x" + connPool.GetAddrs()[1] = "y" + for i, addr := range connPool.GetAddrs() { + require.Equal(t, expected[i], addr) + } +} + func TestCall17(t *testing.T) { roles := []bool{false, true, false, false, true} From 605abb858e97184ff7373f429127d5dfd3cbd364 Mon Sep 17 00:00:00 2001 From: Oleg Jukovec Date: Thu, 18 Aug 2022 11:43:50 +0300 Subject: [PATCH 10/12] bugfix: do not recreate a connection immediately If we create a connection immediately after closing previous, then it can to lead to too frequent connection creation under some configurations [1] and high CPU load. It will be expected to recreate connection with OptsPool.CheckTimeout frequency. 1. https://github.com/tarantool/go-tarantool/issues/136 Part of #208 --- CHANGELOG.md | 1 + connection_pool/connection_pool.go | 15 +-------------- 2 files changed, 2 insertions(+), 14 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 51551325d..b638eed29 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release. - ConnectionPool does not close UnknownRole connections (#208) - Segmentation faults in ConnectionPool requests after disconnect (#208) - Addresses in ConnectionPool may be changed from an external code (#208) +- ConnectionPool recreates connections too often (#208) ## [1.8.0] - 2022-08-17 diff --git a/connection_pool/connection_pool.go b/connection_pool/connection_pool.go index 68fcf7155..83959141a 100644 --- a/connection_pool/connection_pool.go +++ b/connection_pool/connection_pool.go @@ -702,20 +702,7 @@ func (connPool *ConnectionPool) checker() { return } if e.Conn.ClosedNow() { - addr := e.Conn.Addr() - if conn, _ := connPool.getConnectionFromPool(addr); conn == nil { - continue - } - conn, _ := tarantool.Connect(addr, connPool.connOpts) - if conn != nil { - err := connPool.setConnectionToPool(addr, conn) - if err != nil { - conn.Close() - log.Printf("tarantool: storing connection to %s failed: %s\n", addr, err.Error()) - } - } else { - connPool.deleteConnectionFromPool(addr) - } + connPool.deleteConnectionFromPool(e.Conn.Addr()) } case <-timer.C: for _, addr := range connPool.addrs { From 4ea0006280883ac3867b7fdf06414cbd0cf72c4b Mon Sep 17 00:00:00 2001 From: Oleg Jukovec Date: Wed, 17 Aug 2022 13:16:01 +0300 Subject: [PATCH 11/12] bugfix: prevent recreate connection after Close() The ConnectionPool.checker() goroutine may still work some time after ConnectionPool.Close() call. It may lead to re-open connection in a concurrent closing pool. The connection still opened after the pool is closed. The patch adds RWLock to protect blocks which work with anyPool, roPool and rwPool. We don't need to protect regular requests because in the worst case, we will send a request into a closed connection. It can happen for other reasons and it seems like we can't avoid it. So it is an expected behavior. Part of #208 --- CHANGELOG.md | 1 + connection_pool/connection_pool.go | 85 ++++++++++++++++++------------ connection_pool/const.go | 8 --- connection_pool/state.go | 26 +++++++++ 4 files changed, 78 insertions(+), 42 deletions(-) create mode 100644 connection_pool/state.go diff --git a/CHANGELOG.md b/CHANGELOG.md index b638eed29..9ba11e011 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release. - Segmentation faults in ConnectionPool requests after disconnect (#208) - Addresses in ConnectionPool may be changed from an external code (#208) - ConnectionPool recreates connections too often (#208) +- A connection is still opened after ConnectionPool.Close() (#208) ## [1.8.0] - 2022-08-17 diff --git a/connection_pool/connection_pool.go b/connection_pool/connection_pool.go index 83959141a..673a7d63d 100644 --- a/connection_pool/connection_pool.go +++ b/connection_pool/connection_pool.go @@ -14,7 +14,7 @@ import ( "errors" "fmt" "log" - "sync/atomic" + "sync" "time" "github.com/tarantool/go-tarantool" @@ -69,12 +69,13 @@ type ConnectionPool struct { connOpts tarantool.Opts opts OptsPool - notify chan tarantool.ConnEvent - state State - control chan struct{} - roPool *RoundRobinStrategy - rwPool *RoundRobinStrategy - anyPool *RoundRobinStrategy + notify chan tarantool.ConnEvent + state state + control chan struct{} + roPool *RoundRobinStrategy + rwPool *RoundRobinStrategy + anyPool *RoundRobinStrategy + poolsMutex sync.RWMutex } // ConnectWithOpts creates pool for instances with addresses addrs @@ -100,6 +101,7 @@ func ConnectWithOpts(addrs []string, connOpts tarantool.Opts, opts OptsPool) (co connOpts: connOpts, opts: opts, notify: notify, + state: unknownState, control: make(chan struct{}), rwPool: rwPool, roPool: roPool, @@ -109,10 +111,12 @@ func ConnectWithOpts(addrs []string, connOpts tarantool.Opts, opts OptsPool) (co somebodyAlive := connPool.fillPools() if !somebodyAlive { - connPool.Close() + connPool.state.set(closedState) + connPool.closeImpl() return nil, ErrNoConnection } + connPool.state.set(connectedState) go connPool.checker() return connPool, nil @@ -128,7 +132,10 @@ func Connect(addrs []string, connOpts tarantool.Opts) (connPool *ConnectionPool, // ConnectedNow gets connected status of pool. func (connPool *ConnectionPool) ConnectedNow(mode Mode) (bool, error) { - if connPool.getState() != connConnected { + connPool.poolsMutex.RLock() + defer connPool.poolsMutex.RUnlock() + + if connPool.state.get() != connectedState { return false, nil } switch mode { @@ -157,10 +164,8 @@ func (connPool *ConnectionPool) ConfiguredTimeout(mode Mode) (time.Duration, err return conn.ConfiguredTimeout(), nil } -// Close closes connections in pool. -func (connPool *ConnectionPool) Close() []error { +func (connPool *ConnectionPool) closeImpl() []error { close(connPool.control) - connPool.state = connClosed errs := make([]error, 0, len(connPool.addrs)) @@ -177,6 +182,17 @@ func (connPool *ConnectionPool) Close() []error { return errs } +// Close closes connections in pool. +func (connPool *ConnectionPool) Close() []error { + if connPool.state.cas(connectedState, closedState) { + connPool.poolsMutex.Lock() + defer connPool.poolsMutex.Unlock() + + return connPool.closeImpl() + } + return nil +} + // GetAddrs gets addresses of connections in pool. func (connPool *ConnectionPool) GetAddrs() []string { cpy := make([]string, len(connPool.addrs)) @@ -188,6 +204,13 @@ func (connPool *ConnectionPool) GetAddrs() []string { func (connPool *ConnectionPool) GetPoolInfo() map[string]*ConnectionInfo { info := make(map[string]*ConnectionInfo) + connPool.poolsMutex.RLock() + defer connPool.poolsMutex.RUnlock() + + if connPool.state.get() != connectedState { + return info + } + for _, addr := range connPool.addrs { conn, role := connPool.getConnectionFromPool(addr) if conn != nil { @@ -638,11 +661,9 @@ func (connPool *ConnectionPool) getConnectionFromPool(addr string) (*tarantool.C func (connPool *ConnectionPool) deleteConnectionFromPool(addr string) { _ = connPool.anyPool.DeleteConnByAddr(addr) conn := connPool.rwPool.DeleteConnByAddr(addr) - if conn != nil { - return + if conn == nil { + connPool.roPool.DeleteConnByAddr(addr) } - - connPool.roPool.DeleteConnByAddr(addr) } func (connPool *ConnectionPool) setConnectionToPool(addr string, conn *tarantool.Connection) error { @@ -689,32 +710,30 @@ func (connPool *ConnectionPool) refreshConnection(addr string) { } func (connPool *ConnectionPool) checker() { - timer := time.NewTicker(connPool.opts.CheckTimeout) defer timer.Stop() - for connPool.getState() != connClosed { + for connPool.state.get() != closedState { select { case <-connPool.control: return case e := <-connPool.notify: - if connPool.getState() == connClosed { - return - } - if e.Conn.ClosedNow() { + connPool.poolsMutex.Lock() + if connPool.state.get() == connectedState && e.Conn.ClosedNow() { connPool.deleteConnectionFromPool(e.Conn.Addr()) } + connPool.poolsMutex.Unlock() case <-timer.C: - for _, addr := range connPool.addrs { - if connPool.getState() == connClosed { - return + connPool.poolsMutex.Lock() + if connPool.state.get() == connectedState { + for _, addr := range connPool.addrs { + // Reopen connection + // Relocate connection between subpools + // if ro/rw was updated + connPool.refreshConnection(addr) } - - // Reopen connection - // Relocate connection between subpools - // if ro/rw was updated - connPool.refreshConnection(addr) } + connPool.poolsMutex.Unlock() } } } @@ -722,6 +741,8 @@ func (connPool *ConnectionPool) checker() { func (connPool *ConnectionPool) fillPools() bool { somebodyAlive := false + // It is called before checker() goroutine and before closeImpl() may be + // called so we don't expect concurrency issues here. for _, addr := range connPool.addrs { conn, err := tarantool.Connect(addr, connPool.connOpts) if err != nil { @@ -740,10 +761,6 @@ func (connPool *ConnectionPool) fillPools() bool { return somebodyAlive } -func (connPool *ConnectionPool) getState() uint32 { - return atomic.LoadUint32((*uint32)(&connPool.state)) -} - func (connPool *ConnectionPool) getNextConnection(mode Mode) (*tarantool.Connection, error) { switch mode { diff --git a/connection_pool/const.go b/connection_pool/const.go index 2cfcbcb00..d77a55044 100644 --- a/connection_pool/const.go +++ b/connection_pool/const.go @@ -34,11 +34,3 @@ const ( MasterRole // The instance is read-write mode. ReplicaRole // The instance is in read-only mode. ) - -type State uint32 - -// pool state -const ( - connConnected = iota - connClosed -) diff --git a/connection_pool/state.go b/connection_pool/state.go new file mode 100644 index 000000000..a9d20392e --- /dev/null +++ b/connection_pool/state.go @@ -0,0 +1,26 @@ +package connection_pool + +import ( + "sync/atomic" +) + +// pool state +type state uint32 + +const ( + unknownState state = iota + connectedState + closedState +) + +func (s *state) set(news state) { + atomic.StoreUint32((*uint32)(s), uint32(news)) +} + +func (s *state) cas(olds, news state) bool { + return atomic.CompareAndSwapUint32((*uint32)(s), uint32(olds), uint32(news)) +} + +func (s *state) get() state { + return state(atomic.LoadUint32((*uint32)(s))) +} From 6dee596a5ae0f445f3a3e153f0ef3173b79257a1 Mon Sep 17 00:00:00 2001 From: Oleg Jukovec Date: Thu, 25 Aug 2022 11:39:41 +0300 Subject: [PATCH 12/12] bugfix: prevent duplicate connections in pool An user can specify duplicate addresses for a ConnectionPool. We cannot support multiple connections to the same address due to the ConnectionPool.GetPoolInfo() implementation without breaking backward compatibility. So we need to skip duplicates. Closes #208 --- connection_pool/connection_pool.go | 11 +++++++++-- connection_pool/connection_pool_test.go | 25 +++++++++++++++++++++++++ 2 files changed, 34 insertions(+), 2 deletions(-) diff --git a/connection_pool/connection_pool.go b/connection_pool/connection_pool.go index 673a7d63d..e0c862386 100644 --- a/connection_pool/connection_pool.go +++ b/connection_pool/connection_pool.go @@ -97,7 +97,7 @@ func ConnectWithOpts(addrs []string, connOpts tarantool.Opts, opts OptsPool) (co anyPool := NewEmptyRoundRobin(size) connPool = &ConnectionPool{ - addrs: make([]string, len(addrs)), + addrs: make([]string, 0, len(addrs)), connOpts: connOpts, opts: opts, notify: notify, @@ -107,7 +107,14 @@ func ConnectWithOpts(addrs []string, connOpts tarantool.Opts, opts OptsPool) (co roPool: roPool, anyPool: anyPool, } - copy(connPool.addrs, addrs) + + m := make(map[string]bool) + for _, addr := range addrs { + if _, ok := m[addr]; !ok { + m[addr] = true + connPool.addrs = append(connPool.addrs, addr) + } + } somebodyAlive := connPool.fillPools() if !somebodyAlive { diff --git a/connection_pool/connection_pool_test.go b/connection_pool/connection_pool_test.go index ccf1f14ab..326604b51 100644 --- a/connection_pool/connection_pool_test.go +++ b/connection_pool/connection_pool_test.go @@ -80,6 +80,31 @@ func TestConnSuccessfully(t *testing.T) { require.Nil(t, err) } +func TestConnSuccessfullyDuplicates(t *testing.T) { + server := servers[0] + connPool, err := connection_pool.Connect([]string{server, server, server, server}, connOpts) + require.Nilf(t, err, "failed to connect") + require.NotNilf(t, connPool, "conn is nil after Connect") + + defer connPool.Close() + + args := test_helpers.CheckStatusesArgs{ + ConnPool: connPool, + Mode: connection_pool.ANY, + Servers: []string{server}, + ExpectedPoolStatus: true, + ExpectedStatuses: map[string]bool{ + server: true, + }, + } + + err = test_helpers.CheckPoolStatuses(args) + require.Nil(t, err) + + addrs := connPool.GetAddrs() + require.Equalf(t, []string{server}, addrs, "should be only one address") +} + func TestReconnect(t *testing.T) { server := servers[0]