@@ -203,8 +203,8 @@ start_cluster(Q) ->
203203 ? SNAPSHOT_INTERVAL ),
204204 RaConfs = [make_ra_conf (NewQ , ServerId , TickTimeout , SnapshotInterval )
205205 || ServerId <- members (NewQ )],
206- Timeout = erpc_timeout (Leader , ? START_CLUSTER_RPC_TIMEOUT ) ,
207- try erpc : call ( Leader , ra , start_cluster , [ ? RA_SYSTEM , RaConfs ], Timeout ) of
206+ try erpc_call (Leader , ra , start_cluster , [ ? RA_SYSTEM , RaConfs ] ,
207+ ? START_CLUSTER_RPC_TIMEOUT ) of
208208 {ok , _ , _ } ->
209209 % % ensure the latest config is evaluated properly
210210 % % even when running the machine version from 0
@@ -285,17 +285,23 @@ single_active_consumer_on(Q) ->
285285 _ -> false
286286 end .
287287
288- update_consumer_handler (QName , {ConsumerTag , ChPid }, Exclusive , AckRequired , Prefetch , Active , ActivityStatus , Args ) ->
289- local_or_remote_handler (ChPid , rabbit_quorum_queue , update_consumer ,
290- [QName , ChPid , ConsumerTag , Exclusive , AckRequired , Prefetch , Active , ActivityStatus , Args ]).
288+ update_consumer_handler (QName , {ConsumerTag , ChPid }, Exclusive , AckRequired ,
289+ Prefetch , Active , ActivityStatus , Args ) ->
290+ catch local_or_remote_handler (ChPid , rabbit_quorum_queue , update_consumer ,
291+ [QName , ChPid , ConsumerTag , Exclusive ,
292+ AckRequired , Prefetch , Active ,
293+ ActivityStatus , Args ]).
291294
292- update_consumer (QName , ChPid , ConsumerTag , Exclusive , AckRequired , Prefetch , Active , ActivityStatus , Args ) ->
293- catch rabbit_core_metrics :consumer_updated (ChPid , ConsumerTag , Exclusive , AckRequired ,
294- QName , Prefetch , Active , ActivityStatus , Args ).
295+ update_consumer (QName , ChPid , ConsumerTag , Exclusive , AckRequired , Prefetch ,
296+ Active , ActivityStatus , Args ) ->
297+ catch rabbit_core_metrics :consumer_updated (ChPid , ConsumerTag , Exclusive ,
298+ AckRequired ,
299+ QName , Prefetch , Active ,
300+ ActivityStatus , Args ).
295301
296302cancel_consumer_handler (QName , {ConsumerTag , ChPid }) ->
297- local_or_remote_handler (ChPid , rabbit_quorum_queue , cancel_consumer ,
298- [QName , ChPid , ConsumerTag ]).
303+ catch local_or_remote_handler (ChPid , rabbit_quorum_queue , cancel_consumer ,
304+ [QName , ChPid , ConsumerTag ]).
299305
300306cancel_consumer (QName , ChPid , ConsumerTag ) ->
301307 catch rabbit_core_metrics :consumer_deleted (ChPid , ConsumerTag , QName ),
@@ -309,7 +315,7 @@ local_or_remote_handler(ChPid, Module, Function, Args) ->
309315 false ->
310316 % % this could potentially block for a while if the node is
311317 % % in disconnected state or tcp buffers are full
312- rpc :cast (Node , Module , Function , Args )
318+ erpc :cast (Node , Module , Function , Args )
313319 end .
314320
315321become_leader (QName , Name ) ->
@@ -329,8 +335,8 @@ become_leader(QName, Name) ->
329335 case rabbit_amqqueue :lookup (QName ) of
330336 {ok , Q0 } when ? is_amqqueue (Q0 ) ->
331337 Nodes = get_nodes (Q0 ),
332- [rpc : call (Node , ? MODULE , rpc_delete_metrics ,
333- [QName ], ? RPC_TIMEOUT )
338+ [_ = erpc_call (Node , ? MODULE , rpc_delete_metrics ,
339+ [QName ], ? RPC_TIMEOUT )
334340 || Node <- Nodes , Node =/= node ()];
335341 _ ->
336342 ok
@@ -676,8 +682,8 @@ delete(Q, _IfUnused, _IfEmpty, ActingUser) when ?amqqueue_is_quorum(Q) ->
676682 end ,
677683 notify_decorators (QName , shutdown ),
678684 ok = delete_queue_data (QName , ActingUser ),
679- rpc :call (LeaderNode , rabbit_core_metrics , queue_deleted , [QName ],
680- ? RPC_TIMEOUT ),
685+ _ = erpc :call (LeaderNode , rabbit_core_metrics , queue_deleted , [QName ],
686+ ? RPC_TIMEOUT ),
681687 {ok , ReadyMsgs };
682688 {error , {no_more_servers_to_try , Errs }} ->
683689 case lists :all (fun ({{error , noproc }, _ }) -> true ;
@@ -971,9 +977,8 @@ cleanup_data_dir() ->
971977 end
972978 || Q <- rabbit_amqqueue :list_by_type (? MODULE ),
973979 lists :member (node (), get_nodes (Q ))],
974- NoQQClusters = rabbit_ra_registry :list_not_quorum_clusters (),
975980 Registered = ra_directory :list_registered (? RA_SYSTEM ),
976- Running = Names ++ NoQQClusters ,
981+ Running = Names ,
977982 _ = [maybe_delete_data_dir (UId ) || {Name , UId } <- Registered ,
978983 not lists :member (Name , Running )],
979984 ok .
@@ -1436,9 +1441,11 @@ i(memory, Q) when ?is_amqqueue(Q) ->
14361441i (state , Q ) when ? is_amqqueue (Q ) ->
14371442 {Name , Node } = amqqueue :get_pid (Q ),
14381443 % % Check against the leader or last known leader
1439- case rpc :call (Node , ? MODULE , cluster_state , [Name ], ? RPC_TIMEOUT ) of
1440- {badrpc , _ } -> down ;
1441- State -> State
1444+ case erpc_call (Node , ? MODULE , cluster_state , [Name ], ? RPC_TIMEOUT ) of
1445+ {error , _ } ->
1446+ down ;
1447+ State ->
1448+ State
14421449 end ;
14431450i (local_state , Q ) when ? is_amqqueue (Q ) ->
14441451 {Name , _ } = amqqueue :get_pid (Q ),
@@ -1457,7 +1464,7 @@ i(online, Q) -> online(Q);
14571464i (leader , Q ) -> leader (Q );
14581465i (open_files , Q ) when ? is_amqqueue (Q ) ->
14591466 {Name , _ } = amqqueue :get_pid (Q ),
1460- Nodes = get_nodes (Q ),
1467+ Nodes = get_connected_nodes (Q ),
14611468 {Data , _ } = rpc :multicall (Nodes , ? MODULE , open_files , [Name ]),
14621469 lists :flatten (Data );
14631470i (single_active_consumer_pid , Q ) when ? is_amqqueue (Q ) ->
@@ -1559,7 +1566,7 @@ peek(_Pos, Q) when ?is_amqqueue(Q) andalso ?amqqueue_is_classic(Q) ->
15591566 {error , classic_queue_not_supported }.
15601567
15611568online (Q ) when ? is_amqqueue (Q ) ->
1562- Nodes = get_nodes (Q ),
1569+ Nodes = get_connected_nodes (Q ),
15631570 {Name , _ } = amqqueue :get_pid (Q ),
15641571 [Node || Node <- Nodes , is_process_alive (Name , Node )].
15651572
@@ -1568,7 +1575,10 @@ format(Q) when ?is_amqqueue(Q) ->
15681575 [{members , Nodes }, {online , online (Q )}, {leader , leader (Q )}].
15691576
15701577is_process_alive (Name , Node ) ->
1571- erlang :is_pid (rpc :call (Node , erlang , whereis , [Name ], ? RPC_TIMEOUT )).
1578+ % % don't attempt rpc if node is not already connected
1579+ % % as this function is used for metrics and stats and the additional
1580+ % % latency isn't warranted
1581+ erlang :is_pid (erpc_call (Node , erlang , whereis , [Name ], ? RPC_TIMEOUT )).
15721582
15731583-spec quorum_messages (rabbit_amqqueue :name ()) -> non_neg_integer ().
15741584
@@ -1626,6 +1636,10 @@ get_nodes(Q) when ?is_amqqueue(Q) ->
16261636 #{nodes := Nodes } = amqqueue :get_type_state (Q ),
16271637 Nodes .
16281638
1639+ get_connected_nodes (Q ) when ? is_amqqueue (Q ) ->
1640+ ErlangNodes = [node () | nodes ()],
1641+ [N || N <- get_nodes (Q ), lists :member (N , ErlangNodes )].
1642+
16291643update_type_state (Q , Fun ) when ? is_amqqueue (Q ) ->
16301644 Ts = amqqueue :get_type_state (Q ),
16311645 amqqueue :set_type_state (Q , Fun (Ts )).
@@ -1691,18 +1705,37 @@ prepare_content(Content) ->
16911705 % % rabbit_fifo can directly parse it without having to decode again.
16921706 Content .
16931707
1694- erpc_timeout (Node , _ )
1695- when Node =:= node () ->
1696- % % Only timeout 'infinity' optimises the local call in OTP 23-25 avoiding a new process being spawned:
1697- % % https://github.com/erlang/otp/blob/47f121af8ee55a0dbe2a8c9ab85031ba052bad6b/lib/kernel/src/erpc.erl#L121
1698- infinity ;
1699- erpc_timeout (_ , Timeout ) ->
1700- Timeout .
1701-
17021708ets_lookup_element (Tbl , Key , Pos , Default ) ->
17031709 try ets :lookup_element (Tbl , Key , Pos ) of
17041710 V -> V
17051711 catch
17061712 _ :badarg ->
17071713 Default
17081714 end .
1715+
1716+ erpc_call (Node , M , F , A , Timeout )
1717+ when is_integer (Timeout ) andalso Node == node () ->
1718+ % % Only timeout 'infinity' optimises the local call in OTP 23-25 avoiding a new process being spawned:
1719+ % % https://github.com/erlang/otp/blob/47f121af8ee55a0dbe2a8c9ab85031ba052bad6b/lib/kernel/src/erpc.erl#L121
1720+ try erpc :call (Node , M , F , A , Timeout ) of
1721+ Result ->
1722+ Result
1723+ catch
1724+ error :Err ->
1725+ {error , Err }
1726+ end ;
1727+ erpc_call (Node , M , F , A , Timeout ) ->
1728+ case lists :member (Node , nodes ()) of
1729+ true ->
1730+ try erpc :call (Node , M , F , A , Timeout ) of
1731+ Result ->
1732+ Result
1733+ catch
1734+ error :Err ->
1735+ {error , Err }
1736+ end ;
1737+ false ->
1738+ {error , noconnection }
1739+ end .
1740+
1741+
0 commit comments