Skip to content

Commit 897e7f6

Browse files
committed
Implement rabbit_quorum_queue:stat/1
It must return number of ready messages and consumers when leader is available
1 parent 655934d commit 897e7f6

File tree

3 files changed

+48
-5
lines changed

3 files changed

+48
-5
lines changed

src/rabbit_fifo.erl

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,8 @@
5757
make_discard/2,
5858
make_credit/4,
5959
make_purge/0,
60-
make_update_config/1
60+
make_update_config/1,
61+
make_stat/0
6162
]).
6263

6364
-type raw_msg() :: term().
@@ -133,6 +134,7 @@
133134
drain :: boolean()}).
134135
-record(purge, {}).
135136
-record(update_config, {config :: config()}).
137+
-record(stat, {}).
136138

137139

138140

@@ -144,7 +146,8 @@
144146
#discard{} |
145147
#credit{} |
146148
#purge{} |
147-
#update_config{}.
149+
#update_config{} |
150+
#stat{}.
148151

149152
-type command() :: protocol() | ra_machine:builtin_command().
150153
%% all the command types suppored by ra fifo
@@ -431,6 +434,19 @@ apply(#{index := RaftIdx}, #purge{},
431434
%% reverse the effects ourselves
432435
{State, {purge, Total},
433436
lists:reverse([garbage_collection | Effects])};
437+
apply(_, #stat{}, #state{name = Name,
438+
messages = Messages,
439+
ra_indexes = Indexes,
440+
consumers = Cons,
441+
msg_bytes_enqueue = EnqueueBytes,
442+
msg_bytes_checkout = CheckoutBytes} = State) ->
443+
Metrics = {maps:size(Messages), % Ready
444+
num_checked_out(State), % checked out
445+
rabbit_fifo_index:size(Indexes), %% Total
446+
maps:size(Cons), % Consumers
447+
EnqueueBytes,
448+
CheckoutBytes},
449+
{State, {stat, Metrics}, []};
434450
apply(_, {down, ConsumerPid, noconnection},
435451
#state{consumers = Cons0,
436452
enqueuers = Enqs0} = State0) ->
@@ -1181,6 +1197,9 @@ make_purge() -> #purge{}.
11811197
make_update_config(Config) ->
11821198
#update_config{config = Config}.
11831199

1200+
-spec make_stat() -> protocol().
1201+
make_stat() -> #stat{}.
1202+
11841203
add_bytes_enqueue(Msg, #state{msg_bytes_enqueue = Enqueue} = State) ->
11851204
Bytes = message_size(Msg),
11861205
State#state{msg_bytes_enqueue = Enqueue + Bytes}.

src/rabbit_fifo_client.erl

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@
3838
untracked_enqueue/2,
3939
purge/1,
4040
cluster_name/1,
41-
update_machine_state/2
41+
update_machine_state/2,
42+
stat/1
4243
]).
4344

4445
-include_lib("ra/include/ra.hrl").
@@ -398,6 +399,23 @@ purge(Node) ->
398399
Err
399400
end.
400401

402+
-spec stat(ra_server_id()) -> {ok, {non_neg_integer(), non_neg_integer(),
403+
non_neg_integer(), non_neg_integer(),
404+
non_neg_integer(), non_neg_integer()}}
405+
| {error | timeout, term()}.
406+
stat(Servers) ->
407+
try_process_stat(Servers, rabbit_fifo:make_stat()).
408+
409+
try_process_stat([Server | Rem], Cmd) ->
410+
case ra:process_command(Server, Cmd, 30000) of
411+
{ok, {stat, Reply}, _} ->
412+
{ok, Reply};
413+
Err when length(Rem) =:= 0 ->
414+
Err;
415+
_ ->
416+
try_process_stat(Rem, Cmd)
417+
end.
418+
401419
%% @doc returns the cluster name
402420
-spec cluster_name(state()) -> cluster_name().
403421
cluster_name(#state{cluster_name = ClusterName}) ->

src/rabbit_quorum_queue.erl

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -424,8 +424,14 @@ infos(QName) ->
424424
info(Q, Items) ->
425425
[{Item, i(Item, Q)} || Item <- Items].
426426

427-
stat(_Q) ->
428-
{ok, 0, 0}. %% TODO length, consumers count
427+
stat(#amqqueue{pid = {Name, _}, quorum_nodes = Nodes}) ->
428+
case rabbit_fifo_client:stat([{Name, N} || N <- Nodes]) of
429+
{ok, {Ready, _, _, Consumers, _, _}} ->
430+
{ok, Ready, Consumers};
431+
_ ->
432+
%% Leader is not available, cluster might be in minority
433+
{ok, 0, 0}
434+
end.
429435

430436
purge(Node) ->
431437
rabbit_fifo_client:purge(Node).

0 commit comments

Comments
 (0)