Skip to content

Commit aa29a82

Browse files
committed
api: graceful Connection.Close()
Connection.Close() waits for all active requests to complete. Part of #257
1 parent 3877378 commit aa29a82

File tree

3 files changed

+67
-97
lines changed

3 files changed

+67
-97
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
1212

1313
### Changed
1414

15+
- Connection.Close() waits for all active requests to complete (#257)
16+
1517
### Fixed
1618

1719
- crud tests with Tarantool 3.0 (#293)

connection.go

Lines changed: 29 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,10 @@ const (
3737
Disconnected
3838
// ReconnectFailed signals that attempt to reconnect has failed.
3939
ReconnectFailed
40-
// Either reconnect attempts exhausted, or explicit Close is called.
41-
Closed
4240
// Shutdown signals that shutdown callback is processing.
4341
Shutdown
42+
// Either reconnect attempts exhausted, or explicit Close is called.
43+
Closed
4444

4545
// LogReconnectFailed is logged when reconnect attempt failed.
4646
LogReconnectFailed ConnLogKind = iota + 1
@@ -456,10 +456,7 @@ func (conn *Connection) ClosedNow() bool {
456456
// Close closes Connection.
457457
// After this method called, there is no way to reopen this Connection.
458458
func (conn *Connection) Close() error {
459-
err := ClientError{ErrConnectionClosed, "connection closed by client"}
460-
conn.mutex.Lock()
461-
defer conn.mutex.Unlock()
462-
return conn.closeConnection(err, true)
459+
return conn.shutdown(true)
463460
}
464461

465462
// Addr returns a configured address of Tarantool socket.
@@ -1532,17 +1529,27 @@ func shutdownEventCallback(event WatchEvent) {
15321529
// step 2.
15331530
val, ok := event.Value.(bool)
15341531
if ok && val {
1535-
go event.Conn.shutdown()
1532+
go event.Conn.shutdown(false)
15361533
}
15371534
}
15381535

1539-
func (conn *Connection) shutdown() {
1536+
func (conn *Connection) shutdown(forever bool) error {
15401537
// Forbid state changes.
15411538
conn.mutex.Lock()
15421539
defer conn.mutex.Unlock()
15431540

15441541
if !atomic.CompareAndSwapUint32(&conn.state, connConnected, connShutdown) {
1545-
return
1542+
if forever {
1543+
err := ClientError{ErrConnectionClosed, "connection closed by client"}
1544+
return conn.closeConnection(err, true)
1545+
}
1546+
return nil
1547+
}
1548+
1549+
if forever {
1550+
// We don't want to reconnect any more.
1551+
conn.opts.Reconnect = 0
1552+
conn.opts.MaxReconnects = 0
15461553
}
15471554

15481555
conn.cond.Broadcast()
@@ -1551,7 +1558,7 @@ func (conn *Connection) shutdown() {
15511558
c := conn.c
15521559
for {
15531560
if (atomic.LoadUint32(&conn.state) != connShutdown) || (c != conn.c) {
1554-
return
1561+
return nil
15551562
}
15561563
if atomic.LoadInt64(&conn.requestCnt) == 0 {
15571564
break
@@ -1563,14 +1570,19 @@ func (conn *Connection) shutdown() {
15631570
conn.cond.Wait()
15641571
}
15651572

1566-
// Start to reconnect based on common rules, same as in net.box.
1567-
// Reconnect also closes the connection: server waits until all
1568-
// subscribed connections are terminated.
1569-
// See https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/
1570-
// step 3.
1571-
conn.reconnectImpl(
1572-
ClientError{
1573+
if forever {
1574+
err := ClientError{ErrConnectionClosed, "connection closed by client"}
1575+
return conn.closeConnection(err, true)
1576+
} else {
1577+
// Start to reconnect based on common rules, same as in net.box.
1578+
// Reconnect also closes the connection: server waits until all
1579+
// subscribed connections are terminated.
1580+
// See https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/
1581+
// step 3.
1582+
conn.reconnectImpl(ClientError{
15731583
ErrConnectionClosed,
15741584
"connection closed after server shutdown",
15751585
}, conn.c)
1586+
return nil
1587+
}
15761588
}

shutdown_test.go

Lines changed: 36 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,42 @@ func TestGracefulShutdown(t *testing.T) {
143143
testGracefulShutdown(t, conn, &inst)
144144
}
145145

146+
func TestGracefulShutdownOnClose(t *testing.T) {
147+
test_helpers.SkipIfWatchersUnsupported(t)
148+
149+
var inst test_helpers.TarantoolInstance
150+
var conn *Connection
151+
var err error
152+
153+
inst, err = test_helpers.StartTarantool(shtdnSrvOpts)
154+
require.Nil(t, err)
155+
defer test_helpers.StopTarantoolWithCleanup(inst)
156+
157+
conn = test_helpers.ConnectWithValidation(t, shtdnServer, shtdnClntOpts)
158+
defer conn.Close()
159+
160+
// Send request with sleep.
161+
evalSleep := 3 // in seconds
162+
require.Lessf(t,
163+
time.Duration(evalSleep)*time.Second,
164+
shtdnClntOpts.Timeout,
165+
"test request won't be failed by timeout")
166+
167+
req := NewEvalRequest(evalBody).Args([]interface{}{evalSleep, evalMsg})
168+
fut := conn.Do(req)
169+
170+
// Close the connection.
171+
conn.Close()
172+
173+
// Connection is closed.
174+
require.Equal(t, true, conn.ClosedNow())
175+
176+
// Check that request was successful.
177+
resp, err := fut.Get()
178+
require.Nilf(t, err, "sleep request no error")
179+
require.NotNilf(t, resp, "sleep response exists")
180+
}
181+
146182
func TestGracefulShutdownWithReconnect(t *testing.T) {
147183
test_helpers.SkipIfWatchersUnsupported(t)
148184

@@ -222,86 +258,6 @@ func TestNoGracefulShutdown(t *testing.T) {
222258
"server went down without any additional waiting")
223259
}
224260

225-
func TestGracefulShutdownRespectsClose(t *testing.T) {
226-
test_helpers.SkipIfWatchersUnsupported(t)
227-
228-
var inst test_helpers.TarantoolInstance
229-
var conn *Connection
230-
var err error
231-
232-
inst, err = test_helpers.StartTarantool(shtdnSrvOpts)
233-
require.Nil(t, err)
234-
defer test_helpers.StopTarantoolWithCleanup(inst)
235-
236-
conn = test_helpers.ConnectWithValidation(t, shtdnServer, shtdnClntOpts)
237-
defer conn.Close()
238-
239-
// Create a helper watcher to ensure that async
240-
// shutdown is set up.
241-
helperCh := make(chan WatchEvent, 10)
242-
helperW, herr := conn.NewWatcher("box.shutdown", func(event WatchEvent) {
243-
helperCh <- event
244-
})
245-
require.Nil(t, herr)
246-
defer helperW.Unregister()
247-
<-helperCh
248-
249-
// Set a big timeout so it would be easy to differ
250-
// if server went down on timeout or after all connections were terminated.
251-
serverShutdownTimeout := 60 // in seconds
252-
_, err = conn.Call("box.ctl.set_on_shutdown_timeout", []interface{}{serverShutdownTimeout})
253-
require.Nil(t, err)
254-
255-
// Send request with sleep.
256-
evalSleep := 10 // in seconds
257-
require.Lessf(t,
258-
time.Duration(evalSleep)*time.Second,
259-
shtdnClntOpts.Timeout,
260-
"test request won't be failed by timeout")
261-
262-
req := NewEvalRequest(evalBody).Args([]interface{}{evalSleep, evalMsg})
263-
264-
fut := conn.Do(req)
265-
266-
// SIGTERM the server.
267-
shutdownStart := time.Now()
268-
require.Nil(t, inst.Cmd.Process.Signal(syscall.SIGTERM))
269-
270-
// Close the connection.
271-
conn.Close()
272-
273-
// Connection is closed.
274-
require.Equal(t, true, conn.ClosedNow())
275-
276-
// Check that request was interrupted.
277-
_, err = fut.Get()
278-
require.NotNilf(t, err, "sleep request error")
279-
280-
// Wait until server go down.
281-
_, err = inst.Cmd.Process.Wait()
282-
require.Nil(t, err)
283-
shutdownFinish := time.Now()
284-
shutdownTime := shutdownFinish.Sub(shutdownStart)
285-
286-
// Help test helpers to properly clean up.
287-
inst.Cmd.Process = nil
288-
289-
// Check that server finished without waiting for eval to finish.
290-
require.Lessf(t,
291-
shutdownTime,
292-
time.Duration(evalSleep/2)*time.Second,
293-
"server went down without any additional waiting")
294-
295-
// Check that it wasn't a timeout.
296-
require.Lessf(t,
297-
shutdownTime,
298-
time.Duration(serverShutdownTimeout/2)*time.Second,
299-
"server went down not by timeout")
300-
301-
// Connection is still closed.
302-
require.Equal(t, true, conn.ClosedNow())
303-
}
304-
305261
func TestGracefulShutdownNotRacesWithRequestReconnect(t *testing.T) {
306262
test_helpers.SkipIfWatchersUnsupported(t)
307263

0 commit comments

Comments
 (0)