Skip to content

Commit b617395

Browse files
committed
refactoring: move task check to "taken" into a separate function
Needed for #85
1 parent 8efc8c5 commit b617395

File tree

1 file changed

+19
-16
lines changed

1 file changed

+19
-16
lines changed

queue/abstract.lua

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,16 @@ function tube.take(self, timeout)
125125
end
126126
end
127127

128+
--- Checks is the task has been taken in a session with
129+
-- connection id == "conn_id".
130+
-- Throw an error on failure.
131+
local function check_task_on_taken(tube_id, task_id, conn_id)
132+
local _taken = box.space._queue_taken:get{conn_id, tube_id, task_id}
133+
if _taken == nil then
134+
error("Task was not taken in the session")
135+
end
136+
end
137+
128138
function tube.touch(self, id, delta)
129139
if delta == nil then
130140
return
@@ -140,10 +150,7 @@ function tube.touch(self, id, delta)
140150
return
141151
end
142152

143-
local _taken = box.space._queue_taken:get{connection.id(), self.tube_id, id}
144-
if _taken == nil then
145-
error("Task was not taken in the session")
146-
end
153+
check_task_on_taken(self.tube_id, id, connection.id())
147154

148155
local space_name = box.space._queue:get{self.name}[3]
149156
queue.stat[space_name]:inc('touch')
@@ -152,16 +159,14 @@ function tube.touch(self, id, delta)
152159
end
153160

154161
function tube.ack(self, id)
155-
local _taken = box.space._queue_taken:get{connection.id(), self.tube_id, id}
156-
if _taken == nil then
157-
error("Task was not taken in the session")
158-
end
162+
local conn_id = connection.id()
163+
check_task_on_taken(self.tube_id, id, conn_id)
159164
local tube = box.space._queue:get{self.name}
160165
local space_name = tube[3]
161166

162167
self:peek(id)
163168
-- delete task
164-
box.space._queue_taken:delete{connection.id(), self.tube_id, id}
169+
box.space._queue_taken:delete{conn_id, self.tube_id, id}
165170
local result = self.raw:normalize_task(
166171
self.raw:delete(id):transform(2, 1, state.DONE)
167172
)
@@ -173,10 +178,7 @@ end
173178

174179
local function tube_release_internal(self, id, opts, connection_id)
175180
opts = opts or {}
176-
local _taken = box.space._queue_taken:get{connection_id, self.tube_id, id}
177-
if _taken == nil then
178-
error("Task was not taken in the session")
179-
end
181+
check_task_on_taken(self.tube_id, id, connection_id)
180182

181183
box.space._queue_taken:delete{connection_id, self.tube_id, id}
182184
self:peek(id)
@@ -197,9 +199,10 @@ end
197199

198200
function tube.bury(self, id)
199201
local task = self:peek(id)
200-
local _taken = box.space._queue_taken:get{connection.id(), self.tube_id, id}
201-
if _taken ~= nil then
202-
box.space._queue_taken:delete{connection.id(), self.tube_id, id}
202+
local conn_id = connection.id()
203+
local is_taken, _ = pcall(check_task_on_taken, self.tube_id, id, conn_id)
204+
if is_taken then
205+
box.space._queue_taken:delete{conn_id, self.tube_id, id}
203206
end
204207
if task[2] == state.BURIED then
205208
return task

0 commit comments

Comments
 (0)