Skip to content

Commit de1d004

Browse files
committed
pool: add a connection even on connection error
From a user's perspective, it is useful to add all target instances to the pool, even some that are not currently unavailable. This way the user don’t have to keep track of the list of actually added instances. The patch make it possible. Closes #372
1 parent b8d9914 commit de1d004

File tree

4 files changed

+91
-112
lines changed

4 files changed

+91
-112
lines changed

CHANGELOG.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,8 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
5252
connection objects (#136). This function now does not attempt to reconnect
5353
and tries to establish a connection only once. Function might be canceled
5454
via context. Context accepted as first argument.
55-
`pool.Connect` and `pool.Add` now accept context as first argument, which
56-
user may cancel in process. If `pool.Connect` is canceled in progress, an
55+
`pool.Connect` now accept context as first argument, which user may cancel
56+
in process. If `pool.Connect` is canceled in progress, an
5757
error will be returned. All created connections will be closed.
5858
- `iproto.Feature` type now used instead of `ProtocolFeature` (#337)
5959
- `iproto.IPROTO_FEATURE_` constants now used instead of local `Feature`
@@ -95,6 +95,8 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
9595
- Renamed `StrangerResponse` to `MockResponse` (#237)
9696
- `pool.Connect`, `pool.ConnetcWithOpts` and `pool.Add` use a new type
9797
`pool.Instance` to determinate connection options (#356)
98+
- `pool.Connect`, `pool.ConnectWithOpts` and `pool.Add` add connections to
99+
the pool even it is unable to connect to it (#372)
98100

99101
### Deprecated
100102

README.md

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -199,9 +199,10 @@ The subpackage has been deleted. You could use `pool` instead.
199199
the second argument instead of a list of addresses. Each instance is
200200
associated with a unique string name, `Dialer` and connection options which
201201
allows instances to be independently configured.
202-
* `pool.Add` now accepts context as the first argument, which user may cancel
203-
in process.
204-
* `pool.Add` now accepts `pool.Instance` as the second argument instead of
202+
* `pool.Connect`, `pool.ConnectWithOpts` and `pool.Add` add instances into
203+
the pool even it is unable to connect to it. The pool will try to connect to
204+
the instance later.
205+
* `pool.Add` now accepts `pool.Instance` as the first argument instead of
205206
an address, it allows to configure a new instance more flexible.
206207
* `pool.GetPoolInfo` has been renamed to `pool.GetInfo`. Return type has been
207208
changed to `map[string]ConnectionInfo`.

pool/connection_pool.go

Lines changed: 18 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,7 @@ import (
2424
)
2525

2626
var (
27-
ErrEmptyInstances = errors.New("instances (second argument) should not be empty")
2827
ErrWrongCheckTimeout = errors.New("wrong check timeout, must be greater than 0")
29-
ErrNoConnection = errors.New("no active connections")
3028
ErrTooManyArgs = errors.New("too many arguments")
3129
ErrIncorrectResponse = errors.New("incorrect response format")
3230
ErrIncorrectStatus = errors.New("incorrect instance status: status should be `running`")
@@ -155,9 +153,6 @@ func newEndpoint(name string, dialer tarantool.Dialer, opts tarantool.Opts) *end
155153
// opts. Instances must have unique names.
156154
func ConnectWithOpts(ctx context.Context, instances []Instance,
157155
opts Opts) (*ConnectionPool, error) {
158-
if len(instances) == 0 {
159-
return nil, ErrEmptyInstances
160-
}
161156
unique := make(map[string]bool)
162157
for _, instance := range instances {
163158
if _, ok := unique[instance.Name]; ok {
@@ -178,28 +173,23 @@ func ConnectWithOpts(ctx context.Context, instances []Instance,
178173
connPool := &ConnectionPool{
179174
ends: make(map[string]*endpoint),
180175
opts: opts,
181-
state: unknownState,
176+
state: connectedState,
182177
done: make(chan struct{}),
183178
rwPool: rwPool,
184179
roPool: roPool,
185180
anyPool: anyPool,
186181
}
187182

188-
somebodyAlive, ctxCanceled := connPool.fillPools(ctx, instances)
189-
if !somebodyAlive {
183+
canceled := connPool.fillPools(ctx, instances)
184+
if canceled {
190185
connPool.state.set(closedState)
191-
if ctxCanceled {
192-
return nil, ErrContextCanceled
193-
}
194-
return nil, ErrNoConnection
186+
return nil, ErrContextCanceled
195187
}
196188

197-
connPool.state.set(connectedState)
198-
199-
for _, s := range connPool.ends {
189+
for _, endpoint := range connPool.ends {
200190
endpointCtx, cancel := context.WithCancel(context.Background())
201-
s.cancel = cancel
202-
go connPool.controller(endpointCtx, s)
191+
endpoint.cancel = cancel
192+
go connPool.controller(endpointCtx, endpoint)
203193
}
204194

205195
return connPool, nil
@@ -252,9 +242,9 @@ func (p *ConnectionPool) ConfiguredTimeout(mode Mode) (time.Duration, error) {
252242
return conn.ConfiguredTimeout(), nil
253243
}
254244

255-
// Add adds a new instance into the pool. This function adds the instance
256-
// only after successful connection.
257-
func (p *ConnectionPool) Add(ctx context.Context, instance Instance) error {
245+
// Add adds a new instance into the pool. The pool will try to connect to the
246+
// instance later if it is unable to establish a connection.
247+
func (p *ConnectionPool) Add(instance Instance) error {
258248
e := newEndpoint(instance.Name, instance.Dialer, instance.Opts)
259249

260250
p.endsMutex.Lock()
@@ -274,14 +264,9 @@ func (p *ConnectionPool) Add(ctx context.Context, instance Instance) error {
274264
p.ends[instance.Name] = e
275265
p.endsMutex.Unlock()
276266

277-
if err := p.tryConnect(ctx, e); err != nil {
278-
p.endsMutex.Lock()
279-
delete(p.ends, instance.Name)
280-
p.endsMutex.Unlock()
281-
e.cancel()
282-
close(e.closed)
283-
return err
284-
}
267+
// The result does not matter, but we should try to connect to the instance
268+
// as fast as possible.
269+
_ = p.tryConnect(endpointCtx, e)
285270

286271
go p.controller(endpointCtx, e)
287272
return nil
@@ -1145,64 +1130,31 @@ func (p *ConnectionPool) deactivateConnections() {
11451130
}
11461131
}
11471132

1148-
func (p *ConnectionPool) processConnection(conn *tarantool.Connection,
1149-
name string, end *endpoint) bool {
1150-
role, err := p.getConnectionRole(conn)
1151-
if err != nil {
1152-
conn.Close()
1153-
log.Printf("tarantool: storing connection to %s failed: %s\n", name, err)
1154-
return false
1155-
}
1156-
1157-
if !p.handlerDiscovered(name, conn, role) {
1158-
conn.Close()
1159-
return false
1160-
}
1161-
if p.addConnection(name, conn, role) != nil {
1162-
conn.Close()
1163-
p.handlerDeactivated(name, conn, role)
1164-
return false
1165-
}
1166-
1167-
end.conn = conn
1168-
end.role = role
1169-
return true
1170-
}
1171-
1172-
func (p *ConnectionPool) fillPools(ctx context.Context,
1173-
instances []Instance) (bool, bool) {
1174-
somebodyAlive := false
1175-
ctxCanceled := false
1176-
1133+
func (p *ConnectionPool) fillPools(ctx context.Context, instances []Instance) bool {
11771134
// It is called before controller() goroutines, so we don't expect
11781135
// concurrency issues here.
11791136
for _, instance := range instances {
11801137
end := newEndpoint(instance.Name, instance.Dialer, instance.Opts)
11811138
p.ends[instance.Name] = end
1182-
connOpts := instance.Opts
1183-
connOpts.Notify = end.notify
1184-
conn, err := tarantool.Connect(ctx, instance.Dialer, connOpts)
1139+
1140+
err := p.tryConnect(ctx, end)
11851141
if err != nil {
11861142
log.Printf("tarantool: connect to %s failed: %s\n",
11871143
instance.Name, err)
11881144
select {
11891145
case <-ctx.Done():
1190-
ctxCanceled = true
1191-
11921146
p.ends[instance.Name] = nil
11931147
log.Printf("tarantool: operation was canceled")
11941148

11951149
p.deactivateConnections()
11961150

1197-
return false, ctxCanceled
1151+
return true
11981152
default:
11991153
}
1200-
} else if p.processConnection(conn, instance.Name, end) {
1201-
somebodyAlive = true
12021154
}
12031155
}
12041156

1205-
return somebodyAlive, ctxCanceled
1157+
return false
12061158
}
12071159

12081160
func (p *ConnectionPool) updateConnection(e *endpoint) {

pool/connection_pool_test.go

Lines changed: 65 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -86,22 +86,6 @@ var defaultTimeoutRetry = 500 * time.Millisecond
8686

8787
var helpInstances []test_helpers.TarantoolInstance
8888

89-
func TestConnect_error_empty_instances(t *testing.T) {
90-
ctx, cancel := test_helpers.GetPoolConnectContext()
91-
connPool, err := pool.Connect(ctx, []pool.Instance{})
92-
cancel()
93-
require.Nilf(t, connPool, "conn is not nil with incorrect param")
94-
require.ErrorIs(t, err, pool.ErrEmptyInstances)
95-
}
96-
97-
func TestConnect_error_unavailable(t *testing.T) {
98-
ctx, cancel := test_helpers.GetPoolConnectContext()
99-
connPool, err := pool.Connect(ctx, makeInstances([]string{"err1", "err2"}, connOpts))
100-
cancel()
101-
require.Nilf(t, connPool, "conn is not nil with incorrect param")
102-
require.ErrorIs(t, err, pool.ErrNoConnection)
103-
}
104-
10589
func TestConnect_error_duplicate(t *testing.T) {
10690
ctx, cancel := test_helpers.GetPoolConnectContext()
10791
connPool, err := pool.Connect(ctx, makeInstances([]string{"foo", "foo"}, connOpts))
@@ -138,13 +122,56 @@ func TestConnSuccessfully(t *testing.T) {
138122
ExpectedPoolStatus: true,
139123
ExpectedStatuses: map[string]bool{
140124
healthyServ: true,
125+
"err": false,
141126
},
142127
}
143128

144129
err = test_helpers.CheckPoolStatuses(args)
145130
require.Nil(t, err)
146131
}
147132

133+
func TestConnect_empty(t *testing.T) {
134+
cases := []struct {
135+
Name string
136+
Instances []pool.Instance
137+
}{
138+
{"nil", nil},
139+
{"empty", []pool.Instance{}},
140+
}
141+
142+
for _, tc := range cases {
143+
t.Run(tc.Name, func(t *testing.T) {
144+
ctx, cancel := test_helpers.GetPoolConnectContext()
145+
defer cancel()
146+
connPool, err := pool.Connect(ctx, tc.Instances)
147+
if connPool != nil {
148+
defer connPool.Close()
149+
}
150+
require.NoError(t, err, "failed to create a pool")
151+
require.NotNilf(t, connPool, "pool is nil after Connect")
152+
require.Lenf(t, connPool.GetInfo(), 0, "empty pool expected")
153+
})
154+
}
155+
}
156+
157+
func TestConnect_unavailable(t *testing.T) {
158+
servers := []string{"err1", "err2"}
159+
ctx, cancel := test_helpers.GetPoolConnectContext()
160+
connPool, err := pool.Connect(ctx, makeInstances([]string{"err1", "err2"}, connOpts))
161+
cancel()
162+
163+
if connPool != nil {
164+
defer connPool.Close()
165+
}
166+
167+
require.NoError(t, err, "failed to create a pool")
168+
require.NotNilf(t, connPool, "pool is nil after Connect")
169+
require.Equal(t, map[string]pool.ConnectionInfo{
170+
servers[0]: pool.ConnectionInfo{ConnectedNow: false, ConnRole: pool.UnknownRole},
171+
servers[1]: pool.ConnectionInfo{ConnectedNow: false, ConnRole: pool.UnknownRole},
172+
}, connPool.GetInfo())
173+
}
174+
148175
func TestConnErrorAfterCtxCancel(t *testing.T) {
149176
var connLongReconnectOpts = tarantool.Opts{
150177
Timeout: 5 * time.Second,
@@ -410,16 +437,14 @@ func TestDisconnectAll(t *testing.T) {
410437
func TestAdd(t *testing.T) {
411438
ctx, cancel := test_helpers.GetPoolConnectContext()
412439
defer cancel()
413-
connPool, err := pool.Connect(ctx, makeInstances(servers[:1], connOpts))
440+
connPool, err := pool.Connect(ctx, []pool.Instance{})
414441
require.Nilf(t, err, "failed to connect")
415442
require.NotNilf(t, connPool, "conn is nil after Connect")
416443

417444
defer connPool.Close()
418445

419-
for _, server := range servers[1:] {
420-
ctx, cancel := test_helpers.GetConnectContext()
421-
err = connPool.Add(ctx, makeInstance(server, connOpts))
422-
cancel()
446+
for _, server := range servers {
447+
err = connPool.Add(makeInstance(server, connOpts))
423448
require.Nil(t, err)
424449
}
425450

@@ -452,9 +477,7 @@ func TestAdd_exist(t *testing.T) {
452477

453478
defer connPool.Close()
454479

455-
ctx, cancel = test_helpers.GetConnectContext()
456-
err = connPool.Add(ctx, makeInstance(server, connOpts))
457-
cancel()
480+
err = connPool.Add(makeInstance(server, connOpts))
458481
require.Equal(t, pool.ErrExists, err)
459482

460483
args := test_helpers.CheckStatusesArgs{
@@ -484,25 +507,23 @@ func TestAdd_unreachable(t *testing.T) {
484507
defer connPool.Close()
485508

486509
unhealthyServ := "127.0.0.2:6667"
487-
ctx, cancel = test_helpers.GetConnectContext()
488-
err = connPool.Add(ctx, pool.Instance{
510+
err = connPool.Add(pool.Instance{
489511
Name: unhealthyServ,
490512
Dialer: tarantool.NetDialer{
491513
Address: unhealthyServ,
492514
},
493515
Opts: connOpts,
494516
})
495-
cancel()
496-
// The OS-dependent error so we just check for existence.
497-
require.NotNil(t, err)
517+
require.NoError(t, err)
498518

499519
args := test_helpers.CheckStatusesArgs{
500520
ConnPool: connPool,
501521
Mode: pool.ANY,
502522
Servers: servers,
503523
ExpectedPoolStatus: true,
504524
ExpectedStatuses: map[string]bool{
505-
server: true,
525+
server: true,
526+
unhealthyServ: false,
506527
},
507528
}
508529

@@ -520,9 +541,7 @@ func TestAdd_afterClose(t *testing.T) {
520541
require.NotNilf(t, connPool, "conn is nil after Connect")
521542

522543
connPool.Close()
523-
ctx, cancel = test_helpers.GetConnectContext()
524-
err = connPool.Add(ctx, makeInstance(server, connOpts))
525-
cancel()
544+
err = connPool.Add(makeInstance(server, connOpts))
526545
assert.Equal(t, err, pool.ErrClosed)
527546
}
528547

@@ -541,9 +560,7 @@ func TestAdd_Close_concurrent(t *testing.T) {
541560
go func() {
542561
defer wg.Done()
543562

544-
ctx, cancel := test_helpers.GetConnectContext()
545-
err = connPool.Add(ctx, makeInstance(serv1, connOpts))
546-
cancel()
563+
err = connPool.Add(makeInstance(serv1, connOpts))
547564
if err != nil {
548565
assert.Equal(t, pool.ErrClosed, err)
549566
}
@@ -569,9 +586,7 @@ func TestAdd_CloseGraceful_concurrent(t *testing.T) {
569586
go func() {
570587
defer wg.Done()
571588

572-
ctx, cancel := test_helpers.GetConnectContext()
573-
err = connPool.Add(ctx, makeInstance(serv1, connOpts))
574-
cancel()
589+
err = connPool.Add(makeInstance(serv1, connOpts))
575590
if err != nil {
576591
assert.Equal(t, pool.ErrClosed, err)
577592
}
@@ -1028,8 +1043,17 @@ func TestConnectionHandlerOpenError(t *testing.T) {
10281043
if err == nil {
10291044
defer connPool.Close()
10301045
}
1031-
require.NotNilf(t, err, "success to connect")
1032-
require.Equalf(t, 2, h.discovered, "unexpected discovered count")
1046+
require.NoError(t, err, "failed to connect")
1047+
require.NotNil(t, connPool, "pool expected")
1048+
require.Equal(t, map[string]pool.ConnectionInfo{
1049+
servers[0]: pool.ConnectionInfo{ConnectedNow: false, ConnRole: pool.UnknownRole},
1050+
servers[1]: pool.ConnectionInfo{ConnectedNow: false, ConnRole: pool.UnknownRole},
1051+
}, connPool.GetInfo())
1052+
connPool.Close()
1053+
1054+
// It could happen additional reconnect attempts in the background, but
1055+
// at least 2 connects on start.
1056+
require.GreaterOrEqualf(t, h.discovered, 2, "unexpected discovered count")
10331057
require.Equalf(t, 0, h.deactivated, "unexpected deactivated count")
10341058
}
10351059

0 commit comments

Comments
 (0)