Skip to content

Commit 3fa50b3

Browse files
committed
refactoring: move time functions to util
The same time functions are used in several files, so move them to a separate file. Needed for #85
1 parent ca3cdee commit 3fa50b3

File tree

6 files changed

+77
-94
lines changed

6 files changed

+77
-94
lines changed

queue-scm-1.rockspec

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ build = {
2424
['queue.abstract.driver.utube'] = 'queue/abstract/driver/utube.lua',
2525
['queue.abstract.driver.limfifottl'] = 'queue/abstract/driver/limfifottl.lua',
2626
['queue.compat'] = 'queue/compat.lua',
27+
['queue.util'] = 'queue/util.lua',
2728
['queue'] = 'queue/init.lua'
2829
}
2930
}

queue/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/init.lua
22
DESTINATION ${TARANTOOL_INSTALL_LUADIR}/${PROJECT_NAME}/)
33
install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/compat.lua
44
DESTINATION ${TARANTOOL_INSTALL_LUADIR}/${PROJECT_NAME}/)
5+
install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/util.lua
6+
DESTINATION ${TARANTOOL_INSTALL_LUADIR}/${PROJECT_NAME}/)
57
install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/abstract.lua
68
DESTINATION ${TARANTOOL_INSTALL_LUADIR}/${PROJECT_NAME}/)
79
install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/abstract/state.lua

queue/abstract.lua

Lines changed: 5 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ local fiber = require('fiber')
33

44
local state = require('queue.abstract.state')
55

6+
local util = require('queue.util')
67
local qc = require('queue.compat')
78
local num_type = qc.num_type
89
local str_type = qc.str_type
@@ -29,30 +30,6 @@ local queue = {
2930
}),
3031
stat = {}
3132
}
32-
local MAX_TIMEOUT = 365 * 86400 * 100 -- MAX_TIMEOUT == 100 years
33-
local TIMEOUT_INFINITY = 18446744073709551615ULL -- Set to TIMEOUT_INFINITY
34-
-- instead
35-
-- returns time for next event
36-
local function time(tm)
37-
if tm == nil then
38-
tm = fiber.time64()
39-
elseif tm < 0 then
40-
tm = 0
41-
else
42-
tm = tm * 1000000
43-
end
44-
return 0ULL + tm
45-
end
46-
47-
local function event_time(tm)
48-
if tm == nil or tm < 0 then
49-
tm = 0
50-
elseif tm > MAX_TIMEOUT then
51-
return TIMEOUT_INFINITY
52-
end
53-
tm = 0ULL + tm * 1000000 + fiber.time64()
54-
return tm
55-
end
5633

5734
local function tube_release_all_tasks(tube)
5835
local prefix = ('queue: [tube "%s"] '):format(tube.name)
@@ -98,15 +75,15 @@ local conds = {}
9875
local releasing_connections = {}
9976

10077
function tube.take(self, timeout)
101-
timeout = time(timeout or TIMEOUT_INFINITY)
78+
timeout = util.time(timeout or util.TIMEOUT_INFINITY)
10279
local task = self.raw:take()
10380
if task ~= nil then
10481
return self.raw:normalize_task(task)
10582
end
10683

10784
while timeout > 0 do
10885
local started = fiber.time64()
109-
local time = event_time(timeout)
86+
local time = util.event_time(timeout)
11087
local tid = self.tube_id
11188
local fid = fiber.id()
11289
local conn_id = connection.id()
@@ -141,8 +118,8 @@ function tube.touch(self, id, delta)
141118
end
142119
if delta < 0 then -- if delta is lesser then 0, then it's zero
143120
delta = 0
144-
elseif delta > MAX_TIMEOUT then -- no ttl/ttr for this task
145-
delta = TIMEOUT_INFINITY
121+
elseif delta > util.MAX_TIMEOUT then -- no ttl/ttr for this task
122+
delta = util.TIMEOUT_INFINITY
146123
else -- convert to usec
147124
delta = delta * 1000000
148125
end

queue/abstract/driver/fifottl.lua

Lines changed: 13 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,14 @@ local log = require('log')
22
local fiber = require('fiber')
33
local state = require('queue.abstract.state')
44

5+
local util = require('queue.util')
56
local qc = require('queue.compat')
67
local num_type = qc.num_type
78
local str_type = qc.str_type
89

910
local tube = {}
1011
local method = {}
1112

12-
local TIMEOUT_INFINITY = 365 * 86400 * 500
13-
1413
local i_id = 1
1514
local i_status = 2
1615
local i_next_event = 3
@@ -20,24 +19,6 @@ local i_pri = 6
2019
local i_created = 7
2120
local i_data = 8
2221

23-
local function time(tm)
24-
if tm == nil then
25-
tm = fiber.time64()
26-
elseif tm < 0 then
27-
tm = 0
28-
else
29-
tm = tm * 1000000
30-
end
31-
return 0ULL + tm
32-
end
33-
34-
local function event_time(tm)
35-
if tm == nil or tm < 0 then
36-
tm = 0
37-
end
38-
return 0ULL + tm * 1000000 + fiber.time64()
39-
end
40-
4122
local function is_expired(task)
4223
local dead_event = task[i_created] + task[i_ttl]
4324
return (dead_event <= fiber.time64())
@@ -57,7 +38,7 @@ end
5738

5839
-- create space
5940
function tube.create_space(space_name, opts)
60-
opts.ttl = opts.ttl or TIMEOUT_INFINITY
41+
opts.ttl = opts.ttl or util.MAX_TIMEOUT
6142
opts.ttr = opts.ttr or opts.ttl
6243
opts.pri = opts.pri or 0
6344

@@ -107,9 +88,9 @@ local ttl_states = { state.READY, state.BURIED }
10788
local ttr_state = { state.TAKEN }
10889

10990
local function fifottl_fiber_iteration(self, processed)
110-
local now = time()
91+
local now = util.time()
11192
local task = nil
112-
local estimated = TIMEOUT_INFINITY
93+
local estimated = util.MAX_TIMEOUT
11394

11495
-- delayed tasks
11596
task = self.space.index.watch:min(delayed_state)
@@ -234,20 +215,20 @@ function method.put(self, data, opts)
234215
if opts.delay ~= nil and opts.delay > 0 then
235216
status = state.DELAYED
236217
ttl = ttl + opts.delay
237-
next_event = event_time(opts.delay)
218+
next_event = util.event_time(opts.delay)
238219
else
239220
status = state.READY
240-
next_event = event_time(ttl)
221+
next_event = util.event_time(ttl)
241222
end
242223

243224
local task = self.space:insert{
244225
id,
245226
status,
246227
next_event,
247-
time(ttl),
248-
time(ttr),
228+
util.time(ttl),
229+
util.time(ttr),
249230
pri,
250-
time(),
231+
util.time(),
251232
data
252233
}
253234
self:on_task_change(task, 'put')
@@ -261,7 +242,7 @@ function method.touch(self, id, delta)
261242
{'+', i_ttl, delta},
262243
{'+', i_ttr, delta}
263244
}
264-
if delta == TIMEOUT_INFINITY then
245+
if delta == util.MAX_TIMEOUT then
265246
ops = {
266247
{'=', i_next_event, delta},
267248
{'=', i_ttl, delta},
@@ -288,7 +269,7 @@ function method.take(self)
288269
return
289270
end
290271

291-
local next_event = time() + task[i_ttr]
272+
local next_event = util.time() + task[i_ttr]
292273
local dead_event = task[i_created] + task[i_ttl]
293274
if next_event > dead_event then
294275
next_event = dead_event
@@ -322,8 +303,8 @@ function method.release(self, id, opts)
322303
if opts.delay ~= nil and opts.delay > 0 then
323304
task = self.space:update(id, {
324305
{ '=', i_status, state.DELAYED },
325-
{ '=', i_next_event, event_time(opts.delay) },
326-
{ '+', i_ttl, time(opts.delay) }
306+
{ '=', i_next_event, util.event_time(opts.delay) },
307+
{ '+', i_ttl, util.time(opts.delay) }
327308
})
328309
else
329310
task = self.space:update(id, {

queue/abstract/driver/utubettl.lua

Lines changed: 15 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,14 @@ local fiber = require('fiber')
33

44
local state = require('queue.abstract.state')
55

6+
local util = require('queue.util')
67
local qc = require('queue.compat')
78
local num_type = qc.num_type
89
local str_type = qc.str_type
910

1011
local tube = {}
1112
local method = {}
1213

13-
local TIMEOUT_INFINITY = 365 * 86400 * 500
14-
1514
local i_id = 1
1615
local i_status = 2
1716
local i_next_event = 3
@@ -22,24 +21,6 @@ local i_created = 7
2221
local i_utube = 8
2322
local i_data = 9
2423

25-
local function time(tm)
26-
if tm == nil then
27-
tm = fiber.time64()
28-
elseif tm < 0 then
29-
tm = 0
30-
else
31-
tm = tm * 1000000
32-
end
33-
return 0ULL + tm
34-
end
35-
36-
local function event_time(tm)
37-
if tm == nil or tm < 0 then
38-
tm = 0
39-
end
40-
return 0ULL + tm * 1000000 + fiber.time64()
41-
end
42-
4324
local function is_expired(task)
4425
local dead_event = task[i_created] + task[i_ttl]
4526
return (dead_event <= fiber.time64())
@@ -59,7 +40,7 @@ end
5940

6041
-- create space
6142
function tube.create_space(space_name, opts)
62-
opts.ttl = opts.ttl or TIMEOUT_INFINITY
43+
opts.ttl = opts.ttl or util.MAX_TIMEOUT
6344
opts.ttr = opts.ttr or opts.ttl
6445
opts.pri = opts.pri or 0
6546

@@ -114,9 +95,9 @@ local ttl_states = { state.READY, state.BURIED }
11495
local ttr_state = { state.TAKEN }
11596

11697
local function utubettl_fiber_iteration(self, processed)
117-
local now = time()
98+
local now = util.time()
11899
local task = nil
119-
local estimated = TIMEOUT_INFINITY
100+
local estimated = util.MAX_TIMEOUT
120101

121102
-- delayed tasks
122103
task = self.space.index.watch:min(delayed_state)
@@ -242,28 +223,28 @@ function method.put(self, data, opts)
242223
if opts.delay ~= nil and opts.delay > 0 then
243224
status = state.DELAYED
244225
ttl = ttl + opts.delay
245-
next_event = event_time(opts.delay)
226+
next_event = util.event_time(opts.delay)
246227
else
247228
status = state.READY
248-
next_event = event_time(ttl)
229+
next_event = util.event_time(ttl)
249230
end
250231

251232
local task = self.space:insert{
252233
id,
253234
status,
254235
next_event,
255-
time(ttl),
256-
time(ttr),
236+
util.time(ttl),
237+
util.time(ttr),
257238
pri,
258-
time(),
239+
util.time(),
259240
tostring(opts.utube),
260241
data
261242
}
262243
self:on_task_change(task, 'put')
263244
return task
264245
end
265246

266-
local TIMEOUT_INFINITY_TIME = time(TIMEOUT_INFINITY)
247+
local TIMEOUT_INFINITY_TIME = util.time(util.MAX_TIMEOUT)
267248

268249
-- touch task
269250
function method.touch(self, id, delta)
@@ -272,7 +253,7 @@ function method.touch(self, id, delta)
272253
{'+', i_ttl, delta},
273254
{'+', i_ttr, delta}
274255
}
275-
if delta == TIMEOUT_INFINITY then
256+
if delta == util.MAX_TIMEOUT then
276257
ops = {
277258
{'=', i_next_event, delta},
278259
{'=', i_ttl, delta},
@@ -291,7 +272,7 @@ function method.take(self)
291272
if t[2] ~= state.READY then
292273
break
293274
elseif not is_expired(t) then
294-
local next_event = time() + t[i_ttr]
275+
local next_event = util.time() + t[i_ttr]
295276
local taken = self.space.index.utube:min{state.TAKEN, t[i_utube]}
296277
if taken == nil or taken[i_status] ~= state.TAKEN then
297278
t = self.space:update(t[1], {
@@ -344,16 +325,16 @@ function method.release(self, id, opts)
344325
if opts.delay ~= nil and opts.delay > 0 then
345326
task = self.space:update(id, {
346327
{ '=', i_status, state.DELAYED },
347-
{ '=', i_next_event, event_time(opts.delay) },
348-
{ '+', i_ttl, time(opts.delay) }
328+
{ '=', i_next_event, util.event_time(opts.delay) },
329+
{ '+', i_ttl, util.time(opts.delay) }
349330
})
350331
if task ~= nil then
351332
return process_neighbour(self, task, 'release')
352333
end
353334
else
354335
task = self.space:update(id, {
355336
{ '=', i_status, state.READY },
356-
{ '=', i_next_event, time(task[i_created] + task[i_ttl]) }
337+
{ '=', i_next_event, util.time(task[i_created] + task[i_ttl]) }
357338
})
358339
end
359340
self:on_task_change(task, 'release')

queue/util.lua

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
local fiber = require('fiber')
2+
3+
-- MAX_TIMEOUT == 100 years
4+
local MAX_TIMEOUT = 365 * 86400 * 100
5+
-- Set to TIMEOUT_INFINITY
6+
-- instead returns time for next event
7+
local TIMEOUT_INFINITY = 18446744073709551615ULL
8+
9+
local function time(tm)
10+
if tm == nil then
11+
tm = fiber.time64()
12+
elseif tm < 0 then
13+
tm = 0
14+
else
15+
tm = tm * 1000000
16+
end
17+
return 0ULL + tm
18+
end
19+
20+
local function event_time(tm)
21+
if tm == nil or tm < 0 then
22+
tm = 0
23+
elseif tm > MAX_TIMEOUT then
24+
return TIMEOUT_INFINITY
25+
end
26+
tm = 0ULL + tm * 1000000 + fiber.time64()
27+
return tm
28+
end
29+
30+
local util = {
31+
MAX_TIMEOUT = MAX_TIMEOUT,
32+
TIMEOUT_INFINITY = TIMEOUT_INFINITY
33+
}
34+
35+
-- methods
36+
local method = {
37+
time = time,
38+
event_time = event_time
39+
}
40+
41+
return setmetatable(util, { __index = method })

0 commit comments

Comments
 (0)