-
Notifications
You must be signed in to change notification settings - Fork 14
Description
Problem
We want to use secondary indexes to iterate over the space, as is done in indexpiration, but also use all expirationd features such as callback and hotreload.
Needs
- Make it available to specify the index for iteration
- It is necessary to preserve backward compatibility, so that if the index is not specified, then the iteration goes as it is now on the primary key
Research
Iterations over Secondary
Now there is a hardcode implementation of iterations by tree or hash index zero, i.e. by primary:
Lines 91 to 100 in 29d1a25
local tuples = scan_space.index[0]:select({}, params) | |
while #tuples > 0 do | |
last_id = tuples[#tuples] | |
for _, tuple in ipairs(tuples) do | |
expiration_process(task, tuple) | |
end | |
suspend(scan_space, task) | |
local key = construct_key(scan_space.id, last_id) | |
tuples = scan_space.index[0]:select(key, params) | |
end |
The logic in indexepiration is now this, some field is used to remove by time (only the time or time64 types are available):
If the time value in the field is greater than zero, then we walk along it in the index:
for _,t in expire_index:pairs({0},{iterator = box.index.GT}) do
if opts.kind == 'time' or opts.kind == 'time64' then
if not typeeq(expire_index.parts[1].type,'num') then
error(("Can't use field %s as %s"):format(opts.field,opts.kind),2+depth)
end
if opts.kind == 'time' then
self.check = function(t)
return t[ expire_field_no ] - clock.realtime()
end
elseif opts.kind == 'time64' then
self.check = function(t)
return tonumber(
ffi.cast('int64_t',t[ expire_field_no ])
- ffi.cast('int64_t',clock.realtime64())
)/1e9
end
end
elseif _callable(opts.kind) then
self.check = opts.kind
else
error(("Unsupported kind: %s"):format(opts.kind),2+depth)
end
In expirationd we can use the specified index from opts and perhaps we need to specify from which element to start the iteration like:
-- as noticed @akudiyar we need direction and stop iteration function
iterator = 'GE'
if not ascending then
iterator = 'LE'
end
local params = {iterator = iterator, limit = task.tuples_per_iteration}
local tuples = scan_space.index.expire_index:select({start_element}, params)
while #tuples > 0 do
last_id = tuples[#tuples]
for _, tuple in ipairs(tuples) do
if stop_iteration(task) then break end
expiration_process(task, tuple)
end
suspend(scan_space, task)
local key = construct_key(scan_space.id, last_id)
-- select all greater then last key
tuples = scan_space.index[0]:select(key, params)
end
Taking the field?
Do we need this feature at all or just specify the index?
we need to think about how we will accept the field for such cases:
- If we want to use a multipart index, do we accept a list of fields? It might be worthwhile to somehow accept the index directly without using the field. One field is passed to indexpiration
- If we accepted only one field, but the index consists of several. In indexpiration, only the index is always taken, where the first part is our field, it doesn't matter if it's a multipart index or a single one
for _, index in pairs(box.space[space_id].index) do -- we use only first part of index, -- because we will have problems with the starting element -- perhaps we should first of all take an index consisting only of our field if index.parts[1].fieldno == expire_field_no then expire_index = index end end
Accordingly, the starting element needs to be considered from the architectural point of view, after understanding how we will take field or fields. And of course the starting element and ascending cannot be used in the HASH index
Transactions
One transaction per batch.
There are no problems if we take into account the transaction per batch. We also need to consider if our function worked stop_iteration, the transaction should be completed.
local tuples = task.expire_index:select(task.start_element, params)
while #tuples > 0 do
last_id = tuples[#tuples]
if task.trx then
box.begin()
end
for _, tuple in ipairs(tuples) do
if task:stop_iteration() then
if task.trx then
box.commit()
end
goto done
end
expiration_process(task, tuple)
end
if task.trx then
box.commit()
end
suspend(task)
local key = construct_key(task.expire_index, last_id)
-- select all greater then last key
tuples = task.expire_index:select(key, params)
end
::done::
Pairs instead select in tree indexation
As noticed @olegrok #52 (comment) it's better to use pairs. For example now iterating over the hash index and done using pairs
Lines 104 to 116 in 29d1a25
local function hash_index_iter(scan_space, task) | |
-- iteration for hash index | |
local checked_tuples_count = 0 | |
for _, tuple in scan_space.index[0]:pairs(nil, {iterator = box.index.ALL}) do | |
checked_tuples_count = checked_tuples_count + 1 | |
expiration_process(task, tuple) | |
-- find out if the worker can go to sleep | |
if checked_tuples_count >= task.tuples_per_iteration then | |
checked_tuples_count = 0 | |
suspend(scan_space, task) | |
end | |
end | |
end |
Proposed API
local format = {
[1] = {name = "id", type = "string"},
[2] = {name = "status", type = "string"},
[3] = {name = "deadline", type = "number"},
[4] = {name = "other", type = "number"},
}
box.schema.space.create('to_expire', {
format = format,
})
box.space.to_expire:create_index('primary', { unique = true, parts = {1, 'str'}, if_not_exists = true})
box.space.to_expire:create_index('exp', { unique = false, parts = { 3, 'number', 1, 'str' }, if_not_exists = true})
simple version
can use start_key instead of start_element?
expirationd.start("clean_all", box.space.to_expire.id, is_expired,
{
index = 'exp',
trx = true, -- one transaction per batch
start_element = function() return clock.time() - (365*24*60*60) end, -- delete data that was added a year ago
iterator = 'LE', -- delete it from the oldest to the newest
stop_full_scan = function( task )
if task.args.max_expired_tuples >= task.expired_tuples_count then -- stop full_scan if delete a lot
task.expired_tuples_count = 0
return true
end
return false
end,
args = {
max_expired_tuples = 1000
}
}
)
flexible versions
Mons generator
expirationd.start("clean_all", box.space.to_expire.id,
function() return true end, -- is_tuple_expired always return true
{
index = 'exp',
trx = true, -- one transaction per batch
-- to do this we should rewrite tree indexing on pairs
iterate = function( task )
return task.space:pairs({ clock.time() - (365*24*60*60) }, { iterator = 'GT' })
:take_while(function(task)
-- return false if you want to stop full scan
if is_too_many_expired(task)
return false
end
return true
end)
end,
args = {
max_expired_tuples = 1000
}
}
)
Maybe we should take the union of implementations from above, the interface will be simpler without take_while:
expirationd.start("clean_all", box.space.to_expire.id,
function() return true end, -- is_tuple_expired always return true
{
index = 'exp',
trx = true, -- one transaction per batch
-- to do this we should rewrite tree indexing on pairs
iterate = function( task )
return task.space:pairs({ clock.time() - (365*24*60*60) }, { iterator = 'GT' })
end,
stop_full_scan = function( task )
if task.args.max_expired_tuples >= task.expired_tuples_count then
task.expired_tuples_count = 0
return true
end
return false
end,
args = {
max_expired_tuples = 1000
}
}
)
-- Template from Sasha
{
gen_first_batch_iterator_params = function(space)
local key = fiber.time() - 86400
local opts = {iterator = 'GT', limit = 1024}
return key, opts
end,
gen_next_batch_iterator_params = <...>,
}