Skip to content

Commit 9dfa931

Browse files
committed
Purge stale node state from quorum queues
When a node is disconnected then removed from the RabbitMQ cluster it is possibly that quorum queues retain some state for this node. This change purges any enqueuer or consumer state for pids relating to nodes that are not in the RabbitMQ cluster. [#164214265]
1 parent c722c96 commit 9dfa931

File tree

4 files changed

+158
-33
lines changed

4 files changed

+158
-33
lines changed

src/rabbit_fifo.erl

Lines changed: 80 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
make_discard/2,
6363
make_credit/4,
6464
make_purge/0,
65+
make_purge_nodes/1,
6566
make_update_config/1
6667
]).
6768

@@ -83,6 +84,7 @@
8384
delivery_count :: non_neg_integer(),
8485
drain :: boolean()}).
8586
-record(purge, {}).
87+
-record(purge_nodes, {nodes :: [node()]}).
8688
-record(update_config, {config :: config()}).
8789

8890
-opaque protocol() ::
@@ -93,6 +95,7 @@
9395
#discard{} |
9496
#credit{} |
9597
#purge{} |
98+
#purge_nodes{} |
9699
#update_config{}.
97100

98101
-type command() :: protocol() | ra_machine:builtin_command().
@@ -396,28 +399,9 @@ apply(Meta, {down, Pid, noconnection},
396399
_ ->
397400
[{monitor, node, Node}]
398401
end ++ Effects1,
399-
%% TODO: should we run a checkout here?
400402
checkout(Meta, State#?MODULE{enqueuers = Enqs}, Effects);
401-
apply(Meta, {down, Pid, _Info}, #?MODULE{consumers = Cons0,
402-
enqueuers = Enqs0} = State0) ->
403-
% Remove any enqueuer for the same pid and enqueue any pending messages
404-
% This should be ok as we won't see any more enqueues from this pid
405-
State1 = case maps:take(Pid, Enqs0) of
406-
{#enqueuer{pending = Pend}, Enqs} ->
407-
lists:foldl(fun ({_, RIdx, RawMsg}, S) ->
408-
enqueue(RIdx, RawMsg, S)
409-
end, State0#?MODULE{enqueuers = Enqs}, Pend);
410-
error ->
411-
State0
412-
end,
413-
{Effects1, State2} = handle_waiting_consumer_down(Pid, State1),
414-
% return checked out messages to main queue
415-
% Find the consumers for the down pid
416-
DownConsumers = maps:keys(
417-
maps:filter(fun({_, P}, _) -> P =:= Pid end, Cons0)),
418-
{State, Effects} = lists:foldl(fun(ConsumerId, {S, E}) ->
419-
cancel_consumer(ConsumerId, S, E, down)
420-
end, {State2, Effects1}, DownConsumers),
403+
apply(Meta, {down, Pid, _Info}, State0) ->
404+
{State, Effects} = handle_down(Pid, State0),
421405
checkout(Meta, State, Effects);
422406
apply(Meta, {nodeup, Node}, #?MODULE{consumers = Cons0,
423407
enqueuers = Enqs0,
@@ -448,16 +432,50 @@ apply(Meta, {nodeup, Node}, #?MODULE{consumers = Cons0,
448432
Acc
449433
end, {Cons0, SQ0, Monitors}, Cons0),
450434
Waiting = update_waiting_consumer_status(Node, State0, up),
451-
State1 = State0#?MODULE{consumers = Cons1, enqueuers = Enqs1,
435+
State1 = State0#?MODULE{consumers = Cons1,
436+
enqueuers = Enqs1,
452437
service_queue = SQ,
453438
waiting_consumers = Waiting},
454439
{State, Effects} = activate_next_consumer(State1, Effects1),
455440
checkout(Meta, State, Effects);
456441
apply(_, {nodedown, _Node}, State) ->
457442
{State, ok};
443+
apply(_, #purge_nodes{nodes = Nodes}, State0) ->
444+
{State, Effects} = lists:foldl(fun(Node, {S, E}) ->
445+
purge_node(Node, S, E)
446+
end, {State0, []}, Nodes),
447+
{State, ok, Effects};
458448
apply(Meta, #update_config{config = Conf}, State) ->
459449
checkout(Meta, update_config(Conf, State), []).
460450

451+
purge_node(Node, State, Effects) ->
452+
lists:foldl(fun(Pid, {S0, E0}) ->
453+
{S, E} = handle_down(Pid, S0),
454+
{S, E0 ++ E}
455+
end, {State, Effects}, all_pids_for(Node, State)).
456+
457+
%% any downs that re not noconnection
458+
handle_down(Pid, #?MODULE{consumers = Cons0,
459+
enqueuers = Enqs0} = State0) ->
460+
% Remove any enqueuer for the same pid and enqueue any pending messages
461+
% This should be ok as we won't see any more enqueues from this pid
462+
State1 = case maps:take(Pid, Enqs0) of
463+
{#enqueuer{pending = Pend}, Enqs} ->
464+
lists:foldl(fun ({_, RIdx, RawMsg}, S) ->
465+
enqueue(RIdx, RawMsg, S)
466+
end, State0#?MODULE{enqueuers = Enqs}, Pend);
467+
error ->
468+
State0
469+
end,
470+
{Effects1, State2} = handle_waiting_consumer_down(Pid, State1),
471+
% return checked out messages to main queue
472+
% Find the consumers for the down pid
473+
DownConsumers = maps:keys(
474+
maps:filter(fun({_, P}, _) -> P =:= Pid end, Cons0)),
475+
lists:foldl(fun(ConsumerId, {S, E}) ->
476+
cancel_consumer(ConsumerId, S, E, down)
477+
end, {State2, Effects1}, DownConsumers).
478+
461479
consumer_active_flag_update_function(#?MODULE{cfg = #cfg{consumer_strategy = competing}}) ->
462480
fun(State, ConsumerId, Consumer, Active, ActivityStatus, Effects) ->
463481
consumer_update_active_effects(State, ConsumerId, Consumer, Active,
@@ -556,8 +574,10 @@ tick(_Ts, #?MODULE{cfg = #cfg{name = Name,
556574
query_consumer_count(State), % Consumers
557575
EnqueueBytes,
558576
CheckoutBytes},
577+
%% TODO: call a handler that works out if any known nodes need to be
578+
%% purged and emit a command effect to append this to the log
559579
[{mod_call, rabbit_quorum_queue,
560-
handle_tick, [QName, Metrics]}, {aux, emit}].
580+
handle_tick, [QName, Metrics, all_nodes(State)]}, {aux, emit}].
561581

562582
-spec overview(state()) -> map().
563583
overview(#?MODULE{consumers = Cons,
@@ -1495,6 +1515,10 @@ make_credit(ConsumerId, Credit, DeliveryCount, Drain) ->
14951515
-spec make_purge() -> protocol().
14961516
make_purge() -> #purge{}.
14971517

1518+
-spec make_purge_nodes([node()]) -> protocol().
1519+
make_purge_nodes(Nodes) ->
1520+
#purge_nodes{nodes = Nodes}.
1521+
14981522
-spec make_update_config(config()) -> protocol().
14991523
make_update_config(Config) ->
15001524
#update_config{config = Config}.
@@ -1532,6 +1556,39 @@ message_size(Msg) ->
15321556
%% probably only hit this for testing so ok to use erts_debug
15331557
erts_debug:size(Msg).
15341558

1559+
all_nodes(#?MODULE{consumers = Cons0,
1560+
enqueuers = Enqs0,
1561+
waiting_consumers = WaitingConsumers0}) ->
1562+
Nodes0 = maps:fold(fun({_, P}, _, Acc) ->
1563+
Acc#{node(P) => ok}
1564+
end, #{}, Cons0),
1565+
Nodes1 = maps:fold(fun(P, _, Acc) ->
1566+
Acc#{node(P) => ok}
1567+
end, Nodes0, Enqs0),
1568+
maps:keys(
1569+
lists:foldl(fun({{_, P}, _}, Acc) ->
1570+
Acc#{node(P) => ok}
1571+
end, Nodes1, WaitingConsumers0)).
1572+
1573+
all_pids_for(Node, #?MODULE{consumers = Cons0,
1574+
enqueuers = Enqs0,
1575+
waiting_consumers = WaitingConsumers0}) ->
1576+
Cons = maps:fold(fun({_, P}, _, Acc)
1577+
when node(P) =:= Node ->
1578+
[P | Acc];
1579+
(_, _, Acc) -> Acc
1580+
end, [], Cons0),
1581+
Enqs = maps:fold(fun(P, _, Acc)
1582+
when node(P) =:= Node ->
1583+
[P | Acc];
1584+
(_, _, Acc) -> Acc
1585+
end, Cons, Enqs0),
1586+
lists:foldl(fun({{_, P}, _}, Acc)
1587+
when node(P) =:= Node ->
1588+
[P | Acc];
1589+
(_, Acc) -> Acc
1590+
end, Enqs, WaitingConsumers0).
1591+
15351592
suspected_pids_for(Node, #?MODULE{consumers = Cons0,
15361593
enqueuers = Enqs0,
15371594
waiting_consumers = WaitingConsumers0}) ->

src/rabbit_quorum_queue.erl

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
-export([cluster_state/1, status/2]).
2929
-export([update_consumer_handler/8, update_consumer/9]).
3030
-export([cancel_consumer_handler/2, cancel_consumer/3]).
31-
-export([become_leader/2, handle_tick/2]).
31+
-export([become_leader/2, handle_tick/3]).
3232
-export([rpc_delete_metrics/1]).
3333
-export([format/1]).
3434
-export([open_files/1]).
@@ -243,7 +243,9 @@ rpc_delete_metrics(QName) ->
243243
ets:delete(queue_metrics, QName),
244244
ok.
245245

246-
handle_tick(QName, {Name, MR, MU, M, C, MsgBytesReady, MsgBytesUnack}) ->
246+
handle_tick(QName,
247+
{Name, MR, MU, M, C, MsgBytesReady, MsgBytesUnack},
248+
Nodes) ->
247249
%% this makes calls to remote processes so cannot be run inside the
248250
%% ra server
249251
Self = self(),
@@ -266,7 +268,21 @@ handle_tick(QName, {Name, MR, MU, M, C, MsgBytesReady, MsgBytesUnack}) ->
266268
{messages_ready, MR},
267269
{messages_unacknowledged, MU},
268270
{reductions, R}]),
269-
ok = repair_leader_record(QName, Self)
271+
ok = repair_leader_record(QName, Self),
272+
ExpectedNodes = rabbit_mnesia:cluster_nodes(all),
273+
case Nodes -- ExpectedNodes of
274+
[] ->
275+
ok;
276+
Stale ->
277+
rabbit_log:info("~s: stale nodes detected. Purging ~w~n",
278+
[rabbit_misc:rs(QName), Stale]),
279+
%% pipeline purge command
280+
{ok, Q} = rabbit_amqqueue:lookup(QName),
281+
ok = ra:pipeline_command(amqqueue:get_pid(Q),
282+
rabbit_fifo:make_purge_nodes(Stale)),
283+
284+
ok
285+
end
270286
end),
271287
ok.
272288

test/rabbit_fifo_SUITE.erl

Lines changed: 58 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -480,9 +480,12 @@ tick_test(_) ->
480480
{S3, {_, _}} = deq(4, Cid2, unsettled, S2),
481481
{S4, _, _} = apply(meta(5), rabbit_fifo:make_return(Cid, [MsgId]), S3),
482482

483-
[{mod_call, _, _,
483+
[{mod_call, rabbit_quorum_queue, handle_tick,
484484
[#resource{},
485-
{?FUNCTION_NAME, 1, 1, 2, 1, 3, 3}]}, {aux, emit}] = rabbit_fifo:tick(1, S4),
485+
{?FUNCTION_NAME, 1, 1, 2, 1, 3, 3},
486+
[_Node]
487+
]},
488+
{aux, emit}] = rabbit_fifo:tick(1, S4),
486489
ok.
487490

488491

@@ -921,10 +924,11 @@ single_active_consumer_all_disconnected_test(_) ->
921924

922925
single_active_consumer_state_enter_leader_include_waiting_consumers_test(_) ->
923926
State0 = init(#{name => ?FUNCTION_NAME,
924-
queue_resource => rabbit_misc:r("/", queue,
925-
atom_to_binary(?FUNCTION_NAME, utf8)),
926-
release_cursor_interval => 0,
927-
single_active_consumer_on => true}),
927+
queue_resource =>
928+
rabbit_misc:r("/", queue,
929+
atom_to_binary(?FUNCTION_NAME, utf8)),
930+
release_cursor_interval => 0,
931+
single_active_consumer_on => true}),
928932

929933
DummyFunction = fun() -> ok end,
930934
Pid1 = spawn(DummyFunction),
@@ -1203,6 +1207,54 @@ single_active_with_credited_test(_) ->
12031207
State3#rabbit_fifo.waiting_consumers),
12041208
ok.
12051209

1210+
purge_nodes_test(_) ->
1211+
Node = purged@node,
1212+
ThisNode = node(),
1213+
EnqPid = test_util:fake_pid(Node),
1214+
EnqPid2 = test_util:fake_pid(node()),
1215+
ConPid = test_util:fake_pid(Node),
1216+
Cid = {<<"tag">>, ConPid},
1217+
% WaitingPid = test_util:fake_pid(Node),
1218+
1219+
State0 = init(#{name => ?FUNCTION_NAME,
1220+
queue_resource => rabbit_misc:r("/", queue,
1221+
atom_to_binary(?FUNCTION_NAME, utf8)),
1222+
single_active_consumer_on => false}),
1223+
{State1, _, _} = apply(meta(1),
1224+
rabbit_fifo:make_enqueue(EnqPid, 1, msg1),
1225+
State0),
1226+
{State2, _, _} = apply(meta(2),
1227+
rabbit_fifo:make_enqueue(EnqPid2, 1, msg2),
1228+
State1),
1229+
{State3, _} = check(Cid, 3, 1000, State2),
1230+
{State4, _, _} = apply(meta(4),
1231+
{down, EnqPid, noconnection},
1232+
State3),
1233+
?assertMatch(
1234+
[{mod_call, rabbit_quorum_queue, handle_tick,
1235+
[#resource{}, _Metrics,
1236+
[ThisNode, Node]
1237+
]},
1238+
{aux, emit}] , rabbit_fifo:tick(1, State4)),
1239+
%% assert there are both enqueuers and consumers
1240+
{State, _, _} = apply(meta(5),
1241+
rabbit_fifo:make_purge_nodes([Node]),
1242+
State4),
1243+
1244+
%% assert there are no enqueuers nor consumers
1245+
?assertMatch(#rabbit_fifo{enqueuers = Enqs} when map_size(Enqs) == 1,
1246+
State),
1247+
1248+
?assertMatch(#rabbit_fifo{consumers = Cons} when map_size(Cons) == 0,
1249+
State),
1250+
?assertMatch(
1251+
[{mod_call, rabbit_quorum_queue, handle_tick,
1252+
[#resource{}, _Metrics,
1253+
[ThisNode]
1254+
]},
1255+
{aux, emit}] , rabbit_fifo:tick(1, State)),
1256+
ok.
1257+
12061258
meta(Idx) ->
12071259
#{index => Idx, term => 1}.
12081260

test/rabbit_fifo_int_SUITE.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ end_per_group(_, Config) ->
5454

5555
init_per_testcase(TestCase, Config) ->
5656
meck:new(rabbit_quorum_queue, [passthrough]),
57-
meck:expect(rabbit_quorum_queue, handle_tick, fun (_, _) -> ok end),
57+
meck:expect(rabbit_quorum_queue, handle_tick, fun (_, _, _) -> ok end),
5858
meck:expect(rabbit_quorum_queue, cancel_consumer_handler,
5959
fun (_, _) -> ok end),
6060
ra_server_sup_sup:remove_all(),

0 commit comments

Comments
 (0)