diff --git a/CHANGELOG.md b/CHANGELOG.md index c2d03b21f..94c20c265 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -37,6 +37,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release. - Datetime location after encode + decode is unequal (#217) - Wrong interval arithmetic with timezones (#221) - Invalid MsgPack if STREAM_ID > 127 (#224) +- queue.Take() returns an invalid task (#222) ## [1.8.0] - 2022-08-17 diff --git a/connection_pool/connection_pool_test.go b/connection_pool/connection_pool_test.go index 091d5216c..60c9b4f91 100644 --- a/connection_pool/connection_pool_test.go +++ b/connection_pool/connection_pool_test.go @@ -337,7 +337,7 @@ func TestConnectionHandlerOpenUpdateClose(t *testing.T) { for i := 0; i < 100; i++ { // Wait for read_only update, it should report about close connection // with old role. - if h.deactivated >= 1 { + if h.discovered >= 3 { break } time.Sleep(poolOpts.CheckTimeout) diff --git a/queue/example_connection_pool_test.go b/queue/example_connection_pool_test.go index 59f5020ea..214e67274 100644 --- a/queue/example_connection_pool_test.go +++ b/queue/example_connection_pool_test.go @@ -3,6 +3,7 @@ package queue_test import ( "fmt" "sync" + "sync/atomic" "time" "github.com/google/uuid" @@ -22,6 +23,7 @@ type QueueConnectionHandler struct { err error mutex sync.Mutex masterUpdated chan struct{} + masterCnt int32 } // QueueConnectionHandler implements the ConnectionHandler interface. @@ -87,6 +89,7 @@ func (h *QueueConnectionHandler) Discovered(conn *tarantool.Connection, } } + atomic.AddInt32(&h.masterCnt, 1) fmt.Printf("Master %s is ready to work!\n", conn.Addr()) return nil @@ -95,6 +98,9 @@ func (h *QueueConnectionHandler) Discovered(conn *tarantool.Connection, // Deactivated doesn't do anything useful for the example. func (h *QueueConnectionHandler) Deactivated(conn *tarantool.Connection, role connection_pool.Role) error { + if role == connection_pool.MasterRole { + atomic.AddInt32(&h.masterCnt, -1) + } return nil } @@ -184,8 +190,18 @@ func Example_connectionPool() { return } + for i := 0; i < 2 && atomic.LoadInt32(&h.masterCnt) != 1; i++ { + // The pool does not immediately detect role switching. It may happen + // that requests will be sent to RO instances. In that case q.Take() + // method will return a nil value. + // + // We need to make the example test output deterministic so we need to + // avoid it here. But in real life, you need to take this into account. + time.Sleep(poolOpts.CheckTimeout) + } + // Take a data from the new master instance. - task, err := q.TakeTimeout(1 * time.Second) + task, err := q.Take() if err != nil { fmt.Println("Unable to got task:", err) } else if task == nil { diff --git a/queue/queue.go b/queue/queue.go index a4e9af029..df13f09bb 100644 --- a/queue/queue.go +++ b/queue/queue.go @@ -467,6 +467,14 @@ func (qd *queueData) DecodeMsgpack(d *decoder) error { } qd.task = &Task{data: qd.result, q: qd.q} - d.Decode(&qd.task) + if err = d.Decode(&qd.task); err != nil { + return err + } + + if qd.task.Data() == nil { + // It may happen if the decoder has a code.Nil value inside. As a + // result, the task will not be decoded. + qd.task = nil + } return nil } diff --git a/queue/task.go b/queue/task.go index 348d8610d..c1b0aad98 100644 --- a/queue/task.go +++ b/queue/task.go @@ -29,13 +29,9 @@ func (t *Task) DecodeMsgpack(d *decoder) error { return err } if t.data != nil { - if err = d.Decode(t.data); err != nil { - return fmt.Errorf("fffuuuu: %s", err) - } - } else { - if t.data, err = d.DecodeInterface(); err != nil { - return err - } + d.Decode(t.data) + } else if t.data, err = d.DecodeInterface(); err != nil { + return err } return nil }