Skip to content

Commit 9149711

Browse files
committed
api: block Connect() on failure if Reconnect > 0
This patch makes Connect() retry connection attempts if opts.Reconnect is greater than 0. The delay between connection attempts is opts.Reconnect. If opts.MaxReconnects > 0 then the maximum number of attempts is equal to it, otherwise the maximum number of attempts is unlimited. Connect() now also blocks until a connection is established, provided context is cancelled or the number of attempts is exhausted. Closes #436
1 parent 252c3b7 commit 9149711

File tree

3 files changed

+134
-10
lines changed

3 files changed

+134
-10
lines changed

CHANGELOG.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,19 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
66
and this project adheres to [Semantic
77
Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
88

9+
## [Unreleased]
10+
11+
### Added
12+
13+
### Changed
14+
15+
- Connect() now retry the connection if a failure occurs and opts.Reconnect > 0.
16+
The number of attempts is equal to opts.MaxReconnects or unlimited if
17+
opts.MaxReconnects == 0. Connect() blocks until a connection is established,
18+
the context is cancelled, or the number of attempts is exhausted (#436).
19+
20+
### Fixed
21+
922
## [v2.3.0] - 2025-03-11
1023

1124
The release extends box.info responses and ConnectionPool.GetInfo return data.

connection.go

Lines changed: 32 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -92,8 +92,14 @@ func (d defaultLogger) Report(event ConnLogKind, conn *Connection, v ...interfac
9292
case LogReconnectFailed:
9393
reconnects := v[0].(uint)
9494
err := v[1].(error)
95-
log.Printf("tarantool: reconnect (%d/%d) to %s failed: %s",
96-
reconnects, conn.opts.MaxReconnects, conn.Addr(), err)
95+
addr := conn.Addr()
96+
if addr == nil {
97+
log.Printf("tarantool: connect (%d/%d) failed: %s",
98+
reconnects, conn.opts.MaxReconnects, err)
99+
} else {
100+
log.Printf("tarantool: reconnect (%d/%d) to %s failed: %s",
101+
reconnects, conn.opts.MaxReconnects, addr, err)
102+
}
97103
case LogLastReconnectFailed:
98104
err := v[0].(error)
99105
log.Printf("tarantool: last reconnect to %s failed: %s, giving it up",
@@ -362,8 +368,17 @@ func Connect(ctx context.Context, dialer Dialer, opts Opts) (conn *Connection, e
362368

363369
conn.cond = sync.NewCond(&conn.mutex)
364370

365-
if err = conn.createConnection(ctx); err != nil {
366-
return nil, err
371+
if conn.opts.Reconnect > 0 {
372+
conn.mutex.Lock()
373+
err = conn.runReconnects(ctx)
374+
conn.mutex.Unlock()
375+
if err != nil {
376+
return nil, err
377+
}
378+
} else {
379+
if err = conn.createConnection(ctx); err != nil {
380+
return nil, err
381+
}
367382
}
368383

369384
go conn.pinger()
@@ -616,19 +631,23 @@ func (conn *Connection) getDialTimeout() time.Duration {
616631
return dialTimeout
617632
}
618633

619-
func (conn *Connection) runReconnects() error {
634+
func (conn *Connection) runReconnects(ctx context.Context) error {
620635
dialTimeout := conn.getDialTimeout()
621636
var reconnects uint
622637
var err error
623638

624639
for conn.opts.MaxReconnects == 0 || reconnects <= conn.opts.MaxReconnects {
625-
now := time.Now()
640+
t := time.NewTimer(conn.opts.Reconnect)
641+
defer t.Stop()
626642

627-
ctx, cancel := context.WithTimeout(context.Background(), dialTimeout)
628-
err = conn.createConnection(ctx)
643+
localCtx, cancel := context.WithTimeout(ctx, dialTimeout)
644+
err = conn.createConnection(localCtx)
629645
cancel()
630646

631647
if err != nil {
648+
if ctx.Err() != nil {
649+
return err
650+
}
632651
if clientErr, ok := err.(ClientError); ok &&
633652
clientErr.Code == ErrConnectionClosed {
634653
return err
@@ -642,7 +661,10 @@ func (conn *Connection) runReconnects() error {
642661
reconnects++
643662
conn.mutex.Unlock()
644663

645-
time.Sleep(time.Until(now.Add(conn.opts.Reconnect)))
664+
select {
665+
case <-ctx.Done():
666+
case <-t.C:
667+
}
646668

647669
conn.mutex.Lock()
648670
}
@@ -656,7 +678,7 @@ func (conn *Connection) reconnectImpl(neterr error, c Conn) {
656678
if conn.opts.Reconnect > 0 {
657679
if c == conn.c {
658680
conn.closeConnection(neterr, false)
659-
if err := conn.runReconnects(); err != nil {
681+
if err := conn.runReconnects(context.TODO()); err != nil {
660682
conn.closeConnection(err, true)
661683
}
662684
}

tarantool_test.go

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3972,6 +3972,95 @@ func TestConnect_context_cancel(t *testing.T) {
39723972
}
39733973
}
39743974

3975+
func TestConnectIsBlocked(t *testing.T) {
3976+
const server = "127.0.0.1:3015"
3977+
3978+
testDialer := dialer
3979+
testDialer.Address = server
3980+
3981+
ctx, cancel := context.WithTimeout(context.TODO(), 10*time.Second)
3982+
defer cancel()
3983+
delay := 2 * time.Second
3984+
errors := make(chan error)
3985+
go func() {
3986+
now := time.Now()
3987+
reconnectOpts := opts
3988+
reconnectOpts.Reconnect = 100 * time.Millisecond
3989+
reconnectOpts.MaxReconnects = 100
3990+
conn, err := Connect(ctx, testDialer, reconnectOpts)
3991+
if err != nil {
3992+
errors <- fmt.Errorf("Connection was not established: %v", err)
3993+
return
3994+
}
3995+
defer conn.Close()
3996+
if time.Now().Before(now.Add(delay)) {
3997+
errors <- fmt.Errorf("Connection was established too early")
3998+
return
3999+
}
4000+
close(errors)
4001+
}()
4002+
4003+
time.Sleep(delay)
4004+
inst, err := test_helpers.StartTarantool(test_helpers.StartOpts{
4005+
Dialer: testDialer,
4006+
InitScript: "config.lua",
4007+
Listen: server,
4008+
WaitStart: 100 * time.Millisecond,
4009+
ConnectRetry: 10,
4010+
RetryTimeout: 500 * time.Millisecond,
4011+
})
4012+
defer test_helpers.StopTarantoolWithCleanup(inst)
4013+
if err != nil {
4014+
t.Fatalf("Unable to start Tarantool: %s", err)
4015+
}
4016+
if err := <-errors; err != nil {
4017+
t.Fatal(err)
4018+
}
4019+
}
4020+
4021+
func TestConnectIsBlockedUntilContextExpires(t *testing.T) {
4022+
const server = "127.0.0.1:3015"
4023+
4024+
testDialer := dialer
4025+
testDialer.Address = server
4026+
4027+
now := time.Now()
4028+
delay := 2 * time.Second
4029+
ctx, cancel := context.WithTimeout(context.TODO(), delay)
4030+
defer cancel()
4031+
reconnectOpts := opts
4032+
reconnectOpts.Reconnect = 100 * time.Millisecond
4033+
reconnectOpts.MaxReconnects = 100
4034+
_, err := Connect(ctx, testDialer, reconnectOpts)
4035+
if err == nil {
4036+
t.Fatal("Connection was unexpectedly established.")
4037+
}
4038+
if time.Now().Before(now.Add(delay)) {
4039+
t.Fatal("Connect() was unblocked too early")
4040+
}
4041+
}
4042+
4043+
func TestConnectIsUnblockedAfterMaxAttempts(t *testing.T) {
4044+
const server = "127.0.0.1:3015"
4045+
4046+
testDialer := dialer
4047+
testDialer.Address = server
4048+
4049+
now := time.Now()
4050+
ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
4051+
defer cancel()
4052+
reconnectOpts := opts
4053+
reconnectOpts.Reconnect = 100 * time.Millisecond
4054+
reconnectOpts.MaxReconnects = 10
4055+
_, err := Connect(ctx, testDialer, reconnectOpts)
4056+
if err == nil {
4057+
t.Fatal("Connection was unexpectedly established.")
4058+
}
4059+
if time.Now().Before(now.Add(time.Second)) {
4060+
t.Fatal("Connect() was unblocked too early")
4061+
}
4062+
}
4063+
39754064
func buildSidecar(dir string) error {
39764065
goPath, err := exec.LookPath("go")
39774066
if err != nil {

0 commit comments

Comments
 (0)