Skip to content

Commit c85bfae

Browse files
committed
api: add context to connection create
`connection.Connect` and `pool.Connect` no longer return non-working connection objects. Those functions now accept context as their first arguments, which user may cancel in process. `connection.Connect` will block until either the working connection created (and returned), `opts.MaxReconnects` creation attempts were made (returns error) or the context is canceled by user (returns error too). Closes #136
1 parent d8df65d commit c85bfae

30 files changed

+885
-275
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
2828
decoded to a varbinary object (#313).
2929
- Use objects of the Decimal type instead of pointers (#238)
3030
- Use objects of the Datetime type instead of pointers (#238)
31+
- `connection.Connect` no longer return non-working
32+
connection objects (#136). This function now does not attempt to reconnect
33+
and tries to establish a connection only once based on the context object.
34+
Context accepted as first argument, and user may cancel it in process.
3135

3236
### Deprecated
3337

README.md

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -105,13 +105,19 @@ about what it does.
105105
package tarantool
106106

107107
import (
108+
"context"
108109
"fmt"
110+
"time"
111+
109112
"github.com/tarantool/go-tarantool/v2"
110113
)
111114

112115
func main() {
113116
opts := tarantool.Opts{User: "guest"}
114-
conn, err := tarantool.Connect("127.0.0.1:3301", opts)
117+
ctx, cancel := context.WithTimeout(context.Background(),
118+
500 * time.Millisecond)
119+
defer cancel()
120+
conn, err := tarantool.Connect(ctx, "127.0.0.1:3301", opts)
115121
if err != nil {
116122
fmt.Println("Connection refused:", err)
117123
}
@@ -134,11 +140,17 @@ username. The structure may also contain other settings, see more in
134140
[documentation][godoc-opts-url] for the "`Opts`" structure.
135141

136142
**Observation 3:** The line containing "`tarantool.Connect`" is essential for
137-
starting a session. There are two parameters:
143+
starting a session. There are three parameters:
138144

139-
* a string with `host:port` format, and
145+
* a context,
146+
* a string with `host:port` format,
140147
* the option structure that was set up earlier.
141148

149+
There will be only one attempt to connect. If multiple attempts needed,
150+
"`tarantool.Connect`" could be placed inside the loop with some timeout
151+
between each try. Example could be found in the [example_test](./example_test.go),
152+
name - `ExampleConnect_reconnects`.
153+
142154
**Observation 4:** The `err` structure will be `nil` if there is no error,
143155
otherwise it will have a description which can be retrieved with `err.Error()`.
144156

@@ -167,10 +179,12 @@ The subpackage has been deleted. You could use `pool` instead.
167179

168180
#### pool package
169181

170-
The logic has not changed, but there are a few renames:
171-
172182
* The `connection_pool` subpackage has been renamed to `pool`.
173183
* The type `PoolOpts` has been renamed to `Opts`.
184+
* `pool.Connect` now accepts context as first argument, which user may cancel
185+
in process.
186+
* `pool.Add` now accepts context as first argument, which user may cancel in
187+
process.
174188

175189
#### msgpack.v5
176190

@@ -212,6 +226,13 @@ IPROTO constants have been moved to a separate package [go-iproto](https://githu
212226

213227
* The method `Code() uint32` replaced by the `Type() iproto.Type`.
214228

229+
#### Connect function
230+
231+
`connection.Connect` no longer return non-working connection objects. This function
232+
now does not attempt to reconnect and tries to establish a connection only once
233+
based on the context object. Context accepted as first argument, and user may
234+
cancel it in process.
235+
215236
## Contributing
216237

217238
See [the contributing guide](CONTRIBUTING.md) for detailed instructions on how

connection.go

Lines changed: 61 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -375,16 +375,7 @@ func (opts Opts) Clone() Opts {
375375
// - Unix socket, first '/' or '.' indicates Unix socket
376376
// (unix:///abs/path/tnt.sock, unix:path/tnt.sock, /abs/path/tnt.sock,
377377
// ./rel/path/tnt.sock, unix/:path/tnt.sock)
378-
//
379-
// Notes:
380-
//
381-
// - If opts.Reconnect is zero (default), then connection either already connected
382-
// or error is returned.
383-
//
384-
// - If opts.Reconnect is non-zero, then error will be returned only if authorization
385-
// fails. But if Tarantool is not reachable, then it will make an attempt to reconnect later
386-
// and will not finish to make attempts on authorization failures.
387-
func Connect(addr string, opts Opts) (conn *Connection, err error) {
378+
func Connect(ctx context.Context, addr string, opts Opts) (conn *Connection, err error) {
388379
conn = &Connection{
389380
addr: addr,
390381
requestId: 0,
@@ -432,25 +423,8 @@ func Connect(addr string, opts Opts) (conn *Connection, err error) {
432423

433424
conn.cond = sync.NewCond(&conn.mutex)
434425

435-
if err = conn.createConnection(false); err != nil {
436-
ter, ok := err.(Error)
437-
if conn.opts.Reconnect <= 0 {
438-
return nil, err
439-
} else if ok && (ter.Code == iproto.ER_NO_SUCH_USER ||
440-
ter.Code == iproto.ER_CREDS_MISMATCH) {
441-
// Reported auth errors immediately.
442-
return nil, err
443-
} else {
444-
// Without SkipSchema it is useless.
445-
go func(conn *Connection) {
446-
conn.mutex.Lock()
447-
defer conn.mutex.Unlock()
448-
if err := conn.createConnection(true); err != nil {
449-
conn.closeConnection(err, true)
450-
}
451-
}(conn)
452-
err = nil
453-
}
426+
if err = conn.createConnection(ctx); err != nil {
427+
return nil, err
454428
}
455429

456430
go conn.pinger()
@@ -534,18 +508,11 @@ func (conn *Connection) cancelFuture(fut *Future, err error) {
534508
}
535509
}
536510

537-
func (conn *Connection) dial() (err error) {
511+
func (conn *Connection) dial(ctx context.Context) error {
538512
opts := conn.opts
539-
dialTimeout := opts.Reconnect / 2
540-
if dialTimeout == 0 {
541-
dialTimeout = 500 * time.Millisecond
542-
} else if dialTimeout > 5*time.Second {
543-
dialTimeout = 5 * time.Second
544-
}
545513

546514
var c Conn
547-
c, err = conn.opts.Dialer.Dial(conn.addr, DialOpts{
548-
DialTimeout: dialTimeout,
515+
c, err := conn.opts.Dialer.Dial(ctx, conn.addr, DialOpts{
549516
IoTimeout: opts.Timeout,
550517
Transport: opts.Transport,
551518
Ssl: opts.Ssl,
@@ -555,7 +522,7 @@ func (conn *Connection) dial() (err error) {
555522
Password: opts.Pass,
556523
})
557524
if err != nil {
558-
return
525+
return err
559526
}
560527

561528
conn.Greeting.Version = c.Greeting().Version
@@ -605,7 +572,7 @@ func (conn *Connection) dial() (err error) {
605572
conn.shutdownWatcher = watcher
606573
}
607574

608-
return
575+
return nil
609576
}
610577

611578
func pack(h *smallWBuf, enc *msgpack.Encoder, reqid uint32,
@@ -658,34 +625,18 @@ func pack(h *smallWBuf, enc *msgpack.Encoder, reqid uint32,
658625
return
659626
}
660627

661-
func (conn *Connection) createConnection(reconnect bool) (err error) {
662-
var reconnects uint
663-
for conn.c == nil && conn.state == connDisconnected {
664-
now := time.Now()
665-
err = conn.dial()
666-
if err == nil || !reconnect {
667-
if err == nil {
668-
conn.notify(Connected)
669-
}
670-
return
671-
}
672-
if conn.opts.MaxReconnects > 0 && reconnects > conn.opts.MaxReconnects {
673-
conn.opts.Logger.Report(LogLastReconnectFailed, conn, err)
674-
err = ClientError{ErrConnectionClosed, "last reconnect failed"}
675-
// mark connection as closed to avoid reopening by another goroutine
676-
return
628+
func (conn *Connection) createConnection(ctx context.Context) error {
629+
var err error
630+
if conn.c == nil && conn.state == connDisconnected {
631+
if err = conn.dial(ctx); err == nil {
632+
conn.notify(Connected)
633+
return nil
677634
}
678-
conn.opts.Logger.Report(LogReconnectFailed, conn, reconnects, err)
679-
conn.notify(ReconnectFailed)
680-
reconnects++
681-
conn.mutex.Unlock()
682-
time.Sleep(time.Until(now.Add(conn.opts.Reconnect)))
683-
conn.mutex.Lock()
684635
}
685636
if conn.state == connClosed {
686637
err = ClientError{ErrConnectionClosed, "using closed connection"}
687638
}
688-
return
639+
return err
689640
}
690641

691642
func (conn *Connection) closeConnection(neterr error, forever bool) (err error) {
@@ -727,11 +678,57 @@ func (conn *Connection) closeConnection(neterr error, forever bool) (err error)
727678
return
728679
}
729680

681+
func (conn *Connection) getDialTimeout() time.Duration {
682+
dialTimeout := conn.opts.Reconnect / 2
683+
if dialTimeout == 0 {
684+
dialTimeout = 500 * time.Millisecond
685+
} else if dialTimeout > 5*time.Second {
686+
dialTimeout = 5 * time.Second
687+
}
688+
return dialTimeout
689+
}
690+
691+
func (conn *Connection) runReconnects() error {
692+
dialTimeout := conn.getDialTimeout()
693+
var reconnects uint
694+
var err error
695+
696+
for conn.opts.MaxReconnects == 0 || reconnects <= conn.opts.MaxReconnects {
697+
now := time.Now()
698+
699+
ctx, cancel := context.WithTimeout(context.Background(), dialTimeout)
700+
err = conn.createConnection(ctx)
701+
cancel()
702+
703+
if err != nil {
704+
if clientErr, ok := err.(ClientError); ok &&
705+
clientErr.Code == ErrConnectionClosed {
706+
return err
707+
}
708+
} else {
709+
return nil
710+
}
711+
712+
conn.opts.Logger.Report(LogReconnectFailed, conn, reconnects, err)
713+
conn.notify(ReconnectFailed)
714+
reconnects++
715+
conn.mutex.Unlock()
716+
717+
time.Sleep(time.Until(now.Add(conn.opts.Reconnect)))
718+
719+
conn.mutex.Lock()
720+
}
721+
722+
conn.opts.Logger.Report(LogLastReconnectFailed, conn, err)
723+
// mark connection as closed to avoid reopening by another goroutine
724+
return ClientError{ErrConnectionClosed, "last reconnect failed"}
725+
}
726+
730727
func (conn *Connection) reconnectImpl(neterr error, c Conn) {
731728
if conn.opts.Reconnect > 0 {
732729
if c == conn.c {
733730
conn.closeConnection(neterr, false)
734-
if err := conn.createConnection(true); err != nil {
731+
if err := conn.runReconnects(); err != nil {
735732
conn.closeConnection(err, true)
736733
}
737734
}

crud/example_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package crud_test
22

33
import (
4+
"context"
45
"fmt"
56
"reflect"
67
"time"
@@ -21,7 +22,9 @@ var exampleOpts = tarantool.Opts{
2122
}
2223

2324
func exampleConnect() *tarantool.Connection {
24-
conn, err := tarantool.Connect(exampleServer, exampleOpts)
25+
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
26+
defer cancel()
27+
conn, err := tarantool.Connect(ctx, exampleServer, exampleOpts)
2528
if err != nil {
2629
panic("Connection is not established: " + err.Error())
2730
}

crud/tarantool_test.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package crud_test
22

33
import (
4+
"context"
45
"fmt"
56
"log"
67
"os"
@@ -108,7 +109,10 @@ var object = crud.MapObject{
108109

109110
func connect(t testing.TB) *tarantool.Connection {
110111
for i := 0; i < 10; i++ {
111-
conn, err := tarantool.Connect(server, opts)
112+
ctx, cancel := context.WithTimeout(context.Background(),
113+
test_helpers.GetConnectTimeout())
114+
conn, err := tarantool.Connect(ctx, server, opts)
115+
cancel()
112116
if err != nil {
113117
t.Fatalf("Failed to connect: %s", err)
114118
}

datetime/example_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
package datetime_test
1010

1111
import (
12+
"context"
1213
"fmt"
1314
"time"
1415

@@ -23,7 +24,9 @@ func Example() {
2324
User: "test",
2425
Pass: "test",
2526
}
26-
conn, err := tarantool.Connect("127.0.0.1:3013", opts)
27+
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
28+
defer cancel()
29+
conn, err := tarantool.Connect(ctx, "127.0.0.1:3013", opts)
2730
if err != nil {
2831
fmt.Printf("Error in connect is %v", err)
2932
return

decimal/example_test.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
package decimal_test
1010

1111
import (
12+
"context"
1213
"log"
1314
"time"
1415

@@ -28,7 +29,17 @@ func Example() {
2829
User: "test",
2930
Pass: "test",
3031
}
31-
client, err := tarantool.Connect(server, opts)
32+
var client *tarantool.Connection
33+
var err error
34+
for i := uint(0); i < opts.MaxReconnects; i++ {
35+
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
36+
client, err = tarantool.Connect(ctx, server, opts)
37+
cancel()
38+
if err == nil {
39+
break
40+
}
41+
time.Sleep(opts.Reconnect)
42+
}
3243
if err != nil {
3344
log.Fatalf("Failed to connect: %s", err.Error())
3445
}

0 commit comments

Comments
 (0)