diff --git a/CHANGELOG.md b/CHANGELOG.md index 24e7c5f65..80fc40f5a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release. - Several non-critical data race issues (#218) - ConnectionPool does not properly handle disconnection with Opts.Reconnect set (#272) +- Watcher events loss with a small per-request timeout (#284) ## [1.10.0] - 2022-12-31 diff --git a/connection.go b/connection.go index 7da3f26d0..65d21365c 100644 --- a/connection.go +++ b/connection.go @@ -1066,6 +1066,10 @@ func (conn *Connection) putFuture(fut *Future, req Request, streamId uint64) { } shard.bufmut.Unlock() + if firstWritten { + conn.dirtyShard <- shardn + } + if req.Async() { if fut = conn.fetchFuture(reqid); fut != nil { resp := &Response{ @@ -1076,10 +1080,6 @@ func (conn *Connection) putFuture(fut *Future, req Request, streamId uint64) { conn.markDone(fut) } } - - if firstWritten { - conn.dirtyShard <- shardn - } } func (conn *Connection) markDone(fut *Future) { @@ -1456,10 +1456,13 @@ func (conn *Connection) newWatcherImpl(key string, callback WatchCallback) (Watc st <- state if sendAck { - conn.Do(newWatchRequest(key)).Get() // We expect a reconnect and re-subscribe if it fails to // send the watch request. So it looks ok do not check a - // result. + // result. But we need to make sure that the re-watch + // request will not be finished by a small per-request + // timeout. + req := newWatchRequest(key).Context(context.Background()) + conn.Do(req).Get() } } @@ -1477,7 +1480,12 @@ func (conn *Connection) newWatcherImpl(key string, callback WatchCallback) (Watc if !conn.ClosedNow() { // conn.ClosedNow() check is a workaround for calling // Unregister from connectionClose(). - conn.Do(newUnwatchRequest(key)).Get() + // + // We need to make sure that the unwatch request will + // not be finished by a small per-request timeout to + // avoid lost of the request. + req := newUnwatchRequest(key).Context(context.Background()) + conn.Do(req).Get() } conn.watchMap.Delete(key) close(state.unready) diff --git a/tarantool_test.go b/tarantool_test.go index 125642dcf..3c2181f2e 100644 --- a/tarantool_test.go +++ b/tarantool_test.go @@ -3835,7 +3835,7 @@ func TestConnection_NewWatcher_concurrent(t *testing.T) { var wg sync.WaitGroup wg.Add(testConcurrency) - var ret error + errors := make(chan error, testConcurrency) for i := 0; i < testConcurrency; i++ { go func(i int) { defer wg.Done() @@ -3846,21 +3846,22 @@ func TestConnection_NewWatcher_concurrent(t *testing.T) { close(events) }) if err != nil { - ret = err + errors <- err } else { select { case <-events: case <-time.After(time.Second): - ret = fmt.Errorf("Unable to get an event %d", i) + errors <- fmt.Errorf("Unable to get an event %d", i) } watcher.Unregister() } }(i) } wg.Wait() + close(errors) - if ret != nil { - t.Fatalf("An error found: %s", ret) + for err := range errors { + t.Errorf("An error found: %s", err) } }