Skip to content

cluster: expel instances removed from config in cluster.sync #424

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@

- Fixed a bug when `server:grep_log()` failed to find a string logged in
`server:exec()` called immediately before it (gh-421).
- Fixed a bug when it wasn't possible to reload the cluster config with
`cluster:reload()` after removing an instance with `cluster:sync()`.
Also added an option to `cluster:sync()` to start/stop added/removed
instances (gh-423).

## 1.1.0

Expand Down
135 changes: 97 additions & 38 deletions luatest/cluster.lua
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,46 @@ end

-- {{{ Helpers

-- Start all instances in the list.
--
-- @tab[opt] opts Options.
-- @bool[opt] opts.wait_until_ready Wait until servers are ready
-- (default: true).
-- @bool[opt] opts.wait_until_running Wait until servers are running
-- (default: wait_until_ready).
local function start_instances(servers, opts)
for _, iserver in ipairs(servers) do
iserver:start({wait_until_ready = false})
end

-- wait_until_ready is true by default.
local wait_until_ready = true
if opts ~= nil and opts.wait_until_ready ~= nil then
wait_until_ready = opts.wait_until_ready
end

if wait_until_ready then
for _, iserver in ipairs(servers) do
iserver:wait_until_ready()
end
end

-- wait_until_running is equal to wait_until_ready by default.
local wait_until_running = wait_until_ready
if opts ~= nil and opts.wait_until_running ~= nil then
wait_until_running = opts.wait_until_running
end

if wait_until_running then
for _, iserver in ipairs(servers) do
helpers.retrying({timeout = 60}, function()
assertions.assert_equals(iserver:eval('return box.info.status'),
'running')
end)
end
end
end

-- Collect names of all the instances defined in the config
-- in the alphabetical order.
local function instance_names_from_config(config)
Expand Down Expand Up @@ -131,39 +171,11 @@ end
--
-- @tab[opt] opts Cluster startup options.
-- @bool[opt] opts.wait_until_ready Wait until servers are ready
-- (default: false).
-- (default: true).
-- @bool[opt] opts.wait_until_running Wait until servers are running
-- (default: wait_until_ready).
function Cluster:start(opts)
self:each(function(iserver)
iserver:start({wait_until_ready = false})
end)

-- wait_until_ready is true by default.
local wait_until_ready = true
if opts ~= nil and opts.wait_until_ready ~= nil then
wait_until_ready = opts.wait_until_ready
end

if wait_until_ready then
self:each(function(iserver)
iserver:wait_until_ready()
end)
end

-- wait_until_running is equal to wait_until_ready by default.
local wait_until_running = wait_until_ready
if opts ~= nil and opts.wait_until_running ~= nil then
wait_until_running = opts.wait_until_running
end

if wait_until_running then
self:each(function(iserver)
helpers.retrying({timeout = 60}, function()
assertions.assert_equals(iserver:eval('return box.info.status'),
'running')
end)

end)
end
start_instances(self._servers, opts)
end

--- Start the given instance.
Expand All @@ -187,8 +199,12 @@ function Cluster:drop()
for _, iserver in ipairs(self._servers or {}) do
iserver:drop()
end
for _, iserver in ipairs(self._expelled_servers or {}) do
iserver:drop()
end
self._servers = nil
self._server_map = nil
self._expelled_servers = nil
end

--- Sync the cluster object with the new config.
Expand All @@ -197,25 +213,67 @@ end
--
-- * Write the new config into the config file.
-- * Update the internal list of instances.
-- * Optionally starts instances added to the config and stops instances
-- removed from the config.
--
-- @tab config New config.
function Cluster:sync(config)
-- @tab[opt] opts Options.
-- @bool[opt] opts.start_stop Start/stop added/removed servers
-- (default: false).
-- @bool[opt] opts.wait_until_ready Wait until servers are ready
-- (default: true; used only if start_stop is set).
-- @bool[opt] opts.wait_until_running Wait until servers are running
-- (default: wait_until_ready; used only if start_stop is set).
function Cluster:sync(config, opts)
assert(type(config) == 'table')

local instance_names = instance_names_from_config(config)

treegen.write_file(self._dir, self._config_file_rel, yaml.encode(config))

for i, name in ipairs(instance_names) do
if self._server_map[name] == nil then
local iserver = server:new(fun.chain(self._server_opts, {
local server_map = self._server_map
self._server_map = {}
self._servers = {}
local new_servers = {}

for _, name in ipairs(instance_names) do
local iserver = server_map[name]
if iserver == nil then
iserver = server:new(fun.chain(self._server_opts, {
alias = name,
}):tomap())
table.insert(self._servers, i, iserver)
self._server_map[name] = iserver
table.insert(new_servers, iserver)
else
server_map[name] = nil
end
self._server_map[name] = iserver
table.insert(self._servers, iserver)
end

local expelled_servers = {}
for _, iserver in pairs(server_map) do
table.insert(expelled_servers, iserver)
end

-- Sort expelled servers by name for reproducibility.
table.sort(expelled_servers, function(a, b) return a.alias < b.alias end)

-- Add expelled servers to the list to be dropped with the cluster.
for _, iserver in pairs(expelled_servers) do
table.insert(self._expelled_servers, iserver)
end

local start_stop = false
if opts ~= nil and opts.start_stop ~= nil then
start_stop = opts.start_stop
end

if start_stop then
start_instances(new_servers, opts)
for _, iserver in ipairs(expelled_servers) do
iserver:stop()
end
end
end

--- Reload configuration on all the instances.
Expand Down Expand Up @@ -297,6 +355,7 @@ function Cluster:new(config, server_opts, opts)
-- Store a cluster object in 'g'.
self._servers = servers
self._server_map = server_map
self._expelled_servers = {}
self._dir = dir
self._config_file_rel = config_file_rel
self._server_opts = server_opts
Expand Down
76 changes: 70 additions & 6 deletions test/cluster_test.lua
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,6 @@ g.test_sync = function()
c:start()
assert_instance_running(c, 'i-001')

c:stop()
assert_instance_stopped(c, 'i-001')

local config2 = cbuilder:new()
:use_group('g-001')
:use_replicaset('r-001')
Expand All @@ -156,18 +153,85 @@ g.test_sync = function()

:config()

local server1 = c['i-001']

c:sync(config2)

t.assert_equals(c:size(), 3)
-- Check that the server that was removed from the config is expelled
-- from the cluster but not stopped.
t.assert_equals(c:size(), 2)
t.assert_is(c['i-001'], nil)
t.assert_is_not(server1.process, nil)

c:start_instance('i-002')
c:start_instance('i-003')
-- Check that the new servers are not started.
assert_instance_stopped(c, 'i-002')
assert_instance_stopped(c, 'i-003')

c:start()
assert_instance_running(c, 'i-002')
assert_instance_running(c, 'i-003')

-- Check config reload works after sync.
c:reload()

c:stop()
assert_instance_stopped(c, 'i-002')
assert_instance_stopped(c, 'i-003')

-- Starting/stopping the cluster shouldn't affect the expelled server.
-- However, dropping the cluster should also drop the expelled server.
t.assert_is_not(server1.process, nil)
c:drop()
t.assert_is(server1.process, nil)
end

g.test_sync_start_stop = function()
t.run_only_if(utils.version_current_ge_than(3, 0, 0),
[[Declarative configuration works on Tarantool 3.0.0+.
See tarantool/tarantool@13149d65bc9d for details]])

t.assert_equals(g._cluster, nil)

local config = cbuilder:new()
:use_group('g-001')
:use_replicaset('r-001')
:add_instance('i-001', {})
:config()

local c = cluster:new(config, server_opts)

t.assert_equals(c:size(), 1)

c:start()
assert_instance_running(c, 'i-001')

local config2 = cbuilder:new()
:use_group('g-001')
:use_replicaset('r-001')
:add_instance('i-002', {})

:use_group('g-002')
:use_replicaset('r-002')
:add_instance('i-003', {})

:config()

local server1 = c['i-001']

c:sync(config2, {start_stop = true})

-- Check that the server that was removed from the config is expelled
-- from the cluster and stopped.
t.assert_equals(c:size(), 2)
t.assert_is(c['i-001'], nil)
t.assert_is(server1.process, nil)

-- Check that the new servers are started.
assert_instance_running(c, 'i-002')
assert_instance_running(c, 'i-003')

-- Check config reload works after sync.
c:reload()
end

g.test_reload = function()
Expand Down
Loading