Skip to content

Commit 5a79c7f

Browse files
committed
api: add context to connection create
`connection.Connect` and `pool.Connect` no longer return non-working connection objects. Those functions now accept context as their first arguments, which user may cancel in process. `connection.Connect` will block until either the working connection created (and returned), `opts.MaxReconnects` creation attempts were made (returns error) or the context is canceled by user (returns error too). Closes #136
1 parent d8df65d commit 5a79c7f

29 files changed

+788
-244
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
2828
decoded to a varbinary object (#313).
2929
- Use objects of the Decimal type instead of pointers (#238)
3030
- Use objects of the Datetime type instead of pointers (#238)
31+
- `connection.Connect` and `pool.Connect` no longer return non-working
32+
connection objects (#136). Those functions now accept context as their first
33+
arguments, which user may cancel in process.
3134

3235
### Deprecated
3336

README.md

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -105,13 +105,19 @@ about what it does.
105105
package tarantool
106106

107107
import (
108+
"context"
108109
"fmt"
110+
"time"
111+
109112
"github.com/tarantool/go-tarantool/v2"
110113
)
111114

112115
func main() {
113116
opts := tarantool.Opts{User: "guest"}
114-
conn, err := tarantool.Connect("127.0.0.1:3301", opts)
117+
ctx, cancel := context.WithTimeout(context.Background(),
118+
500 * time.Millisecond)
119+
defer cancel()
120+
conn, err := tarantool.Connect(ctx, "127.0.0.1:3301", opts)
115121
if err != nil {
116122
fmt.Println("Connection refused:", err)
117123
}
@@ -139,6 +145,10 @@ starting a session. There are two parameters:
139145
* a string with `host:port` format, and
140146
* the option structure that was set up earlier.
141147

148+
There will be only one attempt to connect. If multiple attempts needed,
149+
"`tarantool.Connect`" could be placed inside the loop with some timeout
150+
between each try. Example could be found in the [example_test](./example_test.go).
151+
142152
**Observation 4:** The `err` structure will be `nil` if there is no error,
143153
otherwise it will have a description which can be retrieved with `err.Error()`.
144154

@@ -167,10 +177,17 @@ The subpackage has been deleted. You could use `pool` instead.
167177

168178
#### pool package
169179

170-
The logic has not changed, but there are a few renames:
171-
172180
* The `connection_pool` subpackage has been renamed to `pool`.
173181
* The type `PoolOpts` has been renamed to `Opts`.
182+
* `pool.Connect` no longer return non-working connection objects. This function
183+
now accepts context as first argument, which user may cancel in process.
184+
* `pool.Add` now accepts context as first argument, which user may cancel in
185+
process.
186+
187+
#### connection package
188+
189+
`connection.Connect` no longer return non-working connection objects. This
190+
function now accept context as first argument, which user may cancel in process.
174191

175192
#### msgpack.v5
176193

connection.go

Lines changed: 63 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -378,13 +378,9 @@ func (opts Opts) Clone() Opts {
378378
//
379379
// Notes:
380380
//
381-
// - If opts.Reconnect is zero (default), then connection either already connected
382-
// or error is returned.
383-
//
384-
// - If opts.Reconnect is non-zero, then error will be returned only if authorization
385-
// fails. But if Tarantool is not reachable, then it will make an attempt to reconnect later
386-
// and will not finish to make attempts on authorization failures.
387-
func Connect(addr string, opts Opts) (conn *Connection, err error) {
381+
// - There will be only one attempt to connect. If multiple attempts needed,
382+
// Connect could be placed inside the loop with some timeout between each try.
383+
func Connect(ctx context.Context, addr string, opts Opts) (conn *Connection, err error) {
388384
conn = &Connection{
389385
addr: addr,
390386
requestId: 0,
@@ -432,25 +428,8 @@ func Connect(addr string, opts Opts) (conn *Connection, err error) {
432428

433429
conn.cond = sync.NewCond(&conn.mutex)
434430

435-
if err = conn.createConnection(false); err != nil {
436-
ter, ok := err.(Error)
437-
if conn.opts.Reconnect <= 0 {
438-
return nil, err
439-
} else if ok && (ter.Code == iproto.ER_NO_SUCH_USER ||
440-
ter.Code == iproto.ER_CREDS_MISMATCH) {
441-
// Reported auth errors immediately.
442-
return nil, err
443-
} else {
444-
// Without SkipSchema it is useless.
445-
go func(conn *Connection) {
446-
conn.mutex.Lock()
447-
defer conn.mutex.Unlock()
448-
if err := conn.createConnection(true); err != nil {
449-
conn.closeConnection(err, true)
450-
}
451-
}(conn)
452-
err = nil
453-
}
431+
if err = conn.createConnection(ctx); err != nil {
432+
return nil, err
454433
}
455434

456435
go conn.pinger()
@@ -534,18 +513,11 @@ func (conn *Connection) cancelFuture(fut *Future, err error) {
534513
}
535514
}
536515

537-
func (conn *Connection) dial() (err error) {
516+
func (conn *Connection) dial(ctx context.Context) (err error) {
538517
opts := conn.opts
539-
dialTimeout := opts.Reconnect / 2
540-
if dialTimeout == 0 {
541-
dialTimeout = 500 * time.Millisecond
542-
} else if dialTimeout > 5*time.Second {
543-
dialTimeout = 5 * time.Second
544-
}
545518

546519
var c Conn
547-
c, err = conn.opts.Dialer.Dial(conn.addr, DialOpts{
548-
DialTimeout: dialTimeout,
520+
c, err = conn.opts.Dialer.Dial(ctx, conn.addr, DialOpts{
549521
IoTimeout: opts.Timeout,
550522
Transport: opts.Transport,
551523
Ssl: opts.Ssl,
@@ -658,34 +630,19 @@ func pack(h *smallWBuf, enc *msgpack.Encoder, reqid uint32,
658630
return
659631
}
660632

661-
func (conn *Connection) createConnection(reconnect bool) (err error) {
662-
var reconnects uint
663-
for conn.c == nil && conn.state == connDisconnected {
664-
now := time.Now()
665-
err = conn.dial()
666-
if err == nil || !reconnect {
667-
if err == nil {
668-
conn.notify(Connected)
669-
}
670-
return
671-
}
672-
if conn.opts.MaxReconnects > 0 && reconnects > conn.opts.MaxReconnects {
673-
conn.opts.Logger.Report(LogLastReconnectFailed, conn, err)
674-
err = ClientError{ErrConnectionClosed, "last reconnect failed"}
675-
// mark connection as closed to avoid reopening by another goroutine
676-
return
633+
func (conn *Connection) createConnection(ctx context.Context) error {
634+
var err error
635+
if conn.c == nil && conn.state == connDisconnected {
636+
err = conn.dial(ctx)
637+
if err == nil {
638+
conn.notify(Connected)
639+
return nil
677640
}
678-
conn.opts.Logger.Report(LogReconnectFailed, conn, reconnects, err)
679-
conn.notify(ReconnectFailed)
680-
reconnects++
681-
conn.mutex.Unlock()
682-
time.Sleep(time.Until(now.Add(conn.opts.Reconnect)))
683-
conn.mutex.Lock()
684641
}
685642
if conn.state == connClosed {
686643
err = ClientError{ErrConnectionClosed, "using closed connection"}
687644
}
688-
return
645+
return err
689646
}
690647

691648
func (conn *Connection) closeConnection(neterr error, forever bool) (err error) {
@@ -727,11 +684,58 @@ func (conn *Connection) closeConnection(neterr error, forever bool) (err error)
727684
return
728685
}
729686

687+
func (conn *Connection) getDialTimeout() time.Duration {
688+
dialTimeout := conn.opts.Reconnect / 2
689+
if dialTimeout == 0 {
690+
dialTimeout = 500 * time.Millisecond
691+
} else if dialTimeout > 5*time.Second {
692+
dialTimeout = 5 * time.Second
693+
}
694+
return dialTimeout
695+
}
696+
697+
func (conn *Connection) runReconnects() error {
698+
dialTimeout := conn.getDialTimeout()
699+
var reconnects uint
700+
var err error
701+
702+
for reconnects <= conn.opts.MaxReconnects {
703+
now := time.Now()
704+
705+
ctx, cancel := context.WithTimeout(context.Background(), dialTimeout)
706+
err = conn.createConnection(ctx)
707+
cancel()
708+
709+
if err == nil {
710+
return nil
711+
}
712+
if clientErr, ok := err.(ClientError); ok &&
713+
clientErr.Code == ErrConnectionClosed {
714+
return err
715+
}
716+
717+
conn.opts.Logger.Report(LogReconnectFailed, conn, reconnects, err)
718+
conn.notify(ReconnectFailed)
719+
if conn.opts.MaxReconnects != 0 {
720+
reconnects++
721+
}
722+
conn.mutex.Unlock()
723+
724+
time.Sleep(time.Until(now.Add(conn.opts.Reconnect)))
725+
726+
conn.mutex.Lock()
727+
}
728+
729+
conn.opts.Logger.Report(LogLastReconnectFailed, conn, err)
730+
// mark connection as closed to avoid reopening by another goroutine
731+
return ClientError{ErrConnectionClosed, "last reconnect failed"}
732+
}
733+
730734
func (conn *Connection) reconnectImpl(neterr error, c Conn) {
731735
if conn.opts.Reconnect > 0 {
732736
if c == conn.c {
733737
conn.closeConnection(neterr, false)
734-
if err := conn.createConnection(true); err != nil {
738+
if err := conn.runReconnects(); err != nil {
735739
conn.closeConnection(err, true)
736740
}
737741
}

crud/example_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package crud_test
22

33
import (
4+
"context"
45
"fmt"
56
"reflect"
67
"time"
@@ -21,7 +22,9 @@ var exampleOpts = tarantool.Opts{
2122
}
2223

2324
func exampleConnect() *tarantool.Connection {
24-
conn, err := tarantool.Connect(exampleServer, exampleOpts)
25+
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
26+
defer cancel()
27+
conn, err := tarantool.Connect(ctx, exampleServer, exampleOpts)
2528
if err != nil {
2629
panic("Connection is not established: " + err.Error())
2730
}

crud/tarantool_test.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package crud_test
22

33
import (
4+
"context"
45
"fmt"
56
"log"
67
"os"
@@ -108,7 +109,10 @@ var object = crud.MapObject{
108109

109110
func connect(t testing.TB) *tarantool.Connection {
110111
for i := 0; i < 10; i++ {
111-
conn, err := tarantool.Connect(server, opts)
112+
ctx, cancel := context.WithTimeout(context.Background(),
113+
500*time.Millisecond)
114+
conn, err := tarantool.Connect(ctx, server, opts)
115+
cancel()
112116
if err != nil {
113117
t.Fatalf("Failed to connect: %s", err)
114118
}

datetime/example_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
package datetime_test
1010

1111
import (
12+
"context"
1213
"fmt"
1314
"time"
1415

@@ -23,7 +24,9 @@ func Example() {
2324
User: "test",
2425
Pass: "test",
2526
}
26-
conn, err := tarantool.Connect("127.0.0.1:3013", opts)
27+
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
28+
defer cancel()
29+
conn, err := tarantool.Connect(ctx, "127.0.0.1:3013", opts)
2730
if err != nil {
2831
fmt.Printf("Error in connect is %v", err)
2932
return

decimal/example_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
package decimal_test
1010

1111
import (
12+
"context"
1213
"log"
1314
"time"
1415

@@ -28,7 +29,9 @@ func Example() {
2829
User: "test",
2930
Pass: "test",
3031
}
31-
client, err := tarantool.Connect(server, opts)
32+
ctx, cancel := context.WithTimeout(context.Background(), opts.Reconnect/2)
33+
client, err := tarantool.Connect(ctx, server, opts)
34+
cancel()
3235
if err != nil {
3336
log.Fatalf("Failed to connect: %s", err.Error())
3437
}

dial.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package tarantool
33
import (
44
"bufio"
55
"bytes"
6+
"context"
67
"errors"
78
"fmt"
89
"io"
@@ -56,8 +57,6 @@ type Conn interface {
5657

5758
// DialOpts is a way to configure a Dial method to create a new Conn.
5859
type DialOpts struct {
59-
// DialTimeout is a timeout for an initial network dial.
60-
DialTimeout time.Duration
6160
// IoTimeout is a timeout per a network read/write.
6261
IoTimeout time.Duration
6362
// Transport is a connect transport type.
@@ -86,7 +85,7 @@ type DialOpts struct {
8685
type Dialer interface {
8786
// Dial connects to a Tarantool instance to the address with specified
8887
// options.
89-
Dial(address string, opts DialOpts) (Conn, error)
88+
Dial(ctx context.Context, address string, opts DialOpts) (Conn, error)
9089
}
9190

9291
type tntConn struct {
@@ -104,11 +103,11 @@ type TtDialer struct {
104103

105104
// Dial connects to a Tarantool instance to the address with specified
106105
// options.
107-
func (t TtDialer) Dial(address string, opts DialOpts) (Conn, error) {
106+
func (t TtDialer) Dial(ctx context.Context, address string, opts DialOpts) (Conn, error) {
108107
var err error
109108
conn := new(tntConn)
110109

111-
if conn.net, err = dial(address, opts); err != nil {
110+
if conn.net, err = dial(ctx, address, opts); err != nil {
112111
return nil, fmt.Errorf("failed to dial: %w", err)
113112
}
114113

@@ -199,13 +198,14 @@ func (c *tntConn) ProtocolInfo() ProtocolInfo {
199198
}
200199

201200
// dial connects to a Tarantool instance.
202-
func dial(address string, opts DialOpts) (net.Conn, error) {
201+
func dial(ctx context.Context, address string, opts DialOpts) (net.Conn, error) {
203202
network, address := parseAddress(address)
204203
switch opts.Transport {
205204
case dialTransportNone:
206-
return net.DialTimeout(network, address, opts.DialTimeout)
205+
dialer := net.Dialer{}
206+
return dialer.DialContext(ctx, network, address)
207207
case dialTransportSsl:
208-
return sslDialTimeout(network, address, opts.DialTimeout, opts.Ssl)
208+
return sslDialContext(ctx, network, address, opts.Ssl)
209209
default:
210210
return nil, fmt.Errorf("unsupported transport type: %s", opts.Transport)
211211
}

0 commit comments

Comments
 (0)