Skip to content

Commit be60c41

Browse files
committed
api: block Connect() on failure if Reconnect > 0
This patch makes Connect() retry connection attempts if opts.Reconnect is greater than 0. The delay between connection attempts is opts.Reconnect. If opts.MaxReconnects > 0 then the maximum number of attempts is equal to it, otherwise the maximum number of attempts is unlimited. Connect() now also blocks until a connection is established, provided context is cancelled or the number of attempts is exhausted. Closes #436
1 parent 252c3b7 commit be60c41

File tree

3 files changed

+165
-14
lines changed

3 files changed

+165
-14
lines changed

CHANGELOG.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,19 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
66
and this project adheres to [Semantic
77
Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
88

9+
## [Unreleased]
10+
11+
### Added
12+
13+
### Changed
14+
15+
- Connect() now retry the connection if a failure occurs and opts.Reconnect > 0.
16+
The number of attempts is equal to opts.MaxReconnects or unlimited if
17+
opts.MaxReconnects == 0. Connect() blocks until a connection is established,
18+
the context is cancelled, or the number of attempts is exhausted (#436).
19+
20+
### Fixed
21+
922
## [v2.3.0] - 2025-03-11
1023

1124
The release extends box.info responses and ConnectionPool.GetInfo return data.

connection.go

Lines changed: 54 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -92,12 +92,24 @@ func (d defaultLogger) Report(event ConnLogKind, conn *Connection, v ...interfac
9292
case LogReconnectFailed:
9393
reconnects := v[0].(uint)
9494
err := v[1].(error)
95-
log.Printf("tarantool: reconnect (%d/%d) to %s failed: %s",
96-
reconnects, conn.opts.MaxReconnects, conn.Addr(), err)
95+
addr := conn.Addr()
96+
if addr == nil {
97+
log.Printf("tarantool: connect (%d/%d) failed: %s",
98+
reconnects, conn.opts.MaxReconnects, err)
99+
} else {
100+
log.Printf("tarantool: reconnect (%d/%d) to %s failed: %s",
101+
reconnects, conn.opts.MaxReconnects, addr, err)
102+
}
97103
case LogLastReconnectFailed:
98104
err := v[0].(error)
99-
log.Printf("tarantool: last reconnect to %s failed: %s, giving it up",
100-
conn.Addr(), err)
105+
addr := conn.Addr()
106+
if addr == nil {
107+
log.Printf("tarantool: last connect failed: %s, giving it up",
108+
err)
109+
} else {
110+
log.Printf("tarantool: last reconnect to %s failed: %s, giving it up",
111+
addr, err)
112+
}
101113
case LogUnexpectedResultId:
102114
header := v[0].(Header)
103115
log.Printf("tarantool: connection %s got unexpected request ID (%d) in response "+
@@ -362,8 +374,20 @@ func Connect(ctx context.Context, dialer Dialer, opts Opts) (conn *Connection, e
362374

363375
conn.cond = sync.NewCond(&conn.mutex)
364376

365-
if err = conn.createConnection(ctx); err != nil {
366-
return nil, err
377+
if conn.opts.Reconnect > 0 {
378+
// We don't need these mutex.Lock()/mutex.Unlock() here, but
379+
// runReconnects() expects mutex.Lock() to be set, so it's
380+
// easier to add them instead of reworking runReconnects().
381+
conn.mutex.Lock()
382+
err = conn.runReconnects(ctx)
383+
conn.mutex.Unlock()
384+
if err != nil {
385+
return nil, err
386+
}
387+
} else {
388+
if err = conn.connect(ctx); err != nil {
389+
return nil, err
390+
}
367391
}
368392

369393
go conn.pinger()
@@ -553,7 +577,7 @@ func pack(h *smallWBuf, enc *msgpack.Encoder, reqid uint32,
553577
return
554578
}
555579

556-
func (conn *Connection) createConnection(ctx context.Context) error {
580+
func (conn *Connection) connect(ctx context.Context) error {
557581
var err error
558582
if conn.c == nil && conn.state == connDisconnected {
559583
if err = conn.dial(ctx); err == nil {
@@ -616,19 +640,30 @@ func (conn *Connection) getDialTimeout() time.Duration {
616640
return dialTimeout
617641
}
618642

619-
func (conn *Connection) runReconnects() error {
643+
func (conn *Connection) runReconnects(ctx context.Context) error {
620644
dialTimeout := conn.getDialTimeout()
621645
var reconnects uint
622646
var err error
623647

648+
t := time.NewTicker(conn.opts.Reconnect)
649+
defer t.Stop()
624650
for conn.opts.MaxReconnects == 0 || reconnects <= conn.opts.MaxReconnects {
625-
now := time.Now()
626-
627-
ctx, cancel := context.WithTimeout(context.Background(), dialTimeout)
628-
err = conn.createConnection(ctx)
651+
localCtx, cancel := context.WithTimeout(ctx, dialTimeout)
652+
err = conn.connect(localCtx)
629653
cancel()
630654

631655
if err != nil {
656+
// The error will most likely be the one that Dialer
657+
// returns to us due to the context being cancelled.
658+
// Although this is not guaranteed. For example,
659+
// if the dialer may throw another error before checking
660+
// the context, and the context has already been
661+
// canceled. Or the context was not canceled after
662+
// the error was thrown, but before the context was
663+
// checked here.
664+
if ctx.Err() != nil {
665+
return err
666+
}
632667
if clientErr, ok := err.(ClientError); ok &&
633668
clientErr.Code == ErrConnectionClosed {
634669
return err
@@ -642,7 +677,12 @@ func (conn *Connection) runReconnects() error {
642677
reconnects++
643678
conn.mutex.Unlock()
644679

645-
time.Sleep(time.Until(now.Add(conn.opts.Reconnect)))
680+
select {
681+
case <-ctx.Done():
682+
// Since the context is cancelled, we don't need to do anything.
683+
// Conn.connect() will return the correct error.
684+
case <-t.C:
685+
}
646686

647687
conn.mutex.Lock()
648688
}
@@ -656,7 +696,7 @@ func (conn *Connection) reconnectImpl(neterr error, c Conn) {
656696
if conn.opts.Reconnect > 0 {
657697
if c == conn.c {
658698
conn.closeConnection(neterr, false)
659-
if err := conn.runReconnects(); err != nil {
699+
if err := conn.runReconnects(context.Background()); err != nil {
660700
conn.closeConnection(err, true)
661701
}
662702
}

tarantool_test.go

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3972,6 +3972,104 @@ func TestConnect_context_cancel(t *testing.T) {
39723972
}
39733973
}
39743974

3975+
// A dialer that rejects the first few connection requests.
3976+
type mockSlowDialer struct {
3977+
counter *int
3978+
original NetDialer
3979+
}
3980+
3981+
func (m mockSlowDialer) Dial(ctx context.Context, opts DialOpts) (Conn, error) {
3982+
*m.counter++
3983+
if *m.counter < 10 {
3984+
return nil, fmt.Errorf("Too early: %v", *m.counter)
3985+
}
3986+
return m.original.Dial(ctx, opts)
3987+
}
3988+
3989+
func TestConnectIsBlocked(t *testing.T) {
3990+
const server = "127.0.0.1:3015"
3991+
testDialer := dialer
3992+
testDialer.Address = server
3993+
3994+
inst, err := test_helpers.StartTarantool(test_helpers.StartOpts{
3995+
Dialer: testDialer,
3996+
InitScript: "config.lua",
3997+
Listen: server,
3998+
WaitStart: 100 * time.Millisecond,
3999+
ConnectRetry: 10,
4000+
RetryTimeout: 500 * time.Millisecond,
4001+
})
4002+
defer test_helpers.StopTarantoolWithCleanup(inst)
4003+
if err != nil {
4004+
t.Fatalf("Unable to start Tarantool: %s", err)
4005+
}
4006+
4007+
var counter int
4008+
mockDialer := mockSlowDialer{original: testDialer, counter: &counter}
4009+
ctx, cancel := context.WithTimeout(context.TODO(), 10*time.Second)
4010+
defer cancel()
4011+
4012+
reconnectOpts := opts
4013+
reconnectOpts.Reconnect = 100 * time.Millisecond
4014+
reconnectOpts.MaxReconnects = 100
4015+
conn, err := Connect(ctx, mockDialer, reconnectOpts)
4016+
if err != nil {
4017+
t.Fatalf("Connection was not established: %v", err)
4018+
}
4019+
4020+
conn.Close()
4021+
4022+
if counter < 10 {
4023+
t.Fatalf("Expected no less than 10, got %d", counter)
4024+
}
4025+
}
4026+
4027+
func TestConnectIsBlockedUntilContextExpires(t *testing.T) {
4028+
const server = "127.0.0.1:3015"
4029+
4030+
testDialer := dialer
4031+
testDialer.Address = server
4032+
4033+
ctx, cancel := test_helpers.GetConnectContext()
4034+
defer cancel()
4035+
4036+
reconnectOpts := opts
4037+
reconnectOpts.Reconnect = 100 * time.Millisecond
4038+
reconnectOpts.MaxReconnects = 100
4039+
_, err := Connect(ctx, testDialer, reconnectOpts)
4040+
if err == nil {
4041+
t.Fatal("Connection was unexpectedly established.")
4042+
}
4043+
4044+
exp := "failed to dial: dial tcp 127.0.0.1:3015: i/o timeout"
4045+
if err.Error() != exp {
4046+
t.Fatalf("Expected '%s', got '%v'", exp, err)
4047+
}
4048+
}
4049+
4050+
func TestConnectIsUnblockedAfterMaxAttempts(t *testing.T) {
4051+
const server = "127.0.0.1:3015"
4052+
4053+
testDialer := dialer
4054+
testDialer.Address = server
4055+
4056+
ctx, cancel := test_helpers.GetConnectContext()
4057+
defer cancel()
4058+
4059+
reconnectOpts := opts
4060+
reconnectOpts.Reconnect = 100 * time.Millisecond
4061+
reconnectOpts.MaxReconnects = 1
4062+
_, err := Connect(ctx, testDialer, reconnectOpts)
4063+
if err == nil {
4064+
t.Fatal("Connection was unexpectedly established.")
4065+
}
4066+
4067+
exp := "last reconnect failed"
4068+
if !strings.Contains(err.Error(), exp) {
4069+
t.Fatalf("Expected '%s', got '%v'", exp, err)
4070+
}
4071+
}
4072+
39754073
func buildSidecar(dir string) error {
39764074
goPath, err := exec.LookPath("go")
39774075
if err != nil {

0 commit comments

Comments
 (0)