Skip to content

Commit 7ac378f

Browse files
Merge pull request #12475 from rabbitmq/rabbitmq-server-12468-for-v4.0.x
4.0.x, by @Ayanda-D: stop QQ replicas when a QQ is forced to shrink to a single replica
2 parents e192d6d + d8c01a3 commit 7ac378f

File tree

2 files changed

+173
-9
lines changed

2 files changed

+173
-9
lines changed

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@
7474
-export([validate_policy/1, merge_policy_value/3]).
7575

7676
-export([force_shrink_member_to_current_member/2,
77+
force_vhost_queues_shrink_member_to_current_member/1,
7778
force_all_queues_shrink_member_to_current_member/0]).
7879

7980
%% for backwards compatibility
@@ -1376,6 +1377,7 @@ delete_member(Q, Node) when ?amqqueue_is_quorum(Q) ->
13761377
_ = rabbit_amqqueue:update(QName, Fun),
13771378
case ra:force_delete_server(?RA_SYSTEM, ServerId) of
13781379
ok ->
1380+
rabbit_log:info("Deleted a replica of quorum ~ts on node ~ts", [rabbit_misc:rs(QName), Node]),
13791381
ok;
13801382
{error, {badrpc, nodedown}} ->
13811383
ok;
@@ -1951,41 +1953,55 @@ notify_decorators(QName, F, A) ->
19511953
is_stateful() -> true.
19521954

19531955
force_shrink_member_to_current_member(VHost, Name) ->
1954-
rabbit_log:warning("Disaster recovery procedure: shrinking ~p queue at vhost ~p to a single node cluster", [Name, VHost]),
19551956
Node = node(),
19561957
QName = rabbit_misc:r(VHost, queue, Name),
1958+
QNameFmt = rabbit_misc:rs(QName),
1959+
rabbit_log:warning("Shrinking ~ts to a single node: ~ts", [QNameFmt, Node]),
19571960
case rabbit_amqqueue:lookup(QName) of
19581961
{ok, Q} when ?is_amqqueue(Q) ->
19591962
{RaName, _} = amqqueue:get_pid(Q),
1963+
OtherNodes = lists:delete(Node, get_nodes(Q)),
19601964
ok = ra_server_proc:force_shrink_members_to_current_member({RaName, Node}),
19611965
Fun = fun (Q0) ->
19621966
TS0 = amqqueue:get_type_state(Q0),
19631967
TS = TS0#{nodes => [Node]},
19641968
amqqueue:set_type_state(Q, TS)
19651969
end,
19661970
_ = rabbit_amqqueue:update(QName, Fun),
1967-
rabbit_log:warning("Disaster recovery procedure: shrinking finished");
1971+
_ = [ra:force_delete_server(?RA_SYSTEM, {RaName, N}) || N <- OtherNodes],
1972+
rabbit_log:warning("Shrinking ~ts finished", [QNameFmt]);
19681973
_ ->
1969-
rabbit_log:warning("Disaster recovery procedure: shrinking failed, queue ~p not found at vhost ~p", [Name, VHost]),
1974+
rabbit_log:warning("Shrinking failed, ~ts not found", [QNameFmt]),
19701975
{error, not_found}
19711976
end.
19721977

1978+
force_vhost_queues_shrink_member_to_current_member(VHost) when is_binary(VHost) ->
1979+
rabbit_log:warning("Shrinking all quorum queues in vhost '~ts' to a single node: ~ts", [VHost, node()]),
1980+
ListQQs = fun() -> rabbit_amqqueue:list(VHost) end,
1981+
force_all_queues_shrink_member_to_current_member(ListQQs).
1982+
19731983
force_all_queues_shrink_member_to_current_member() ->
1974-
rabbit_log:warning("Disaster recovery procedure: shrinking all quorum queues to a single node cluster"),
1984+
rabbit_log:warning("Shrinking all quorum queues to a single node: ~ts", [node()]),
1985+
ListQQs = fun() -> rabbit_amqqueue:list() end,
1986+
force_all_queues_shrink_member_to_current_member(ListQQs).
1987+
1988+
force_all_queues_shrink_member_to_current_member(ListQQFun) when is_function(ListQQFun) ->
19751989
Node = node(),
19761990
_ = [begin
19771991
QName = amqqueue:get_name(Q),
19781992
{RaName, _} = amqqueue:get_pid(Q),
1979-
rabbit_log:warning("Disaster recovery procedure: shrinking queue ~p", [QName]),
1993+
OtherNodes = lists:delete(Node, get_nodes(Q)),
1994+
rabbit_log:warning("Shrinking queue ~ts to a single node: ~ts", [rabbit_misc:rs(QName), Node]),
19801995
ok = ra_server_proc:force_shrink_members_to_current_member({RaName, Node}),
19811996
Fun = fun (QQ) ->
19821997
TS0 = amqqueue:get_type_state(QQ),
19831998
TS = TS0#{nodes => [Node]},
19841999
amqqueue:set_type_state(QQ, TS)
19852000
end,
1986-
_ = rabbit_amqqueue:update(QName, Fun)
1987-
end || Q <- rabbit_amqqueue:list(), amqqueue:get_type(Q) == ?MODULE],
1988-
rabbit_log:warning("Disaster recovery procedure: shrinking finished"),
2001+
_ = rabbit_amqqueue:update(QName, Fun),
2002+
_ = [ra:force_delete_server(?RA_SYSTEM, {RaName, N}) || N <- OtherNodes]
2003+
end || Q <- ListQQFun(), amqqueue:get_type(Q) == ?MODULE],
2004+
rabbit_log:warning("Shrinking finished"),
19892005
ok.
19902006

19912007
is_minority(All, Up) ->

deps/rabbit/test/quorum_queue_SUITE.erl

Lines changed: 149 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,10 @@ groups() ->
9292
format,
9393
add_member_2,
9494
single_active_consumer_priority_take_over,
95-
single_active_consumer_priority
95+
single_active_consumer_priority,
96+
force_shrink_member_to_current_member,
97+
force_all_queues_shrink_member_to_current_member,
98+
force_vhost_queues_shrink_member_to_current_member
9699
]
97100
++ all_tests()},
98101
{cluster_size_5, [], [start_queue,
@@ -1154,6 +1157,151 @@ single_active_consumer_priority(Config) ->
11541157
rpc:call(Server0, ra, local_query, [RaNameQ3, QueryFun])),
11551158
ok.
11561159

1160+
force_shrink_member_to_current_member(Config) ->
1161+
[Server0, Server1, Server2] =
1162+
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
1163+
1164+
Ch = rabbit_ct_client_helpers:open_channel(Config, Server0),
1165+
QQ = ?config(queue_name, Config),
1166+
?assertEqual({'queue.declare_ok', QQ, 0, 0},
1167+
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
1168+
1169+
RaName = ra_name(QQ),
1170+
rabbit_ct_client_helpers:publish(Ch, QQ, 3),
1171+
wait_for_messages_ready([Server0], RaName, 3),
1172+
1173+
{ok, Q0} = rpc:call(Server0, rabbit_amqqueue, lookup, [QQ, <<"/">>]),
1174+
#{nodes := Nodes0} = amqqueue:get_type_state(Q0),
1175+
?assertEqual(3, length(Nodes0)),
1176+
1177+
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue,
1178+
force_shrink_member_to_current_member, [<<"/">>, QQ]),
1179+
1180+
wait_for_messages_ready([Server0], RaName, 3),
1181+
1182+
{ok, Q1} = rpc:call(Server0, rabbit_amqqueue, lookup, [QQ, <<"/">>]),
1183+
#{nodes := Nodes1} = amqqueue:get_type_state(Q1),
1184+
?assertEqual(1, length(Nodes1)),
1185+
1186+
%% grow queues back to all nodes
1187+
[rpc:call(Server0, rabbit_quorum_queue, grow, [S, <<"/">>, <<".*">>, all]) || S <- [Server1, Server2]],
1188+
1189+
wait_for_messages_ready([Server0], RaName, 3),
1190+
{ok, Q2} = rpc:call(Server0, rabbit_amqqueue, lookup, [QQ, <<"/">>]),
1191+
#{nodes := Nodes2} = amqqueue:get_type_state(Q2),
1192+
?assertEqual(3, length(Nodes2)).
1193+
1194+
force_all_queues_shrink_member_to_current_member(Config) ->
1195+
[Server0, Server1, Server2] =
1196+
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
1197+
1198+
Ch = rabbit_ct_client_helpers:open_channel(Config, Server0),
1199+
QQ = ?config(queue_name, Config),
1200+
AQ = ?config(alt_queue_name, Config),
1201+
?assertEqual({'queue.declare_ok', QQ, 0, 0},
1202+
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
1203+
?assertEqual({'queue.declare_ok', AQ, 0, 0},
1204+
declare(Ch, AQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
1205+
1206+
QQs = [QQ, AQ],
1207+
1208+
[begin
1209+
RaName = ra_name(Q),
1210+
rabbit_ct_client_helpers:publish(Ch, Q, 3),
1211+
wait_for_messages_ready([Server0], RaName, 3),
1212+
{ok, Q0} = rpc:call(Server0, rabbit_amqqueue, lookup, [Q, <<"/">>]),
1213+
#{nodes := Nodes0} = amqqueue:get_type_state(Q0),
1214+
?assertEqual(3, length(Nodes0))
1215+
end || Q <- QQs],
1216+
1217+
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue,
1218+
force_all_queues_shrink_member_to_current_member, []),
1219+
1220+
[begin
1221+
RaName = ra_name(Q),
1222+
wait_for_messages_ready([Server0], RaName, 3),
1223+
{ok, Q0} = rpc:call(Server0, rabbit_amqqueue, lookup, [Q, <<"/">>]),
1224+
#{nodes := Nodes0} = amqqueue:get_type_state(Q0),
1225+
?assertEqual(1, length(Nodes0))
1226+
end || Q <- QQs],
1227+
1228+
%% grow queues back to all nodes
1229+
[rpc:call(Server0, rabbit_quorum_queue, grow, [S, <<"/">>, <<".*">>, all]) || S <- [Server1, Server2]],
1230+
1231+
[begin
1232+
RaName = ra_name(Q),
1233+
wait_for_messages_ready([Server0], RaName, 3),
1234+
{ok, Q0} = rpc:call(Server0, rabbit_amqqueue, lookup, [Q, <<"/">>]),
1235+
#{nodes := Nodes0} = amqqueue:get_type_state(Q0),
1236+
?assertEqual(3, length(Nodes0))
1237+
end || Q <- QQs].
1238+
1239+
force_vhost_queues_shrink_member_to_current_member(Config) ->
1240+
[Server0, Server1, Server2] =
1241+
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
1242+
1243+
Ch0 = rabbit_ct_client_helpers:open_channel(Config, Server0),
1244+
QQ = ?config(queue_name, Config),
1245+
AQ = ?config(alt_queue_name, Config),
1246+
?assertEqual({'queue.declare_ok', QQ, 0, 0},
1247+
declare(Ch0, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
1248+
?assertEqual({'queue.declare_ok', AQ, 0, 0},
1249+
declare(Ch0, AQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
1250+
1251+
QQs = [QQ, AQ],
1252+
1253+
VHost1 = <<"/">>,
1254+
VHost2 = <<"another-vhost">>,
1255+
VHosts = [VHost1, VHost2],
1256+
1257+
User = ?config(rmq_username, Config),
1258+
ok = rabbit_ct_broker_helpers:add_vhost(Config, Server0, VHost2, User),
1259+
ok = rabbit_ct_broker_helpers:set_full_permissions(Config, User, VHost2),
1260+
Conn1 = rabbit_ct_client_helpers:open_unmanaged_connection(Config, Server0, VHost2),
1261+
{ok, Ch1} = amqp_connection:open_channel(Conn1),
1262+
?assertEqual({'queue.declare_ok', QQ, 0, 0},
1263+
declare(Ch1, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
1264+
?assertEqual({'queue.declare_ok', AQ, 0, 0},
1265+
declare(Ch1, AQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
1266+
1267+
[rabbit_ct_client_helpers:publish(Ch, Q, 3) || Q <- QQs, Ch <- [Ch0, Ch1]],
1268+
1269+
[begin
1270+
QQRes = rabbit_misc:r(VHost, queue, Q),
1271+
{ok, RaName} = rpc:call(Server0, rabbit_queue_type_util, qname_to_internal_name, [QQRes]),
1272+
wait_for_messages_ready([Server0], RaName, 3),
1273+
{ok, Q0} = rpc:call(Server0, rabbit_amqqueue, lookup, [Q, VHost]),
1274+
#{nodes := Nodes0} = amqqueue:get_type_state(Q0),
1275+
?assertEqual(3, length(Nodes0))
1276+
end || Q <- QQs, VHost <- VHosts],
1277+
1278+
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue,
1279+
force_vhost_queues_shrink_member_to_current_member, [VHost2]),
1280+
1281+
[begin
1282+
QQRes = rabbit_misc:r(VHost, queue, Q),
1283+
{ok, RaName} = rpc:call(Server0, rabbit_queue_type_util, qname_to_internal_name, [QQRes]),
1284+
wait_for_messages_ready([Server0], RaName, 3),
1285+
{ok, Q0} = rpc:call(Server0, rabbit_amqqueue, lookup, [Q, VHost]),
1286+
#{nodes := Nodes0} = amqqueue:get_type_state(Q0),
1287+
case VHost of
1288+
VHost1 -> ?assertEqual(3, length(Nodes0));
1289+
VHost2 -> ?assertEqual(1, length(Nodes0))
1290+
end
1291+
end || Q <- QQs, VHost <- VHosts],
1292+
1293+
%% grow queues back to all nodes in VHost2 only
1294+
[rpc:call(Server0, rabbit_quorum_queue, grow, [S, VHost2, <<".*">>, all]) || S <- [Server1, Server2]],
1295+
1296+
[begin
1297+
QQRes = rabbit_misc:r(VHost, queue, Q),
1298+
{ok, RaName} = rpc:call(Server0, rabbit_queue_type_util, qname_to_internal_name, [QQRes]),
1299+
wait_for_messages_ready([Server0], RaName, 3),
1300+
{ok, Q0} = rpc:call(Server0, rabbit_amqqueue, lookup, [Q, VHost]),
1301+
#{nodes := Nodes0} = amqqueue:get_type_state(Q0),
1302+
?assertEqual(3, length(Nodes0))
1303+
end || Q <- QQs, VHost <- VHosts].
1304+
11571305
priority_queue_fifo(Config) ->
11581306
%% testing: if hi priority messages are published before lo priority
11591307
%% messages they are always consumed first (fifo)

0 commit comments

Comments
 (0)