diff --git a/deps/rabbit/src/amqqueue.erl b/deps/rabbit/src/amqqueue.erl index 2d416582ceb6..c054051c461a 100644 --- a/deps/rabbit/src/amqqueue.erl +++ b/deps/rabbit/src/amqqueue.erl @@ -66,10 +66,12 @@ pattern_match_on_type/1, pattern_match_on_durable/1, pattern_match_on_type_and_durable/2, + pattern_match_on_type_and_vhost/2, reset_decorators/1, set_immutable/1, qnode/1, to_printable/1, + to_printable/2, macros/0]). -define(record_version, amqqueue_v2). @@ -531,6 +533,12 @@ pattern_match_on_durable(IsDurable) -> pattern_match_on_type_and_durable(Type, IsDurable) -> #amqqueue{type = Type, durable = IsDurable, _ = '_'}. +-spec pattern_match_on_type_and_vhost(atom(), binary()) -> + amqqueue_pattern(). + +pattern_match_on_type_and_vhost(Type, VHost) -> + #amqqueue{type = Type, vhost = VHost, _ = '_'}. + -spec reset_decorators(amqqueue()) -> amqqueue(). reset_decorators(#amqqueue{} = Queue) -> @@ -564,6 +572,14 @@ to_printable(#amqqueue{name = QName = #resource{name = Name}, <<"virtual_host">> => VHost, <<"type">> => Type}. +-spec to_printable(rabbit_types:r(queue), atom() | binary()) -> #{binary() => any()}. +to_printable(QName = #resource{name = Name, virtual_host = VHost}, Type) -> + _ = rabbit_queue_type:discover(Type), + #{<<"readable_name">> => rabbit_data_coercion:to_binary(rabbit_misc:rs(QName)), + <<"name">> => Name, + <<"virtual_host">> => VHost, + <<"type">> => Type}. + % private macros() -> diff --git a/deps/rabbit/src/rabbit_db_queue.erl b/deps/rabbit/src/rabbit_db_queue.erl index 1c7254e418ad..18590879ae0b 100644 --- a/deps/rabbit/src/rabbit_db_queue.erl +++ b/deps/rabbit/src/rabbit_db_queue.erl @@ -21,6 +21,7 @@ get_all/0, get_all/1, get_all_by_type/1, + get_all_by_type_and_vhost/2, get_all_by_type_and_node/3, list/0, count/0, @@ -829,6 +830,28 @@ get_all_by_type(Type) -> khepri => fun() -> get_all_by_pattern_in_khepri(Pattern) end }). +%% ------------------------------------------------------------------- +%% get_all_by_type_and_vhost(). +%% ------------------------------------------------------------------- + +-spec get_all_by_type_and_vhost(Type, VHost) -> [Queue] when + Type :: atom(), + VHost :: binary(), + Queue :: amqqueue:amqqueue(). + +%% @doc Gets all queues belonging to the given type and vhost +%% +%% @returns a list of queue records. +%% +%% @private + +get_all_by_type_and_vhost(Type, VHost) -> + Pattern = amqqueue:pattern_match_on_type_and_vhost(Type, VHost), + rabbit_khepri:handle_fallback( + #{mnesia => fun() -> get_all_by_pattern_in_mnesia(Pattern) end, + khepri => fun() -> get_all_by_pattern_in_khepri(Pattern) end + }). + get_all_by_pattern_in_mnesia(Pattern) -> rabbit_db:list_in_mnesia(?MNESIA_TABLE, Pattern). diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index c9fb877b38dc..69dc09b97c19 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -82,6 +82,9 @@ file_handle_other_reservation/0, file_handle_release_reservation/0]). +-export([leader_health_check/2, + run_leader_health_check/4]). + -ifdef(TEST). -export([filter_promotable/2, ra_machine_config/1]). @@ -144,6 +147,8 @@ -define(SNAPSHOT_INTERVAL, 8192). %% the ra default is 4096 % -define(UNLIMITED_PREFETCH_COUNT, 2000). %% something large for ra -define(MIN_CHECKPOINT_INTERVAL, 8192). %% the ra default is 16384 +-define(LEADER_HEALTH_CHECK_TIMEOUT, 5_000). +-define(GLOBAL_LEADER_HEALTH_CHECK_TIMEOUT, 60_000). %%----------- QQ policies --------------------------------------------------- @@ -2145,3 +2150,75 @@ file_handle_other_reservation() -> file_handle_release_reservation() -> ok. +leader_health_check(QueueNameOrRegEx, VHost) -> + %% Set a process limit threshold to 20% of ErlangVM process limit, beyond which + %% we cannot spawn any new processes for executing QQ leader health checks. + ProcessLimitThreshold = round(0.2 * erlang:system_info(process_limit)), + + leader_health_check(QueueNameOrRegEx, VHost, ProcessLimitThreshold). + +leader_health_check(QueueNameOrRegEx, VHost, ProcessLimitThreshold) -> + Qs = + case VHost of + across_all_vhosts -> + rabbit_db_queue:get_all_by_type(?MODULE); + VHost when is_binary(VHost) -> + rabbit_db_queue:get_all_by_type_and_vhost(?MODULE, VHost) + end, + check_process_limit_safety(length(Qs), ProcessLimitThreshold), + ParentPID = self(), + HealthCheckRef = make_ref(), + HealthCheckPids = + lists:flatten( + [begin + {resource, _VHostN, queue, QueueName} = QResource = amqqueue:get_name(Q), + case re:run(QueueName, QueueNameOrRegEx, [{capture, none}]) of + match -> + {ClusterName, _} = rabbit_amqqueue:pid_of(Q), + _Pid = spawn(fun() -> run_leader_health_check(ClusterName, QResource, HealthCheckRef, ParentPID) end); + _ -> + [] + end + end || Q <- Qs, amqqueue:get_type(Q) == ?MODULE]), + Result = wait_for_leader_health_checks(HealthCheckRef, length(HealthCheckPids), []), + _ = spawn(fun() -> maybe_log_leader_health_check_result(Result) end), + Result. + +run_leader_health_check(ClusterName, QResource, HealthCheckRef, From) -> + Leader = ra_leaderboard:lookup_leader(ClusterName), + + %% Ignoring result here is required to clear a diayzer warning. + _ = + case ra_server_proc:ping(Leader, ?LEADER_HEALTH_CHECK_TIMEOUT) of + {pong,leader} -> + From ! {ok, HealthCheckRef, QResource}; + _ -> + From ! {error, HealthCheckRef, QResource} + end, + ok. + +wait_for_leader_health_checks(_Ref, 0, UnhealthyAcc) -> UnhealthyAcc; +wait_for_leader_health_checks(Ref, N, UnhealthyAcc) -> + receive + {ok, Ref, _QResource} -> + wait_for_leader_health_checks(Ref, N - 1, UnhealthyAcc); + {error, Ref, QResource} -> + wait_for_leader_health_checks(Ref, N - 1, [amqqueue:to_printable(QResource, ?MODULE) | UnhealthyAcc]) + after + ?GLOBAL_LEADER_HEALTH_CHECK_TIMEOUT -> + UnhealthyAcc + end. + +check_process_limit_safety(QCount, ProcessLimitThreshold) -> + case (erlang:system_info(process_count) + QCount) >= ProcessLimitThreshold of + true -> + rabbit_log:warning("Leader health check not permitted, process limit threshold will be exceeded."), + throw({error, leader_health_check_process_limit_exceeded}); + false -> + ok + end. + +maybe_log_leader_health_check_result([]) -> ok; +maybe_log_leader_health_check_result(Result) -> + Qs = lists:map(fun(R) -> catch maps:get(<<"readable_name">>, R) end, Result), + rabbit_log:warning("Leader health check result (unhealthy leaders detected): ~tp", [Qs]). diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl index fdb0a8c5dd8a..6a3167bdcc51 100644 --- a/deps/rabbit/test/quorum_queue_SUITE.erl +++ b/deps/rabbit/test/quorum_queue_SUITE.erl @@ -192,7 +192,8 @@ all_tests() -> priority_queue_2_1_ratio, requeue_multiple_true, requeue_multiple_false, - subscribe_from_each + subscribe_from_each, + leader_health_check ]. memory_tests() -> @@ -4145,6 +4146,129 @@ amqpl_headers(Config) -> ok = amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag, multiple = true}). +leader_health_check(Config) -> + VHost1 = <<"vhost1">>, + VHost2 = <<"vhost2">>, + + set_up_vhost(Config, VHost1), + set_up_vhost(Config, VHost2), + + %% check empty vhost + ?assertEqual([], + rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check, + [<<".*">>, VHost1])), + ?assertEqual([], + rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check, + [<<".*">>, across_all_vhosts])), + + Conn1 = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0, VHost1), + {ok, Ch1} = amqp_connection:open_channel(Conn1), + + Conn2 = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0, VHost2), + {ok, Ch2} = amqp_connection:open_channel(Conn2), + + Qs1 = [<<"Q.1">>, <<"Q.2">>, <<"Q.3">>], + Qs2 = [<<"Q.4">>, <<"Q.5">>, <<"Q.6">>], + + %% in vhost1 + [?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch1, Q, [{<<"x-queue-type">>, longstr, <<"quorum">>}])) + || Q <- Qs1], + + %% in vhost2 + [?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch2, Q, [{<<"x-queue-type">>, longstr, <<"quorum">>}])) + || Q <- Qs2], + + %% test sucessful health checks in vhost1, vhost2, across_all_vhosts + ?assertEqual([], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check, + [<<".*">>, VHost1])), + ?assertEqual([], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check, + [<<"Q.*">>, VHost1])), + [?assertEqual([], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check, + [Q, VHost1])) || Q <- Qs1], + + ?assertEqual([], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check, + [<<".*">>, VHost2])), + ?assertEqual([], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check, + [<<"Q.*">>, VHost2])), + [?assertEqual([], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check, + [Q, VHost2])) || Q <- Qs2], + + ?assertEqual([], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check, + [<<".*">>, across_all_vhosts])), + ?assertEqual([], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check, + [<<"Q.*">>, across_all_vhosts])), + + %% clear leaderboard + Qs = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, list, []), + + [{_Q1_ClusterName, _Q1Res}, + {_Q2_ClusterName, _Q2Res}, + {_Q3_ClusterName, _Q3Res}, + {_Q4_ClusterName, _Q4Res}, + {_Q5_ClusterName, _Q5Res}, + {_Q6_ClusterName, _Q6Res}] = QQ_Clusters = + lists:usort( + [begin + {ClusterName, _} = amqqueue:get_pid(Q), + {ClusterName, amqqueue:get_name(Q)} + end + || Q <- Qs, amqqueue:get_type(Q) == rabbit_quorum_queue]), + + [Q1Data, Q2Data, Q3Data, Q4Data, Q5Data, Q6Data] = QQ_Data = + [begin + rabbit_ct_broker_helpers:rpc(Config, 0, ra_leaderboard, clear, [Q_ClusterName]), + _QData = amqqueue:to_printable(Q_Res, rabbit_quorum_queue) + end + || {Q_ClusterName, Q_Res} <- QQ_Clusters], + + %% test failed health checks in vhost1, vhost2, across_all_vhosts + ?assertEqual([Q1Data], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check, + [<<"Q.1">>, VHost1])), + ?assertEqual([Q2Data], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check, + [<<"Q.2">>, VHost1])), + ?assertEqual([Q3Data], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check, + [<<"Q.3">>, VHost1])), + ?assertEqual([Q1Data, Q2Data, Q3Data], + lists:usort(rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check, + [<<".*">>, VHost1]))), + ?assertEqual([Q1Data, Q2Data, Q3Data], + lists:usort(rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check, + [<<"Q.*">>, VHost1]))), + + ?assertEqual([Q4Data], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check, + [<<"Q.4">>, VHost2])), + ?assertEqual([Q5Data], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check, + [<<"Q.5">>, VHost2])), + ?assertEqual([Q6Data], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check, + [<<"Q.6">>, VHost2])), + ?assertEqual([Q4Data, Q5Data, Q6Data], + lists:usort(rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check, + [<<".*">>, VHost2]))), + ?assertEqual([Q4Data, Q5Data, Q6Data], + lists:usort(rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check, + [<<"Q.*">>, VHost2]))), + + ?assertEqual(QQ_Data, + lists:usort(rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check, + [<<"Q.*">>, across_all_vhosts]))), + ?assertEqual(QQ_Data, + lists:usort(rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, leader_health_check, + [<<"Q.*">>, across_all_vhosts]))), + + %% cleanup + [?assertMatch(#'queue.delete_ok'{}, + amqp_channel:call(Ch1, #'queue.delete'{queue = Q})) + || Q <- Qs1], + [?assertMatch(#'queue.delete_ok'{}, + amqp_channel:call(Ch1, #'queue.delete'{queue = Q})) + || Q <- Qs2], + + amqp_connection:close(Conn1), + amqp_connection:close(Conn2). + + leader_locator_client_local(Config) -> [Server1 | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), Q = ?config(queue_name, Config), @@ -4465,6 +4589,11 @@ declare_passive(Ch, Q, Args) -> auto_delete = false, passive = true, arguments = Args}). + +set_up_vhost(Config, VHost) -> + rabbit_ct_broker_helpers:add_vhost(Config, VHost), + rabbit_ct_broker_helpers:set_full_permissions(Config, <<"guest">>, VHost). + assert_queue_type(Server, Q, Expected) -> assert_queue_type(Server, <<"/">>, Q, Expected). diff --git a/deps/rabbit/test/rabbit_db_queue_SUITE.erl b/deps/rabbit/test/rabbit_db_queue_SUITE.erl index e1db66a8bf5c..c80b1fcfba8f 100644 --- a/deps/rabbit/test/rabbit_db_queue_SUITE.erl +++ b/deps/rabbit/test/rabbit_db_queue_SUITE.erl @@ -35,6 +35,7 @@ all_tests() -> get_all, get_all_by_vhost, get_all_by_type, + get_all_by_type_and_vhost, get_all_by_type_and_node, list, count, @@ -198,6 +199,30 @@ get_all_by_type1(_Config) -> ?assertEqual([Q4], rabbit_db_queue:get_all_by_type(rabbit_stream_queue)), passed. +get_all_by_type_and_vhost(Config) -> + passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, get_all_by_type_and_vhost1, [Config]). + +get_all_by_type_and_vhost1(_Config) -> + VHost1 = <<"carrots">>, + VHost2 = <<"cabage">>, + QName = rabbit_misc:r(VHost1, queue, <<"test-queue">>), + QName2 = rabbit_misc:r(VHost2, queue, <<"test-queue2">>), + QName3 = rabbit_misc:r(VHost2, queue, <<"test-queue3">>), + QName4 = rabbit_misc:r(VHost1, queue, <<"test-queue4">>), + Q = new_queue(QName, rabbit_classic_queue), + Q2 = new_queue(QName2, rabbit_quorum_queue), + Q3 = new_queue(QName3, rabbit_quorum_queue), + Q4 = new_queue(QName4, rabbit_stream_queue), + Quorum = lists:sort([Q2, Q3]), + ?assertEqual([], rabbit_db_queue:get_all_by_type_and_vhost(rabbit_classic_queue, VHost1)), + ?assertEqual([], lists:sort(rabbit_db_queue:get_all_by_type_and_vhost(rabbit_quorum_queue, VHost2))), + ?assertEqual([], rabbit_db_queue:get_all_by_type_and_vhost(rabbit_stream_queue, VHost1)), + set_list([Q, Q2, Q3, Q4]), + ?assertEqual([Q], rabbit_db_queue:get_all_by_type_and_vhost(rabbit_classic_queue, VHost1)), + ?assertEqual(Quorum, lists:sort(rabbit_db_queue:get_all_by_type_and_vhost(rabbit_quorum_queue, VHost2))), + ?assertEqual([Q4], rabbit_db_queue:get_all_by_type_and_vhost(rabbit_stream_queue, VHost1)), + passed. + get_all_by_type_and_node(Config) -> passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, get_all_by_type_and_node1, [Config]). diff --git a/deps/rabbitmq_cli/lib/rabbitmq/cli/core/output.ex b/deps/rabbitmq_cli/lib/rabbitmq/cli/core/output.ex index 48c1283ed59b..58d9e611e32e 100644 --- a/deps/rabbitmq_cli/lib/rabbitmq/cli/core/output.ex +++ b/deps/rabbitmq_cli/lib/rabbitmq/cli/core/output.ex @@ -18,6 +18,10 @@ defmodule RabbitMQ.CLI.Core.Output do :ok end + def format_output({:ok, :check_passed, output}, formatter, options) do + {:ok, formatter.format_output(output, options)} + end + def format_output({:ok, output}, formatter, options) do {:ok, formatter.format_output(output, options)} end diff --git a/deps/rabbitmq_cli/lib/rabbitmq/cli/diagnostics/commands/check_for_quorum_queues_without_an_elected_leader_command.ex b/deps/rabbitmq_cli/lib/rabbitmq/cli/diagnostics/commands/check_for_quorum_queues_without_an_elected_leader_command.ex new file mode 100644 index 000000000000..0cf5dae2d57c --- /dev/null +++ b/deps/rabbitmq_cli/lib/rabbitmq/cli/diagnostics/commands/check_for_quorum_queues_without_an_elected_leader_command.ex @@ -0,0 +1,105 @@ +## This Source Code Form is subject to the terms of the Mozilla Public +## License, v. 2.0. If a copy of the MPL was not distributed with this +## file, You can obtain one at https://mozilla.org/MPL/2.0/. +## +## Copyright (c) 2007-2025 VMware, Inc. or its affiliates. All rights reserved. + +defmodule RabbitMQ.CLI.Diagnostics.Commands.CheckForQuorumQueuesWithoutAnElectedLeaderCommand do + alias RabbitMQ.CLI.Core.{Config, DocGuide} + + @behaviour RabbitMQ.CLI.CommandBehaviour + + import RabbitMQ.CLI.Core.Platform, only: [line_separator: 0] + + def switches(), do: [across_all_vhosts: :boolean] + + def scopes(), do: [:diagnostics] + + def merge_defaults(args, opts) do + {args, Map.merge(%{across_all_vhosts: false, vhost: "/"}, opts)} + end + + use RabbitMQ.CLI.Core.AcceptsOnePositionalArgument + use RabbitMQ.CLI.Core.RequiresRabbitAppRunning + + def run([pattern] = _args, %{node: node_name, vhost: vhost, across_all_vhosts: across_all_vhosts_opt}) do + vhost = if across_all_vhosts_opt, do: :across_all_vhosts, else: vhost + + case :rabbit_misc.rpc_call(node_name, :rabbit_quorum_queue, :leader_health_check, [pattern, vhost]) do + [] -> + :ok + + error_or_leaderless_queues -> + {:error, error_or_leaderless_queues} + end + end + + def output(:ok, %{node: node_name, formatter: "json"}) do + {:ok, + %{ + "result" => "ok", + "message" => + "Node #{node_name} reported all quorum queue as having responsive leader replicas" + }} + end + + def output(:ok, %{node: node_name} = opts) do + case Config.output_less?(opts) do + true -> + {:ok, :check_passed} + false -> + {:ok, "Node #{node_name} reported all quorum queue as having responsive leader replicas"} + end + end + + def output({:error, error_or_leaderless_queues}, %{node: node_name, formatter: "json"}) when is_list(error_or_leaderless_queues) do + {:error, :check_failed, + %{ + "result" => "error", + "queues" => error_or_leaderless_queues, + "message" => "Node #{node_name} reported quorum queues with a missing (not elected) or unresponsive leader replica" + }} + end + + def output({:error, error_or_leaderless_queues}, opts) when is_list(error_or_leaderless_queues) do + case Config.output_less?(opts) do + true -> + {:error, :check_failed} + false -> + lines = queue_lines(error_or_leaderless_queues) + {:error, :check_failed, Enum.join(lines, line_separator())} + end + end + + def usage() do + "check_for_quorum_queues_without_an_elected_leader [--vhost ] [--across-all-vhosts] " + end + + def usage_additional do + [ + ["", "regular expression pattern used to match quorum queues"], + ["--across-all-vhosts", "run this health check across all existing virtual hosts"] + ] + end + + def help_section(), do: :observability_and_health_checks + + def usage_doc_guides() do + [ + DocGuide.monitoring(), + DocGuide.quorum_queues() + ] + end + + def description(), do: "Checks that quorum queue have elected and available leader replicas" + + def banner([name], %{across_all_vhosts: true}), + do: "Checking leader replicas of quorum queues matching '#{name}' in all vhosts ..." + + def banner([name], %{vhost: vhost}), + do: "Checking leader replicas of quorum queues matching '#{name}' in vhost #{vhost} ..." + + def queue_lines(qs) do + for q <- qs, do: "#{q["readable_name"]} does not have an elected leader replica or the replica was unresponsive" + end +end diff --git a/deps/rabbitmq_cli/test/diagnostics/check_for_quorum_queues_without_an_elected_leader_command_test.exs b/deps/rabbitmq_cli/test/diagnostics/check_for_quorum_queues_without_an_elected_leader_command_test.exs new file mode 100644 index 000000000000..fc2759d88eef --- /dev/null +++ b/deps/rabbitmq_cli/test/diagnostics/check_for_quorum_queues_without_an_elected_leader_command_test.exs @@ -0,0 +1,53 @@ +## This Source Code Form is subject to the terms of the Mozilla Public +## License, v. 2.0. If a copy of the MPL was not distributed with this +## file, You can obtain one at https://mozilla.org/MPL/2.0/. +## +## Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. + +defmodule CheckForQuorumQueuesWithoutAnElectedLeaderCommandTest do + use ExUnit.Case, async: false + import TestHelper + + @command RabbitMQ.CLI.Diagnostics.Commands.CheckForQuorumQueuesWithoutAnElectedLeaderCommand + + setup_all do + RabbitMQ.CLI.Core.Distribution.start() + + :ok + end + + setup context do + {:ok, + opts: %{ + node: get_rabbit_hostname(), + timeout: context[:test_timeout] || 30000 + }} + end + + test "validate: treats no arguments as a failure" do + assert @command.validate([], %{}) == {:validation_failure, :not_enough_args} + end + + test "validate: accepts a single positional argument" do + assert @command.validate(["quorum.queue.*"], %{}) == :ok + end + + test "validate: when two or more arguments are provided, returns a failure" do + assert @command.validate(["quorum.queue.*", "one-extra-arg"], %{}) == + {:validation_failure, :too_many_args} + + assert @command.validate(["quorum.queue.*", "extra-arg", "another-extra-arg"], %{}) == + {:validation_failure, :too_many_args} + end + + @tag test_timeout: 3000 + test "run: targeting an unreachable node throws a badrpc" do + assert match?( + {:error, {:badrpc, :nodedown}}, + @command.run( + ["quorum.queue.*"], + %{node: :jake@thedog, vhost: "/", across_all_vhosts: false, timeout: 200} + ) + ) + end +end