Skip to content

Commit 1c856ee

Browse files
authored
Merge pull request #14654 from rabbitmq/mergify/bp/v4.2.x/pr-14573
Keep exclusive/auto-delete queues with Khepri + network partition (backport #14573)
2 parents 1703e44 + 4826a38 commit 1c856ee

File tree

9 files changed

+803
-105
lines changed

9 files changed

+803
-105
lines changed

deps/rabbit/src/rabbit_amqqueue.erl

Lines changed: 27 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
-export([recover/1, stop/1, start/1, declare/6, declare/7,
1111
delete_immediately/1, delete_exclusive/2, delete/4, purge/1,
12-
forget_all_durable/1]).
12+
forget_all/1]).
1313
-export([pseudo_queue/2, pseudo_queue/3]).
1414
-export([exists/1, lookup/1, lookup/2, lookup_durable_queue/1,
1515
not_found_or_absent_dirty/1,
@@ -1882,19 +1882,19 @@ internal_delete(Queue, ActingUser, Reason) ->
18821882
{user_who_performed_action, ActingUser}])
18831883
end.
18841884

1885-
-spec forget_all_durable(node()) -> 'ok'.
1885+
-spec forget_all(node()) -> 'ok'.
18861886

1887-
%% TODO this is used by `rabbit_mnesia:remove_node_if_mnesia_running`
1888-
%% Does it make any sense once mnesia is not used/removed?
1889-
forget_all_durable(Node) ->
1890-
?LOG_INFO("Will remove all classic queues from node ~ts. The node is likely being removed from the cluster.", [Node]),
1887+
%% This is used by `rabbit_mnesia:remove_node_if_mnesia_running/1' and
1888+
%% `rabbit_khepri:remove_*_member/1'.
1889+
forget_all(Node) ->
1890+
?LOG_INFO("Will remove all queues from node ~ts. The node is likely being removed from the cluster.", [Node]),
18911891
UpdateFun = fun(Q) ->
18921892
forget_node_for_queue(Q)
18931893
end,
18941894
FilterFun = fun(Q) ->
18951895
is_local_to_node(amqqueue:get_pid(Q), Node)
18961896
end,
1897-
rabbit_db_queue:foreach_durable(UpdateFun, FilterFun).
1897+
rabbit_db_queue:foreach(UpdateFun, FilterFun).
18981898

18991899
forget_node_for_queue(Q)
19001900
when ?amqqueue_is_quorum(Q) ->
@@ -1936,27 +1936,31 @@ is_dead_exclusive(Q) when ?amqqueue_exclusive_owner_is_pid(Q) ->
19361936

19371937
-spec on_node_up(node()) -> 'ok'.
19381938

1939-
on_node_up(_Node) ->
1940-
ok.
1939+
on_node_up(Node) ->
1940+
case rabbit_khepri:is_enabled() of
1941+
true ->
1942+
%% With Khepri, we try to delete transient queues now because it's
1943+
%% possible any updates timed out because of the lack of a quorum
1944+
%% while `Node' was down.
1945+
ok = delete_transient_queues_on_node(Node);
1946+
false ->
1947+
ok
1948+
end.
19411949

19421950
-spec on_node_down(node()) -> 'ok'.
19431951

19441952
on_node_down(Node) ->
1945-
case delete_transient_queues_on_node(Node) of
1946-
ok ->
1953+
case rabbit_khepri:is_enabled() of
1954+
true ->
1955+
%% With Khepri, we don't delete transient/exclusive queues. There
1956+
%% may be a network partition and the node will be reachable again
1957+
%% after the partition is repaired.
1958+
%%
1959+
%% If the node will never come back, it will likely be removed from
1960+
%% the cluster. We take care of transient queues at that time.
19471961
ok;
1948-
{error, timeout} ->
1949-
%% This case is possible when running Khepri. The node going down
1950-
%% could leave the cluster in a minority so the command to delete
1951-
%% the transient queue records would fail. Also see
1952-
%% `rabbit_khepri:init/0': we also try this deletion when the node
1953-
%% restarts - a time that the cluster is very likely to have a
1954-
%% majority - to ensure these records are deleted.
1955-
?LOG_WARNING("transient queues for node '~ts' could not be "
1956-
"deleted because of a timeout. These queues "
1957-
"will be removed when node '~ts' restarts or "
1958-
"is removed from the cluster.", [Node, Node]),
1959-
ok
1962+
false ->
1963+
ok = delete_transient_queues_on_node(Node)
19601964
end.
19611965

19621966
-spec delete_transient_queues_on_node(Node) -> Ret when

deps/rabbit/src/rabbit_amqqueue_process.erl

Lines changed: 57 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -352,21 +352,66 @@ terminate_delete(EmitStats, Reason0, ReplyTo,
352352
fun() -> emit_stats(State) end);
353353
true -> ok
354354
end,
355-
%% This try-catch block transforms throws to errors since throws are not
356-
%% logged. When mnesia is removed this `try` can be removed: Khepri
357-
%% returns errors as error tuples instead.
358-
Reply = try rabbit_amqqueue:internal_delete(Q, ActingUser, Reason0) of
359-
ok ->
360-
{ok, Len};
361-
{error, _} = Err ->
362-
Err
363-
catch
364-
{error, ReasonE} -> error(ReasonE)
365-
end,
366-
send_reply(ReplyTo, Reply),
355+
case ReplyTo of
356+
_ when ReplyTo =/= none ->
357+
Reply = case delete_queue_record(Q, ActingUser, Reason0) of
358+
ok ->
359+
{ok, Len};
360+
{error, _} = Err ->
361+
Err
362+
end,
363+
send_reply(ReplyTo, Reply);
364+
none ->
365+
%% No processes are waiting for this queue process to exit. We
366+
%% can handle the deletion of the queue record differently: if
367+
%% the deletion times out, we retry indefinitely.
368+
%%
369+
%% For instance, this allows an auto-delete queue process to
370+
%% wait and retry until a network partition is resolved (or
371+
%% this node stops of course). This reduces the risk of a
372+
%% "leak" of a queue record in the metadata store.
373+
%%
374+
%% If for whatever reason the queue record is still leaked
375+
%% (this process could not delete it before it was killed), the
376+
%% "leaked" queue record will be cleaned up when the partition
377+
%% is solved (or this node is removed from the cluster).
378+
%% Indeed, when the partition is solved, all nodes are notified
379+
%% with the `node_up' message from `rabbit_node_monitor'. This
380+
%% calls `rabbit_amqqueue:on_node_up/1' which will delete any
381+
%% transient queues.
382+
%%
383+
%% This infinite delete attempts loop is executed in a
384+
%% separate process to let this queue process exits. This way,
385+
%% connections will be notified that the queue process is
386+
%% gone.
387+
worker_pool:submit_async(
388+
fun() ->
389+
_ = infinite_internal_delete(Q, ActingUser, Reason0)
390+
end),
391+
ok
392+
end,
367393
BQS1
368394
end.
369395

396+
infinite_internal_delete(Q, ActingUser, Reason) ->
397+
case delete_queue_record(Q, ActingUser, Reason) of
398+
{error, timeout} ->
399+
_ = rabbit_khepri:fence(infinity),
400+
infinite_internal_delete(Q, ActingUser, Reason);
401+
Ret ->
402+
Ret
403+
end.
404+
405+
delete_queue_record(Q, ActingUser, Reason) ->
406+
%% This try-catch block transforms throws to errors since throws are not
407+
%% logged. When mnesia is removed this `try` can be removed: Khepri returns
408+
%% errors as error tuples instead.
409+
try
410+
rabbit_amqqueue:internal_delete(Q, ActingUser, Reason)
411+
catch
412+
{error, ReasonE} -> error(ReasonE)
413+
end.
414+
370415
terminated_by({terminated_by, auto_delete}) ->
371416
?INTERNAL_USER;
372417
terminated_by({terminated_by, ActingUser}) ->

deps/rabbit/src/rabbit_db_queue.erl

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@
3232
delete/2,
3333
update/2,
3434
update_decorators/2,
35-
exists/1
35+
exists/1,
36+
foreach/2
3637
]).
3738

3839
%% Once mnesia is removed, all transient entities will be deleted. These can be replaced
@@ -57,8 +58,7 @@
5758
%% Only used by rabbit_amqqueue:forget_node_for_queue, which is only called
5859
%% by `rabbit_mnesia:remove_node_if_mnesia_running`. Thus, once mnesia and/or
5960
%% HA queues are removed it can be deleted.
60-
-export([foreach_durable/2,
61-
internal_delete/3]).
61+
-export([internal_delete/3]).
6262

6363
%% Storing it on Khepri is not needed, this function is just used in
6464
%% rabbit_quorum_queue to ensure the queue is present in the rabbit_queue
@@ -1263,20 +1263,26 @@ foreach_transient_in_khepri(UpdateFun) ->
12631263
end.
12641264

12651265
%% -------------------------------------------------------------------
1266-
%% foreach_durable().
1266+
%% foreach().
12671267
%% -------------------------------------------------------------------
12681268

1269-
-spec foreach_durable(UpdateFun, FilterFun) -> ok when
1269+
-spec foreach(UpdateFun, FilterFun) -> ok when
12701270
UpdateFun :: fun((Queue) -> any()),
12711271
FilterFun :: fun((Queue) -> boolean()).
1272-
%% @doc Applies `UpdateFun' to all durable queue records that match `FilterFun'.
1272+
%% @doc Applies `UpdateFun' to all queue records that match `FilterFun'.
1273+
%%
1274+
%% With Mnesia, only durable queues are considered because we use the durable
1275+
%% queues table.
1276+
%%
1277+
%% With Khepri, all queues are considered because they are all in the same
1278+
%% "table".
12731279
%%
12741280
%% @private
12751281

1276-
foreach_durable(UpdateFun, FilterFun) ->
1282+
foreach(UpdateFun, FilterFun) ->
12771283
rabbit_khepri:handle_fallback(
12781284
#{mnesia => fun() -> foreach_durable_in_mnesia(UpdateFun, FilterFun) end,
1279-
khepri => fun() -> foreach_durable_in_khepri(UpdateFun, FilterFun) end
1285+
khepri => fun() -> foreach_in_khepri(UpdateFun, FilterFun) end
12801286
}).
12811287

12821288
foreach_durable_in_mnesia(UpdateFun, FilterFun) ->
@@ -1292,11 +1298,8 @@ foreach_durable_in_mnesia(UpdateFun, FilterFun) ->
12921298
end),
12931299
ok.
12941300

1295-
foreach_durable_in_khepri(UpdateFun, FilterFun) ->
1296-
Path = khepri_queue_path(
1297-
?KHEPRI_WILDCARD_STAR,
1298-
#if_data_matches{
1299-
pattern = amqqueue:pattern_match_on_durable(true)}),
1301+
foreach_in_khepri(UpdateFun, FilterFun) ->
1302+
Path = khepri_queue_path(?KHEPRI_WILDCARD_STAR, ?KHEPRI_WILDCARD_STAR),
13001303
case rabbit_khepri:filter(Path, fun(_, #{data := Q}) ->
13011304
FilterFun(Q)
13021305
end) of

deps/rabbit/src/rabbit_khepri.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -670,7 +670,7 @@ remove_reachable_member(NodeToRemove) ->
670670
NodeToRemove, khepri_cluster, reset, [?RA_CLUSTER_NAME]),
671671
case Ret of
672672
ok ->
673-
rabbit_amqqueue:forget_all_durable(NodeToRemove),
673+
rabbit_amqqueue:forget_all(NodeToRemove),
674674
?LOG_DEBUG(
675675
"Node ~s removed from Khepri cluster \"~s\"",
676676
[NodeToRemove, ?RA_CLUSTER_NAME],
@@ -692,7 +692,7 @@ remove_down_member(NodeToRemove) ->
692692
Ret = ra:remove_member(ServerRef, ServerId, Timeout),
693693
case Ret of
694694
{ok, _, _} ->
695-
rabbit_amqqueue:forget_all_durable(NodeToRemove),
695+
rabbit_amqqueue:forget_all(NodeToRemove),
696696
?LOG_DEBUG(
697697
"Node ~s removed from Khepri cluster \"~s\"",
698698
[NodeToRemove, ?RA_CLUSTER_NAME],

deps/rabbit/src/rabbit_mnesia.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -916,7 +916,7 @@ remove_node_if_mnesia_running(Node) ->
916916
case mnesia:del_table_copy(schema, Node) of
917917
{atomic, ok} ->
918918
rabbit_node_monitor:notify_left_cluster(Node),
919-
rabbit_amqqueue:forget_all_durable(Node),
919+
rabbit_amqqueue:forget_all(Node),
920920
ok;
921921
{aborted, Reason} ->
922922
{error, {failed_to_remove_node, Node, Reason}}

deps/rabbit/src/rabbit_node_monitor.erl

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -430,16 +430,8 @@ handle_call(status, _From, State = #state{partitions = Partitions}) ->
430430
handle_call(_Request, _From, State) ->
431431
{noreply, State}.
432432

433-
handle_cast(notify_node_up, State = #state{guid = GUID}) ->
434-
Nodes = rabbit_nodes:list_reachable() -- [node()],
435-
gen_server:abcast(Nodes, ?SERVER,
436-
{node_up, node(), rabbit_db_cluster:node_type(), GUID}),
437-
%% register other active rabbits with this rabbit
438-
DiskNodes = rabbit_db_cluster:disc_members(),
439-
[gen_server:cast(?SERVER, {node_up, N, case lists:member(N, DiskNodes) of
440-
true -> disc;
441-
false -> ram
442-
end}) || N <- Nodes],
433+
handle_cast(notify_node_up, State) ->
434+
do_notify_node_up(State),
443435
{noreply, State};
444436

445437
%%----------------------------------------------------------------------------
@@ -665,6 +657,12 @@ handle_info({nodedown, Node, Info}, State) ->
665657

666658
handle_info({nodeup, Node, _Info}, State) ->
667659
?LOG_INFO("node ~tp up", [Node]),
660+
%% We notify that `rabbit' is up here too (in addition to the message sent
661+
%% explicitly by a boot step. That's because nodes may go down then up
662+
%% during a network partition, and with Khepri, nodes are not restarted
663+
%% (unlike with some partition handling strategies used with Mnesia), and
664+
%% thus the boot steps are not executed.
665+
do_notify_node_up(State),
668666
{noreply, State};
669667

670668
handle_info({mnesia_system_event,
@@ -854,6 +852,20 @@ wait_for_cluster_recovery(Condition) ->
854852
wait_for_cluster_recovery(Condition)
855853
end.
856854

855+
do_notify_node_up(#state{guid = GUID}) ->
856+
Nodes = rabbit_nodes:list_reachable() -- [node()],
857+
gen_server:abcast(Nodes, ?SERVER,
858+
{node_up, node(), rabbit_db_cluster:node_type(), GUID}),
859+
%% register other active rabbits with this rabbit
860+
DiskNodes = rabbit_db_cluster:disc_members(),
861+
_ = [gen_server:cast(
862+
?SERVER,
863+
{node_up, N, case lists:member(N, DiskNodes) of
864+
true -> disc;
865+
false -> ram
866+
end}) || N <- Nodes],
867+
ok.
868+
857869
handle_dead_rabbit(Node, State) ->
858870
%% TODO: This may turn out to be a performance hog when there are
859871
%% lots of nodes. We really only need to execute some of these

deps/rabbit/test/bindings_SUITE.erl

Lines changed: 28 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -96,25 +96,36 @@ end_per_group(_, Config) ->
9696
rabbit_ct_broker_helpers:teardown_steps()).
9797

9898
init_per_testcase(Testcase, Config) ->
99-
Config1 = rabbit_ct_helpers:testcase_started(Config, Testcase),
100-
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, []),
101-
Name = rabbit_data_coercion:to_binary(Testcase),
102-
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_exchange, [Name]),
103-
Config2 = rabbit_ct_helpers:set_config(Config1,
104-
[{queue_name, Name},
105-
{alt_queue_name, <<Name/binary, "_alt">>},
106-
{exchange_name, Name}
107-
]),
108-
rabbit_ct_helpers:run_steps(Config2, rabbit_ct_client_helpers:setup_steps()).
99+
case {Testcase, rabbit_ct_broker_helpers:configured_metadata_store(Config)} of
100+
{transient_queue_on_node_down, khepri} ->
101+
{skip, "Test irrelevant with Khepri"};
102+
_ ->
103+
Config1 = rabbit_ct_helpers:testcase_started(Config, Testcase),
104+
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, []),
105+
Name = rabbit_data_coercion:to_binary(Testcase),
106+
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_exchange, [Name]),
107+
Config2 = rabbit_ct_helpers:set_config(
108+
Config1,
109+
[{queue_name, Name},
110+
{alt_queue_name, <<Name/binary, "_alt">>},
111+
{exchange_name, Name}
112+
]),
113+
rabbit_ct_helpers:run_steps(Config2, rabbit_ct_client_helpers:setup_steps())
114+
end.
109115

110116
end_per_testcase(Testcase, Config) ->
111-
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, []),
112-
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_exchange,
113-
[?config(exchange_name, Config)]),
114-
Config1 = rabbit_ct_helpers:run_steps(
115-
Config,
116-
rabbit_ct_client_helpers:teardown_steps()),
117-
rabbit_ct_helpers:testcase_finished(Config1, Testcase).
117+
case {Testcase, rabbit_ct_broker_helpers:configured_metadata_store(Config)} of
118+
{transient_queue_on_node_down, khepri} ->
119+
Config;
120+
_ ->
121+
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, []),
122+
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_exchange,
123+
[?config(exchange_name, Config)]),
124+
Config1 = rabbit_ct_helpers:run_steps(
125+
Config,
126+
rabbit_ct_client_helpers:teardown_steps()),
127+
rabbit_ct_helpers:testcase_finished(Config1, Testcase)
128+
end.
118129

119130
%% -------------------------------------------------------------------
120131
%% Testcases.

0 commit comments

Comments
 (0)