Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 64 additions & 31 deletions deps/rabbit/src/rabbit_quorum_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,8 @@ start_cluster(Q) ->
?SNAPSHOT_INTERVAL),
RaConfs = [make_ra_conf(NewQ, ServerId, TickTimeout, SnapshotInterval)
|| ServerId <- members(NewQ)],
Timeout = erpc_timeout(Leader, ?START_CLUSTER_RPC_TIMEOUT),
try erpc:call(Leader, ra, start_cluster, [?RA_SYSTEM, RaConfs], Timeout) of
try erpc_call(Leader, ra, start_cluster, [?RA_SYSTEM, RaConfs],
?START_CLUSTER_RPC_TIMEOUT) of
{ok, _, _} ->
%% ensure the latest config is evaluated properly
%% even when running the machine version from 0
Expand Down Expand Up @@ -285,17 +285,23 @@ single_active_consumer_on(Q) ->
_ -> false
end.

update_consumer_handler(QName, {ConsumerTag, ChPid}, Exclusive, AckRequired, Prefetch, Active, ActivityStatus, Args) ->
local_or_remote_handler(ChPid, rabbit_quorum_queue, update_consumer,
[QName, ChPid, ConsumerTag, Exclusive, AckRequired, Prefetch, Active, ActivityStatus, Args]).
update_consumer_handler(QName, {ConsumerTag, ChPid}, Exclusive, AckRequired,
Prefetch, Active, ActivityStatus, Args) ->
catch local_or_remote_handler(ChPid, rabbit_quorum_queue, update_consumer,
[QName, ChPid, ConsumerTag, Exclusive,
AckRequired, Prefetch, Active,
ActivityStatus, Args]).

update_consumer(QName, ChPid, ConsumerTag, Exclusive, AckRequired, Prefetch, Active, ActivityStatus, Args) ->
catch rabbit_core_metrics:consumer_updated(ChPid, ConsumerTag, Exclusive, AckRequired,
QName, Prefetch, Active, ActivityStatus, Args).
update_consumer(QName, ChPid, ConsumerTag, Exclusive, AckRequired, Prefetch,
Active, ActivityStatus, Args) ->
catch rabbit_core_metrics:consumer_updated(ChPid, ConsumerTag, Exclusive,
AckRequired,
QName, Prefetch, Active,
ActivityStatus, Args).

cancel_consumer_handler(QName, {ConsumerTag, ChPid}) ->
local_or_remote_handler(ChPid, rabbit_quorum_queue, cancel_consumer,
[QName, ChPid, ConsumerTag]).
catch local_or_remote_handler(ChPid, rabbit_quorum_queue, cancel_consumer,
[QName, ChPid, ConsumerTag]).

cancel_consumer(QName, ChPid, ConsumerTag) ->
catch rabbit_core_metrics:consumer_deleted(ChPid, ConsumerTag, QName),
Expand All @@ -309,7 +315,7 @@ local_or_remote_handler(ChPid, Module, Function, Args) ->
false ->
%% this could potentially block for a while if the node is
%% in disconnected state or tcp buffers are full
rpc:cast(Node, Module, Function, Args)
erpc:cast(Node, Module, Function, Args)
end.

become_leader(QName, Name) ->
Expand All @@ -329,8 +335,8 @@ become_leader(QName, Name) ->
case rabbit_amqqueue:lookup(QName) of
{ok, Q0} when ?is_amqqueue(Q0) ->
Nodes = get_nodes(Q0),
[rpc:call(Node, ?MODULE, rpc_delete_metrics,
[QName], ?RPC_TIMEOUT)
[_ = erpc_call(Node, ?MODULE, rpc_delete_metrics,
[QName], ?RPC_TIMEOUT)
|| Node <- Nodes, Node =/= node()];
_ ->
ok
Expand Down Expand Up @@ -676,8 +682,8 @@ delete(Q, _IfUnused, _IfEmpty, ActingUser) when ?amqqueue_is_quorum(Q) ->
end,
notify_decorators(QName, shutdown),
ok = delete_queue_data(QName, ActingUser),
rpc:call(LeaderNode, rabbit_core_metrics, queue_deleted, [QName],
?RPC_TIMEOUT),
_ = erpc:call(LeaderNode, rabbit_core_metrics, queue_deleted, [QName],
?RPC_TIMEOUT),
{ok, ReadyMsgs};
{error, {no_more_servers_to_try, Errs}} ->
case lists:all(fun({{error, noproc}, _}) -> true;
Expand Down Expand Up @@ -971,9 +977,8 @@ cleanup_data_dir() ->
end
|| Q <- rabbit_amqqueue:list_by_type(?MODULE),
lists:member(node(), get_nodes(Q))],
NoQQClusters = rabbit_ra_registry:list_not_quorum_clusters(),
Registered = ra_directory:list_registered(?RA_SYSTEM),
Running = Names ++ NoQQClusters,
Running = Names,
_ = [maybe_delete_data_dir(UId) || {Name, UId} <- Registered,
not lists:member(Name, Running)],
ok.
Expand Down Expand Up @@ -1436,9 +1441,11 @@ i(memory, Q) when ?is_amqqueue(Q) ->
i(state, Q) when ?is_amqqueue(Q) ->
{Name, Node} = amqqueue:get_pid(Q),
%% Check against the leader or last known leader
case rpc:call(Node, ?MODULE, cluster_state, [Name], ?RPC_TIMEOUT) of
{badrpc, _} -> down;
State -> State
case erpc_call(Node, ?MODULE, cluster_state, [Name], ?RPC_TIMEOUT) of
{error, _} ->
down;
State ->
State
end;
i(local_state, Q) when ?is_amqqueue(Q) ->
{Name, _} = amqqueue:get_pid(Q),
Expand All @@ -1457,7 +1464,7 @@ i(online, Q) -> online(Q);
i(leader, Q) -> leader(Q);
i(open_files, Q) when ?is_amqqueue(Q) ->
{Name, _} = amqqueue:get_pid(Q),
Nodes = get_nodes(Q),
Nodes = get_connected_nodes(Q),
{Data, _} = rpc:multicall(Nodes, ?MODULE, open_files, [Name]),
lists:flatten(Data);
i(single_active_consumer_pid, Q) when ?is_amqqueue(Q) ->
Expand Down Expand Up @@ -1559,7 +1566,7 @@ peek(_Pos, Q) when ?is_amqqueue(Q) andalso ?amqqueue_is_classic(Q) ->
{error, classic_queue_not_supported}.

online(Q) when ?is_amqqueue(Q) ->
Nodes = get_nodes(Q),
Nodes = get_connected_nodes(Q),
{Name, _} = amqqueue:get_pid(Q),
[Node || Node <- Nodes, is_process_alive(Name, Node)].

Expand All @@ -1568,7 +1575,10 @@ format(Q) when ?is_amqqueue(Q) ->
[{members, Nodes}, {online, online(Q)}, {leader, leader(Q)}].

is_process_alive(Name, Node) ->
erlang:is_pid(rpc:call(Node, erlang, whereis, [Name], ?RPC_TIMEOUT)).
%% don't attempt rpc if node is not already connected
%% as this function is used for metrics and stats and the additional
%% latency isn't warranted
erlang:is_pid(erpc_call(Node, erlang, whereis, [Name], ?RPC_TIMEOUT)).

-spec quorum_messages(rabbit_amqqueue:name()) -> non_neg_integer().

Expand Down Expand Up @@ -1626,6 +1636,10 @@ get_nodes(Q) when ?is_amqqueue(Q) ->
#{nodes := Nodes} = amqqueue:get_type_state(Q),
Nodes.

get_connected_nodes(Q) when ?is_amqqueue(Q) ->
ErlangNodes = [node() | nodes()],
[N || N <- get_nodes(Q), lists:member(N, ErlangNodes)].

update_type_state(Q, Fun) when ?is_amqqueue(Q) ->
Ts = amqqueue:get_type_state(Q),
amqqueue:set_type_state(Q, Fun(Ts)).
Expand Down Expand Up @@ -1691,18 +1705,37 @@ prepare_content(Content) ->
%% rabbit_fifo can directly parse it without having to decode again.
Content.

erpc_timeout(Node, _)
when Node =:= node() ->
%% Only timeout 'infinity' optimises the local call in OTP 23-25 avoiding a new process being spawned:
%% https://github.com/erlang/otp/blob/47f121af8ee55a0dbe2a8c9ab85031ba052bad6b/lib/kernel/src/erpc.erl#L121
infinity;
erpc_timeout(_, Timeout) ->
Timeout.

ets_lookup_element(Tbl, Key, Pos, Default) ->
try ets:lookup_element(Tbl, Key, Pos) of
V -> V
catch
_:badarg ->
Default
end.

erpc_call(Node, M, F, A, Timeout)
when is_integer(Timeout) andalso Node == node() ->
%% Only timeout 'infinity' optimises the local call in OTP 23-25 avoiding a new process being spawned:
%% https://github.com/erlang/otp/blob/47f121af8ee55a0dbe2a8c9ab85031ba052bad6b/lib/kernel/src/erpc.erl#L121
try erpc:call(Node, M, F, A, Timeout) of
Result ->
Result
catch
error:Err ->
{error, Err}
end;
erpc_call(Node, M, F, A, Timeout) ->
case lists:member(Node, nodes()) of
true ->
try erpc:call(Node, M, F, A, Timeout) of
Result ->
Result
catch
error:Err ->
{error, Err}
end;
false ->
{error, noconnection}
end.


2 changes: 1 addition & 1 deletion deps/rabbit/test/quorum_queue_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1751,7 +1751,7 @@ add_member(Config) ->
ok = rabbit_control_helper:command(stop_app, Server1),
ok = rabbit_control_helper:command(join_cluster, Server1, [atom_to_list(Server0)], []),
rabbit_control_helper:command(start_app, Server1),
?assertEqual(ok, rpc:call(Server0, rabbit_quorum_queue, add_member,
?assertEqual(ok, rpc:call(Server1, rabbit_quorum_queue, add_member,
[<<"/">>, QQ, Server1, 5000])),
Info = rpc:call(Server0, rabbit_quorum_queue, infos,
[rabbit_misc:r(<<"/">>, queue, QQ)]),
Expand Down