@@ -74,7 +74,7 @@ local function expiration_process(task, tuple)
74
74
end
75
75
76
76
-- yield for some time
77
- local function suspend_basic (scan_space , task , len )
77
+ local function suspend_basic (task , len )
78
78
local delay = (task .tuples_per_iteration * task .full_scan_time )
79
79
delay = math.min (delay / len , task .iteration_delay )
80
80
fiber .sleep (delay )
@@ -84,81 +84,39 @@ local function suspend(scan_space, task)
84
84
-- Return the number of tuples in the space
85
85
local space_len = scan_space :len ()
86
86
if space_len > 0 then
87
- suspend_basic (scan_space , task , space_len )
87
+ suspend_basic (task , space_len )
88
88
end
89
89
end
90
90
91
- local function tree_index_iter (scan_space , task )
92
- -- iteration with GT iterator
93
- local params = {iterator = ' GT' , limit = task .tuples_per_iteration }
94
- local last_id
95
- local tuples = scan_space .index [0 ]:select ({}, params )
96
- while # tuples > 0 do
97
- last_id = tuples [# tuples ]
98
- for _ , tuple in ipairs (tuples ) do
99
- expiration_process (task , tuple )
100
- end
101
- suspend (scan_space , task )
102
- local key = construct_key (scan_space .id , last_id )
103
- tuples = scan_space .index [0 ]:select (key , params )
104
- end
105
-
106
- end
107
-
108
- local function hash_index_iter (scan_space , task )
109
- -- iteration for hash index
91
+ local function default_do_worker_iteration (task )
92
+ -- full index scan loop
93
+ local scan_space = box .space [task .space_id ]
94
+ local space_len = task .vinyl_assumed_space_len
110
95
local checked_tuples_count = 0
96
+ local vinyl_checked_tuples_count = 0
111
97
for _ , tuple in scan_space .index [0 ]:pairs (nil , {iterator = box .index .ALL }) do
112
98
checked_tuples_count = checked_tuples_count + 1
99
+ vinyl_checked_tuples_count = vinyl_checked_tuples_count + 1
113
100
expiration_process (task , tuple )
114
101
-- find out if the worker can go to sleep
102
+ -- if the batch is full
115
103
if checked_tuples_count >= task .tuples_per_iteration then
116
104
checked_tuples_count = 0
117
- suspend (scan_space , task )
105
+ if scan_space .engine == ' vinyl' then
106
+ if vinyl_checked_tuples_count > space_len then
107
+ space_len = task .vinyl_assumed_space_len_factor * space_len
108
+ end
109
+ suspend_basic (task , space_len )
110
+ else
111
+ suspend (scan_space , task )
112
+ end
118
113
end
119
114
end
120
- end
121
-
122
- local function default_do_worker_iteration (task )
123
- local scan_space = box .space [task .space_id ]
124
- local index_type = scan_space .index [0 ].type
125
-
126
- -- full index scan loop
127
- if index_type == ' HASH' then
128
- hash_index_iter (scan_space , task )
129
- else
130
- tree_index_iter (scan_space , task )
115
+ if scan_space .engine == ' vinyl' then
116
+ task .vinyl_assumed_space_len = vinyl_checked_tuples_count
131
117
end
132
118
end
133
119
134
- local function vinyl_do_worker_iteration (task )
135
- local scan_space = box .space [task .space_id ]
136
-
137
- local checked_tuples_count = 0
138
- local space_len = task .vinyl_assumed_space_len
139
-
140
- local params = {iterator = ' GT' , limit = task .tuples_per_iteration }
141
- local tuples = scan_space .index [0 ]:select ({}, params )
142
- while true do
143
- local tuple_cnt = # tuples
144
- if tuple_cnt == 0 then
145
- break
146
- end
147
- local last_id = nil
148
- for _ , tuple in ipairs (tuples ) do
149
- last_id = tuple
150
- expiration_process (task , tuple )
151
- end
152
- checked_tuples_count = checked_tuples_count + tuple_cnt
153
- if checked_tuples_count > space_len then
154
- space_len = task .vinyl_assumed_space_len_factor * space_len
155
- end
156
- local key = construct_key (scan_space .id , last_id )
157
- suspend_basic (scan_space , task , space_len )
158
- tuples = scan_space .index [0 ]:select (key , params )
159
- end
160
- task .vinyl_assumed_space_len = checked_tuples_count
161
- end
162
120
163
121
local function worker_loop (task )
164
122
-- detach worker from the guardian and attach it to sched fiber
@@ -404,11 +362,7 @@ local function expirationd_run_task(name, space_id, is_tuple_expired, options)
404
362
task .vinyl_assumed_space_len_factor = options .vinyl_assumed_space_len_factor
405
363
end
406
364
407
- if box .space [task .space_id ].engine == ' vinyl' then
408
- task .do_worker_iteration = vinyl_do_worker_iteration
409
- else
410
- task .do_worker_iteration = default_do_worker_iteration
411
- end
365
+ task .do_worker_iteration = default_do_worker_iteration
412
366
413
367
if options .iteration_delay ~= nil then
414
368
if type (options .iteration_delay ) ~= ' number' then
0 commit comments