Skip to content

Commit 9735c15

Browse files
committed
Some new features
Iterations over Secondary Indexes, ascending, start_element and stop_iteration function
1 parent e4d7977 commit 9735c15

File tree

2 files changed

+392
-32
lines changed

2 files changed

+392
-32
lines changed

expirationd.lua

Lines changed: 132 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,9 @@ local constants = {
4545
-- factor for recalculation of vinyl space size
4646
default_vinyl_assumed_space_len_factor = 2,
4747
-- default function on full scan
48-
default_on_full_scan = function(...) end
48+
default_on_full_scan = function(...) end,
49+
-- default iterating over the loop will go in ascending index
50+
iterator = "GE"
4951
}
5052

5153
-- ========================================================================= --
@@ -57,10 +59,10 @@ local constants = {
5759
-- ------------------------------------------------------------------------- --
5860

5961
-- get all fields in primary key(composite possible) from tuple
60-
local function construct_key(space_id, tuple)
62+
local function construct_key(expire_index, tuple)
6163
return fun.map(
6264
function(x) return tuple[x.fieldno] end,
63-
box.space[space_id].index[0].parts
65+
expire_index.parts
6466
):totable()
6567
end
6668

@@ -69,78 +71,77 @@ local function expiration_process(task, tuple)
6971
task.checked_tuples_count = task.checked_tuples_count + 1
7072
if task.is_tuple_expired(task.args, tuple) then
7173
task.expired_tuples_count = task.expired_tuples_count + 1
72-
task.process_expired_tuple(task.space_id, task.args, tuple)
74+
task.process_expired_tuple(task.space_id, task.args, tuple, task)
7375
end
7476
end
7577

7678
-- stop for some time
77-
local function suspend_basic(scan_space, task, len)
79+
local function suspend_basic(task, len)
7880
local delay = (task.tuples_per_iteration * task.full_scan_time)
7981
delay = math.min(delay / len, task.iteration_delay)
8082
fiber.sleep(delay)
8183
end
8284

83-
local function suspend(scan_space, task)
85+
local function suspend(task)
8486
-- Return the number of tuples in the space
85-
local space_len = scan_space:len()
87+
local space_len = task.expire_index:len()
8688
if space_len > 0 then
87-
suspend_basic(scan_space, task, space_len)
89+
suspend_basic(task, space_len)
8890
end
8991
end
9092

9193
-- delete with some suspend and some tuples limit
92-
local function tree_index_iter(scan_space, task)
94+
local function tree_index_iter(task)
9395
-- iteration with GT iterator
94-
local params = {iterator = 'GT', limit = task.tuples_per_iteration}
96+
local params = {iterator = task.iterator, limit = task.tuples_per_iteration}
9597
local last_id
96-
local tuples = scan_space.index[0]:select({}, params)
98+
local tuples = task.expire_index:select(task.start_element, params)
9799
while #tuples > 0 do
98100
last_id = tuples[#tuples]
99101
for _, tuple in ipairs(tuples) do
102+
if task:stop_iteration() then goto done end
100103
expiration_process(task, tuple)
101104
end
102-
suspend(scan_space, task)
103-
local key = construct_key(scan_space.id, last_id)
105+
suspend(task)
106+
local key = construct_key(task.expire_index, last_id)
104107
-- select all greater then last key
105-
tuples = scan_space.index[0]:select(key, params)
108+
tuples = task.expire_index:select(key, params)
106109
end
107-
110+
::done::
108111
end
109112

110-
local function hash_index_iter(scan_space, task)
113+
local function hash_index_iter(task)
111114
-- iteration for hash index
112115
local checked_tuples_count = 0
113-
for _, tuple in scan_space.index[0]:pairs(nil, {iterator = box.index.ALL}) do
116+
for _, tuple in task.expire_index:pairs(nil, {iterator = box.index.ALL}) do
114117
checked_tuples_count = checked_tuples_count + 1
118+
if task:stop_iteration() then break end
115119
expiration_process(task, tuple)
116120
-- find out if the worker can go to sleep
117121
if checked_tuples_count >= task.tuples_per_iteration then
118122
checked_tuples_count = 0
119-
suspend(scan_space, task)
123+
suspend(task)
120124
end
121125
end
122126
end
123127

124128
local function default_do_worker_iteration(task)
125-
local scan_space = box.space[task.space_id]
126-
local index_type = scan_space.index[0].type
129+
local index_type = task.expire_index.type
127130

128131
-- full index scan loop
129132
if index_type == 'HASH' then
130-
hash_index_iter(scan_space, task)
133+
hash_index_iter(task)
131134
else
132-
tree_index_iter(scan_space, task)
135+
tree_index_iter(task)
133136
end
134137
end
135138

136139
local function vinyl_do_worker_iteration(task)
137-
local scan_space = box.space[task.space_id]
138-
139140
local checked_tuples_count = 0
140141
local space_len = task.vinyl_assumed_space_len
141142

142-
local params = {iterator = 'GT', limit = task.tuples_per_iteration}
143-
local tuples = scan_space.index[0]:select({}, params)
143+
local params = {iterator = task.iterator, limit = task.tuples_per_iteration}
144+
local tuples = task.expire_index:select(task.start_element, params)
144145
while true do
145146
local tuple_cnt = #tuples
146147
if tuple_cnt == 0 then
@@ -149,16 +150,24 @@ local function vinyl_do_worker_iteration(task)
149150
local last_id = nil
150151
for _, tuple in ipairs(tuples) do
151152
last_id = tuple
153+
if task:stop_iteration() then
154+
checked_tuples_count = checked_tuples_count + tuple_cnt
155+
if checked_tuples_count > space_len then
156+
space_len = task.vinyl_assumed_space_len_factor * space_len
157+
end
158+
goto done
159+
end
152160
expiration_process(task, tuple)
153161
end
154162
checked_tuples_count = checked_tuples_count + tuple_cnt
155163
if checked_tuples_count > space_len then
156164
space_len = task.vinyl_assumed_space_len_factor * space_len
157165
end
158-
local key = construct_key(scan_space.id, last_id)
159-
suspend_basic(scan_space, task, space_len)
160-
tuples = scan_space.index[0]:select(key, params)
166+
local key = construct_key(task.expire_index, last_id)
167+
suspend_basic(task, space_len)
168+
tuples = task.expire_index:select(key, params)
161169
end
170+
::done::
162171
task.vinyl_assumed_space_len = checked_tuples_count
163172
end
164173

@@ -267,6 +276,8 @@ local function create_task(name)
267276
is_tuple_expired = nil,
268277
process_expired_tuple = nil,
269278
args = nil,
279+
expire_index = nil,
280+
start_element = nil,
270281
iteration_delay = constants.max_delay,
271282
full_scan_delay = constants.max_delay,
272283
tuples_per_iteration = constants.default_tuples_per_iteration,
@@ -276,7 +287,9 @@ local function create_task(name)
276287
on_full_scan_error = constants.default_on_full_scan,
277288
on_full_scan_success = constants.default_on_full_scan,
278289
on_full_scan_start = constants.default_on_full_scan,
279-
on_full_scan_complete = constants.default_on_full_scan
290+
on_full_scan_complete = constants.default_on_full_scan,
291+
stop_iteration = constants.default_on_full_scan,
292+
iterator = constants.iterator
280293
}, { __index = Task_methods })
281294
return task
282295
end
@@ -296,8 +309,9 @@ local function get_task(name)
296309
end
297310

298311
-- default process_expired_tuple function
299-
local function default_tuple_drop(space_id, args, tuple)
300-
box.space[space_id]:delete(construct_key(space_id, tuple))
312+
-- task as parameter was added to preserve backward compatibility
313+
local function default_tuple_drop(space_id, args, tuple, task)
314+
task.expire_index:delete(construct_key(task.expire_index, tuple))
301315
end
302316

303317

@@ -315,6 +329,10 @@ end
315329
-- options = { -- (table with named options)
316330
-- * process_expired_tuple -- applied to expired tuples, receives
317331
-- (space_id, args, tuple) as arguments
332+
-- * index -- name of index, need to have an index on this name (default primary index)
333+
-- * ascending -- false if descending (default true)
334+
-- * start_element -- must be the same data type as index field or fields (default nil)
335+
-- * stop_iteration -- call function before check is tuple expired, if false the iteration will be stopped
318336
-- * on_full_scan_start -- call function on starting full scan iteration
319337
-- * on_full_scan_complete -- call function on complete full scan iteration
320338
-- * on_full_scan_success -- call function on success full scan iteration
@@ -366,6 +384,88 @@ local function expirationd_run_task(name, space_id, is_tuple_expired, options)
366384
end
367385
task.process_expired_tuple = options.process_expired_tuple or default_tuple_drop
368386

387+
-- validate index field
388+
--local expire_index
389+
--if options.field then
390+
-- if type(options.field) ~= "number" and type(options.field) ~= "string" then
391+
-- error("field has a wrong type: " .. type(options.field))
392+
-- end
393+
-- local format_av = box.space[space_id]:format()
394+
-- local format = {}
395+
-- local have_format = false
396+
-- -- add in format numbers of fields
397+
-- -- it is needed to take field_no if field is string e.g. 'name'
398+
-- for no, f in pairs(format_av) do
399+
-- format[ f.name ] = {
400+
-- name = f.name;
401+
-- type = f.type;
402+
-- no = no;
403+
-- }
404+
-- format[ no ] = format[ f.name ];
405+
-- have_format = true
406+
-- end
407+
-- for _, idx in pairs(box.space[space_id].index) do
408+
-- for _, part in pairs(idx.parts) do
409+
-- format[ part.fieldno ] = format[ part.fieldno ] or { no = part.fieldno }
410+
-- format[ part.fieldno ].type = part.type
411+
-- end
412+
-- end
413+
--
414+
-- local expire_field_no
415+
-- if have_format and type(options.field) == "string" then
416+
-- expire_field_no = format[ options.field ].no
417+
-- else
418+
-- expire_field_no = options.field
419+
-- end
420+
-- if type(expire_field_no) ~= 'number' then
421+
-- error("Need correct field option")
422+
-- end
423+
--
424+
-- for _, index in pairs(box.space[space_id].index) do
425+
-- -- we use only first part of index,
426+
-- -- because we will have problems with the starting element
427+
-- -- perhaps we should first of all take an index consisting only of our field
428+
-- if index.parts[1].fieldno == expire_field_no then
429+
-- expire_index = index
430+
-- end
431+
-- end
432+
-- expire_index = expire_index
433+
--else
434+
-- -- take default primary index
435+
-- expire_index = box.space[space_id].index[0]
436+
--end
437+
--task.expire_index = expire_index
438+
439+
-- validate index
440+
local expire_index = box.space[space_id].index[0]
441+
if options.index then
442+
for _, index in pairs(box.space[space_id].index) do
443+
if index.name == options.index then
444+
expire_index = index
445+
end
446+
end
447+
end
448+
task.expire_index = expire_index
449+
450+
-- check ascending
451+
if options.ascending ~= nil then
452+
if type(options.ascending) ~= 'boolean' then
453+
error("Invalid type of ascending")
454+
end
455+
if not options.ascending then
456+
task.iterator = "LE"
457+
end
458+
end
459+
460+
task.start_element = options.start_element
461+
462+
if options.stop_iteration ~= nil then
463+
if type(options.stop_iteration) ~= 'function' then
464+
error("invalid type of stop_iteration is not function")
465+
end
466+
task.stop_iteration = options.stop_iteration
467+
end
468+
369469
-- check expire and process after expiration handler's arguments
370470
task.args = options.args
371471

0 commit comments

Comments
 (0)