From 74a68ff60b97b2b8bfe9fedd1cb08af3a7f81420 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-S=C3=A9bastien=20P=C3=A9dron?= Date: Wed, 29 Oct 2025 15:36:59 +0100 Subject: [PATCH] WIP --- deps/rabbit/src/rabbit_db_cluster.erl | 88 +++++++++++++++++++++------ 1 file changed, 69 insertions(+), 19 deletions(-) diff --git a/deps/rabbit/src/rabbit_db_cluster.erl b/deps/rabbit/src/rabbit_db_cluster.erl index 1fd720e527f..f3245a693ff 100644 --- a/deps/rabbit/src/rabbit_db_cluster.erl +++ b/deps/rabbit/src/rabbit_db_cluster.erl @@ -9,6 +9,7 @@ -module(rabbit_db_cluster). -include_lib("kernel/include/logger.hrl"). +-include_lib("stdlib/include/assert.hrl"). -include_lib("rabbit_common/include/logging.hrl"). @@ -253,27 +254,30 @@ join_using_khepri(_ClusterNodes, ram = NodeType) -> RemoveWhenOffline :: boolean(). %% @doc Removes `Node' from the cluster. -forget_member(Node, RemoveWhenOffline) -> - case forget_member0(Node, RemoveWhenOffline) of - ok -> - rabbit_node_monitor:notify_left_cluster(Node); - Error -> - Error +forget_member(Node, RemoveWhenOffline) + when is_atom(Node) andalso Node =/= node() -> + {ok, InitialState} = lock_cluster_changes(Node), + try + forget_member_locked(Node, RemoveWhenOffline) + after + unlock_cluster_changes(InitialState) end. -forget_member0(Node, RemoveWhenOffline) -> - case rabbit:is_running(Node) of - false -> - ?LOG_DEBUG( - "DB: removing cluster member `~ts`", [Node], - #{domain => ?RMQLOG_DOMAIN_DB}), - case rabbit_khepri:is_enabled() of - true -> forget_member_using_khepri(Node, RemoveWhenOffline); - false -> forget_member_using_mnesia(Node, RemoveWhenOffline) - end; - true -> - {error, {failed_to_remove_node, Node, rabbit_still_running}} - end. +forget_member_locked(Node, RemoveWhenOffline) -> + ?LOG_DEBUG( + "DB: removing cluster member `~ts`", [Node], + #{domain => ?RMQLOG_DOMAIN_DB}), + ?assertNot(rabbit:is_running(Node)), + case rabbit_khepri:is_enabled() of + true -> forget_member_using_khepri(Node, RemoveWhenOffline); + false -> forget_member_using_mnesia(Node, RemoveWhenOffline) + end, + + rabbit_amqqueue:forget_all(Node), + rabbit_quorum_queue:shrink_all(Node), + rabbit_stream_queue:delete_all_replicas(Node), + rabbit_stream_coordinator:forget_node(Node), + rabbit_node_monitor:notify_left_cluster(Node). forget_member_using_mnesia(Node, RemoveWhenOffline) -> rabbit_mnesia:forget_cluster_node(Node, RemoveWhenOffline). @@ -287,6 +291,52 @@ forget_member_using_khepri(_Node, true) -> forget_member_using_khepri(Node, false = _RemoveWhenOffline) -> rabbit_khepri:remove_member(Node). +lock_cluster_changes(ChangingNode) -> + RabbitWasRunning = stop_rabbit_if_running(ChangingNode), + InitialState = #{changing_node => ChangingNode, + rabbit_was_running => RabbitWasRunning}, + + %% We acquire the feature flags registry reload lock because between + %% the time we reset the registry (as part of `rabbit_db:reset/0' and + %% the states copy from the remote node, there could be a concurrent + %% reload of the registry (for instance because of peer discovery on + %% another node) with the default/empty states. + %% + %% To make this work, the lock is also acquired from + %% `rabbit_ff_registry_wrapper'. + rabbit_ff_registry_factory:acquire_state_change_lock(), + {ok, InitialState}. + +stop_rabbit_if_running(ThisNode) when ThisNode =:= node() -> + RabbitWasRunning = rabbit:is_running(), + case RabbitWasRunning of + true -> ok = rabbit:stop(); + false -> ok + end, + RabbitWasRunning; +stop_rabbit_if_running(RemoteNode) when is_atom(RemoteNode) -> + RabbitWasRunning = erpc:call(RemoteNode, rabbit, is_running, []), + case RabbitWasRunning of + true -> ok = erpc:call(RemoteNode, rabbit, stop, []); + false -> ok + end, + RabbitWasRunning. + +unlock_cluster_changes( + #{changing_node := ChangingNode, + rabbit_was_running := RabbitWasRunning}) -> + rabbit_ff_registry_factory:release_state_change_lock(), + start_rabbit_if_was_running(ChangingNode, RabbitWasRunning), + ok. + +start_rabbit_if_was_running(_ChangingNode, false = _RabbitWasRunning) -> + ok; +start_rabbit_if_was_running(ThisNode, true = _RabbitWasRunning) + when ThisNode =:= node() -> + rabbit:start(); +start_rabbit_if_was_running(RemoteNode, true = _RabbitWasRunning) -> + erpc:call(RemoteNode, rabbit, start, []). + %% ------------------------------------------------------------------- %% Cluster update. %% -------------------------------------------------------------------