Skip to content

Commit b4d3545

Browse files
committed
Support single active consumer in quorum queue
Uses a buffer list for non-active consumers. The active consumer is stored in the usual consumers structure, so the logic around servicing consumers is kept the same. [#162582065] Fixes #1799
1 parent 6bec963 commit b4d3545

File tree

3 files changed

+188
-42
lines changed

3 files changed

+188
-42
lines changed

src/rabbit_fifo.erl

Lines changed: 88 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,12 @@
226226
%% This is done so that consumers are still served in a deterministic
227227
%% order on recovery.
228228
prefix_msg_counts = {0, 0} :: {Return :: non_neg_integer(),
229-
PrefixMsgs :: non_neg_integer()}
229+
PrefixMsgs :: non_neg_integer()},
230+
%% whether single active consumer is on or not for this queue
231+
single_active_consumer_on = false :: boolean(),
232+
%% waiting consumers, one is picked active consumer is cancelled or dies
233+
%% used only when single active consumer is on
234+
waiting_consumers = [] :: list()
230235
}).
231236

232237
-opaque state() :: #state{}.
@@ -262,9 +267,11 @@ update_state(Conf, State) ->
262267
DLH = maps:get(dead_letter_handler, Conf, undefined),
263268
BLH = maps:get(become_leader_handler, Conf, undefined),
264269
SHI = maps:get(shadow_copy_interval, Conf, ?SHADOW_COPY_INTERVAL),
270+
SingleActiveConsumerOn = maps:get(single_active_consumer_on, Conf, false),
265271
State#state{dead_letter_handler = DLH,
266272
become_leader_handler = BLH,
267-
shadow_copy_interval = SHI}.
273+
shadow_copy_interval = SHI,
274+
single_active_consumer_on = SingleActiveConsumerOn}.
268275

269276
% msg_ids are scoped per consumer
270277
% ra_indexes holds all raft indexes for enqueues currently on queue
@@ -667,6 +674,54 @@ num_checked_out(#state{consumers = Cons}) ->
667674
end, 0, maps:values(Cons)).
668675

669676
cancel_consumer(ConsumerId,
677+
{Effects0, #state{single_active_consumer_on = false} = S0}) ->
678+
%% general case, single active consumer off
679+
cancel_consumer0(ConsumerId, {Effects0, S0});
680+
cancel_consumer(ConsumerId,
681+
{Effects0, #state{single_active_consumer_on = true,
682+
waiting_consumers = WaitingConsumers } = S0}) when length(WaitingConsumers) == 0 ->
683+
%% single active consumer on, no consumers are waiting
684+
cancel_consumer0(ConsumerId, {Effects0, S0});
685+
cancel_consumer(ConsumerId,
686+
{Effects0, #state{consumers = Cons0,
687+
single_active_consumer_on = true,
688+
waiting_consumers = WaitingConsumers0 } = State0}) ->
689+
%% single active consumer on, consumers are waiting
690+
case maps:take(ConsumerId, Cons0) of
691+
{_CurrentActiveConsumer, _} ->
692+
% The active consumer is to be removed
693+
% Cancel it
694+
{Effects1, State1} = case maps:take(ConsumerId, Cons0) of
695+
{#consumer{checked_out = Checked0}, _} ->
696+
S = return_all(State0, Checked0),
697+
Effects = cancel_consumer_effects(ConsumerId, S, Effects0),
698+
{Effects, State0};
699+
error ->
700+
{Effects0, State0}
701+
end,
702+
% Take another one from the waiting consumers and put it in consumers
703+
{NewActiveConsumerId, NewActiveConsumer} = lists:nth(1, WaitingConsumers0),
704+
WaitingConsumers1 = lists:delete({NewActiveConsumerId, NewActiveConsumer}, WaitingConsumers0),
705+
#state{service_queue = ServiceQueue} = State0,
706+
ServiceQueue1 = maybe_queue_consumer(NewActiveConsumerId, NewActiveConsumer, ServiceQueue),
707+
State2 = State1#state{consumers = #{NewActiveConsumerId => NewActiveConsumer},
708+
service_queue = ServiceQueue1,
709+
waiting_consumers = WaitingConsumers1},
710+
{Effects1, State2};
711+
error ->
712+
% The cancelled consumer is not the active one
713+
% Just remove it from idle_consumers
714+
case lists:keytake(ConsumerId, 1, WaitingConsumers0) of
715+
{value, {ConsumerId, #consumer{checked_out = Checked0}}, WaitingConsumers1} ->
716+
S = return_all(State0, Checked0),
717+
Effects = cancel_consumer_effects(ConsumerId, S, Effects0),
718+
{Effects, #state{waiting_consumers = WaitingConsumers1}};
719+
false ->
720+
{Effects0, State0}
721+
end
722+
end.
723+
724+
cancel_consumer0(ConsumerId,
670725
{Effects0, #state{consumers = C0} = S0}) ->
671726
case maps:take(ConsumerId, C0) of
672727
{#consumer{checked_out = Checked0}, Cons} ->
@@ -677,7 +732,7 @@ cancel_consumer(ConsumerId,
677732
{[{aux, inactive} | Effects], S#state{consumers = Cons}};
678733
_ ->
679734
{Effects, S#state{consumers = Cons}}
680-
end;
735+
end;
681736
error ->
682737
% already removed - do nothing
683738
{Effects0, S0}
@@ -1053,23 +1108,42 @@ uniq_queue_in(Key, Queue) ->
10531108
end.
10541109

10551110

1056-
update_consumer(ConsumerId, Meta, {Life, Credit, Mode},
1111+
update_consumer(ConsumerId, Meta, Spec,
1112+
#state{single_active_consumer_on = false} = State0) ->
1113+
%% general case, single active consumer off
1114+
update_consumer0(ConsumerId, Meta, Spec, State0);
1115+
update_consumer(ConsumerId, Meta, Spec,
10571116
#state{consumers = Cons0,
1058-
service_queue = ServiceQueue0} = State0) ->
1117+
single_active_consumer_on = true} = State0) when map_size(Cons0) == 0 ->
1118+
%% single active consumer on, no one is consuming yet
1119+
update_consumer0(ConsumerId, Meta, Spec, State0);
1120+
update_consumer(ConsumerId, Meta, {Life, Credit, Mode},
1121+
#state{single_active_consumer_on = true,
1122+
waiting_consumers = WaitingConsumers0} = State0) ->
1123+
%% single active consumer on and one active consumer already
1124+
%% adding the new consumer to the waiting list
1125+
Consumer = #consumer{lifetime = Life, meta = Meta,
1126+
credit = Credit, credit_mode = Mode},
1127+
WaitingConsumers1 = WaitingConsumers0 ++ [{ConsumerId, Consumer}],
1128+
State0#state{waiting_consumers = WaitingConsumers1}.
1129+
1130+
update_consumer0(ConsumerId, Meta, {Life, Credit, Mode},
1131+
#state{consumers = Cons0,
1132+
service_queue = ServiceQueue0} = State0) ->
10591133
%% TODO: this logic may not be correct for updating a pre-existing consumer
10601134
Init = #consumer{lifetime = Life, meta = Meta,
10611135
credit = Credit, credit_mode = Mode},
10621136
Cons = maps:update_with(ConsumerId,
1063-
fun(S) ->
1064-
%% remove any in-flight messages from
1065-
%% the credit update
1066-
N = maps:size(S#consumer.checked_out),
1067-
C = max(0, Credit - N),
1068-
S#consumer{lifetime = Life,
1069-
credit = C}
1070-
end, Init, Cons0),
1137+
fun(S) ->
1138+
%% remove any in-flight messages from
1139+
%% the credit update
1140+
N = maps:size(S#consumer.checked_out),
1141+
C = max(0, Credit - N),
1142+
S#consumer{lifetime = Life,
1143+
credit = C}
1144+
end, Init, Cons0),
10711145
ServiceQueue = maybe_queue_consumer(ConsumerId, maps:get(ConsumerId, Cons),
1072-
ServiceQueue0),
1146+
ServiceQueue0),
10731147

10741148
State0#state{consumers = Cons, service_queue = ServiceQueue}.
10751149

src/rabbit_quorum_queue.erl

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,8 @@
3737
-export([policy_changed/2]).
3838
-export([cleanup_data_dir/0]).
3939

40-
-include_lib("rabbit_common/include/rabbit.hrl").
40+
%%-include_lib("rabbit_common/include/rabbit.hrl").
41+
-include_lib("rabbit.hrl").
4142
-include_lib("stdlib/include/qlc.hrl").
4243

4344
-type ra_server_id() :: {Name :: atom(), Node :: node()}.
@@ -151,7 +152,14 @@ ra_machine_config(Q = #amqqueue{name = QName}) ->
151152
#{dead_letter_handler => dlx_mfa(Q),
152153
queue_resource => QName,
153154
become_leader_handler => {?MODULE, become_leader, [QName]},
154-
metrics_handler => {?MODULE, update_metrics, [QName]}}.
155+
metrics_handler => {?MODULE, update_metrics, [QName]},
156+
single_active_consumer_on => single_active_consumer_on(Q)}.
157+
158+
single_active_consumer_on(#amqqueue{arguments = QArguments}) ->
159+
case rabbit_misc:table_lookup(QArguments, <<"x-single-active-consumer">>) of
160+
{bool, true} -> true;
161+
_ -> false
162+
end.
155163

156164
cancel_consumer_handler(QName, {ConsumerTag, ChPid}) ->
157165
Node = node(ChPid),

test/single_active_consumer_SUITE.erl

Lines changed: 90 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -24,16 +24,22 @@
2424

2525
all() ->
2626
[
27-
{group, default}
27+
{group, classic_queue}, {group, quorum_queue}
2828
].
2929

3030
groups() ->
3131
[
32-
{default, [], [
32+
{classic_queue, [], [
3333
all_messages_go_to_one_consumer,
3434
fallback_to_another_consumer_when_first_one_is_cancelled,
3535
fallback_to_another_consumer_when_exclusive_consumer_channel_is_cancelled,
3636
amqp_exclusive_consume_fails_on_exclusive_consumer_queue
37+
]},
38+
{quorum_queue, [], [
39+
all_messages_go_to_one_consumer,
40+
fallback_to_another_consumer_when_first_one_is_cancelled,
41+
fallback_to_another_consumer_when_exclusive_consumer_channel_is_cancelled
42+
%% amqp_exclusive_consume_fails_on_exclusive_consumer_queue % Exclusive consume not implemented in QQ
3743
]}
3844
].
3945

@@ -48,18 +54,37 @@ init_per_suite(Config) ->
4854

4955
end_per_suite(Config) ->
5056
rabbit_ct_helpers:run_teardown_steps(Config,
51-
rabbit_ct_client_helpers:teardown_steps() ++
52-
rabbit_ct_broker_helpers:teardown_steps()).
57+
rabbit_ct_client_helpers:teardown_steps() ++
58+
rabbit_ct_broker_helpers:teardown_steps()).
59+
60+
init_per_group(classic_queue, Config) ->
61+
[{single_active_consumer_queue_declare,
62+
#'queue.declare'{arguments = [
63+
{<<"x-single-active-consumer">>, bool, true},
64+
{<<"x-queue-type">>, longstr, <<"classic">>}
65+
],
66+
auto_delete = true}
67+
} | Config];
68+
init_per_group(quorum_queue, Config) ->
69+
[{single_active_consumer_queue_declare,
70+
#'queue.declare'{arguments = [
71+
{<<"x-single-active-consumer">>, bool, true},
72+
{<<"x-queue-type">>, longstr, <<"quorum">>}
73+
],
74+
durable = true, exclusive = false, auto_delete = false}
75+
} | Config].
76+
77+
end_per_group(_, Config) ->
78+
Config.
5379

5480
init_per_testcase(Testcase, Config) ->
5581
rabbit_ct_helpers:testcase_started(Config, Testcase).
56-
5782
end_per_testcase(Testcase, Config) ->
5883
rabbit_ct_helpers:testcase_finished(Config, Testcase).
5984

6085
all_messages_go_to_one_consumer(Config) ->
6186
{C, Ch} = connection_and_channel(Config),
62-
Q = queue_declare(Ch),
87+
Q = queue_declare(Ch, Config),
6388
NbMessages = 5,
6489
ConsumerPid = spawn(?MODULE, consume, [{self(), {maps:new(), 0}, NbMessages}]),
6590
#'basic.consume_ok'{consumer_tag = CTag1} =
@@ -85,36 +110,52 @@ all_messages_go_to_one_consumer(Config) ->
85110

86111
fallback_to_another_consumer_when_first_one_is_cancelled(Config) ->
87112
{C, Ch} = connection_and_channel(Config),
88-
Q = queue_declare(Ch),
113+
Q = queue_declare(Ch, Config),
89114
NbMessages = 10,
90115
ConsumerPid = spawn(?MODULE, consume, [{self(), {maps:new(), 0}, NbMessages}]),
91116
#'basic.consume_ok'{consumer_tag = CTag1} =
92117
amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, no_ack = true}, ConsumerPid),
93118
#'basic.consume_ok'{consumer_tag = CTag2} =
94119
amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, no_ack = true}, ConsumerPid),
95-
#'basic.consume_ok'{consumer_tag = _CTag3} =
120+
#'basic.consume_ok'{consumer_tag = CTag3} =
96121
amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, no_ack = true}, ConsumerPid),
97122

98123
Publish = #'basic.publish'{exchange = <<>>, routing_key = Q},
99124
[amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}) || _X <- lists:seq(1, NbMessages div 2)],
100125

101-
#'basic.cancel_ok'{} = amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = CTag1}),
126+
{MessagesPerConsumer1, _} = wait_for_messages(NbMessages div 2),
127+
FirstActiveConsumerInList = maps:keys(maps:filter(fun(_CTag, MessageCount) -> MessageCount > 0 end, MessagesPerConsumer1)),
128+
?assertEqual(1, length(FirstActiveConsumerInList)),
129+
130+
FirstActiveConsumer = lists:nth(1, FirstActiveConsumerInList),
131+
#'basic.cancel_ok'{} = amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = FirstActiveConsumer}),
132+
133+
{cancel_ok, FirstActiveConsumer} = wait_for_cancel_ok(),
102134

103135
[amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}) || _X <- lists:seq(NbMessages div 2 + 1, NbMessages - 1)],
104136

105-
#'basic.cancel_ok'{} = amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = CTag2}),
137+
{MessagesPerConsumer2, _} = wait_for_messages(NbMessages div 2 - 1),
138+
SecondActiveConsumerInList = maps:keys(maps:filter(
139+
fun(CTag, MessageCount) -> MessageCount > 0 andalso CTag /= FirstActiveConsumer end,
140+
MessagesPerConsumer2)
141+
),
142+
?assertEqual(1, length(SecondActiveConsumerInList)),
143+
SecondActiveConsumer = lists:nth(1, SecondActiveConsumerInList),
144+
145+
#'basic.cancel_ok'{} = amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = SecondActiveConsumer}),
106146

107147
amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}),
148+
wait_for_messages(1),
149+
150+
LastActiveConsumer = lists:nth(1, lists:delete(FirstActiveConsumer, lists:delete(SecondActiveConsumer, [CTag1, CTag2, CTag3]))),
108151

109152
receive
110153
{consumer_done, {MessagesPerConsumer, MessageCount}} ->
111154
?assertEqual(NbMessages, MessageCount),
112155
?assertEqual(3, maps:size(MessagesPerConsumer)),
113-
?assertEqual(NbMessages div 2, maps:get(CTag1, MessagesPerConsumer)),
114-
Counts = maps:values(MessagesPerConsumer),
115-
?assert(lists:member(NbMessages div 2, Counts)),
116-
?assert(lists:member(NbMessages div 2 - 1, Counts)),
117-
?assert(lists:member(1, Counts))
156+
?assertEqual(NbMessages div 2, maps:get(FirstActiveConsumer, MessagesPerConsumer)),
157+
?assertEqual(NbMessages div 2 - 1, maps:get(SecondActiveConsumer, MessagesPerConsumer)),
158+
?assertEqual(1, maps:get(LastActiveConsumer, MessagesPerConsumer))
118159
after 1000 ->
119160
throw(failed)
120161
end,
@@ -127,7 +168,7 @@ fallback_to_another_consumer_when_exclusive_consumer_channel_is_cancelled(Config
127168
{C1, Ch1} = connection_and_channel(Config),
128169
{C2, Ch2} = connection_and_channel(Config),
129170
{C3, Ch3} = connection_and_channel(Config),
130-
Q = queue_declare(Ch),
171+
Q = queue_declare(Ch, Config),
131172
NbMessages = 10,
132173
Consumer1Pid = spawn(?MODULE, consume, [{self(), {maps:new(), 0}, NbMessages div 2}]),
133174
Consumer2Pid = spawn(?MODULE, consume, [{self(), {maps:new(), 0}, NbMessages div 2 - 1}]),
@@ -168,7 +209,7 @@ fallback_to_another_consumer_when_exclusive_consumer_channel_is_cancelled(Config
168209

169210
amqp_exclusive_consume_fails_on_exclusive_consumer_queue(Config) ->
170211
{C, Ch} = connection_and_channel(Config),
171-
Q = queue_declare(Ch),
212+
Q = queue_declare(Ch, Config),
172213
?assertExit(
173214
{{shutdown, {server_initiated_close, 403, _}}, _},
174215
amqp_channel:call(Ch, #'basic.consume'{queue = Q, exclusive = true})
@@ -181,9 +222,8 @@ connection_and_channel(Config) ->
181222
{ok, Ch} = amqp_connection:open_channel(C),
182223
{C, Ch}.
183224

184-
queue_declare(Channel) ->
185-
Declare = #'queue.declare'{arguments = [{"x-single-active-consumer", bool, true}],
186-
auto_delete = true},
225+
queue_declare(Channel, Config) ->
226+
Declare = ?config(single_active_consumer_queue_declare, Config),
187227
#'queue.declare_ok'{queue = Q} = amqp_channel:call(Channel, Declare),
188228
Q.
189229

@@ -198,11 +238,12 @@ consume({Parent, {MessagesPerConsumer, MessageCount}, CountDown}) ->
198238
{maps:update_with(CTag, fun(V) -> V + 1 end, MessagesPerConsumer),
199239
MessageCount + 1}};
200240
{#'basic.deliver'{consumer_tag = CTag}, _Content} ->
201-
consume({Parent,
202-
{maps:update_with(CTag, fun(V) -> V + 1 end, MessagesPerConsumer),
203-
MessageCount + 1},
204-
CountDown - 1});
205-
#'basic.cancel_ok'{} ->
241+
NewState = {maps:update_with(CTag, fun(V) -> V + 1 end, MessagesPerConsumer),
242+
MessageCount + 1},
243+
Parent ! {message, NewState},
244+
consume({Parent, NewState, CountDown - 1});
245+
#'basic.cancel_ok'{consumer_tag = CTag} ->
246+
Parent ! {cancel_ok, CTag},
206247
consume({Parent, {MessagesPerConsumer, MessageCount}, CountDown});
207248
_ ->
208249
consume({Parent, {MessagesPerConsumer, MessageCount}, CountDown})
@@ -216,7 +257,30 @@ consume_results() ->
216257
{consumer_done, {MessagesPerConsumer, MessageCount}} ->
217258
{MessagesPerConsumer, MessageCount};
218259
{consumer_timeout, {MessagesPerConsumer, MessageCount}} ->
219-
{MessagesPerConsumer, MessageCount}
260+
{MessagesPerConsumer, MessageCount};
261+
_ ->
262+
consume_results()
220263
after 1000 ->
221264
throw(failed)
265+
end.
266+
267+
wait_for_messages(ExpectedCount) ->
268+
wait_for_messages(ExpectedCount, {}).
269+
270+
wait_for_messages(0, State) ->
271+
State;
272+
wait_for_messages(ExpectedCount, _) ->
273+
receive
274+
{message, {MessagesPerConsumer, MessageCount}} ->
275+
wait_for_messages(ExpectedCount - 1, {MessagesPerConsumer, MessageCount})
276+
after 5000 ->
277+
throw(message_waiting_timeout)
278+
end.
279+
280+
wait_for_cancel_ok() ->
281+
receive
282+
{cancel_ok, CTag} ->
283+
{cancel_ok, CTag}
284+
after 5000 ->
285+
throw(consumer_cancel_ok_timeout)
222286
end.

0 commit comments

Comments
 (0)