Skip to content

Commit 161ab21

Browse files
LeonidVasTotktonada
authored andcommitted
Add a check of the session state before take a task
We need to check of a session state before take a task after wait() because session maybe in disconnecting state and task will be hang in a take state after the session will be disconnected Fixes #104
1 parent 3ad7ab3 commit 161ab21

File tree

2 files changed

+71
-0
lines changed

2 files changed

+71
-0
lines changed

queue/abstract.lua

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ function tube.put(self, data, opts)
9393
end
9494

9595
local conds = {}
96+
local releasing_sessions = {}
9697

9798
function tube.take(self, timeout)
9899
timeout = time(timeout or TIMEOUT_INFINITY)
@@ -114,6 +115,13 @@ function tube.take(self, timeout)
114115
conds[fid]:free()
115116
box.space._queue_consumers:delete{ sid, fid }
116117

118+
-- We don't take a task if the session is in a
119+
-- disconnecting state.
120+
if releasing_sessions[fid] then
121+
releasing_sessions[fid] = nil
122+
return nil
123+
end
124+
117125
task = self.raw:take()
118126

119127
if task ~= nil then
@@ -376,6 +384,7 @@ function method._on_consumer_disconnect()
376384
box.space._queue_consumers:delete{ waiter[1], waiter[2] }
377385
local cond = conds[waiter[2]]
378386
if cond then
387+
releasing_sessions[waiter[2]] = true
379388
cond:signal(waiter[2])
380389
end
381390
end

t/120-take-task-after-reconnect.t

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
#!/usr/bin/env tarantool
2+
3+
local fiber = require('fiber')
4+
local netbox = require('net.box')
5+
local os = require('os')
6+
local queue = require('queue')
7+
local tap = require('tap')
8+
local tnt = require('t.tnt')
9+
10+
11+
local test = tap.test('take a task after reconnect')
12+
test:plan(1)
13+
14+
local listen = 'localhost:1918'
15+
tnt.cfg{listen = listen}
16+
17+
18+
local function test_take_task_after_disconnect(test)
19+
test:plan(1)
20+
local driver = 'fifottl'
21+
local tube = queue.create_tube('test_tube', driver,
22+
{if_not_exists = true})
23+
rawset(_G, 'queue', require('queue'))
24+
tube:grant('guest', {call = true})
25+
local task_id = tube:put('test_data')[1]
26+
-- Now we have one task in a ready state
27+
28+
local connection = netbox.connect(listen)
29+
local fiber_1 = fiber.create(function()
30+
connection:call('queue.tube.test_tube:take')
31+
connection:call('queue.tube.test_tube:take')
32+
end)
33+
34+
-- This is not a best practice but we need to use the fiber.sleep()
35+
-- (not fiber.yield()).
36+
-- Expected results from a sleep() calling:
37+
-- 1) Execute first connection:call('queue.tube.test_tube:take')
38+
-- Now one task in a taken state
39+
-- 2) Call the second connection:call('queue.tube.test_tube:take')
40+
-- and to hang the fiber_1
41+
-- 3) Start a fiber on the server side of connection which will execute
42+
-- second queue.tube.test_tube:take call and hang because the queue
43+
-- is empty
44+
fiber.sleep(0.1)
45+
46+
connection:close()
47+
48+
fiber.sleep(0.1)
49+
-- The taken task will be released (cause - disconnection).
50+
-- After that the fiber which waiting of a ready task (into take procedure)
51+
-- will try to take this task (before the fix).
52+
53+
54+
test:is(tube:peek(task_id)[2] == 'r', true, 'Task in ready state')
55+
end
56+
57+
58+
test:test('Don\'t take a task after disconnect', test_take_task_after_disconnect)
59+
60+
61+
tnt.finish()
62+
os.exit(test:check() and 0 or 1)

0 commit comments

Comments
 (0)