Skip to content

Commit fae98b6

Browse files
committed
bugfix: flaky queue/Example_connectionPool
We need to wait a ready state for a queue. We also need to wait for a success queue configuration on all instances before start a work. Closes #278
1 parent 9385498 commit fae98b6

File tree

1 file changed

+34
-19
lines changed

1 file changed

+34
-19
lines changed

queue/example_connection_pool_test.go

Lines changed: 34 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,12 @@ type QueueConnectionHandler struct {
1818
name string
1919
cfg queue.Cfg
2020

21-
uuid uuid.UUID
22-
registered bool
23-
err error
24-
mutex sync.Mutex
25-
masterUpdated chan struct{}
26-
masterCnt int32
21+
uuid uuid.UUID
22+
registered bool
23+
err error
24+
mutex sync.Mutex
25+
updated chan struct{}
26+
masterCnt int32
2727
}
2828

2929
// QueueConnectionHandler implements the ConnectionHandler interface.
@@ -32,9 +32,9 @@ var _ connection_pool.ConnectionHandler = &QueueConnectionHandler{}
3232
// NewQueueConnectionHandler creates a QueueConnectionHandler object.
3333
func NewQueueConnectionHandler(name string, cfg queue.Cfg) *QueueConnectionHandler {
3434
return &QueueConnectionHandler{
35-
name: name,
36-
cfg: cfg,
37-
masterUpdated: make(chan struct{}, 10),
35+
name: name,
36+
cfg: cfg,
37+
updated: make(chan struct{}, 10),
3838
}
3939
}
4040

@@ -53,14 +53,25 @@ func (h *QueueConnectionHandler) Discovered(conn *tarantool.Connection,
5353
}
5454

5555
master := role == connection_pool.MasterRole
56-
if master {
57-
defer func() {
58-
h.masterUpdated <- struct{}{}
59-
}()
56+
57+
q := queue.New(conn, h.name)
58+
59+
// Check is queue ready to work.
60+
if state, err := q.State(); err != nil {
61+
h.updated <- struct{}{}
62+
h.err = err
63+
return err
64+
} else if master && state != queue.RunningState {
65+
return fmt.Errorf("queue state is not RUNNING: %d", state)
66+
} else if !master && state != queue.InitState && state != queue.WaitingState {
67+
return fmt.Errorf("queue state is not INIT and not WAITING: %d", state)
6068
}
6169

70+
defer func() {
71+
h.updated <- struct{}{}
72+
}()
73+
6274
// Set up a queue module configuration for an instance.
63-
q := queue.New(conn, h.name)
6475
opts := queue.CfgOpts{InReplicaset: true, Ttr: 60 * time.Second}
6576

6677
if h.err = q.Cfg(opts); h.err != nil {
@@ -106,7 +117,7 @@ func (h *QueueConnectionHandler) Deactivated(conn *tarantool.Connection,
106117

107118
// Closes closes a QueueConnectionHandler object.
108119
func (h *QueueConnectionHandler) Close() {
109-
close(h.masterUpdated)
120+
close(h.updated)
110121
}
111122

112123
// Example demonstrates how to use the queue package with the connection_pool
@@ -155,8 +166,10 @@ func Example_connectionPool() {
155166
}
156167
defer connPool.Close()
157168

158-
// Wait for a master instance identification in the queue.
159-
<-h.masterUpdated
169+
// Wait for a queue initialization and master instance identification in
170+
// the queue.
171+
<-h.updated
172+
<-h.updated
160173
if h.err != nil {
161174
fmt.Printf("Unable to identify in the pool: %s", h.err)
162175
return
@@ -183,8 +196,10 @@ func Example_connectionPool() {
183196
return
184197
}
185198

186-
// Wait for a new master instance re-identification.
187-
<-h.masterUpdated
199+
// Wait for a replica instance connection and a new master instance
200+
// re-identification.
201+
<-h.updated
202+
<-h.updated
188203
h.mutex.Lock()
189204
err = h.err
190205
h.mutex.Unlock()

0 commit comments

Comments
 (0)