Skip to content

Commit a80f656

Browse files
committed
Add single active field when listing consumers
[#163089472] References rabbitmq/rabbitmq-management#649
1 parent bc05865 commit a80f656

File tree

4 files changed

+34
-12
lines changed

4 files changed

+34
-12
lines changed

src/rabbit_amqqueue.erl

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,7 @@
224224

225225
-define(CONSUMER_INFO_KEYS,
226226
[queue_name, channel_pid, consumer_tag, ack_required, prefetch_count,
227-
arguments]).
227+
single_active, arguments]).
228228

229229
warn_file_limit() ->
230230
DurableQueues = find_recoverable_queues(),
@@ -958,8 +958,8 @@ emit_consumers_local(VHostPath, Ref, AggregatorPid) ->
958958
get_queue_consumer_info(Q, ConsumerInfoKeys) ->
959959
[lists:zip(ConsumerInfoKeys,
960960
[Q#amqqueue.name, ChPid, CTag,
961-
AckRequired, Prefetch, Args]) ||
962-
{ChPid, CTag, AckRequired, Prefetch, Args, _} <- consumers(Q)].
961+
AckRequired, Prefetch, SingleActive, Args]) ||
962+
{ChPid, CTag, AckRequired, Prefetch, SingleActive, Args, _} <- consumers(Q)].
963963

964964
stat(#amqqueue{type = quorum} = Q) -> rabbit_quorum_queue:stat(Q);
965965
stat(#amqqueue{pid = QPid}) -> delegate:invoke(QPid, {gen_server2, call, [stat, infinity]}).

src/rabbit_amqqueue_process.erl

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -364,7 +364,7 @@ terminate_shutdown(Fun, #q{status = Status} = State) ->
364364
QName = qname(State),
365365
notify_decorators(shutdown, State),
366366
[emit_consumer_deleted(Ch, CTag, QName, ActingUser) ||
367-
{Ch, CTag, _, _, _} <-
367+
{Ch, CTag, _, _, _, _, _} <-
368368
rabbit_queue_consumers:all(Consumers)],
369369
State1#q{backing_queue_state = Fun(BQS)}
370370
end.
@@ -1216,8 +1216,10 @@ handle_call({info, Items}, _From, State) ->
12161216
catch Error -> reply({error, Error}, State)
12171217
end;
12181218

1219-
handle_call(consumers, _From, State = #q{consumers = Consumers}) ->
1219+
handle_call(consumers, _From, State = #q{consumers = Consumers, single_active_consumer_on = false}) ->
12201220
reply(rabbit_queue_consumers:all(Consumers), State);
1221+
handle_call(consumers, _From, State = #q{consumers = Consumers, active_consumer = ActiveConsumer}) ->
1222+
reply(rabbit_queue_consumers:all(Consumers, ActiveConsumer), State);
12211223

12221224
handle_call({notify_down, ChPid}, _From, State) ->
12231225
%% we want to do this synchronously, so that auto_deleted queues

src/rabbit_fifo.erl

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -704,11 +704,21 @@ query_ra_indexes(#state{ra_indexes = RaIndexes}) ->
704704
query_consumer_count(#state{consumers = Consumers, waiting_consumers = WaitingConsumers}) ->
705705
maps:size(Consumers) + length(WaitingConsumers).
706706

707-
query_consumers(#state{consumers = Consumers, waiting_consumers = WaitingConsumers}) ->
707+
query_consumers(#state{consumers = Consumers, waiting_consumers = WaitingConsumers} = State) ->
708+
SingleActiveConsumer = query_single_active_consumer(State),
709+
IsSingleActiveConsumerFun = fun({Tag, Pid} = _ConsumerId) ->
710+
case SingleActiveConsumer of
711+
{value, {Tag, Pid}} ->
712+
true;
713+
_ ->
714+
false
715+
end
716+
end,
708717
FromConsumers = maps:map(fun ({Tag, Pid}, #consumer{meta = Meta}) ->
709718
{Pid, Tag,
710719
maps:get(ack, Meta, undefined),
711720
maps:get(prefetch, Meta, undefined),
721+
IsSingleActiveConsumerFun({Tag, Pid}),
712722
maps:get(args, Meta, []),
713723
maps:get(username, Meta, undefined)}
714724
end, Consumers),
@@ -717,6 +727,7 @@ query_consumers(#state{consumers = Consumers, waiting_consumers = WaitingConsume
717727
{Pid, Tag,
718728
maps:get(ack, Meta, undefined),
719729
maps:get(prefetch, Meta, undefined),
730+
IsSingleActiveConsumerFun({Tag, Pid}),
720731
maps:get(args, Meta, []),
721732
maps:get(username, Meta, undefined)},
722733
Acc)

src/rabbit_queue_consumers.erl

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
-module(rabbit_queue_consumers).
1818

19-
-export([new/0, max_active_priority/1, inactive/1, all/1, count/0,
19+
-export([new/0, max_active_priority/1, inactive/1, all/1, all/2, count/0,
2020
unacknowledged_message_count/0, add/10, remove/3, erase_ch/2,
2121
send_drained/0, deliver/5, record_ack/3, subtract_acks/3,
2222
possibly_unblock/3,
@@ -117,16 +117,25 @@ max_active_priority(#state{consumers = Consumers}) ->
117117
inactive(#state{consumers = Consumers}) ->
118118
priority_queue:is_empty(Consumers).
119119

120-
all(#state{consumers = Consumers}) ->
121-
lists:foldl(fun (C, Acc) -> consumers(C#cr.blocked_consumers, Acc) end,
122-
consumers(Consumers, []), all_ch_record()).
120+
all(State) ->
121+
all(State, none).
123122

124-
consumers(Consumers, Acc) ->
123+
all(#state{consumers = Consumers}, SingleActiveConsumer) ->
124+
lists:foldl(fun (C, Acc) -> consumers(C#cr.blocked_consumers, SingleActiveConsumer, Acc) end,
125+
consumers(Consumers, SingleActiveConsumer, []), all_ch_record()).
126+
127+
consumers(Consumers, SingleActiveConsumer, Acc) ->
125128
priority_queue:fold(
126129
fun ({ChPid, Consumer}, _P, Acc1) ->
127130
#consumer{tag = CTag, ack_required = Ack, prefetch = Prefetch,
128131
args = Args, user = Username} = Consumer,
129-
[{ChPid, CTag, Ack, Prefetch, Args, Username} | Acc1]
132+
IsSingleActive = case SingleActiveConsumer of
133+
{ChPid, Consumer} ->
134+
true;
135+
_ ->
136+
false
137+
end,
138+
[{ChPid, CTag, Ack, Prefetch, IsSingleActive, Args, Username} | Acc1]
130139
end, Acc, Consumers).
131140

132141
count() -> lists:sum([Count || #cr{consumer_count = Count} <- all_ch_record()]).

0 commit comments

Comments
 (0)