Skip to content

Commit 10e7657

Browse files
feature: support graceful shutdown
If connected to Tarantool 2.10 or newer and WatchersFeature is required, after this patch connection supports server graceful shutdown [1]. In this case, server will wait until all client requests will be finished and client disconnects before going down (server also may go down by timeout). Client reconnect will happen if connection options enable reconnect. 1. https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/ Closes #214
1 parent 03a1cc1 commit 10e7657

File tree

4 files changed

+564
-14
lines changed

4 files changed

+564
-14
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
1515
- Error type support in MessagePack (#209)
1616
- Event subscription support (#119)
1717
- Session settings support (#215)
18+
- Support graceful shutdown (#214)
1819

1920
### Changed
2021

connection.go

Lines changed: 136 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,17 @@ const ignoreStreamId = 0
2525
const (
2626
connDisconnected = 0
2727
connConnected = 1
28-
connClosed = 2
28+
connShutdown = 2
29+
connClosed = 3
2930
)
3031

3132
const (
3233
connTransportNone = ""
3334
connTransportSsl = "ssl"
3435
)
3536

37+
const shutdownEventKey = "box.shutdown"
38+
3639
type ConnEventKind int
3740
type ConnLogKind int
3841

@@ -45,6 +48,8 @@ const (
4548
ReconnectFailed
4649
// Either reconnect attempts exhausted, or explicit Close is called.
4750
Closed
51+
// Shutdown signals that shutdown callback is processing.
52+
Shutdown
4853

4954
// LogReconnectFailed is logged when reconnect attempt failed.
5055
LogReconnectFailed ConnLogKind = iota + 1
@@ -134,10 +139,19 @@ func (d defaultLogger) Report(event ConnLogKind, conn *Connection, v ...interfac
134139
// always returns array of array (array of tuples for space related methods).
135140
// For Eval* and Call* Tarantool always returns array, but does not forces
136141
// array of arrays.
142+
//
143+
// If connected to Tarantool 2.10 or newer and WatchersFeature is required,
144+
// connection supports server graceful shutdown. In this case, server will
145+
// wait until all client requests will be finished and client disconnects
146+
// before going down (server also may go down by timeout). Client reconnect will
147+
// happen if connection options enable reconnect.
148+
//
149+
// More on graceful shutdown: https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/
137150
type Connection struct {
138151
addr string
139152
c net.Conn
140153
mutex sync.Mutex
154+
cond *sync.Cond
141155
// Schema contains schema loaded on connection.
142156
Schema *Schema
143157
// requestId contains the last request ID for requests with nil context.
@@ -162,6 +176,11 @@ type Connection struct {
162176
serverProtocolInfo ProtocolInfo
163177
// watchMap is a map of key -> chan watchState.
164178
watchMap sync.Map
179+
180+
// shutdownWatcher is the "box.shutdown" event watcher.
181+
shutdownWatcher Watcher
182+
// requestCnt is a counter of active requests.
183+
requestCnt uint32
165184
}
166185

167186
var _ = Connector(&Connection{}) // Check compatibility with connector interface.
@@ -385,6 +404,8 @@ func Connect(addr string, opts Opts) (conn *Connection, err error) {
385404
conn.opts.Logger = defaultLogger{}
386405
}
387406

407+
conn.cond = sync.NewCond(&conn.mutex)
408+
388409
if err = conn.createConnection(false); err != nil {
389410
ter, ok := err.(Error)
390411
if conn.opts.Reconnect <= 0 {
@@ -421,6 +442,16 @@ func Connect(addr string, opts Opts) (conn *Connection, err error) {
421442
}
422443
}
423444

445+
// Subscribe shutdown event to process graceful shutdown.
446+
if conn.isWatchersRequired() {
447+
watcher, werr := conn.NewWatcher(shutdownEventKey, shutdownEventCallback)
448+
if werr != nil {
449+
conn.closeConnection(werr, true)
450+
return nil, werr
451+
}
452+
conn.shutdownWatcher = watcher
453+
}
454+
424455
return conn, err
425456
}
426457

@@ -589,6 +620,7 @@ func (conn *Connection) dial() (err error) {
589620
conn.lockShards()
590621
conn.c = connection
591622
atomic.StoreUint32(&conn.state, connConnected)
623+
conn.cond.Broadcast()
592624
conn.unlockShards()
593625
go conn.writer(w, connection)
594626
go conn.reader(r, connection)
@@ -762,10 +794,17 @@ func (conn *Connection) closeConnection(neterr error, forever bool) (err error)
762794
if conn.state != connClosed {
763795
close(conn.control)
764796
atomic.StoreUint32(&conn.state, connClosed)
797+
conn.cond.Broadcast()
798+
// Free the resources.
799+
if conn.shutdownWatcher != nil {
800+
conn.shutdownWatcher.Unregister()
801+
conn.shutdownWatcher = nil
802+
}
765803
conn.notify(Closed)
766804
}
767805
} else {
768806
atomic.StoreUint32(&conn.state, connDisconnected)
807+
conn.cond.Broadcast()
769808
conn.notify(Disconnected)
770809
}
771810
if conn.c != nil {
@@ -784,9 +823,7 @@ func (conn *Connection) closeConnection(neterr error, forever bool) (err error)
784823
return
785824
}
786825

787-
func (conn *Connection) reconnect(neterr error, c net.Conn) {
788-
conn.mutex.Lock()
789-
defer conn.mutex.Unlock()
826+
func (conn *Connection) reconnectImpl(neterr error, c net.Conn) {
790827
if conn.opts.Reconnect > 0 {
791828
if c == conn.c {
792829
conn.closeConnection(neterr, false)
@@ -799,6 +836,13 @@ func (conn *Connection) reconnect(neterr error, c net.Conn) {
799836
}
800837
}
801838

839+
func (conn *Connection) reconnect(neterr error, c net.Conn) {
840+
conn.mutex.Lock()
841+
defer conn.mutex.Unlock()
842+
conn.reconnectImpl(neterr, c)
843+
conn.cond.Broadcast()
844+
}
845+
802846
func (conn *Connection) lockShards() {
803847
for i := range conn.shard {
804848
conn.shard[i].rmut.Lock()
@@ -1026,6 +1070,15 @@ func (conn *Connection) newFuture(ctx context.Context) (fut *Future) {
10261070
fut.done = nil
10271071
shard.rmut.Unlock()
10281072
return
1073+
case connShutdown:
1074+
fut.err = ClientError{
1075+
ErrConnectionShutdown,
1076+
"server shutdown in progress",
1077+
}
1078+
fut.ready = nil
1079+
fut.done = nil
1080+
shard.rmut.Unlock()
1081+
return
10291082
}
10301083
pos := (fut.requestId / conn.opts.Concurrency) & (requestsMap - 1)
10311084
if ctx != nil {
@@ -1086,6 +1139,7 @@ func (conn *Connection) send(req Request, streamId uint64) *Future {
10861139
if fut.ready == nil {
10871140
return fut
10881141
}
1142+
10891143
if req.Ctx() != nil {
10901144
select {
10911145
case <-req.Ctx().Done():
@@ -1094,10 +1148,17 @@ func (conn *Connection) send(req Request, streamId uint64) *Future {
10941148
default:
10951149
}
10961150
}
1151+
1152+
if conn.shutdownWatcher != nil {
1153+
atomic.AddUint32(&(conn.requestCnt), uint32(1))
1154+
}
1155+
10971156
conn.putFuture(fut, req, streamId)
1157+
10981158
if req.Ctx() != nil {
10991159
go conn.contextWatchdog(fut, req.Ctx())
11001160
}
1161+
11011162
return fut
11021163
}
11031164

@@ -1164,6 +1225,15 @@ func (conn *Connection) markDone(fut *Future) {
11641225
if conn.rlimit != nil {
11651226
<-conn.rlimit
11661227
}
1228+
1229+
if conn.shutdownWatcher != nil {
1230+
// This is a real advice from Go documentation
1231+
// about how to decrement atomic uint32.
1232+
// https://pkg.go.dev/sync/atomic#AddUint32
1233+
if atomic.AddUint32(&(conn.requestCnt), ^uint32(0)) == 0 {
1234+
conn.cond.Broadcast()
1235+
}
1236+
}
11671237
}
11681238

11691239
func (conn *Connection) peekFuture(reqid uint32) (fut *Future) {
@@ -1458,6 +1528,15 @@ func subscribeWatchChannel(conn *Connection, key string) (chan watchState, error
14581528
return st, nil
14591529
}
14601530

1531+
func (conn *Connection) isWatchersRequired() bool {
1532+
for _, feature := range conn.opts.RequiredProtocolInfo.Features {
1533+
if feature == WatchersFeature {
1534+
return true
1535+
}
1536+
}
1537+
return false
1538+
}
1539+
14611540
// NewWatcher creates a new Watcher object for the connection.
14621541
//
14631542
// You need to require WatchersFeature to use watchers, see examples for the
@@ -1496,15 +1575,7 @@ func (conn *Connection) NewWatcher(key string, callback WatchCallback) (Watcher,
14961575
// asynchronous. We do not expect any response from a Tarantool instance
14971576
// That's why we can't just check the Tarantool response for an unsupported
14981577
// request error.
1499-
watchersRequired := false
1500-
for _, feature := range conn.opts.RequiredProtocolInfo.Features {
1501-
if feature == WatchersFeature {
1502-
watchersRequired = true
1503-
break
1504-
}
1505-
}
1506-
1507-
if !watchersRequired {
1578+
if !conn.isWatchersRequired() {
15081579
err := fmt.Errorf("the feature %s must be required by connection "+
15091580
"options to create a watcher", WatchersFeature)
15101581
return nil, err
@@ -1563,7 +1634,11 @@ func (conn *Connection) NewWatcher(key string, callback WatchCallback) (Watcher,
15631634

15641635
if state.cnt == 0 {
15651636
// The last one sends IPROTO_UNWATCH.
1566-
conn.Do(newUnwatchRequest(key)).Get()
1637+
if !conn.ClosedNow() {
1638+
// conn.ClosedNow() check is a workaround for calling
1639+
// Unregister from connectionClose().
1640+
conn.Do(newUnwatchRequest(key)).Get()
1641+
}
15671642
conn.watchMap.Delete(key)
15681643
close(state.unready)
15691644
}
@@ -1666,3 +1741,50 @@ func (conn *Connection) ServerProtocolInfo() ProtocolInfo {
16661741
func (conn *Connection) ClientProtocolInfo() ProtocolInfo {
16671742
return clientProtocolInfo.Clone()
16681743
}
1744+
1745+
func shutdownEventCallback(event WatchEvent) {
1746+
// Receives "true" on server shutdown.
1747+
// See https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/
1748+
// step 2.
1749+
val, ok := event.Value.(bool)
1750+
if ok && val {
1751+
go event.Conn.processShutdown()
1752+
}
1753+
}
1754+
1755+
func (conn *Connection) processShutdown() {
1756+
// Forbid state changes.
1757+
conn.mutex.Lock()
1758+
defer conn.mutex.Unlock()
1759+
1760+
atomic.StoreUint32(&(conn.state), connShutdown)
1761+
conn.notify(Shutdown)
1762+
1763+
c := conn.c
1764+
for (atomic.LoadUint32(&(conn.state)) == connShutdown) &&
1765+
(atomic.LoadUint32(&(conn.requestCnt)) != 0) &&
1766+
(c == conn.c) {
1767+
// Use cond var on conn.mutex since request execution may
1768+
// call reconnect(). It is ok if state changes as part of
1769+
// reconnect since Tarantool server won't allow to reconnect
1770+
// in the middle of shutting down.
1771+
conn.cond.Wait()
1772+
}
1773+
// Do not unregister task explicitly here since connection teardown
1774+
// has the same effect. To clean up connection resources,
1775+
// unregister on full close.
1776+
1777+
if (atomic.LoadUint32(&(conn.state)) == connShutdown) &&
1778+
(c == conn.c) {
1779+
// Start to reconnect based on common rules, same as in net.box.
1780+
// Reconnect also closes the connection: server waits until all
1781+
// subscribed connections are terminated.
1782+
// See https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/
1783+
// step 3.
1784+
conn.reconnectImpl(
1785+
ClientError{
1786+
ErrConnectionClosed,
1787+
"connection closed after server shutdown",
1788+
}, conn.c)
1789+
}
1790+
}

errors.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ const (
5555
ErrProtocolError = 0x4000 + iota
5656
ErrTimeouted = 0x4000 + iota
5757
ErrRateLimited = 0x4000 + iota
58+
ErrConnectionShutdown = 0x4000 + iota
5859
)
5960

6061
// Tarantool server error codes.

0 commit comments

Comments
 (0)