From c0c80fcfa7b9cad7c139c0b0c7d49f2035c334ea Mon Sep 17 00:00:00 2001 From: Ayanda Dube Date: Thu, 8 May 2025 13:55:51 +0100 Subject: [PATCH 01/13] Extend QQ grow command to support target quorum cluster size --- deps/rabbit/src/rabbit_quorum_queue.erl | 69 ++++++++++++++----- .../cli/queues/commands/grow_command.ex | 21 ++++-- 2 files changed, 70 insertions(+), 20 deletions(-) diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index 5ae9a8a73973..3f889a202060 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -1543,29 +1543,20 @@ shrink_all(Node) -> amqqueue:get_type(Q) == ?MODULE, lists:member(Node, get_nodes(Q))]. - +-spec grow(node() | integer(), binary(), binary(), all | even) -> + [{rabbit_amqqueue:name(), + {ok, pos_integer()} | {error, pos_integer(), term()}}]. grow(Node, VhostSpec, QueueSpec, Strategy) -> grow(Node, VhostSpec, QueueSpec, Strategy, promotable). --spec grow(node(), binary(), binary(), all | even, membership()) -> +-spec grow(node() | integer(), binary(), binary(), all | even, membership()) -> [{rabbit_amqqueue:name(), {ok, pos_integer()} | {error, pos_integer(), term()}}]. -grow(Node, VhostSpec, QueueSpec, Strategy, Membership) -> +grow(Node, VhostSpec, QueueSpec, Strategy, Membership) when is_atom(Node) -> Running = rabbit_nodes:list_running(), [begin Size = length(get_nodes(Q)), - QName = amqqueue:get_name(Q), - ?LOG_INFO("~ts: adding a new member (replica) on node ~w", - [rabbit_misc:rs(QName), Node]), - case add_member(Q, Node, Membership) of - ok -> - {QName, {ok, Size + 1}}; - {error, Err} -> - ?LOG_WARNING( - "~ts: failed to add member (replica) on node ~w, error: ~w", - [rabbit_misc:rs(QName), Node, Err]), - {QName, {error, Size, Err}} - end + maybe_grow(Q, Node, Membership, Size) end || Q <- rabbit_amqqueue:list(), amqqueue:get_type(Q) == ?MODULE, @@ -1575,7 +1566,53 @@ grow(Node, VhostSpec, QueueSpec, Strategy, Membership) -> lists:member(Node, Running), matches_strategy(Strategy, get_nodes(Q)), is_match(amqqueue:get_vhost(Q), VhostSpec) andalso - is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec) ]. + is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec) ]; + +grow(QuorumClusterSize, VhostSpec, QueueSpec, Strategy, Membership) + when is_integer(QuorumClusterSize) -> + Running = rabbit_nodes:list_running(), + TotalRunning = length(Running), + + TargetQuorumClusterSize = + if QuorumClusterSize > TotalRunning -> + %% we cant grow beyond total running nodes + TotalRunning; + true -> + QuorumClusterSize + end, + + lists:flatten( + [begin + QNodes = get_nodes(Q), + case length(QNodes) of + Size when Size < TargetQuorumClusterSize -> + TargetAvailableNodes = Running -- QNodes, + Node = hd(TargetAvailableNodes), + maybe_grow(Q, Node, Membership, Size); + _ -> + [] + end + end + || _ <- lists:seq(1, TargetQuorumClusterSize), + Q <- rabbit_amqqueue:list(), + amqqueue:get_type(Q) == ?MODULE, + matches_strategy(Strategy, get_nodes(Q)), + is_match(amqqueue:get_vhost(Q), VhostSpec) andalso + is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec)]). + +maybe_grow(Q, Node, Membership, Size) -> + QName = amqqueue:get_name(Q), + ?LOG_INFO("~ts: adding a new member (replica) on node ~w", + [rabbit_misc:rs(QName), Node]), + case add_member(Q, Node, Membership) of + ok -> + {QName, {ok, Size + 1}}; + {error, Err} -> + ?LOG_WARNING( + "~ts: failed to add member (replica) on node ~w, error: ~w", + [rabbit_misc:rs(QName), Node, Err]), + {QName, {error, Size, Err}} + end. -spec transfer_leadership(amqqueue:amqqueue(), node()) -> {migrated, node()} | {not_migrated, atom()}. transfer_leadership(Q, Destination) -> diff --git a/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/grow_command.ex b/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/grow_command.ex index f1ada3a383bb..8df0b54a6f0e 100644 --- a/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/grow_command.ex +++ b/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/grow_command.ex @@ -39,6 +39,11 @@ defmodule RabbitMQ.CLI.Queues.Commands.GrowCommand do {:validation_failure, "strategy '#{s}' is not recognised."} end + def validate([n, _], _) + when (is_integer(n) and n <= 0) do + {:validation_failure, "target quorum cluster size '#{n}' must be greater than 0."} + end + def validate(_, %{membership: m}) when not (m == "promotable" or m == "non_voter" or @@ -60,14 +65,22 @@ defmodule RabbitMQ.CLI.Queues.Commands.GrowCommand do ) end - def run([node, strategy], %{ + def run([node_or_quorum_cluster_size, strategy], %{ node: node_name, vhost_pattern: vhost_pat, queue_pattern: queue_pat, membership: membership, errors_only: errors_only }) do - args = [to_atom(node), vhost_pat, queue_pat, to_atom(strategy)] + + node_or_quorum_cluster_size = + if is_integer(node_or_quorum_cluster_size) do + node_or_quorum_cluster_size + else + to_atom(node_or_quorum_cluster_size) + end + + args = [node_or_quorum_cluster_size, vhost_pat, queue_pat, to_atom(strategy)] args = case to_atom(membership) do @@ -108,11 +121,11 @@ defmodule RabbitMQ.CLI.Queues.Commands.GrowCommand do def usage, do: - "grow [--vhost-pattern ] [--queue-pattern ] [--membership ]" + "grow [--vhost-pattern ] [--queue-pattern ] [--membership ]" def usage_additional do [ - ["", "node name to place replicas on"], + ["", "node name to place replicas on or desired quorum cluster size"], [ "", "add a member for all matching queues or just those whose membership count is an even number" From ae1156d5331504019a31893595bc39f51c414193 Mon Sep 17 00:00:00 2001 From: Ayanda Dube Date: Thu, 8 May 2025 14:04:51 +0100 Subject: [PATCH 02/13] Add tests for QQ grow to target quorum cluster size command --- deps/rabbit/test/quorum_queue_SUITE.erl | 74 ++++++++++++++++++- .../test/queues/grow_command_test.exs | 29 +++++++- 2 files changed, 100 insertions(+), 3 deletions(-) diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl index 2ae9f23d4060..c4fc6a124350 100644 --- a/deps/rabbit/test/quorum_queue_SUITE.erl +++ b/deps/rabbit/test/quorum_queue_SUITE.erl @@ -115,7 +115,8 @@ groups() -> node_removal_is_not_quorum_critical, select_nodes_with_least_replicas, select_nodes_with_least_replicas_node_down, - subscribe_from_each + subscribe_from_each, + grow_queue ]}, @@ -1790,6 +1791,77 @@ dont_leak_file_handles(Config) -> rabbit_ct_client_helpers:close_channel(C), ok. +grow_queue(Config) -> + [Server0, Server1, _Server2, _Server3, _Server4] = + rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server0), + QQ = ?config(queue_name, Config), + AQ = ?config(alt_queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, + {<<"x-quorum-initial-group-size">>, long, 5}])), + ?assertEqual({'queue.declare_ok', AQ, 0, 0}, + declare(Ch, AQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, + {<<"x-quorum-initial-group-size">>, long, 5}])), + + QQs = [QQ, AQ], + MsgCount = 3, + + [begin + RaName = ra_name(Q), + rabbit_ct_client_helpers:publish(Ch, Q, MsgCount), + wait_for_messages_ready([Server0], RaName, MsgCount), + {ok, Q0} = rpc:call(Server0, rabbit_amqqueue, lookup, [Q, <<"/">>]), + #{nodes := Nodes0} = amqqueue:get_type_state(Q0), + ?assertEqual(5, length(Nodes0)) + end || Q <- QQs], + + rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, + force_all_queues_shrink_member_to_current_member, []), + + TargetClusterSize_1 = 1, + assert_grown_queues(QQs, Server0, TargetClusterSize_1, MsgCount), + + %% grow queues to node 'Server1' + TargetClusterSize_2 = 2, + rpc:call(Server0, rabbit_quorum_queue, grow, [Server1, <<"/">>, <<".*">>, all]), + assert_grown_queues(QQs, Server0, TargetClusterSize_2, MsgCount), + + %% grow queues to quorum cluster size '2' has no effect + rpc:call(Server0, rabbit_quorum_queue, grow, [TargetClusterSize_2, <<"/">>, <<".*">>, all]), + assert_grown_queues(QQs, Server0, TargetClusterSize_2, MsgCount), + + %% grow queues to quorum cluster size '3' + TargetClusterSize_3 = 3, + rpc:call(Server0, rabbit_quorum_queue, grow, [TargetClusterSize_3, <<"/">>, <<".*">>, all]), + assert_grown_queues(QQs, Server0, TargetClusterSize_3, MsgCount), + + %% grow queues to quorum cluster size '5' + TargetClusterSize_5 = 5, + rpc:call(Server0, rabbit_quorum_queue, grow, [TargetClusterSize_5, <<"/">>, <<".*">>, all]), + assert_grown_queues(QQs, Server0, TargetClusterSize_5, MsgCount), + + %% shrink all queues again + rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, + force_all_queues_shrink_member_to_current_member, []), + + assert_grown_queues(QQs, Server0, TargetClusterSize_1, MsgCount), + + %% grow queues to quorum cluster size > '5' (limit = 5). + TargetClusterSize_10 = 10, + rpc:call(Server0, rabbit_quorum_queue, grow, [TargetClusterSize_10, <<"/">>, <<".*">>, all]), + assert_grown_queues(QQs, Server0, TargetClusterSize_5, MsgCount). + +assert_grown_queues(Qs, Node, TargetClusterSize, MsgCount) -> + [begin + RaName = ra_name(Q), + wait_for_messages_ready([Node], RaName, MsgCount), + {ok, Q0} = rpc:call(Node, rabbit_amqqueue, lookup, [Q, <<"/">>]), + #{nodes := Nodes0} = amqqueue:get_type_state(Q0), + ?assertEqual(TargetClusterSize, length(Nodes0)) + end || Q <- Qs]. + gh_12635(Config) -> check_quorum_queues_v4_compat(Config), diff --git a/deps/rabbitmq_cli/test/queues/grow_command_test.exs b/deps/rabbitmq_cli/test/queues/grow_command_test.exs index 2b1aab070317..f09c15761703 100644 --- a/deps/rabbitmq_cli/test/queues/grow_command_test.exs +++ b/deps/rabbitmq_cli/test/queues/grow_command_test.exs @@ -82,12 +82,37 @@ defmodule RabbitMQ.CLI.Queues.Commands.GrowCommandTest do {:validation_failure, "voter status 'banana' is not recognised."} end + test "validate: when target quorum cluster size greater than zero, returns a success" do + assert @command.validate([7, "all"], %{membership: "voter"}) == :ok + end + + test "validate: when target quorum cluster size is zero, returns failure" do + assert @command.validate([0, "all"], %{membership: "voter"}) == + {:validation_failure, "target quorum cluster size '0' must be greater than 0."} + end + + test "validate: when target quorum cluster size is less than zero, returns failure" do + assert @command.validate([-1, "all"], %{membership: "voter"}) == + {:validation_failure, "target quorum cluster size '-1' must be greater than 0."} + end + + @tag test_timeout: 3000 + test "run: targeting an unreachable node throws a badrpc when growing to a target node", context do + assert match?( + {:badrpc, _}, + @command.run( + ["target@node", "all"], + Map.merge(context[:opts], %{node: :jake@thedog}) + ) + ) + end + @tag test_timeout: 3000 - test "run: targeting an unreachable node throws a badrpc", context do + test "run: targeting an unreachable node throws a badrpc when growing to a target quorum cluster size", context do assert match?( {:badrpc, _}, @command.run( - ["quorum-queue-a", "all"], + [5, "all"], Map.merge(context[:opts], %{node: :jake@thedog}) ) ) From a79170eb3e27defdc6f56409047282312185fe58 Mon Sep 17 00:00:00 2001 From: Ayanda Dube Date: Thu, 8 May 2025 14:20:47 +0100 Subject: [PATCH 03/13] Update QQ grow command tests to reflect correct use of queue-pattern option --- .../test/queues/grow_command_test.exs | 28 +++++++++---------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/deps/rabbitmq_cli/test/queues/grow_command_test.exs b/deps/rabbitmq_cli/test/queues/grow_command_test.exs index f09c15761703..0b28f3957eef 100644 --- a/deps/rabbitmq_cli/test/queues/grow_command_test.exs +++ b/deps/rabbitmq_cli/test/queues/grow_command_test.exs @@ -44,55 +44,55 @@ defmodule RabbitMQ.CLI.Queues.Commands.GrowCommandTest do end test "validate: when one argument is provided, returns a failure" do - assert @command.validate(["quorum-queue-a"], %{}) == {:validation_failure, :not_enough_args} + assert @command.validate(["target@node"], %{}) == {:validation_failure, :not_enough_args} end test "validate: when a node and even are provided, returns a success" do - assert @command.validate(["quorum-queue-a", "even"], %{}) == :ok + assert @command.validate(["target@node", "even"], %{}) == :ok end test "validate: when a node and all are provided, returns a success" do - assert @command.validate(["quorum-queue-a", "all"], %{}) == :ok + assert @command.validate(["target@node", "all"], %{}) == :ok end test "validate: when a node and something else is provided, returns a failure" do - assert @command.validate(["quorum-queue-a", "banana"], %{}) == + assert @command.validate(["target@node", "banana"], %{}) == {:validation_failure, "strategy 'banana' is not recognised."} end test "validate: when three arguments are provided, returns a failure" do - assert @command.validate(["quorum-queue-a", "extra-arg", "another-extra-arg"], %{}) == + assert @command.validate(["target@node", "extra-arg", "another-extra-arg"], %{}) == {:validation_failure, :too_many_args} end test "validate: when membership promotable is provided, returns a success" do - assert @command.validate(["quorum-queue-a", "all"], %{membership: "promotable"}) == :ok + assert @command.validate(["target@node", "all"], %{membership: "promotable", queue_pattern: "qq.*"}) == :ok end test "validate: when membership voter is provided, returns a success" do - assert @command.validate(["quorum-queue-a", "all"], %{membership: "voter"}) == :ok + assert @command.validate(["target@node", "all"], %{membership: "voter", queue_pattern: "qq.*"}) == :ok end test "validate: when membership non_voter is provided, returns a success" do - assert @command.validate(["quorum-queue-a", "all"], %{membership: "non_voter"}) == :ok + assert @command.validate(["target@node", "all"], %{membership: "non_voter", queue_pattern: "qq.*"}) == :ok end test "validate: when wrong membership is provided, returns failure" do - assert @command.validate(["quorum-queue-a", "all"], %{membership: "banana"}) == + assert @command.validate(["target@node", "all"], %{membership: "banana", queue_pattern: "qq.*"}) == {:validation_failure, "voter status 'banana' is not recognised."} end test "validate: when target quorum cluster size greater than zero, returns a success" do - assert @command.validate([7, "all"], %{membership: "voter"}) == :ok + assert @command.validate([7, "all"], %{membership: "voter", queue_pattern: "qq.*"}) == :ok end test "validate: when target quorum cluster size is zero, returns failure" do - assert @command.validate([0, "all"], %{membership: "voter"}) == + assert @command.validate([0, "all"], %{membership: "voter", queue_pattern: "qq.*"}) == {:validation_failure, "target quorum cluster size '0' must be greater than 0."} end test "validate: when target quorum cluster size is less than zero, returns failure" do - assert @command.validate([-1, "all"], %{membership: "voter"}) == + assert @command.validate([-1, "all"], %{membership: "voter", queue_pattern: "qq.*"}) == {:validation_failure, "target quorum cluster size '-1' must be greater than 0."} end @@ -102,7 +102,7 @@ defmodule RabbitMQ.CLI.Queues.Commands.GrowCommandTest do {:badrpc, _}, @command.run( ["target@node", "all"], - Map.merge(context[:opts], %{node: :jake@thedog}) + Map.merge(context[:opts], %{node: :jake@thedog, queue_pattern: "qq.*"}) ) ) end @@ -113,7 +113,7 @@ defmodule RabbitMQ.CLI.Queues.Commands.GrowCommandTest do {:badrpc, _}, @command.run( [5, "all"], - Map.merge(context[:opts], %{node: :jake@thedog}) + Map.merge(context[:opts], %{node: :jake@thedog, queue_pattern: "qq.*"}) ) ) end From 41c55db0a34a1fbb221f69ff4ddfd0edb74a73e9 Mon Sep 17 00:00:00 2001 From: Ayanda Dube Date: Thu, 8 May 2025 15:06:52 +0100 Subject: [PATCH 04/13] Dont allow QQ grow for target quorum cluster size less than 0 in rabbit_quorum_queue api --- deps/rabbit/src/rabbit_quorum_queue.erl | 11 +++++++++-- deps/rabbit/test/quorum_queue_SUITE.erl | 13 +++++++++++-- 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index 3f889a202060..ab7bb19922ee 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -1569,7 +1569,7 @@ grow(Node, VhostSpec, QueueSpec, Strategy, Membership) when is_atom(Node) -> is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec) ]; grow(QuorumClusterSize, VhostSpec, QueueSpec, Strategy, Membership) - when is_integer(QuorumClusterSize) -> + when is_integer(QuorumClusterSize), QuorumClusterSize > 0 -> Running = rabbit_nodes:list_running(), TotalRunning = length(Running), @@ -1598,7 +1598,14 @@ grow(QuorumClusterSize, VhostSpec, QueueSpec, Strategy, Membership) amqqueue:get_type(Q) == ?MODULE, matches_strategy(Strategy, get_nodes(Q)), is_match(amqqueue:get_vhost(Q), VhostSpec) andalso - is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec)]). + is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec)]); + +grow(QuorumClusterSize, _VhostSpec, _QueueSpec, _Strategy, _Membership) + when is_integer(QuorumClusterSize) -> + rabbit_log:warning( + "cannot grow queues to a quorum cluster size less than zero (~tp)", + [QuorumClusterSize]), + {error, bad_quorum_cluster_size}. maybe_grow(Q, Node, Membership, Size) -> QName = amqqueue:get_name(Q), diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl index c4fc6a124350..a87a4a4e81dd 100644 --- a/deps/rabbit/test/quorum_queue_SUITE.erl +++ b/deps/rabbit/test/quorum_queue_SUITE.erl @@ -1845,13 +1845,22 @@ grow_queue(Config) -> %% shrink all queues again rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, force_all_queues_shrink_member_to_current_member, []), - assert_grown_queues(QQs, Server0, TargetClusterSize_1, MsgCount), %% grow queues to quorum cluster size > '5' (limit = 5). TargetClusterSize_10 = 10, rpc:call(Server0, rabbit_quorum_queue, grow, [TargetClusterSize_10, <<"/">>, <<".*">>, all]), - assert_grown_queues(QQs, Server0, TargetClusterSize_5, MsgCount). + assert_grown_queues(QQs, Server0, TargetClusterSize_5, MsgCount), + + %% shrink all queues again + rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, + force_all_queues_shrink_member_to_current_member, []), + assert_grown_queues(QQs, Server0, TargetClusterSize_1, MsgCount), + + %% attempt to grow queues to quorum cluster size < '0'. + BadTargetClusterSize = -5, + ?assertEqual({error, bad_quorum_cluster_size}, + rpc:call(Server0, rabbit_quorum_queue, grow, [BadTargetClusterSize, <<"/">>, <<".*">>, all])). assert_grown_queues(Qs, Node, TargetClusterSize, MsgCount) -> [begin From 0ed90fe00314843f0563100ed4e7db74debba7c4 Mon Sep 17 00:00:00 2001 From: Ayanda Dube Date: Fri, 9 May 2025 15:23:50 +0100 Subject: [PATCH 05/13] Randomly select next target node on grow to quorum cluster size --- deps/rabbit/src/rabbit_quorum_queue.erl | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index ab7bb19922ee..6d182a4e3f78 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -1587,7 +1587,8 @@ grow(QuorumClusterSize, VhostSpec, QueueSpec, Strategy, Membership) case length(QNodes) of Size when Size < TargetQuorumClusterSize -> TargetAvailableNodes = Running -- QNodes, - Node = hd(TargetAvailableNodes), + N = length(TargetAvailableNodes), + Node = lists:nth(rand:uniform(N), TargetAvailableNodes), maybe_grow(Q, Node, Membership, Size); _ -> [] From 84f6d1b9291d35d5b0c4b652a777e7c5bcfcc04c Mon Sep 17 00:00:00 2001 From: Ayanda Dube Date: Wed, 14 May 2025 17:23:52 +0100 Subject: [PATCH 06/13] Ensure to only grow QQs when all existing members are in 'voter' status --- deps/rabbit/src/rabbit_quorum_queue.erl | 40 ++++++++++++---- deps/rabbit/test/quorum_queue_SUITE.erl | 48 +++++++++++++++---- .../cli/queues/commands/grow_command.ex | 5 ++ .../test/queues/grow_command_test.exs | 11 ++++- 4 files changed, 86 insertions(+), 18 deletions(-) diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index 6d182a4e3f78..c1b5fdb09839 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -1609,19 +1609,43 @@ grow(QuorumClusterSize, _VhostSpec, _QueueSpec, _Strategy, _Membership) {error, bad_quorum_cluster_size}. maybe_grow(Q, Node, Membership, Size) -> + QNodes = get_nodes(Q), + maybe_grow(Q, Node, Membership, Size, QNodes). + +maybe_grow(Q, Node, Membership, Size, QNodes) -> QName = amqqueue:get_name(Q), - ?LOG_INFO("~ts: adding a new member (replica) on node ~w", - [rabbit_misc:rs(QName), Node]), - case add_member(Q, Node, Membership) of - ok -> - {QName, {ok, Size + 1}}; - {error, Err} -> + {ok, RaName} = qname_to_internal_name(QName), + case check_all_memberships(RaName, QNodes, voter) of + true -> + ?LOG_INFO("~ts: adding a new member (replica) on node ~w", + [rabbit_misc:rs(QName), Node]), + case add_member(Q, Node, Membership) of + ok -> + {QName, {ok, Size + 1}}; + {error, Err} -> + ?LOG_WARNING( + "~ts: failed to add member (replica) on node ~w, error: ~w", + [rabbit_misc:rs(QName), Node, Err]), + {QName, {error, Size, Err}} + end; + false -> + Err = {error, non_voters_found}, ?LOG_WARNING( - "~ts: failed to add member (replica) on node ~w, error: ~w", - [rabbit_misc:rs(QName), Node, Err]), + "~ts: failed to add member (replica) on node ~w, error: ~w", + [rabbit_misc:rs(QName), Node, Err]), {QName, {error, Size, Err}} end. +check_all_memberships(RaName, QNodes, CompareMembership) -> + case rpc:multicall(QNodes, ets, lookup, [ra_state, RaName]) of + {Result, []} -> + lists:all( + fun(M) -> M == CompareMembership end, + [Membership || [{_RaName, _RaState, Membership}] <- Result]); + _ -> + false + end. + -spec transfer_leadership(amqqueue:amqqueue(), node()) -> {migrated, node()} | {not_migrated, atom()}. transfer_leadership(Q, Destination) -> {RaName, _} = Pid = amqqueue:get_pid(Q), diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl index a87a4a4e81dd..dbdcb5511ddc 100644 --- a/deps/rabbit/test/quorum_queue_SUITE.erl +++ b/deps/rabbit/test/quorum_queue_SUITE.erl @@ -1792,7 +1792,7 @@ dont_leak_file_handles(Config) -> ok. grow_queue(Config) -> - [Server0, Server1, _Server2, _Server3, _Server4] = + [Server0, Server1, Server2, _Server3, _Server4] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), Ch = rabbit_ct_client_helpers:open_channel(Config, Server0), @@ -1825,34 +1825,41 @@ grow_queue(Config) -> %% grow queues to node 'Server1' TargetClusterSize_2 = 2, - rpc:call(Server0, rabbit_quorum_queue, grow, [Server1, <<"/">>, <<".*">>, all]), + Result1 = rpc:call(Server0, rabbit_quorum_queue, grow, [Server1, <<"/">>, <<".*">>, all]), + %% [{{resource,<<"/">>,queue,<<"grow_queue">>},{ok,2}}, + %% {{resource,<<"/">>,queue,<<"grow_queue_alt">>},{ok,2}},...] + ?assert(lists:all(fun({_, {R, _}}) -> R =:= ok end, Result1)), assert_grown_queues(QQs, Server0, TargetClusterSize_2, MsgCount), %% grow queues to quorum cluster size '2' has no effect - rpc:call(Server0, rabbit_quorum_queue, grow, [TargetClusterSize_2, <<"/">>, <<".*">>, all]), + Result2 = rpc:call(Server0, rabbit_quorum_queue, grow, [TargetClusterSize_2, <<"/">>, <<".*">>, all]), + ?assertEqual([], Result2), assert_grown_queues(QQs, Server0, TargetClusterSize_2, MsgCount), %% grow queues to quorum cluster size '3' TargetClusterSize_3 = 3, - rpc:call(Server0, rabbit_quorum_queue, grow, [TargetClusterSize_3, <<"/">>, <<".*">>, all]), + Result3 = rpc:call(Server0, rabbit_quorum_queue, grow, [TargetClusterSize_3, <<"/">>, <<".*">>, all, voter]), + ?assert(lists:all(fun({_, {R, _}}) -> R =:= ok end, Result3)), assert_grown_queues(QQs, Server0, TargetClusterSize_3, MsgCount), %% grow queues to quorum cluster size '5' TargetClusterSize_5 = 5, - rpc:call(Server0, rabbit_quorum_queue, grow, [TargetClusterSize_5, <<"/">>, <<".*">>, all]), + Result4 = rpc:call(Server0, rabbit_quorum_queue, grow, [TargetClusterSize_5, <<"/">>, <<".*">>, all, voter]), + ?assert(lists:all(fun({_, {R, _}}) -> R =:= ok end, Result4)), assert_grown_queues(QQs, Server0, TargetClusterSize_5, MsgCount), - %% shrink all queues again + %% shrink all queues again down to 1 member rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, force_all_queues_shrink_member_to_current_member, []), assert_grown_queues(QQs, Server0, TargetClusterSize_1, MsgCount), %% grow queues to quorum cluster size > '5' (limit = 5). TargetClusterSize_10 = 10, - rpc:call(Server0, rabbit_quorum_queue, grow, [TargetClusterSize_10, <<"/">>, <<".*">>, all]), + Result5 = rpc:call(Server0, rabbit_quorum_queue, grow, [TargetClusterSize_10, <<"/">>, <<".*">>, all]), + ?assert(lists:all(fun({_, {R, _}}) -> R =:= ok end, Result5)), assert_grown_queues(QQs, Server0, TargetClusterSize_5, MsgCount), - %% shrink all queues again + %% shrink all queues again down to 1 member rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, force_all_queues_shrink_member_to_current_member, []), assert_grown_queues(QQs, Server0, TargetClusterSize_1, MsgCount), @@ -1860,7 +1867,30 @@ grow_queue(Config) -> %% attempt to grow queues to quorum cluster size < '0'. BadTargetClusterSize = -5, ?assertEqual({error, bad_quorum_cluster_size}, - rpc:call(Server0, rabbit_quorum_queue, grow, [BadTargetClusterSize, <<"/">>, <<".*">>, all])). + rpc:call(Server0, rabbit_quorum_queue, grow, [BadTargetClusterSize, <<"/">>, <<".*">>, all])), + + %% shrink all queues again down to 1 member + rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, + force_all_queues_shrink_member_to_current_member, []), + assert_grown_queues(QQs, Server0, TargetClusterSize_1, MsgCount), + + %% grow queues to node 'Server1': non_voter + rpc:call(Server0, rabbit_quorum_queue, grow, [Server1, <<"/">>, <<".*">>, all, non_voter]), + assert_grown_queues(QQs, Server0, TargetClusterSize_2, MsgCount), + + %% grow queues to node 'Server2': fail, non_voters found + Result6 = rpc:call(Server0, rabbit_quorum_queue, grow, [Server2, <<"/">>, <<".*">>, all, voter]), + %% [{{resource,<<"/">>,queue,<<"grow_queue">>},{error, 2, {error, non_voters_found}}, + %% {{resource,<<"/">>,queue,<<"grow_queue_alt">>},{error, 2, {error, non_voters_found}},...] + ?assert(lists:all( + fun({_, Err}) -> Err =:= {error, TargetClusterSize_2, {error, non_voters_found}} end, Result6)), + assert_grown_queues(QQs, Server0, TargetClusterSize_2, MsgCount), + + %% grow queues to target quorum cluster size '5': fail, non_voters found + Result7 = rpc:call(Server0, rabbit_quorum_queue, grow, [TargetClusterSize_5, <<"/">>, <<".*">>, all]), + ?assert(lists:all( + fun({_, Err}) -> Err =:= {error, TargetClusterSize_2, {error, non_voters_found}} end, Result7)), + assert_grown_queues(QQs, Server0, TargetClusterSize_2, MsgCount). assert_grown_queues(Qs, Node, TargetClusterSize, MsgCount) -> [begin diff --git a/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/grow_command.ex b/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/grow_command.ex index 8df0b54a6f0e..d6a9913881ed 100644 --- a/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/grow_command.ex +++ b/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/grow_command.ex @@ -44,6 +44,11 @@ defmodule RabbitMQ.CLI.Queues.Commands.GrowCommand do {:validation_failure, "target quorum cluster size '#{n}' must be greater than 0."} end + def validate([n, _], %{membership: m}) + when (is_integer(n) and not (m == "voter" or m == "promotable")) do + {:validation_failure, "voter status '#{m}' must be 'voter' or 'promotable' to grow to target quorum cluster size '#{n}'."} + end + def validate(_, %{membership: m}) when not (m == "promotable" or m == "non_voter" or diff --git a/deps/rabbitmq_cli/test/queues/grow_command_test.exs b/deps/rabbitmq_cli/test/queues/grow_command_test.exs index 0b28f3957eef..70467d0eaddf 100644 --- a/deps/rabbitmq_cli/test/queues/grow_command_test.exs +++ b/deps/rabbitmq_cli/test/queues/grow_command_test.exs @@ -82,10 +82,14 @@ defmodule RabbitMQ.CLI.Queues.Commands.GrowCommandTest do {:validation_failure, "voter status 'banana' is not recognised."} end - test "validate: when target quorum cluster size greater than zero, returns a success" do + test "validate: when target quorum cluster size greater than zero and membership is voter, returns a success" do assert @command.validate([7, "all"], %{membership: "voter", queue_pattern: "qq.*"}) == :ok end + test "validate: when target quorum cluster size greater than zero and membership is promotable, returns a success" do + assert @command.validate([5, "all"], %{membership: "promotable", queue_pattern: "qq.*"}) == :ok + end + test "validate: when target quorum cluster size is zero, returns failure" do assert @command.validate([0, "all"], %{membership: "voter", queue_pattern: "qq.*"}) == {:validation_failure, "target quorum cluster size '0' must be greater than 0."} @@ -96,6 +100,11 @@ defmodule RabbitMQ.CLI.Queues.Commands.GrowCommandTest do {:validation_failure, "target quorum cluster size '-1' must be greater than 0."} end + test "validate: when target quorum cluster size is provided and membership is not voter, returns failure" do + assert @command.validate([5, "all"], %{membership: "non_voter", queue_pattern: "qq.*"}) == + {:validation_failure, "voter status 'non_voter' must be 'voter' or 'promotable' to grow to target quorum cluster size '5'."} + end + @tag test_timeout: 3000 test "run: targeting an unreachable node throws a badrpc when growing to a target node", context do assert match?( From 50b3e9cee27367dee80f85dcc0cec80fa988ddc4 Mon Sep 17 00:00:00 2001 From: Ayanda Dube Date: Tue, 27 May 2025 18:03:05 +0100 Subject: [PATCH 07/13] Update QQ grow validate_execution_environment step to only check existing cluster membership when growing to a node. --- .../lib/rabbitmq/cli/queues/commands/grow_command.ex | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/grow_command.ex b/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/grow_command.ex index d6a9913881ed..d7bf472092b4 100644 --- a/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/grow_command.ex +++ b/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/grow_command.ex @@ -64,7 +64,13 @@ defmodule RabbitMQ.CLI.Queues.Commands.GrowCommand do Validators.chain( [ &Validators.rabbit_is_running/2, - &Validators.existing_cluster_member/2 + fn args = [n, _], opts -> + if is_integer(n) do + :ok + else + Validators.existing_cluster_member(args, opts) + end + end ], [args, opts] ) From 2c813ad5a176ce9de27686522a3b5e7cdae18aab Mon Sep 17 00:00:00 2001 From: Ayanda Dube Date: Thu, 29 May 2025 16:34:28 +0100 Subject: [PATCH 08/13] Use Integer.parse/1 to mediate and parse target-cluster-size argument in QQ grow-to-N command and update banner --- .../cli/queues/commands/grow_command.ex | 50 +++++++++++++------ .../test/queues/grow_command_test.exs | 12 ++--- 2 files changed, 41 insertions(+), 21 deletions(-) diff --git a/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/grow_command.ex b/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/grow_command.ex index d7bf472092b4..4fe82833f2d8 100644 --- a/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/grow_command.ex +++ b/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/grow_command.ex @@ -33,30 +33,40 @@ defmodule RabbitMQ.CLI.Queues.Commands.GrowCommand do {:validation_failure, :too_many_args} end - def validate([_, s], _) + def validate(args = [n, s], opts) do + case Integer.parse(n) do + {cluster_size, _} when is_integer(cluster_size) -> + do_validate([cluster_size, s], opts) + + :error -> + do_validate(args, opts) + end + end + + def do_validate([_, s], _) when not (s == "all" or s == "even") do {:validation_failure, "strategy '#{s}' is not recognised."} end - def validate([n, _], _) + def do_validate([n, _], _) when (is_integer(n) and n <= 0) do {:validation_failure, "target quorum cluster size '#{n}' must be greater than 0."} end - def validate([n, _], %{membership: m}) + def do_validate([n, _], %{membership: m}) when (is_integer(n) and not (m == "voter" or m == "promotable")) do {:validation_failure, "voter status '#{m}' must be 'voter' or 'promotable' to grow to target quorum cluster size '#{n}'."} end - def validate(_, %{membership: m}) + def do_validate(_, %{membership: m}) when not (m == "promotable" or m == "non_voter" or m == "voter") do {:validation_failure, "voter status '#{m}' is not recognised."} end - def validate(_, _) do + def do_validate(_, _) do :ok end @@ -65,10 +75,12 @@ defmodule RabbitMQ.CLI.Queues.Commands.GrowCommand do [ &Validators.rabbit_is_running/2, fn args = [n, _], opts -> - if is_integer(n) do - :ok - else - Validators.existing_cluster_member(args, opts) + case Integer.parse(n) do + {cluster_size, _} when is_integer(cluster_size) -> + :ok + + :error -> + Validators.existing_cluster_member(args, opts) end end ], @@ -85,10 +97,12 @@ defmodule RabbitMQ.CLI.Queues.Commands.GrowCommand do }) do node_or_quorum_cluster_size = - if is_integer(node_or_quorum_cluster_size) do - node_or_quorum_cluster_size - else - to_atom(node_or_quorum_cluster_size) + case Integer.parse(node_or_quorum_cluster_size) do + {cluster_size, _} when is_integer(cluster_size) -> + cluster_size + + :error -> + to_atom(node_or_quorum_cluster_size) end args = [node_or_quorum_cluster_size, vhost_pat, queue_pat, to_atom(strategy)] @@ -160,8 +174,14 @@ defmodule RabbitMQ.CLI.Queues.Commands.GrowCommand do do: "Grows quorum queue clusters by adding a member (replica) on the specified node for all matching queues" - def banner([node, strategy], _) do - "Growing #{strategy} quorum queues on #{node}..." + def banner([node_or_quorum_cluster_size, strategy], %{queue_pattern: queue_pattern}) do + case Integer.parse(node_or_quorum_cluster_size) do + {cluster_size, _} when is_integer(cluster_size) -> + "Growing #{strategy} quorum queues matching '#{queue_pattern}' to a target cluster size of '#{cluster_size}'..." + + :error -> + "Growing #{strategy} quorum queues matching '#{queue_pattern}' to #{node_or_quorum_cluster_size}..." + end end # diff --git a/deps/rabbitmq_cli/test/queues/grow_command_test.exs b/deps/rabbitmq_cli/test/queues/grow_command_test.exs index 70467d0eaddf..b8903dfc90c2 100644 --- a/deps/rabbitmq_cli/test/queues/grow_command_test.exs +++ b/deps/rabbitmq_cli/test/queues/grow_command_test.exs @@ -83,25 +83,25 @@ defmodule RabbitMQ.CLI.Queues.Commands.GrowCommandTest do end test "validate: when target quorum cluster size greater than zero and membership is voter, returns a success" do - assert @command.validate([7, "all"], %{membership: "voter", queue_pattern: "qq.*"}) == :ok + assert @command.validate(["7", "all"], %{membership: "voter", queue_pattern: "qq.*"}) == :ok end test "validate: when target quorum cluster size greater than zero and membership is promotable, returns a success" do - assert @command.validate([5, "all"], %{membership: "promotable", queue_pattern: "qq.*"}) == :ok + assert @command.validate(["5", "all"], %{membership: "promotable", queue_pattern: "qq.*"}) == :ok end test "validate: when target quorum cluster size is zero, returns failure" do - assert @command.validate([0, "all"], %{membership: "voter", queue_pattern: "qq.*"}) == + assert @command.validate(["0", "all"], %{membership: "voter", queue_pattern: "qq.*"}) == {:validation_failure, "target quorum cluster size '0' must be greater than 0."} end test "validate: when target quorum cluster size is less than zero, returns failure" do - assert @command.validate([-1, "all"], %{membership: "voter", queue_pattern: "qq.*"}) == + assert @command.validate(["-1", "all"], %{membership: "voter", queue_pattern: "qq.*"}) == {:validation_failure, "target quorum cluster size '-1' must be greater than 0."} end test "validate: when target quorum cluster size is provided and membership is not voter, returns failure" do - assert @command.validate([5, "all"], %{membership: "non_voter", queue_pattern: "qq.*"}) == + assert @command.validate(["5", "all"], %{membership: "non_voter", queue_pattern: "qq.*"}) == {:validation_failure, "voter status 'non_voter' must be 'voter' or 'promotable' to grow to target quorum cluster size '5'."} end @@ -121,7 +121,7 @@ defmodule RabbitMQ.CLI.Queues.Commands.GrowCommandTest do assert match?( {:badrpc, _}, @command.run( - [5, "all"], + ["5", "all"], Map.merge(context[:opts], %{node: :jake@thedog, queue_pattern: "qq.*"}) ) ) From 39776b9444c5addbcc9e8aa033a3bd5a1aee1533 Mon Sep 17 00:00:00 2001 From: Ayanda Dube Date: Fri, 18 Jul 2025 14:29:07 +0100 Subject: [PATCH 09/13] Revert "Extend QQ grow command to support target quorum cluster size" This reverts commit 15ce0b51af294b15f183da57ea2e040362bccbe7. --- .../cli/queues/commands/grow_command.ex | 29 ++++--------------- 1 file changed, 5 insertions(+), 24 deletions(-) diff --git a/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/grow_command.ex b/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/grow_command.ex index 4fe82833f2d8..87a1370882a9 100644 --- a/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/grow_command.ex +++ b/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/grow_command.ex @@ -49,17 +49,7 @@ defmodule RabbitMQ.CLI.Queues.Commands.GrowCommand do {:validation_failure, "strategy '#{s}' is not recognised."} end - def do_validate([n, _], _) - when (is_integer(n) and n <= 0) do - {:validation_failure, "target quorum cluster size '#{n}' must be greater than 0."} - end - - def do_validate([n, _], %{membership: m}) - when (is_integer(n) and not (m == "voter" or m == "promotable")) do - {:validation_failure, "voter status '#{m}' must be 'voter' or 'promotable' to grow to target quorum cluster size '#{n}'."} - end - - def do_validate(_, %{membership: m}) + def validate(_, %{membership: m}) when not (m == "promotable" or m == "non_voter" or m == "voter") do @@ -88,7 +78,7 @@ defmodule RabbitMQ.CLI.Queues.Commands.GrowCommand do ) end - def run([node_or_quorum_cluster_size, strategy], %{ + def run([node, strategy], %{ node: node_name, vhost_pattern: vhost_pat, queue_pattern: queue_pat, @@ -96,16 +86,7 @@ defmodule RabbitMQ.CLI.Queues.Commands.GrowCommand do errors_only: errors_only }) do - node_or_quorum_cluster_size = - case Integer.parse(node_or_quorum_cluster_size) do - {cluster_size, _} when is_integer(cluster_size) -> - cluster_size - - :error -> - to_atom(node_or_quorum_cluster_size) - end - - args = [node_or_quorum_cluster_size, vhost_pat, queue_pat, to_atom(strategy)] + args = [to_atom(node), vhost_pat, queue_pat, to_atom(strategy)] args = case to_atom(membership) do @@ -146,11 +127,11 @@ defmodule RabbitMQ.CLI.Queues.Commands.GrowCommand do def usage, do: - "grow [--vhost-pattern ] [--queue-pattern ] [--membership ]" + "grow [--vhost-pattern ] [--queue-pattern ] [--membership ]" def usage_additional do [ - ["", "node name to place replicas on or desired quorum cluster size"], + ["", "node name to place replicas on"], [ "", "add a member for all matching queues or just those whose membership count is an even number" From 3378ff0cf4ddf632dadbc53ff2f72ff17fcd0114 Mon Sep 17 00:00:00 2001 From: Ayanda Dube Date: Fri, 18 Jul 2025 17:42:39 +0100 Subject: [PATCH 10/13] Revert QQ grow-to-N in original grow command --- .../cli/queues/commands/grow_command.ex | 35 +++---------------- .../test/queues/grow_command_test.exs | 34 ------------------ 2 files changed, 5 insertions(+), 64 deletions(-) diff --git a/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/grow_command.ex b/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/grow_command.ex index 87a1370882a9..f1ada3a383bb 100644 --- a/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/grow_command.ex +++ b/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/grow_command.ex @@ -33,17 +33,7 @@ defmodule RabbitMQ.CLI.Queues.Commands.GrowCommand do {:validation_failure, :too_many_args} end - def validate(args = [n, s], opts) do - case Integer.parse(n) do - {cluster_size, _} when is_integer(cluster_size) -> - do_validate([cluster_size, s], opts) - - :error -> - do_validate(args, opts) - end - end - - def do_validate([_, s], _) + def validate([_, s], _) when not (s == "all" or s == "even") do {:validation_failure, "strategy '#{s}' is not recognised."} @@ -56,7 +46,7 @@ defmodule RabbitMQ.CLI.Queues.Commands.GrowCommand do {:validation_failure, "voter status '#{m}' is not recognised."} end - def do_validate(_, _) do + def validate(_, _) do :ok end @@ -64,15 +54,7 @@ defmodule RabbitMQ.CLI.Queues.Commands.GrowCommand do Validators.chain( [ &Validators.rabbit_is_running/2, - fn args = [n, _], opts -> - case Integer.parse(n) do - {cluster_size, _} when is_integer(cluster_size) -> - :ok - - :error -> - Validators.existing_cluster_member(args, opts) - end - end + &Validators.existing_cluster_member/2 ], [args, opts] ) @@ -85,7 +67,6 @@ defmodule RabbitMQ.CLI.Queues.Commands.GrowCommand do membership: membership, errors_only: errors_only }) do - args = [to_atom(node), vhost_pat, queue_pat, to_atom(strategy)] args = @@ -155,14 +136,8 @@ defmodule RabbitMQ.CLI.Queues.Commands.GrowCommand do do: "Grows quorum queue clusters by adding a member (replica) on the specified node for all matching queues" - def banner([node_or_quorum_cluster_size, strategy], %{queue_pattern: queue_pattern}) do - case Integer.parse(node_or_quorum_cluster_size) do - {cluster_size, _} when is_integer(cluster_size) -> - "Growing #{strategy} quorum queues matching '#{queue_pattern}' to a target cluster size of '#{cluster_size}'..." - - :error -> - "Growing #{strategy} quorum queues matching '#{queue_pattern}' to #{node_or_quorum_cluster_size}..." - end + def banner([node, strategy], _) do + "Growing #{strategy} quorum queues on #{node}..." end # diff --git a/deps/rabbitmq_cli/test/queues/grow_command_test.exs b/deps/rabbitmq_cli/test/queues/grow_command_test.exs index b8903dfc90c2..b4b8ada8acb7 100644 --- a/deps/rabbitmq_cli/test/queues/grow_command_test.exs +++ b/deps/rabbitmq_cli/test/queues/grow_command_test.exs @@ -82,29 +82,6 @@ defmodule RabbitMQ.CLI.Queues.Commands.GrowCommandTest do {:validation_failure, "voter status 'banana' is not recognised."} end - test "validate: when target quorum cluster size greater than zero and membership is voter, returns a success" do - assert @command.validate(["7", "all"], %{membership: "voter", queue_pattern: "qq.*"}) == :ok - end - - test "validate: when target quorum cluster size greater than zero and membership is promotable, returns a success" do - assert @command.validate(["5", "all"], %{membership: "promotable", queue_pattern: "qq.*"}) == :ok - end - - test "validate: when target quorum cluster size is zero, returns failure" do - assert @command.validate(["0", "all"], %{membership: "voter", queue_pattern: "qq.*"}) == - {:validation_failure, "target quorum cluster size '0' must be greater than 0."} - end - - test "validate: when target quorum cluster size is less than zero, returns failure" do - assert @command.validate(["-1", "all"], %{membership: "voter", queue_pattern: "qq.*"}) == - {:validation_failure, "target quorum cluster size '-1' must be greater than 0."} - end - - test "validate: when target quorum cluster size is provided and membership is not voter, returns failure" do - assert @command.validate(["5", "all"], %{membership: "non_voter", queue_pattern: "qq.*"}) == - {:validation_failure, "voter status 'non_voter' must be 'voter' or 'promotable' to grow to target quorum cluster size '5'."} - end - @tag test_timeout: 3000 test "run: targeting an unreachable node throws a badrpc when growing to a target node", context do assert match?( @@ -115,15 +92,4 @@ defmodule RabbitMQ.CLI.Queues.Commands.GrowCommandTest do ) ) end - - @tag test_timeout: 3000 - test "run: targeting an unreachable node throws a badrpc when growing to a target quorum cluster size", context do - assert match?( - {:badrpc, _}, - @command.run( - ["5", "all"], - Map.merge(context[:opts], %{node: :jake@thedog, queue_pattern: "qq.*"}) - ) - ) - end end From 03dffc634426cb87a812d9bba4a48388e4f888e8 Mon Sep 17 00:00:00 2001 From: Ayanda Dube Date: Fri, 18 Jul 2025 18:10:34 +0100 Subject: [PATCH 11/13] use a separate rabbitmq-queues grow_to_count command --- .../queues/commands/grow_to_count_command.ex | 176 ++++++++++++++++++ .../queues/grow_to_count_command_test.exs | 109 +++++++++++ 2 files changed, 285 insertions(+) create mode 100644 deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/grow_to_count_command.ex create mode 100644 deps/rabbitmq_cli/test/queues/grow_to_count_command_test.exs diff --git a/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/grow_to_count_command.ex b/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/grow_to_count_command.ex new file mode 100644 index 000000000000..3e1b9054d539 --- /dev/null +++ b/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/grow_to_count_command.ex @@ -0,0 +1,176 @@ +## This Source Code Form is subject to the terms of the Mozilla Public +## License, v. 2.0. If a copy of the MPL was not distributed with this +## file, You can obtain one at https://mozilla.org/MPL/2.0/. +## +## Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. + +defmodule RabbitMQ.CLI.Queues.Commands.GrowToCountCommand do + alias RabbitMQ.CLI.Core.{DocGuide, Validators} + import RabbitMQ.CLI.Core.DataCoercion + + @behaviour RabbitMQ.CLI.CommandBehaviour + + defp default_opts, + do: %{vhost_pattern: ".*", queue_pattern: ".*", membership: "promotable", errors_only: false} + + def switches(), + do: [ + vhost_pattern: :string, + queue_pattern: :string, + membership: :string, + errors_only: :boolean + ] + + def merge_defaults(args, opts) do + {args, Map.merge(default_opts(), opts)} + end + + def validate(args, _) when length(args) < 2 do + {:validation_failure, :not_enough_args} + end + + def validate(args, _) when length(args) > 2 do + {:validation_failure, :too_many_args} + end + + def validate([_, s], _) + when not (s == "all" or + s == "even") do + {:validation_failure, "strategy '#{s}' is not recognised."} + end + + def validate([n, _], _) + when (is_integer(n) and n <= 0) do + {:validation_failure, "node count '#{n}' must be greater than 0."} + end + + def validate(_, %{membership: m}) + when not (m == "promotable" or + m == "non_voter" or + m == "voter") do + {:validation_failure, "voter status '#{m}' is not recognised."} + end + + def validate(_, _) do + :ok + end + + def validate_execution_environment(args, opts) do + Validators.chain( + [ + &Validators.rabbit_is_running/2 + ], + [args, opts] + ) + end + + def run([node_count, strategy], %{ + node: node_name, + vhost_pattern: vhost_pat, + queue_pattern: queue_pat, + membership: membership, + errors_only: errors_only + }) when is_integer(node_count) do + + args = [node_count, vhost_pat, queue_pat, to_atom(strategy)] + + args = + case to_atom(membership) do + :promotable -> args + other -> args ++ [other] + end + + case :rabbit_misc.rpc_call(node_name, :rabbit_quorum_queue, :grow, args) do + {:error, _} = error -> + error + + {:badrpc, _} = error -> + error + + results when errors_only -> + for {{:resource, vhost, _kind, name}, {:error, _, _} = res} <- results, + do: [ + {:vhost, vhost}, + {:name, name}, + {:size, format_size(res)}, + {:result, format_result(res)} + ] + + results -> + for {{:resource, vhost, _kind, name}, res} <- results, + do: [ + {:vhost, vhost}, + {:name, name}, + {:size, format_size(res)}, + {:result, format_result(res)} + ] + end + end + + use RabbitMQ.CLI.DefaultOutput + + def formatter(), do: RabbitMQ.CLI.Formatters.Table + + def usage, + do: + "grow_to_count [--vhost-pattern ] [--queue-pattern ] [--membership ]" + + def usage_additional do + [ + ["", "number of nodes to place replicas on"], + [ + "", + "add a member for all matching queues or just those whose membership count is an even number" + ], + ["--queue-pattern ", "regular expression to match queue names"], + ["--vhost-pattern ", "regular expression to match virtual host names"], + ["--membership ", "add a promotable non-voter (default) or full voter"], + ["--errors-only", "only list queues which reported an error"] + ] + end + + def usage_doc_guides() do + [ + DocGuide.quorum_queues() + ] + end + + def help_section, do: :cluster_management + + def description, + do: + "Grows quorum queue clusters by adding member replicas on the specified number of nodes for all matching queues" + + def banner([node_count, strategy], _) do + "Growing #{strategy} quorum queues on #{node_count} nodes..." + end + + # + # Implementation + # + + defp format_size({:ok, size}) do + size + end + + defp format_size({:error, _size, :timeout}) do + # the actual size is uncertain here + "?" + end + + defp format_size({:error, size, _}) do + size + end + + defp format_result({:ok, _size}) do + "ok" + end + + defp format_result({:error, _size, :timeout}) do + "error: the operation timed out and may not have been completed" + end + + defp format_result({:error, _size, err}) do + to_string(:io_lib.format("error: ~W", [err, 10])) + end +end diff --git a/deps/rabbitmq_cli/test/queues/grow_to_count_command_test.exs b/deps/rabbitmq_cli/test/queues/grow_to_count_command_test.exs new file mode 100644 index 000000000000..6d866d2c5e8f --- /dev/null +++ b/deps/rabbitmq_cli/test/queues/grow_to_count_command_test.exs @@ -0,0 +1,109 @@ +## This Source Code Form is subject to the terms of the Mozilla Public +## License, v. 2.0. If a copy of the MPL was not distributed with this +## file, You can obtain one at https://mozilla.org/MPL/2.0/. +## +## Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. + +defmodule RabbitMQ.CLI.Queues.Commands.GrowToCountCommandTest do + use ExUnit.Case, async: false + import TestHelper + + @command RabbitMQ.CLI.Queues.Commands.GrowToCountCommand + + setup_all do + RabbitMQ.CLI.Core.Distribution.start() + + :ok + end + + setup context do + {:ok, + opts: %{ + node: get_rabbit_hostname(), + timeout: context[:test_timeout] || 30000, + vhost_pattern: ".*", + queue_pattern: ".*", + membership: "promotable", + errors_only: false + }} + end + + test "merge_defaults: defaults to reporting complete results" do + assert @command.merge_defaults([], %{}) == + {[], + %{ + vhost_pattern: ".*", + queue_pattern: ".*", + errors_only: false, + membership: "promotable" + }} + end + + test "validate: when no arguments are provided, returns a failure" do + assert @command.validate([], %{}) == {:validation_failure, :not_enough_args} + end + + test "validate: when one argument is provided, returns a failure" do + assert @command.validate([5], %{}) == {:validation_failure, :not_enough_args} + end + + test "validate: when node count and even are provided, returns a success" do + assert @command.validate([7, "even"], %{}) == :ok + end + + test "validate: when node count and all are provided, returns a success" do + assert @command.validate([5, "all"], %{}) == :ok + end + + test "validate: when node count and something else is provided, returns a failure" do + assert @command.validate([7, "banana"], %{}) == + {:validation_failure, "strategy 'banana' is not recognised."} + end + + test "validate: when three arguments are provided, returns a failure" do + assert @command.validate([7, "extra-arg", "another-extra-arg"], %{}) == + {:validation_failure, :too_many_args} + end + + test "validate: when membership promotable is provided, returns a success" do + assert @command.validate([5, "all"], %{membership: "promotable"}) == :ok + end + + test "validate: when membership voter is provided, returns a success" do + assert @command.validate([7, "all"], %{membership: "voter"}) == :ok + end + + test "validate: when membership non_voter is provided, returns a success" do + assert @command.validate([5, "all"], %{membership: "non_voter"}) == :ok + end + + test "validate: when wrong membership is provided, returns failure" do + assert @command.validate(["quorum-queue-a", "all"], %{membership: "banana"}) == + {:validation_failure, "voter status 'banana' is not recognised."} + end + + test "validate: when node count greater than zero, returns a success" do + assert @command.validate([7, "all"], %{membership: "voter"}) == :ok + end + + test "validate: when node count is zero, returns failure" do + assert @command.validate([0, "all"], %{membership: "voter"}) == + {:validation_failure, "node count '0' must be greater than 0."} + end + + test "validate: when node count is less than zero, returns failure" do + assert @command.validate([-1, "all"], %{membership: "voter"}) == + {:validation_failure, "node count '-1' must be greater than 0."} + end + + @tag test_timeout: 3000 + test "run: targeting an unreachable node throws a badrpc when growing to a node count", context do + assert match?( + {:badrpc, _}, + @command.run( + [5, "all"], + Map.merge(context[:opts], %{node: :jake@thedog}) + ) + ) + end +end From 359b7e7feee815a6a37946398e8f0566f29a0e23 Mon Sep 17 00:00:00 2001 From: Ayanda Dube Date: Fri, 18 Jul 2025 18:31:12 +0100 Subject: [PATCH 12/13] single validator in use and ensure test uses node count arg --- .../rabbitmq/cli/queues/commands/grow_to_count_command.ex | 7 +------ .../test/queues/grow_to_count_command_test.exs | 2 +- 2 files changed, 2 insertions(+), 7 deletions(-) diff --git a/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/grow_to_count_command.ex b/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/grow_to_count_command.ex index 3e1b9054d539..be78119e5e6f 100644 --- a/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/grow_to_count_command.ex +++ b/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/grow_to_count_command.ex @@ -56,12 +56,7 @@ defmodule RabbitMQ.CLI.Queues.Commands.GrowToCountCommand do end def validate_execution_environment(args, opts) do - Validators.chain( - [ - &Validators.rabbit_is_running/2 - ], - [args, opts] - ) + Validators.rabbit_is_running(args, opts) end def run([node_count, strategy], %{ diff --git a/deps/rabbitmq_cli/test/queues/grow_to_count_command_test.exs b/deps/rabbitmq_cli/test/queues/grow_to_count_command_test.exs index 6d866d2c5e8f..861530f9a3c8 100644 --- a/deps/rabbitmq_cli/test/queues/grow_to_count_command_test.exs +++ b/deps/rabbitmq_cli/test/queues/grow_to_count_command_test.exs @@ -78,7 +78,7 @@ defmodule RabbitMQ.CLI.Queues.Commands.GrowToCountCommandTest do end test "validate: when wrong membership is provided, returns failure" do - assert @command.validate(["quorum-queue-a", "all"], %{membership: "banana"}) == + assert @command.validate([7, "all"], %{membership: "banana"}) == {:validation_failure, "voter status 'banana' is not recognised."} end From 936da119b492f9eb1eac01c4a8a212246de45d32 Mon Sep 17 00:00:00 2001 From: Ayanda Dube Date: Fri, 10 Oct 2025 11:28:08 +0100 Subject: [PATCH 13/13] ensure grow_to_count node-count argument is an integer --- .../lib/rabbitmq/cli/queues/commands/grow_to_count_command.ex | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/grow_to_count_command.ex b/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/grow_to_count_command.ex index be78119e5e6f..256356bce8f7 100644 --- a/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/grow_to_count_command.ex +++ b/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/grow_to_count_command.ex @@ -21,7 +21,8 @@ defmodule RabbitMQ.CLI.Queues.Commands.GrowToCountCommand do errors_only: :boolean ] - def merge_defaults(args, opts) do + def merge_defaults([node_count | rem], opts) do + args = [String.to_integer(node_count) | rem] {args, Map.merge(default_opts(), opts)} end