Skip to content

Release all taken tasks on start #106

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Feb 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,7 @@ If there are no 'ready' tasks in the queue, returns nil.
* `tube:bury(task_id)` - buries a task
* `tube:kick(count)` - digs out `count` tasks
* `tube:peek(task_id)` - return the task state by ID
* `tube:tasks_by_state(task_state)` - return the iterator to tasks in a certain state
* `tube:truncate()` - delete all tasks from the tube. Note that `tube:truncate`
must be called only by the user who created this tube (has space ownership) OR
under a `setuid` function. Read more about `setuid` functions
Expand Down
26 changes: 25 additions & 1 deletion queue/abstract.lua
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,26 @@ queue.driver = {
limfifottl = require('queue.abstract.driver.limfifottl')
}

local function tube_release_all_tasks(tube)
local prefix = ('queue: [tube "%s"] '):format(tube.name)

-- We lean on stable iterators in this function.
-- https://github.com/tarantool/tarantool/issues/1796
if not qc.check_version({1, 7, 5}) then
log.error(prefix .. 'no stable iterator support: skip task releasing')
log.error(prefix .. 'some tasks may stuck in taken state perpetually')
log.error(prefix .. 'update tarantool to >= 1.7.5 or take the risk')
end

log.info(prefix .. 'releasing all taken task (may take a while)')
local released = 0
for _, task in tube.raw:tasks_by_state(state.TAKEN) do
tube.raw:release(task[1])
released = released + 1
end
log.info(prefix .. ('released %d tasks'):format(released))
end

-- tube methods
local tube = {}

Expand Down Expand Up @@ -515,7 +535,11 @@ function method.start()
})
end

_queue:pairs():each(recreate_tube)
for _, tube_tuple in _queue:pairs() do
local tube = recreate_tube(tube_tuple)
-- gh-66: release all taken tasks on start
tube_release_all_tasks(tube)
end

session.on_disconnect(queue._on_consumer_disconnect)
return queue
Expand Down
5 changes: 5 additions & 0 deletions queue/abstract/driver/fifo.lua
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,11 @@ function method.peek(self, id)
return self.space:get{id}
end

-- get iterator to tasks in a certain state
function method.tasks_by_state(self, task_state)
return self.space.index.status:pairs(task_state)
end

function method.truncate(self)
self.space:truncate()
end
Expand Down
5 changes: 5 additions & 0 deletions queue/abstract/driver/fifottl.lua
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,11 @@ function method.peek(self, id)
return self.space:get{id}
end

-- get iterator to tasks in a certain state
function method.tasks_by_state(self, task_state)
return self.space.index.status:pairs(task_state)
end

function method.truncate(self)
self.space:truncate()
end
Expand Down
5 changes: 5 additions & 0 deletions queue/abstract/driver/utube.lua
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,11 @@ function method.peek(self, id)
return self.space:get{id}
end

-- get iterator to tasks in a certain state
function method.tasks_by_state(self, task_state)
return self.space.index.status:pairs(task_state)
end

function method.truncate(self)
self.space:truncate()
end
Expand Down
5 changes: 5 additions & 0 deletions queue/abstract/driver/utubettl.lua
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,11 @@ function method.peek(self, id)
return self.space:get{id}
end

-- get iterator to tasks in a certain state
function method.tasks_by_state(self, task_state)
return self.space.index.status:pairs(task_state)
end

function method.truncate(self)
self.space:truncate()
end
Expand Down
2 changes: 1 addition & 1 deletion t/000-init.t
Original file line number Diff line number Diff line change
Expand Up @@ -43,5 +43,5 @@ test:test('access to queue after box.cfg{}', function(test)
end)

tnt.finish()
os.exit(test:check() == true and 0 or -1)
os.exit(test:check() and 0 or 1)
-- vim: set ft=lua :
2 changes: 1 addition & 1 deletion t/001-tube-init.t
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,5 @@ test:test('test queue mock addition', function(test)
end)

tnt.finish()
os.exit(test:check() == true and 0 or -1)
os.exit(test:check() and 0 or 1)
-- vim: set ft=lua :
34 changes: 32 additions & 2 deletions t/010-fifo.t
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ local yaml = require('yaml')
local fiber = require('fiber')

local test = require('tap').test()
test:plan(14)
test:plan(15)

local queue = require('queue')
local state = require('queue.abstract.state')
Expand Down Expand Up @@ -292,6 +292,36 @@ test:test('if_not_exists test', function(test)
test:isnt(tube, tube_new, "if_not_exists if tube doesn't exists")
end)

test:test('Get tasks by state test', function(test)
test:plan(2)
local tube = queue.create_tube('test_task_it', 'fifo')

for i = 1, 10 do
tube:put('test_data' .. tostring(i))
end
for i = 1, 4 do
tube:take(0.001)
end

local count_taken = 0
local count_ready = 0

for _, task in tube.raw:tasks_by_state(state.READY) do
if task[2] == state.READY then
count_ready = count_ready + 1
end
end

for _, task in tube.raw:tasks_by_state(state.TAKEN) do
if task[2] == state.TAKEN then
count_taken = count_taken + 1
end
end

test:is(count_ready, 6, 'Check tasks count in a ready state')
test:is(count_taken, 4, 'Check tasks count in a taken state')
end)

tnt.finish()
os.exit(test:check() == true and 0 or -1)
os.exit(test:check() and 0 or 1)
-- vim: set ft=lua :
33 changes: 31 additions & 2 deletions t/020-fifottl.t
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
local fiber = require('fiber')

local test = require('tap').test()
test:plan(15)
test:plan(16)

local queue = require('queue')
local state = require('queue.abstract.state')
Expand Down Expand Up @@ -262,7 +262,36 @@ test:test('buried task in a dropped queue', function(test)
test:ok(true, 'queue does not hang')
end)

test:test('Get tasks by state test', function(test)
test:plan(2)
local tube = queue.create_tube('test_task_it', 'fifottl')

for i = 1, 10 do
tube:put('test_data' .. tostring(i))
end
for i = 1, 4 do
tube:take(0.001)
end

local count_taken = 0
local count_ready = 0

for _, task in tube.raw:tasks_by_state(state.READY) do
if task[2] == state.READY then
count_ready = count_ready + 1
end
end

for _, task in tube.raw:tasks_by_state(state.TAKEN) do
if task[2] == state.TAKEN then
count_taken = count_taken + 1
end
end

test:is(count_ready, 6, 'Check tasks count in a ready state')
test:is(count_taken, 4, 'Check tasks count in a taken state')
end)

tnt.finish()
os.exit(test:check() == true and 0 or -1)
os.exit(test:check() and 0 or 1)
-- vim: set ft=lua :
34 changes: 32 additions & 2 deletions t/030-utube.t
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ local yaml = require('yaml')
local fiber = require('fiber')

local test = (require('tap')).test()
test:plan(11)
test:plan(12)

local queue = require('queue')
local state = require('queue.abstract.state')
Expand Down Expand Up @@ -156,6 +156,36 @@ test:test('if_not_exists test', function(test)
test:isnt(tube, tube_new, "if_not_exists if tube doesn't exists")
end)

test:test('Get tasks by state test', function(test)
test:plan(2)
local tube = queue.create_tube('test_task_it', 'utube')

for i = 1, 10 do
tube:put('test_data' .. tostring(i), { utube = i })
end
for i = 1, 4 do
tube:take(0.001)
end

local count_taken = 0
local count_ready = 0

for _, task in tube.raw:tasks_by_state(state.READY) do
if task[2] == state.READY then
count_ready = count_ready + 1
end
end

for _, task in tube.raw:tasks_by_state(state.TAKEN) do
if task[2] == state.TAKEN then
count_taken = count_taken + 1
end
end

test:is(count_ready, 6, 'Check tasks count in a ready state')
test:is(count_taken, 4, 'Check tasks count in a taken state')
end)

tnt.finish()
os.exit(test:check() == true and 0 or -1)
os.exit(test:check() and 0 or 1)
-- vim: set ft=lua :
34 changes: 32 additions & 2 deletions t/040-utubettl.t
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ local yaml = require('yaml')
local fiber = require('fiber')

local test = (require('tap')).test()
test:plan(16)
test:plan(17)

local queue = require('queue')
local state = require('queue.abstract.state')
Expand Down Expand Up @@ -257,6 +257,36 @@ test:test('ttl after delay test', function(test)
test:is(task.ttr, TTR * 1000000, 'check TTR after release')
end)

test:test('Get tasks by state test', function(test)
test:plan(2)
local tube = queue.create_tube('test_task_it', 'utubettl')

for i = 1, 10 do
tube:put('test_data' .. tostring(i), { utube = i })
end
for i = 1, 4 do
tube:take(0.001)
end

local count_taken = 0
local count_ready = 0

for _, task in tube.raw:tasks_by_state(state.READY) do
if task[2] == state.READY then
count_ready = count_ready + 1
end
end

for _, task in tube.raw:tasks_by_state(state.TAKEN) do
if task[2] == state.TAKEN then
count_taken = count_taken + 1
end
end

test:is(count_ready, 6, 'Check tasks count in a ready state')
test:is(count_taken, 4, 'Check tasks count in a taken state')
end)

tnt.finish()
os.exit(test:check() == true and 0 or -1)
os.exit(test:check() and 0 or 1)
-- vim: set ft=lua :
2 changes: 1 addition & 1 deletion t/050-ttl.t
Original file line number Diff line number Diff line change
Expand Up @@ -58,5 +58,5 @@ test:test('many messages, one queue utttl', function (test)
end)

tnt.finish()
os.exit(test:check() == true and 0 or -1)
os.exit(test:check() and 0 or 1)
-- vim: set ft=lua :
2 changes: 1 addition & 1 deletion t/060-async.t
Original file line number Diff line number Diff line change
Expand Up @@ -54,5 +54,5 @@ end)


tnt.finish()
os.exit(test:check() == true and 0 or -1)
os.exit(test:check() and 0 or 1)
-- vim: set ft=lua:
2 changes: 1 addition & 1 deletion t/070-compat.t
Original file line number Diff line number Diff line change
Expand Up @@ -42,5 +42,5 @@ test:test("check compatibility names", function(test)
test:is(str_name("1.7.1-168"), "str", "check old name (str)")
end)

os.exit(test:check() == true and 0 or -1)
os.exit(test:check() and 0 or 1)
-- vim: set ft=lua:
2 changes: 1 addition & 1 deletion t/080-otc-cb.t
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,5 @@ test:test('on_task_change callback', function(test)
end)

tnt.finish()
os.exit(test:check() == true and 0 or -1)
os.exit(test:check() and 0 or 1)
-- vim: set ft=lua :
2 changes: 1 addition & 1 deletion t/090-grant-check.t
Original file line number Diff line number Diff line change
Expand Up @@ -166,5 +166,5 @@ end)

tnt.finish()

os.exit(test:check() == true and 0 or -1)
os.exit(test:check() and 0 or 1)
-- vim: set ft=lua :
2 changes: 1 addition & 1 deletion t/100-limfifottl.t
Original file line number Diff line number Diff line change
Expand Up @@ -68,5 +68,5 @@ else
end

tnt.finish()
os.exit(test:check() == true and 0 or -1)
os.exit(test:check() and 0 or 1)
-- vim: set ft=lua :