Skip to content

Commit b850e37

Browse files
committed
Keep exclusive queues with Khepri + network partition
[Why] With Mnesia, when the network partition strategy is set to `pause_minority`, nodes on the "minority side" are stopped. Thus, the exclusive queues that were hosted by nodes on that minority side are lost: * Consumers connected on these nodes are disconnected because the nodes are stopped. * Queue records on the majority side are deleted from the metadata store. This was ok with Mnesia and how this network partition handling strategy is implemented. However, it does not work with Khepri because the nodes on the "minority side" continue to run and serve clients. Therefore the cluster ends up in a weird situation: 1. The "majority side" deleted the queue records. 2. When the network partition is solved, the "minority side" gets the record deletion, but the queue processes continue to run. This was similar for auto-delete queues. [How] With Khepri, we stop to delete transient queue records in general, just because there is a node going down. Thanks to this, an exclusive or an auto-delete queue and its consumer(s) are not affected by a network partition: they continue to work. However, if a node is really lost, we need to clean up dead queue records. This was already done for durable queues with both Mnesia and Khepri. But with Khepri, transient queue records persist in the store like durable queue records (unlike with Mnesia). That's why this commit changes the clean-up function, `rabbit_amqqueue:forget_all_durable/1` into `rabbit_amqqueue:forget_all/1` which deletes all queue records of queues that were hosted on the given node, regardless if they are transient or durable. In addition to this, the queue process will try to delete the underlying record indefinitely if no other processes are waiting for a reply from the queue process. That's the case for queues that are deleted because of an internal event (like the exclusive/auto-delete conditions). Thanks to this, the queue process will do its best to delete its record in case of a network partition, whether the consumers go away during or after that partition. And the node monitor drives some failsafe code that cleans up record if the queue process was killed before it could delete its own record. Fixes #12949, #12597, #14527.
1 parent a5fed7b commit b850e37

File tree

8 files changed

+722
-95
lines changed

8 files changed

+722
-95
lines changed

deps/rabbit/src/rabbit_amqqueue.erl

Lines changed: 27 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
-export([recover/1, stop/1, start/1, declare/6, declare/7,
1111
delete_immediately/1, delete_exclusive/2, delete/4, purge/1,
12-
forget_all_durable/1]).
12+
forget_all/1]).
1313
-export([pseudo_queue/2, pseudo_queue/3]).
1414
-export([exists/1, lookup/1, lookup/2, lookup_durable_queue/1,
1515
not_found_or_absent_dirty/1,
@@ -1882,19 +1882,19 @@ internal_delete(Queue, ActingUser, Reason) ->
18821882
{user_who_performed_action, ActingUser}])
18831883
end.
18841884

1885-
-spec forget_all_durable(node()) -> 'ok'.
1885+
-spec forget_all(node()) -> 'ok'.
18861886

1887-
%% TODO this is used by `rabbit_mnesia:remove_node_if_mnesia_running`
1888-
%% Does it make any sense once mnesia is not used/removed?
1889-
forget_all_durable(Node) ->
1890-
?LOG_INFO("Will remove all classic queues from node ~ts. The node is likely being removed from the cluster.", [Node]),
1887+
%% This is used by `rabbit_mnesia:remove_node_if_mnesia_running/1' and
1888+
%% `rabbit_khepri:remove_*_member/1'.
1889+
forget_all(Node) ->
1890+
?LOG_INFO("Will remove all queues from node ~ts. The node is likely being removed from the cluster.", [Node]),
18911891
UpdateFun = fun(Q) ->
18921892
forget_node_for_queue(Q)
18931893
end,
18941894
FilterFun = fun(Q) ->
18951895
is_local_to_node(amqqueue:get_pid(Q), Node)
18961896
end,
1897-
rabbit_db_queue:foreach_durable(UpdateFun, FilterFun).
1897+
rabbit_db_queue:foreach(UpdateFun, FilterFun).
18981898

18991899
forget_node_for_queue(Q)
19001900
when ?amqqueue_is_quorum(Q) ->
@@ -1936,27 +1936,31 @@ is_dead_exclusive(Q) when ?amqqueue_exclusive_owner_is_pid(Q) ->
19361936

19371937
-spec on_node_up(node()) -> 'ok'.
19381938

1939-
on_node_up(_Node) ->
1940-
ok.
1939+
on_node_up(Node) ->
1940+
case rabbit_khepri:is_enabled() of
1941+
true ->
1942+
%% With Khepri, we try to delete transient queues now because it's
1943+
%% possible any updates timed out because of the lack of a quorum
1944+
%% while `Node' was down.
1945+
ok = delete_transient_queues_on_node(Node);
1946+
false ->
1947+
ok
1948+
end.
19411949

19421950
-spec on_node_down(node()) -> 'ok'.
19431951

19441952
on_node_down(Node) ->
1945-
case delete_transient_queues_on_node(Node) of
1946-
ok ->
1953+
case rabbit_khepri:is_enabled() of
1954+
true ->
1955+
%% With Khepri, we don't delete transient/exclusive queues. There
1956+
%% may be a network partition and the node will be reachable again
1957+
%% after the partition is repaired.
1958+
%%
1959+
%% If the node will never come back, it will likely be removed from
1960+
%% the cluster. We take care of transient queues at that time.
19471961
ok;
1948-
{error, timeout} ->
1949-
%% This case is possible when running Khepri. The node going down
1950-
%% could leave the cluster in a minority so the command to delete
1951-
%% the transient queue records would fail. Also see
1952-
%% `rabbit_khepri:init/0': we also try this deletion when the node
1953-
%% restarts - a time that the cluster is very likely to have a
1954-
%% majority - to ensure these records are deleted.
1955-
?LOG_WARNING("transient queues for node '~ts' could not be "
1956-
"deleted because of a timeout. These queues "
1957-
"will be removed when node '~ts' restarts or "
1958-
"is removed from the cluster.", [Node, Node]),
1959-
ok
1962+
false ->
1963+
ok = delete_transient_queues_on_node(Node)
19601964
end.
19611965

19621966
-spec delete_transient_queues_on_node(Node) -> Ret when

deps/rabbit/src/rabbit_amqqueue_process.erl

Lines changed: 52 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -352,21 +352,61 @@ terminate_delete(EmitStats, Reason0, ReplyTo,
352352
fun() -> emit_stats(State) end);
353353
true -> ok
354354
end,
355-
%% This try-catch block transforms throws to errors since throws are not
356-
%% logged. When mnesia is removed this `try` can be removed: Khepri
357-
%% returns errors as error tuples instead.
358-
Reply = try rabbit_amqqueue:internal_delete(Q, ActingUser, Reason0) of
359-
ok ->
360-
{ok, Len};
361-
{error, _} = Err ->
362-
Err
363-
catch
364-
{error, ReasonE} -> error(ReasonE)
365-
end,
366-
send_reply(ReplyTo, Reply),
355+
case ReplyTo of
356+
_ when ReplyTo =/= none ->
357+
%% This try-catch block transforms throws to errors since
358+
%% throws are not logged. When mnesia is removed this `try`
359+
%% can be removed: Khepri returns errors as error tuples
360+
%% instead.
361+
Reply = try rabbit_amqqueue:internal_delete(Q, ActingUser, Reason0) of
362+
ok ->
363+
{ok, Len};
364+
{error, _} = Err ->
365+
Err
366+
catch
367+
{error, ReasonE} -> error(ReasonE)
368+
end,
369+
send_reply(ReplyTo, Reply);
370+
none ->
371+
%% No processes are waiting for this queue process to exit. We
372+
%% can handle the deletion of the queue record differently: if
373+
%% the deletion times out, we retry indefinitely.
374+
%%
375+
%% For instance, this allows an auto-delete queue process to
376+
%% wait and retry until a network partition is resolved (or
377+
%% this node stops of course). This reduces the risk of a
378+
%% "leak" of a queue record in the metadata store.
379+
%%
380+
%% If for whatever reason the queue record is still leaked
381+
%% (this process could not delete it before it was killed), the
382+
%% "leaked" queue record will be cleaned up when the partition
383+
%% is solved (or this node is removed from the cluster).
384+
%% Indeed, when the partition is solved, all nodes are notified
385+
%% with the `node_up' message from `rabbit_node_monitor'. This
386+
%% calls `rabbit_amqqueue:on_node_up/1' which will delete any
387+
%% transient queues.
388+
_ = infinite_internal_delete(Q, ActingUser, Reason0),
389+
ok
390+
end,
367391
BQS1
368392
end.
369393

394+
infinite_internal_delete(Q, ActingUser, Reason) ->
395+
%% This try-catch block transforms throws to errors since throws are not
396+
%% logged. When mnesia is removed this `try` can be removed: Khepri returns
397+
%% errors as error tuples instead.
398+
Ret = try
399+
rabbit_amqqueue:internal_delete(Q, ActingUser, Reason)
400+
catch
401+
{error, ReasonE} -> error(ReasonE)
402+
end,
403+
case Ret of
404+
{error, timeout} ->
405+
infinite_internal_delete(Q, ActingUser, Reason);
406+
_ ->
407+
Ret
408+
end.
409+
370410
terminated_by({terminated_by, auto_delete}) ->
371411
?INTERNAL_USER;
372412
terminated_by({terminated_by, ActingUser}) ->

deps/rabbit/src/rabbit_db_queue.erl

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@
3232
delete/2,
3333
update/2,
3434
update_decorators/2,
35-
exists/1
35+
exists/1,
36+
foreach/2
3637
]).
3738

3839
%% Once mnesia is removed, all transient entities will be deleted. These can be replaced
@@ -57,8 +58,7 @@
5758
%% Only used by rabbit_amqqueue:forget_node_for_queue, which is only called
5859
%% by `rabbit_mnesia:remove_node_if_mnesia_running`. Thus, once mnesia and/or
5960
%% HA queues are removed it can be deleted.
60-
-export([foreach_durable/2,
61-
internal_delete/3]).
61+
-export([internal_delete/3]).
6262

6363
%% Storing it on Khepri is not needed, this function is just used in
6464
%% rabbit_quorum_queue to ensure the queue is present in the rabbit_queue
@@ -1263,20 +1263,26 @@ foreach_transient_in_khepri(UpdateFun) ->
12631263
end.
12641264

12651265
%% -------------------------------------------------------------------
1266-
%% foreach_durable().
1266+
%% foreach().
12671267
%% -------------------------------------------------------------------
12681268

1269-
-spec foreach_durable(UpdateFun, FilterFun) -> ok when
1269+
-spec foreach(UpdateFun, FilterFun) -> ok when
12701270
UpdateFun :: fun((Queue) -> any()),
12711271
FilterFun :: fun((Queue) -> boolean()).
1272-
%% @doc Applies `UpdateFun' to all durable queue records that match `FilterFun'.
1272+
%% @doc Applies `UpdateFun' to all queue records that match `FilterFun'.
1273+
%%
1274+
%% With Mnesia, only durable queues are considered because we use the durable
1275+
%% queues table.
1276+
%%
1277+
%% With Khepri, all queues are considered because they are all in the same
1278+
%% "table".
12731279
%%
12741280
%% @private
12751281

1276-
foreach_durable(UpdateFun, FilterFun) ->
1282+
foreach(UpdateFun, FilterFun) ->
12771283
rabbit_khepri:handle_fallback(
12781284
#{mnesia => fun() -> foreach_durable_in_mnesia(UpdateFun, FilterFun) end,
1279-
khepri => fun() -> foreach_durable_in_khepri(UpdateFun, FilterFun) end
1285+
khepri => fun() -> foreach_in_khepri(UpdateFun, FilterFun) end
12801286
}).
12811287

12821288
foreach_durable_in_mnesia(UpdateFun, FilterFun) ->
@@ -1292,11 +1298,8 @@ foreach_durable_in_mnesia(UpdateFun, FilterFun) ->
12921298
end),
12931299
ok.
12941300

1295-
foreach_durable_in_khepri(UpdateFun, FilterFun) ->
1296-
Path = khepri_queue_path(
1297-
?KHEPRI_WILDCARD_STAR,
1298-
#if_data_matches{
1299-
pattern = amqqueue:pattern_match_on_durable(true)}),
1301+
foreach_in_khepri(UpdateFun, FilterFun) ->
1302+
Path = khepri_queue_path(?KHEPRI_WILDCARD_STAR, ?KHEPRI_WILDCARD_STAR),
13001303
case rabbit_khepri:filter(Path, fun(_, #{data := Q}) ->
13011304
FilterFun(Q)
13021305
end) of

deps/rabbit/src/rabbit_khepri.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -670,7 +670,7 @@ remove_reachable_member(NodeToRemove) ->
670670
NodeToRemove, khepri_cluster, reset, [?RA_CLUSTER_NAME]),
671671
case Ret of
672672
ok ->
673-
rabbit_amqqueue:forget_all_durable(NodeToRemove),
673+
rabbit_amqqueue:forget_all(NodeToRemove),
674674
?LOG_DEBUG(
675675
"Node ~s removed from Khepri cluster \"~s\"",
676676
[NodeToRemove, ?RA_CLUSTER_NAME],
@@ -692,7 +692,7 @@ remove_down_member(NodeToRemove) ->
692692
Ret = ra:remove_member(ServerRef, ServerId, Timeout),
693693
case Ret of
694694
{ok, _, _} ->
695-
rabbit_amqqueue:forget_all_durable(NodeToRemove),
695+
rabbit_amqqueue:forget_all(NodeToRemove),
696696
?LOG_DEBUG(
697697
"Node ~s removed from Khepri cluster \"~s\"",
698698
[NodeToRemove, ?RA_CLUSTER_NAME],

deps/rabbit/src/rabbit_mnesia.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -916,7 +916,7 @@ remove_node_if_mnesia_running(Node) ->
916916
case mnesia:del_table_copy(schema, Node) of
917917
{atomic, ok} ->
918918
rabbit_node_monitor:notify_left_cluster(Node),
919-
rabbit_amqqueue:forget_all_durable(Node),
919+
rabbit_amqqueue:forget_all(Node),
920920
ok;
921921
{aborted, Reason} ->
922922
{error, {failed_to_remove_node, Node, Reason}}

deps/rabbit/test/bindings_SUITE.erl

Lines changed: 28 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -96,25 +96,36 @@ end_per_group(_, Config) ->
9696
rabbit_ct_broker_helpers:teardown_steps()).
9797

9898
init_per_testcase(Testcase, Config) ->
99-
Config1 = rabbit_ct_helpers:testcase_started(Config, Testcase),
100-
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, []),
101-
Name = rabbit_data_coercion:to_binary(Testcase),
102-
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_exchange, [Name]),
103-
Config2 = rabbit_ct_helpers:set_config(Config1,
104-
[{queue_name, Name},
105-
{alt_queue_name, <<Name/binary, "_alt">>},
106-
{exchange_name, Name}
107-
]),
108-
rabbit_ct_helpers:run_steps(Config2, rabbit_ct_client_helpers:setup_steps()).
99+
case {Testcase, rabbit_ct_broker_helpers:configured_metadata_store(Config)} of
100+
{transient_queue_on_node_down, khepri} ->
101+
{skip, "Test irrelevant with Khepri"};
102+
_ ->
103+
Config1 = rabbit_ct_helpers:testcase_started(Config, Testcase),
104+
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, []),
105+
Name = rabbit_data_coercion:to_binary(Testcase),
106+
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_exchange, [Name]),
107+
Config2 = rabbit_ct_helpers:set_config(
108+
Config1,
109+
[{queue_name, Name},
110+
{alt_queue_name, <<Name/binary, "_alt">>},
111+
{exchange_name, Name}
112+
]),
113+
rabbit_ct_helpers:run_steps(Config2, rabbit_ct_client_helpers:setup_steps())
114+
end.
109115

110116
end_per_testcase(Testcase, Config) ->
111-
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, []),
112-
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_exchange,
113-
[?config(exchange_name, Config)]),
114-
Config1 = rabbit_ct_helpers:run_steps(
115-
Config,
116-
rabbit_ct_client_helpers:teardown_steps()),
117-
rabbit_ct_helpers:testcase_finished(Config1, Testcase).
117+
case {Testcase, rabbit_ct_broker_helpers:configured_metadata_store(Config)} of
118+
{transient_queue_on_node_down, khepri} ->
119+
Config;
120+
_ ->
121+
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, []),
122+
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_exchange,
123+
[?config(exchange_name, Config)]),
124+
Config1 = rabbit_ct_helpers:run_steps(
125+
Config,
126+
rabbit_ct_client_helpers:teardown_steps()),
127+
rabbit_ct_helpers:testcase_finished(Config1, Testcase)
128+
end.
118129

119130
%% -------------------------------------------------------------------
120131
%% Testcases.

0 commit comments

Comments
 (0)