From 5adab736ebaa13cb77a1f94a99c3373731a4ad8f Mon Sep 17 00:00:00 2001 From: Ayanda Dube Date: Fri, 6 Sep 2024 16:49:54 +0100 Subject: [PATCH 1/5] Shutdown peer QQ FSMs on connected nodes on force-shrink execution for cluster wide consistency, ensuring only the leader is active/running (cherry picked from commit b675ce29f022bc9d46f20ef32e065b0bb9684c8b) (cherry picked from commit d9de6d989c7b8482f410e75d6791cdc1dc3ec8db) (cherry picked from commit 5e3e58a0283d0a1a6dc2b718cf54f0c9bbdf52d5) --- deps/rabbit/src/rabbit_quorum_queue.erl | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index a818e3cd5f34..28fdf19d3534 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -1303,6 +1303,7 @@ delete_member(Q, Node) when ?amqqueue_is_quorum(Q) -> _ = rabbit_amqqueue:update(QName, Fun), case ra:force_delete_server(?RA_SYSTEM, ServerId) of ok -> + rabbit_log:info("Deleted a replica of quorum ~ts on node ~ts", [rabbit_misc:rs(QName), Node]), ok; {error, {badrpc, nodedown}} -> ok; @@ -1893,6 +1894,7 @@ force_shrink_member_to_current_member(VHost, Name) -> case rabbit_amqqueue:lookup(QName) of {ok, Q} when ?is_amqqueue(Q) -> {RaName, _} = amqqueue:get_pid(Q), + OtherNodes = lists:delete(Node, get_nodes(Q)), ok = ra_server_proc:force_shrink_members_to_current_member({RaName, Node}), Fun = fun (Q0) -> TS0 = amqqueue:get_type_state(Q0), @@ -1900,6 +1902,7 @@ force_shrink_member_to_current_member(VHost, Name) -> amqqueue:set_type_state(Q, TS) end, _ = rabbit_amqqueue:update(QName, Fun), + _ = [ra:force_delete_server(?RA_SYSTEM, {RaName, N}) || N <- OtherNodes], rabbit_log:warning("Disaster recovery procedure: shrinking finished"); _ -> rabbit_log:warning("Disaster recovery procedure: shrinking failed, queue ~p not found at vhost ~p", [Name, VHost]), @@ -1912,6 +1915,7 @@ force_all_queues_shrink_member_to_current_member() -> _ = [begin QName = amqqueue:get_name(Q), {RaName, _} = amqqueue:get_pid(Q), + OtherNodes = lists:delete(Node, get_nodes(Q)), rabbit_log:warning("Disaster recovery procedure: shrinking queue ~p", [QName]), ok = ra_server_proc:force_shrink_members_to_current_member({RaName, Node}), Fun = fun (QQ) -> @@ -1919,7 +1923,8 @@ force_all_queues_shrink_member_to_current_member() -> TS = TS0#{nodes => [Node]}, amqqueue:set_type_state(QQ, TS) end, - _ = rabbit_amqqueue:update(QName, Fun) + _ = rabbit_amqqueue:update(QName, Fun), + _ = [ra:force_delete_server(?RA_SYSTEM, {RaName, N}) || N <- OtherNodes] end || Q <- rabbit_amqqueue:list(), amqqueue:get_type(Q) == ?MODULE], rabbit_log:warning("Disaster recovery procedure: shrinking finished"), ok. From c65cb4c09a1b0a1fe7f3750ef183392df3d9b6e8 Mon Sep 17 00:00:00 2001 From: Ayanda Dube Date: Tue, 1 Oct 2024 15:16:16 +0100 Subject: [PATCH 2/5] QQ tests for force-shrink to current member operations (cherry picked from commit 60ee35ea7e269dca3eecc84a68fd1a5feaa64ec2) (cherry picked from commit 10dbde1f7102fb53d99c9c27ffc6ad7ff0094f8b) (cherry picked from commit 2e37ac607d7ffe5f02d60ea5c472b4b28148a991) # Conflicts: # deps/rabbit/test/quorum_queue_SUITE.erl --- deps/rabbit/test/quorum_queue_SUITE.erl | 269 ++++++++++++++++++++++++ 1 file changed, 269 insertions(+) diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl index 946c750ec475..bcf81aadcd92 100644 --- a/deps/rabbit/test/quorum_queue_SUITE.erl +++ b/deps/rabbit/test/quorum_queue_SUITE.erl @@ -92,7 +92,15 @@ groups() -> leader_locator_policy, status, format, +<<<<<<< HEAD add_member_2 +======= + add_member_2, + single_active_consumer_priority_take_over, + single_active_consumer_priority, + force_shrink_member_to_current_member, + force_all_queues_shrink_member_to_current_member +>>>>>>> 2e37ac607d (QQ tests for force-shrink to current member operations) ] ++ all_tests()}, {cluster_size_5, [], [start_queue, @@ -992,6 +1000,267 @@ consume_in_minority(Config) -> rabbit_quorum_queue:restart_server({RaName, Server2}), ok. +<<<<<<< HEAD +======= +single_active_consumer_priority_take_over(Config) -> + check_quorum_queues_v4_compat(Config), + + [Server0, Server1, _Server2] = + rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server0), + Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server1), + QName = ?config(queue_name, Config), + Q1 = <>, + RaNameQ1 = binary_to_atom(<<"%2F", "_", Q1/binary>>, utf8), + QueryFun = fun rabbit_fifo:query_single_active_consumer/1, + Args = [{<<"x-queue-type">>, longstr, <<"quorum">>}, + {<<"x-single-active-consumer">>, bool, true}], + ?assertEqual({'queue.declare_ok', Q1, 0, 0}, declare(Ch1, Q1, Args)), + ok = subscribe(Ch1, Q1, false, <<"ch1-ctag1">>, [{"x-priority", byte, 1}]), + ?assertMatch({ok, {_, {value, {<<"ch1-ctag1">>, _}}}, _}, + rpc:call(Server0, ra, local_query, [RaNameQ1, QueryFun])), + #'confirm.select_ok'{} = amqp_channel:call(Ch2, #'confirm.select'{}), + publish_confirm(Ch2, Q1), + %% higher priority consumer attaches + ok = subscribe(Ch2, Q1, false, <<"ch2-ctag1">>, [{"x-priority", byte, 3}]), + + %% Q1 should still have Ch1 as consumer as it has pending messages + ?assertMatch({ok, {_, {value, {<<"ch1-ctag1">>, _}}}, _}, + rpc:call(Server0, ra, local_query, + [RaNameQ1, QueryFun])), + + %% ack the message + receive + {#'basic.deliver'{consumer_tag = <<"ch1-ctag1">>, + delivery_tag = DeliveryTag}, _} -> + amqp_channel:cast(Ch1, #'basic.ack'{delivery_tag = DeliveryTag, + multiple = false}) + after 5000 -> + flush(1), + exit(basic_deliver_timeout) + end, + + ?awaitMatch({ok, {_, {value, {<<"ch2-ctag1">>, _}}}, _}, + rpc:call(Server0, ra, local_query, [RaNameQ1, QueryFun]), + ?DEFAULT_AWAIT), + ok. + +single_active_consumer_priority(Config) -> + check_quorum_queues_v4_compat(Config), + [Server0, Server1, Server2] = + rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server0), + Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server1), + Ch3 = rabbit_ct_client_helpers:open_channel(Config, Server2), + QName = ?config(queue_name, Config), + Q1 = <>, + Q2 = <>, + Q3 = <>, + Args = [{<<"x-queue-type">>, longstr, <<"quorum">>}, + {<<"x-single-active-consumer">>, bool, true}], + ?assertEqual({'queue.declare_ok', Q1, 0, 0}, declare(Ch1, Q1, Args)), + ?assertEqual({'queue.declare_ok', Q2, 0, 0}, declare(Ch2, Q2, Args)), + ?assertEqual({'queue.declare_ok', Q3, 0, 0}, declare(Ch3, Q3, Args)), + + ok = subscribe(Ch1, Q1, false, <<"ch1-ctag1">>, [{"x-priority", byte, 3}]), + ok = subscribe(Ch1, Q2, false, <<"ch1-ctag2">>, [{"x-priority", byte, 2}]), + ok = subscribe(Ch1, Q3, false, <<"ch1-ctag3">>, [{"x-priority", byte, 1}]), + + + ok = subscribe(Ch2, Q1, false, <<"ch2-ctag1">>, [{"x-priority", byte, 1}]), + ok = subscribe(Ch2, Q2, false, <<"ch2-ctag2">>, [{"x-priority", byte, 3}]), + ok = subscribe(Ch2, Q3, false, <<"ch2-ctag3">>, [{"x-priority", byte, 2}]), + + ok = subscribe(Ch3, Q1, false, <<"ch3-ctag1">>, [{"x-priority", byte, 2}]), + ok = subscribe(Ch3, Q2, false, <<"ch3-ctag2">>, [{"x-priority", byte, 1}]), + ok = subscribe(Ch3, Q3, false, <<"ch3-ctag3">>, [{"x-priority", byte, 3}]), + + + RaNameQ1 = binary_to_atom(<<"%2F", "_", Q1/binary>>, utf8), + RaNameQ2 = binary_to_atom(<<"%2F", "_", Q2/binary>>, utf8), + RaNameQ3 = binary_to_atom(<<"%2F", "_", Q3/binary>>, utf8), + %% assert each queue has a different consumer + QueryFun = fun rabbit_fifo:query_single_active_consumer/1, + + %% Q1 should have the consumer on Ch1 + ?assertMatch({ok, {_, {value, {<<"ch1-ctag1">>, _}}}, _}, + rpc:call(Server0, ra, local_query, [RaNameQ1, QueryFun])), + + %% Q2 Ch2 + ?assertMatch({ok, {_, {value, {<<"ch2-ctag2">>, _}}}, _}, + rpc:call(Server1, ra, local_query, [RaNameQ2, QueryFun])), + + %% Q3 Ch3 + ?assertMatch({ok, {_, {value, {<<"ch3-ctag3">>, _}}}, _}, + rpc:call(Server2, ra, local_query, [RaNameQ3, QueryFun])), + + %% close Ch3 + _ = rabbit_ct_client_helpers:close_channel(Ch3), + flush(100), + + %% assert Q3 has Ch2 (priority 2) as consumer + ?assertMatch({ok, {_, {value, {<<"ch2-ctag3">>, _}}}, _}, + rpc:call(Server2, ra, local_query, [RaNameQ3, QueryFun])), + + %% close Ch2 + _ = rabbit_ct_client_helpers:close_channel(Ch2), + flush(100), + + %% assert all queues as has Ch1 as consumer + ?assertMatch({ok, {_, {value, {<<"ch1-ctag1">>, _}}}, _}, + rpc:call(Server0, ra, local_query, [RaNameQ1, QueryFun])), + ?assertMatch({ok, {_, {value, {<<"ch1-ctag2">>, _}}}, _}, + rpc:call(Server0, ra, local_query, [RaNameQ2, QueryFun])), + ?assertMatch({ok, {_, {value, {<<"ch1-ctag3">>, _}}}, _}, + rpc:call(Server0, ra, local_query, [RaNameQ3, QueryFun])), + ok. + +force_shrink_member_to_current_member(Config) -> + [Server0, Server1, Server2] = + rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server0), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + RaName = ra_name(QQ), + rabbit_ct_client_helpers:publish(Ch, QQ, 3), + wait_for_messages_ready([Server0], RaName, 3), + + {ok, Q0} = rpc:call(Server0, rabbit_amqqueue, lookup, [QQ, <<"/">>]), + #{nodes := Nodes0} = amqqueue:get_type_state(Q0), + ?assertEqual(3, length(Nodes0)), + + rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, + force_shrink_member_to_current_member, [<<"/">>, QQ]), + + wait_for_messages_ready([Server0], RaName, 3), + + {ok, Q1} = rpc:call(Server0, rabbit_amqqueue, lookup, [QQ, <<"/">>]), + #{nodes := Nodes1} = amqqueue:get_type_state(Q1), + ?assertEqual(1, length(Nodes1)), + + %% grow queues back to all nodes + [rpc:call(Server0, rabbit_quorum_queue, grow, [S, <<"/">>, <<".*">>, all]) || S <- [Server1, Server2]], + + wait_for_messages_ready([Server0], RaName, 3), + {ok, Q2} = rpc:call(Server0, rabbit_amqqueue, lookup, [QQ, <<"/">>]), + #{nodes := Nodes2} = amqqueue:get_type_state(Q2), + ?assertEqual(3, length(Nodes2)). + +force_all_queues_shrink_member_to_current_member(Config) -> + [Server0, Server1, Server2] = + rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server0), + QQ = ?config(queue_name, Config), + AQ = ?config(alt_queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + ?assertEqual({'queue.declare_ok', AQ, 0, 0}, + declare(Ch, AQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + QQs = [QQ, AQ], + + [begin + RaName = ra_name(Q), + rabbit_ct_client_helpers:publish(Ch, Q, 3), + wait_for_messages_ready([Server0], RaName, 3), + {ok, Q0} = rpc:call(Server0, rabbit_amqqueue, lookup, [Q, <<"/">>]), + #{nodes := Nodes0} = amqqueue:get_type_state(Q0), + ?assertEqual(3, length(Nodes0)) + end || Q <- QQs], + + rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, + force_all_queues_shrink_member_to_current_member, []), + + [begin + RaName = ra_name(Q), + wait_for_messages_ready([Server0], RaName, 3), + {ok, Q0} = rpc:call(Server0, rabbit_amqqueue, lookup, [Q, <<"/">>]), + #{nodes := Nodes0} = amqqueue:get_type_state(Q0), + ?assertEqual(1, length(Nodes0)) + end || Q <- QQs], + + %% grow queues back to all nodes + [rpc:call(Server0, rabbit_quorum_queue, grow, [S, <<"/">>, <<".*">>, all]) || S <- [Server1, Server2]], + + [begin + RaName = ra_name(Q), + wait_for_messages_ready([Server0], RaName, 3), + {ok, Q0} = rpc:call(Server0, rabbit_amqqueue, lookup, [Q, <<"/">>]), + #{nodes := Nodes0} = amqqueue:get_type_state(Q0), + ?assertEqual(3, length(Nodes0)) + end || Q <- QQs]. + +priority_queue_fifo(Config) -> + %% testing: if hi priority messages are published before lo priority + %% messages they are always consumed first (fifo) + check_quorum_queues_v4_compat(Config), + [Server0 | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server0), + Queue = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', Queue, 0, 0}, + declare(Ch, Queue, + [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + ExpectedHi = + [begin + MsgP5 = integer_to_binary(P), + ok = amqp_channel:cast(Ch, #'basic.publish'{routing_key = Queue}, + #amqp_msg{props = #'P_basic'{priority = P}, + payload = MsgP5}), + MsgP5 + %% high priority is > 4 + end || P <- lists:seq(5, 10)], + + ExpectedLo = + [begin + MsgP1 = integer_to_binary(P), + ok = amqp_channel:cast(Ch, #'basic.publish'{routing_key = Queue}, + #amqp_msg{props = #'P_basic'{priority = P}, + payload = MsgP1}), + MsgP1 + end || P <- lists:seq(0, 4)], + + validate_queue(Ch, Queue, ExpectedHi ++ ExpectedLo), + ok. + +priority_queue_2_1_ratio(Config) -> + %% testing: if lo priority messages are published before hi priority + %% messages are consumed in a 2:1 hi to lo ratio + check_quorum_queues_v4_compat(Config), + [Server0 | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server0), + Queue = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', Queue, 0, 0}, + declare(Ch, Queue, + [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + ExpectedLo = + [begin + MsgP1 = integer_to_binary(P), + ok = amqp_channel:cast(Ch, #'basic.publish'{routing_key = Queue}, + #amqp_msg{props = #'P_basic'{priority = P}, + payload = MsgP1}), + MsgP1 + end || P <- lists:seq(0, 4)], + ExpectedHi = + [begin + MsgP5 = integer_to_binary(P), + ok = amqp_channel:cast(Ch, #'basic.publish'{routing_key = Queue}, + #amqp_msg{props = #'P_basic'{priority = P}, + payload = MsgP5}), + MsgP5 + %% high priority is > 4 + end || P <- lists:seq(5, 14)], + + Expected = lists_interleave(ExpectedLo, ExpectedHi), + + validate_queue(Ch, Queue, Expected), + ok. + +>>>>>>> 2e37ac607d (QQ tests for force-shrink to current member operations) reject_after_leader_transfer(Config) -> [Server0, Server1, Server2] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), From 4a7a355ecc1115cd41cac702c9b794fd09061eed Mon Sep 17 00:00:00 2001 From: Ayanda Dube Date: Wed, 2 Oct 2024 11:28:41 +0100 Subject: [PATCH 3/5] Implement force_vhost_queues_shrink_member_to_current_member/1 (cherry picked from commit c26aa3b1c76dd82fb9e67852b6a2f030a92bf7cc) (cherry picked from commit b03637f8ecbcbf8085100c809cde59abad760d70) (cherry picked from commit 023a46e33d5724c2226a7f05a52c122479fd4465) --- deps/rabbit/src/rabbit_quorum_queue.erl | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index 28fdf19d3534..76d30ee7f799 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -76,6 +76,7 @@ -export([validate_policy/1, merge_policy_value/3]). -export([force_shrink_member_to_current_member/2, + force_vhost_queues_shrink_member_to_current_member/1, force_all_queues_shrink_member_to_current_member/0]). -ifdef(TEST). @@ -1909,8 +1910,17 @@ force_shrink_member_to_current_member(VHost, Name) -> {error, not_found} end. +force_vhost_queues_shrink_member_to_current_member(VHost) when is_binary(VHost) -> + rabbit_log:warning("Disaster recovery procedure: shrinking all quorum queues in vhost ~tp to a single node cluster", [VHost]), + ListQQs = fun() -> rabbit_amqqueue:list(VHost) end, + force_all_queues_shrink_member_to_current_member(ListQQs). + force_all_queues_shrink_member_to_current_member() -> rabbit_log:warning("Disaster recovery procedure: shrinking all quorum queues to a single node cluster"), + ListQQs = fun() -> rabbit_amqqueue:list() end, + force_all_queues_shrink_member_to_current_member(ListQQs). + +force_all_queues_shrink_member_to_current_member(ListQQFun) when is_function(ListQQFun) -> Node = node(), _ = [begin QName = amqqueue:get_name(Q), @@ -1925,7 +1935,7 @@ force_all_queues_shrink_member_to_current_member() -> end, _ = rabbit_amqqueue:update(QName, Fun), _ = [ra:force_delete_server(?RA_SYSTEM, {RaName, N}) || N <- OtherNodes] - end || Q <- rabbit_amqqueue:list(), amqqueue:get_type(Q) == ?MODULE], + end || Q <- ListQQFun(), amqqueue:get_type(Q) == ?MODULE], rabbit_log:warning("Disaster recovery procedure: shrinking finished"), ok. From d72d4e2f8df38850b2d8c2475d319ed3a622958b Mon Sep 17 00:00:00 2001 From: Ayanda Dube Date: Wed, 2 Oct 2024 12:28:18 +0100 Subject: [PATCH 4/5] Add test for QQ force_vhost_queues_shrink_member_to_current_member/1 (cherry picked from commit de0c0dbd89b7c278a1145833cbeda6b7d3de34eb) (cherry picked from commit c9d97e61de062ca05b060b7ab327837e54e226d2) (cherry picked from commit 6c55cb1fdc82b4274d1d439e2ffb8f9ecf08fa43) # Conflicts: # deps/rabbit/test/quorum_queue_SUITE.erl --- deps/rabbit/test/quorum_queue_SUITE.erl | 71 +++++++++++++++++++++++++ 1 file changed, 71 insertions(+) diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl index bcf81aadcd92..7aceed422a8a 100644 --- a/deps/rabbit/test/quorum_queue_SUITE.erl +++ b/deps/rabbit/test/quorum_queue_SUITE.erl @@ -99,8 +99,13 @@ groups() -> single_active_consumer_priority_take_over, single_active_consumer_priority, force_shrink_member_to_current_member, +<<<<<<< HEAD force_all_queues_shrink_member_to_current_member >>>>>>> 2e37ac607d (QQ tests for force-shrink to current member operations) +======= + force_all_queues_shrink_member_to_current_member, + force_vhost_queues_shrink_member_to_current_member +>>>>>>> 6c55cb1fdc (Add test for QQ force_vhost_queues_shrink_member_to_current_member/1) ] ++ all_tests()}, {cluster_size_5, [], [start_queue, @@ -1195,6 +1200,72 @@ force_all_queues_shrink_member_to_current_member(Config) -> ?assertEqual(3, length(Nodes0)) end || Q <- QQs]. +force_vhost_queues_shrink_member_to_current_member(Config) -> + [Server0, Server1, Server2] = + rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch0 = rabbit_ct_client_helpers:open_channel(Config, Server0), + QQ = ?config(queue_name, Config), + AQ = ?config(alt_queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch0, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + ?assertEqual({'queue.declare_ok', AQ, 0, 0}, + declare(Ch0, AQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + QQs = [QQ, AQ], + + VHost1 = <<"/">>, + VHost2 = <<"another-vhost">>, + VHosts = [VHost1, VHost2], + + User = ?config(rmq_username, Config), + ok = rabbit_ct_broker_helpers:add_vhost(Config, Server0, VHost2, User), + ok = rabbit_ct_broker_helpers:set_full_permissions(Config, User, VHost2), + Conn1 = rabbit_ct_client_helpers:open_unmanaged_connection(Config, Server0, VHost2), + {ok, Ch1} = amqp_connection:open_channel(Conn1), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch1, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + ?assertEqual({'queue.declare_ok', AQ, 0, 0}, + declare(Ch1, AQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + [rabbit_ct_client_helpers:publish(Ch, Q, 3) || Q <- QQs, Ch <- [Ch0, Ch1]], + + [begin + QQRes = rabbit_misc:r(VHost, queue, Q), + {ok, RaName} = rpc:call(Server0, rabbit_queue_type_util, qname_to_internal_name, [QQRes]), + wait_for_messages_ready([Server0], RaName, 3), + {ok, Q0} = rpc:call(Server0, rabbit_amqqueue, lookup, [Q, VHost]), + #{nodes := Nodes0} = amqqueue:get_type_state(Q0), + ?assertEqual(3, length(Nodes0)) + end || Q <- QQs, VHost <- VHosts], + + rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, + force_vhost_queues_shrink_member_to_current_member, [VHost2]), + + [begin + QQRes = rabbit_misc:r(VHost, queue, Q), + {ok, RaName} = rpc:call(Server0, rabbit_queue_type_util, qname_to_internal_name, [QQRes]), + wait_for_messages_ready([Server0], RaName, 3), + {ok, Q0} = rpc:call(Server0, rabbit_amqqueue, lookup, [Q, VHost]), + #{nodes := Nodes0} = amqqueue:get_type_state(Q0), + case VHost of + VHost1 -> ?assertEqual(3, length(Nodes0)); + VHost2 -> ?assertEqual(1, length(Nodes0)) + end + end || Q <- QQs, VHost <- VHosts], + + %% grow queues back to all nodes in VHost2 only + [rpc:call(Server0, rabbit_quorum_queue, grow, [S, VHost2, <<".*">>, all]) || S <- [Server1, Server2]], + + [begin + QQRes = rabbit_misc:r(VHost, queue, Q), + {ok, RaName} = rpc:call(Server0, rabbit_queue_type_util, qname_to_internal_name, [QQRes]), + wait_for_messages_ready([Server0], RaName, 3), + {ok, Q0} = rpc:call(Server0, rabbit_amqqueue, lookup, [Q, VHost]), + #{nodes := Nodes0} = amqqueue:get_type_state(Q0), + ?assertEqual(3, length(Nodes0)) + end || Q <- QQs, VHost <- VHosts]. + priority_queue_fifo(Config) -> %% testing: if hi priority messages are published before lo priority %% messages they are always consumed first (fifo) From a60a649eb89cca73eca9ab63521c1e7d3f951933 Mon Sep 17 00:00:00 2001 From: Ayanda Dube Date: Thu, 3 Oct 2024 10:57:11 +0100 Subject: [PATCH 5/5] Update QQ force-shrink logging (cherry picked from commit dd5ec3ccc0715ac47c2dc2f82191263bfc860204) (cherry picked from commit 16170d093bac1df4766ba36e17c50f281ba2466c) (cherry picked from commit d8c01a30caea53aeb803ff0fb4572a89125b9085) --- deps/rabbit/src/rabbit_quorum_queue.erl | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index 76d30ee7f799..3e78a7100734 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -1889,9 +1889,10 @@ notify_decorators(QName, F, A) -> is_stateful() -> true. force_shrink_member_to_current_member(VHost, Name) -> - rabbit_log:warning("Disaster recovery procedure: shrinking ~p queue at vhost ~p to a single node cluster", [Name, VHost]), Node = node(), QName = rabbit_misc:r(VHost, queue, Name), + QNameFmt = rabbit_misc:rs(QName), + rabbit_log:warning("Shrinking ~ts to a single node: ~ts", [QNameFmt, Node]), case rabbit_amqqueue:lookup(QName) of {ok, Q} when ?is_amqqueue(Q) -> {RaName, _} = amqqueue:get_pid(Q), @@ -1904,19 +1905,19 @@ force_shrink_member_to_current_member(VHost, Name) -> end, _ = rabbit_amqqueue:update(QName, Fun), _ = [ra:force_delete_server(?RA_SYSTEM, {RaName, N}) || N <- OtherNodes], - rabbit_log:warning("Disaster recovery procedure: shrinking finished"); + rabbit_log:warning("Shrinking ~ts finished", [QNameFmt]); _ -> - rabbit_log:warning("Disaster recovery procedure: shrinking failed, queue ~p not found at vhost ~p", [Name, VHost]), + rabbit_log:warning("Shrinking failed, ~ts not found", [QNameFmt]), {error, not_found} end. force_vhost_queues_shrink_member_to_current_member(VHost) when is_binary(VHost) -> - rabbit_log:warning("Disaster recovery procedure: shrinking all quorum queues in vhost ~tp to a single node cluster", [VHost]), + rabbit_log:warning("Shrinking all quorum queues in vhost '~ts' to a single node: ~ts", [VHost, node()]), ListQQs = fun() -> rabbit_amqqueue:list(VHost) end, force_all_queues_shrink_member_to_current_member(ListQQs). force_all_queues_shrink_member_to_current_member() -> - rabbit_log:warning("Disaster recovery procedure: shrinking all quorum queues to a single node cluster"), + rabbit_log:warning("Shrinking all quorum queues to a single node: ~ts", [node()]), ListQQs = fun() -> rabbit_amqqueue:list() end, force_all_queues_shrink_member_to_current_member(ListQQs). @@ -1926,7 +1927,7 @@ force_all_queues_shrink_member_to_current_member(ListQQFun) when is_function(Lis QName = amqqueue:get_name(Q), {RaName, _} = amqqueue:get_pid(Q), OtherNodes = lists:delete(Node, get_nodes(Q)), - rabbit_log:warning("Disaster recovery procedure: shrinking queue ~p", [QName]), + rabbit_log:warning("Shrinking queue ~ts to a single node: ~ts", [rabbit_misc:rs(QName), Node]), ok = ra_server_proc:force_shrink_members_to_current_member({RaName, Node}), Fun = fun (QQ) -> TS0 = amqqueue:get_type_state(QQ), @@ -1936,7 +1937,7 @@ force_all_queues_shrink_member_to_current_member(ListQQFun) when is_function(Lis _ = rabbit_amqqueue:update(QName, Fun), _ = [ra:force_delete_server(?RA_SYSTEM, {RaName, N}) || N <- OtherNodes] end || Q <- ListQQFun(), amqqueue:get_type(Q) == ?MODULE], - rabbit_log:warning("Disaster recovery procedure: shrinking finished"), + rabbit_log:warning("Shrinking finished"), ok. is_minority(All, Up) ->