Skip to content

Commit cd76247

Browse files
committed
Some new features
Iterations over Secondary Indexes, ascending, start_element and stop_iteration function
1 parent e4d7977 commit cd76247

File tree

4 files changed

+614
-35
lines changed

4 files changed

+614
-35
lines changed

Makefile

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
all:
22
@echo "Only tests are available: make test"
33

4+
.PHONY: test
45
test:
6+
.rocks/bin/luatest -v
57
rm -rf *.xlog* *.snap 51{2,3,4,5,6,7}
68
INDEX_TYPE='TREE' SPACE_TYPE='vinyl' ./test.lua
79
rm -rf *.xlog* *.snap 51{2,3,4,5,6,7}

expirationd.lua

Lines changed: 175 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,13 @@ local constants = {
4545
-- factor for recalculation of vinyl space size
4646
default_vinyl_assumed_space_len_factor = 2,
4747
-- default function on full scan
48-
default_on_full_scan = function(...) end
48+
default_on_full_scan = function(...) end,
49+
-- default function for process_while
50+
default_process_while = function(...) return true end,
51+
-- default iterating over the loop will go in ascending index
52+
iterator = "GE",
53+
-- default atomic_iteration is false, batch of items doesn't include in one transaction
54+
atomic_iteration = false
4955
}
5056

5157
-- ========================================================================= --
@@ -57,10 +63,10 @@ local constants = {
5763
-- ------------------------------------------------------------------------- --
5864

5965
-- get all fields in primary key(composite possible) from tuple
60-
local function construct_key(space_id, tuple)
66+
local function construct_key(expire_index, tuple)
6167
return fun.map(
6268
function(x) return tuple[x.fieldno] end,
63-
box.space[space_id].index[0].parts
69+
expire_index.parts
6470
):totable()
6571
end
6672

@@ -69,95 +75,132 @@ local function expiration_process(task, tuple)
6975
task.checked_tuples_count = task.checked_tuples_count + 1
7076
if task.is_tuple_expired(task.args, tuple) then
7177
task.expired_tuples_count = task.expired_tuples_count + 1
72-
task.process_expired_tuple(task.space_id, task.args, tuple)
78+
task.process_expired_tuple(task.space_id, task.args, tuple, task)
7379
end
7480
end
7581

7682
-- stop for some time
77-
local function suspend_basic(scan_space, task, len)
83+
local function suspend_basic(task, len)
7884
local delay = (task.tuples_per_iteration * task.full_scan_time)
7985
delay = math.min(delay / len, task.iteration_delay)
8086
fiber.sleep(delay)
8187
end
8288

83-
local function suspend(scan_space, task)
89+
local function suspend(task)
8490
-- Return the number of tuples in the space
85-
local space_len = scan_space:len()
91+
local space_len = task.expire_index:len()
8692
if space_len > 0 then
87-
suspend_basic(scan_space, task, space_len)
93+
suspend_basic(task, space_len)
8894
end
8995
end
9096

9197
-- delete with some suspend and some tuples limit
92-
local function tree_index_iter(scan_space, task)
98+
local function tree_index_iter(task)
9399
-- iteration with GT iterator
94-
local params = {iterator = 'GT', limit = task.tuples_per_iteration}
100+
local params = {iterator = task.iterator, limit = task.tuples_per_iteration}
95101
local last_id
96-
local tuples = scan_space.index[0]:select({}, params)
102+
local tuples = task.expire_index:select(task.start_key, params)
97103
while #tuples > 0 do
98104
last_id = tuples[#tuples]
105+
if task.atomic_iteration then
106+
box.begin()
107+
end
99108
for _, tuple in ipairs(tuples) do
109+
if not task:process_while() then goto done end
100110
expiration_process(task, tuple)
101111
end
102-
suspend(scan_space, task)
103-
local key = construct_key(scan_space.id, last_id)
112+
if task.atomic_iteration then
113+
box.commit()
114+
end
115+
suspend(task)
116+
local key = construct_key(task.expire_index, last_id)
104117
-- select all greater then last key
105-
tuples = scan_space.index[0]:select(key, params)
118+
tuples = task.expire_index:select(key, {iterator = 'GT', limit = task.tuples_per_iteration})
119+
end
120+
::done::
121+
if task.atomic_iteration then
122+
box.commit()
106123
end
107-
108124
end
109125

110-
local function hash_index_iter(scan_space, task)
126+
local function hash_index_iter(task)
111127
-- iteration for hash index
112128
local checked_tuples_count = 0
113-
for _, tuple in scan_space.index[0]:pairs(nil, {iterator = box.index.ALL}) do
129+
if task.atomic_iteration then
130+
box.begin()
131+
end
132+
for _, tuple in task.expire_index:pairs(nil, {iterator = box.index.ALL}) do
114133
checked_tuples_count = checked_tuples_count + 1
134+
if not task:process_while() then break end
115135
expiration_process(task, tuple)
116136
-- find out if the worker can go to sleep
117137
if checked_tuples_count >= task.tuples_per_iteration then
138+
if task.atomic_iteration then
139+
box.commit()
140+
end
118141
checked_tuples_count = 0
119-
suspend(scan_space, task)
142+
suspend(task)
143+
if task.atomic_iteration then
144+
box.begin()
145+
end
120146
end
121147
end
148+
if task.atomic_iteration then
149+
box.commit()
150+
end
122151
end
123152

124153
local function default_do_worker_iteration(task)
125-
local scan_space = box.space[task.space_id]
126-
local index_type = scan_space.index[0].type
154+
local index_type = task.expire_index.type
127155

128156
-- full index scan loop
129157
if index_type == 'HASH' then
130-
hash_index_iter(scan_space, task)
158+
hash_index_iter(task)
131159
else
132-
tree_index_iter(scan_space, task)
160+
tree_index_iter(task)
133161
end
134162
end
135163

136164
local function vinyl_do_worker_iteration(task)
137-
local scan_space = box.space[task.space_id]
138-
139165
local checked_tuples_count = 0
140166
local space_len = task.vinyl_assumed_space_len
141167

142-
local params = {iterator = 'GT', limit = task.tuples_per_iteration}
143-
local tuples = scan_space.index[0]:select({}, params)
168+
local params = {iterator = task.iterator, limit = task.tuples_per_iteration}
169+
local tuples = task.expire_index:select(task.start_key, params)
144170
while true do
145171
local tuple_cnt = #tuples
146172
if tuple_cnt == 0 then
147173
break
148174
end
149175
local last_id = nil
176+
if task.atomic_iteration then
177+
box.begin()
178+
end
150179
for _, tuple in ipairs(tuples) do
151180
last_id = tuple
181+
if not task:process_while() then
182+
checked_tuples_count = checked_tuples_count + tuple_cnt
183+
if checked_tuples_count > space_len then
184+
space_len = task.vinyl_assumed_space_len_factor * space_len
185+
end
186+
goto done
187+
end
152188
expiration_process(task, tuple)
153189
end
190+
if task.atomic_iteration then
191+
box.commit()
192+
end
154193
checked_tuples_count = checked_tuples_count + tuple_cnt
155194
if checked_tuples_count > space_len then
156195
space_len = task.vinyl_assumed_space_len_factor * space_len
157196
end
158-
local key = construct_key(scan_space.id, last_id)
159-
suspend_basic(scan_space, task, space_len)
160-
tuples = scan_space.index[0]:select(key, params)
197+
local key = construct_key(task.expire_index, last_id)
198+
suspend_basic(task, space_len)
199+
tuples = task.expire_index:select(key, {iterator = "GT", limit = task.tuples_per_iteration})
200+
end
201+
::done::
202+
if task.atomic_iteration then
203+
box.commit()
161204
end
162205
task.vinyl_assumed_space_len = checked_tuples_count
163206
end
@@ -267,6 +310,8 @@ local function create_task(name)
267310
is_tuple_expired = nil,
268311
process_expired_tuple = nil,
269312
args = nil,
313+
expire_index = nil,
314+
start_key = nil,
270315
iteration_delay = constants.max_delay,
271316
full_scan_delay = constants.max_delay,
272317
tuples_per_iteration = constants.default_tuples_per_iteration,
@@ -276,7 +321,10 @@ local function create_task(name)
276321
on_full_scan_error = constants.default_on_full_scan,
277322
on_full_scan_success = constants.default_on_full_scan,
278323
on_full_scan_start = constants.default_on_full_scan,
279-
on_full_scan_complete = constants.default_on_full_scan
324+
on_full_scan_complete = constants.default_on_full_scan,
325+
process_while = constants.default_process_while,
326+
iterator = constants.iterator,
327+
atomic_iteration = constants.atomic_iteration
280328
}, { __index = Task_methods })
281329
return task
282330
end
@@ -296,8 +344,9 @@ local function get_task(name)
296344
end
297345

298346
-- default process_expired_tuple function
299-
local function default_tuple_drop(space_id, args, tuple)
300-
box.space[space_id]:delete(construct_key(space_id, tuple))
347+
-- task as parameter was added to preserve backward compatibility
348+
local function default_tuple_drop(space_id, args, tuple, task)
349+
task.expire_index:delete(construct_key(task.expire_index, tuple))
301350
end
302351

303352

@@ -315,6 +364,11 @@ end
315364
-- options = { -- (table with named options)
316365
-- * process_expired_tuple -- applied to expired tuples, receives
317366
-- (space_id, args, tuple) as arguments
367+
-- * index -- name of index, need to have an index on this name (default primary index)
368+
-- * atomic_iteration -- boolean, true if we want put all items from batch in one transaction
369+
-- * iterator -- iterator which can be used at the given index
370+
-- * start_key -- must be the same data type as index field or fields (default nil)
371+
-- * process_while -- call function before check is tuple expired, if false the iteration will be stopped
318372
-- * on_full_scan_start -- call function on starting full scan iteration
319373
-- * on_full_scan_complete -- call function on complete full scan iteration
320374
-- * on_full_scan_success -- call function on success full scan iteration
@@ -366,21 +420,108 @@ local function expirationd_run_task(name, space_id, is_tuple_expired, options)
366420
end
367421
task.process_expired_tuple = options.process_expired_tuple or default_tuple_drop
368422

423+
-- validate index
424+
local expire_index = box.space[space_id].index[0]
425+
if options.index then
426+
local index_exists = false
427+
for _, index in pairs(box.space[space_id].index) do
428+
if index.name == options.index then
429+
expire_index = index
430+
index_exists = true
431+
end
432+
end
433+
if not index_exists then
434+
error("Index with name " .. options.index .. " not exists")
435+
end
436+
end
437+
task.expire_index = expire_index
438+
439+
-- check iterator
440+
if options.iterator ~= nil then
441+
if type(options.iterator) ~= 'string' then
442+
error("Invalid type of iterator, expected string")
443+
end
444+
if expire_index.type == 'TREE' then
445+
if options.iterator ~= 'ALL' and
446+
options.iterator ~= 'EQ' and
447+
options.iterator ~= 'REQ' and
448+
options.iterator ~= 'GT' and
449+
options.iterator ~= 'GE' and
450+
options.iterator ~= 'LT' and
451+
options.iterator ~= 'LE' then
452+
error("Invalid iterator for TREE index, expected {ALL, EQ, REQ, GT, GE, LT, LE}")
453+
end
454+
elseif expire_index.type == 'HASH' then
455+
if options.iterator ~= 'ALL' and
456+
options.iterator ~= 'EQ' and
457+
options.iterator ~= 'GT' then
458+
error("Invalid iterator for HASH index, expected {ALL, EQ, GT}")
459+
end
460+
end
461+
task.iterator = options.iterator
462+
else
463+
if expire_index.type == 'HASH' then
464+
task.iterator = "ALL"
465+
end
466+
end
467+
468+
-- check start_key
469+
if options.start_key then
470+
if type(options.start_key) == "table" then
471+
if #expire_index.parts ~= #options.start_key then
472+
error("Wrong number of elements in start_key table, expected: " .. #expire_index.parts)
473+
end
474+
for i = 1, #expire_index.parts do
475+
log.info(expire_index.parts[i].type)
476+
if expire_index.parts[i].type ~= type(options.start_key[i]) then
477+
error("Wrong type in " .. i ..
478+
" element of start_key table, expected: " .. expire_index.parts[i].type ..
479+
", got: " .. type(options.start_key[i]))
480+
end
481+
end
482+
else
483+
if #expire_index.parts ~= 1 then
484+
error("Wrong number of elements in start_key table, expected: " .. #expire_index.parts)
485+
end
486+
if expire_index.parts[1].type ~= type(options.start_key) then
487+
error("Wrong type of start_key")
488+
end
489+
end
490+
task.start_key = options.start_key
491+
end
492+
493+
494+
-- check process_while
495+
if options.process_while ~= nil then
496+
if type(options.process_while) ~= 'function' then
497+
error("Invalid type of process_while is not function")
498+
end
499+
task.process_while = options.process_while
500+
end
501+
502+
-- check transaction
503+
if options.atomic_iteration ~= nil then
504+
if type(options.atomic_iteration) ~= 'boolean' then
505+
error("Invalid type of atomic_iteration, expected boolean")
506+
end
507+
task.atomic_iteration = options.atomic_iteration
508+
end
509+
369510
-- check expire and process after expiration handler's arguments
370511
task.args = options.args
371512

372513
-- check tuples per iteration (not required)
373514
if options.tuples_per_iteration ~= nil then
374515
if options.tuples_per_iteration <= 0 then
375-
error("invalid tuples per iteration parameter")
516+
error("Invalid tuples per iteration parameter")
376517
end
377518
task.tuples_per_iteration = options.tuples_per_iteration
378519
end
379520

380521
-- check full scan time
381522
if options.full_scan_time ~= nil then
382523
if options.full_scan_time <= 0 then
383-
error("invalid full scan time")
524+
error("Invalid full scan time")
384525
end
385526
task.full_scan_time = options.full_scan_time
386527
end

test.sh

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,5 +11,9 @@ deb-src https://packagecloud.io/tarantool/${VERSION}/ubuntu/ $release main
1111
EOF
1212

1313
sudo apt-get update > /dev/null
14-
sudo apt-get -q -y install tarantool --force-yes
14+
sudo apt-get -q -y install tarantool tarantool-dev cartridge-cli npm --force-yes
15+
curl -sL https://deb.nodesource.com/setup_15.x | sudo bash -
16+
sudo apt-get install -y nodejs
17+
tarantoolctl rocks install luatest
18+
tarantoolctl rocks install cartridge
1519
make test

0 commit comments

Comments
 (0)