11
11
% % The Original Code is RabbitMQ.
12
12
% %
13
13
% % The Initial Developer of the Original Code is GoPivotal, Inc.
14
- % % Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved.
14
+ % % Copyright (c) 2007-2019 Pivotal Software, Inc. All rights reserved.
15
15
% %
16
16
17
17
-module (rabbit_fifo ).
178
178
suspected_down = false :: boolean ()
179
179
}).
180
180
181
+ - type consumer () :: # consumer {}.
182
+
181
183
- record (enqueuer ,
182
184
{next_seqno = 1 :: msg_seqno (),
183
185
% out of order enqueues - sorted list
234
236
msg_bytes_enqueue = 0 :: non_neg_integer (),
235
237
msg_bytes_checkout = 0 :: non_neg_integer (),
236
238
% % whether single active consumer is on or not for this queue
237
- single_active_consumer_on = false :: boolean () ,
239
+ consumer_strategy = default :: default | single_active ,
238
240
% % waiting consumers, one is picked active consumer is cancelled or dies
239
241
% % used only when single active consumer is on
240
- waiting_consumers = [] :: list ()
242
+ waiting_consumers = [] :: [{ consumer_id (), consumer ()}]
241
243
}).
242
244
243
245
- opaque state () :: # state {}.
246
248
queue_resource := rabbit_types :r ('queue' ),
247
249
dead_letter_handler => applied_mfa (),
248
250
become_leader_handler => applied_mfa (),
249
- shadow_copy_interval => non_neg_integer ()}.
251
+ shadow_copy_interval => non_neg_integer (),
252
+ single_active_consumer_on => boolean ()}.
250
253
251
254
- export_type ([protocol / 0 ,
252
255
delivery / 0 ,
@@ -273,11 +276,16 @@ update_config(Conf, State) ->
273
276
DLH = maps :get (dead_letter_handler , Conf , undefined ),
274
277
BLH = maps :get (become_leader_handler , Conf , undefined ),
275
278
SHI = maps :get (shadow_copy_interval , Conf , ? SHADOW_COPY_INTERVAL ),
276
- SingleActiveConsumerOn = maps :get (single_active_consumer_on , Conf , false ),
279
+ ConsumerStrategy = case maps :get (single_active_consumer_on , Conf , false ) of
280
+ true ->
281
+ single_active ;
282
+ false ->
283
+ default
284
+ end ,
277
285
State # state {dead_letter_handler = DLH ,
278
286
become_leader_handler = BLH ,
279
287
shadow_copy_interval = SHI ,
280
- single_active_consumer_on = SingleActiveConsumerOn }.
288
+ consumer_strategy = ConsumerStrategy }.
281
289
282
290
zero (_ ) ->
283
291
0 .
@@ -705,51 +713,47 @@ num_checked_out(#state{consumers = Cons}) ->
705
713
end , 0 , maps :values (Cons )).
706
714
707
715
cancel_consumer (ConsumerId ,
708
- {Effects0 , # state {single_active_consumer_on = false } = S0 }) ->
716
+ {Effects0 , # state {consumer_strategy = default } = S0 }) ->
709
717
% % general case, single active consumer off
710
718
cancel_consumer0 (ConsumerId , {Effects0 , S0 });
711
719
cancel_consumer (ConsumerId ,
712
- {Effects0 , # state {single_active_consumer_on = true ,
713
- waiting_consumers = WaitingConsumers } = S0 }) when length ( WaitingConsumers ) == 0 ->
720
+ {Effects0 , # state {consumer_strategy = single_active ,
721
+ waiting_consumers = [] } = S0 }) ->
714
722
% % single active consumer on, no consumers are waiting
715
723
cancel_consumer0 (ConsumerId , {Effects0 , S0 });
716
724
cancel_consumer (ConsumerId ,
717
- {Effects0 , # state {consumers = Cons0 ,
718
- single_active_consumer_on = true ,
719
- waiting_consumers = WaitingConsumers0 } = State0 }) ->
725
+ {Effects0 , # state {consumers = Cons0 ,
726
+ consumer_strategy = single_active ,
727
+ waiting_consumers = WaitingConsumers0 } = State0 }) ->
720
728
% % single active consumer on, consumers are waiting
721
729
case maps :take (ConsumerId , Cons0 ) of
722
730
{_CurrentActiveConsumer , _ } ->
723
731
% The active consumer is to be removed
724
732
% Cancel it
725
733
{Effects1 , State1 } = case maps :take (ConsumerId , Cons0 ) of
726
- {# consumer {checked_out = Checked0 }, _ } ->
727
- S = return_all (State0 , Checked0 ),
728
- Effects = cancel_consumer_effects (ConsumerId , S , Effects0 ),
729
- {Effects , State0 };
730
- error ->
731
- {Effects0 , State0 }
732
- end ,
734
+ {# consumer {checked_out = Checked0 }, _ } ->
735
+ S = return_all (State0 , Checked0 ),
736
+ Effects = cancel_consumer_effects (ConsumerId , S , Effects0 ),
737
+ {Effects , State0 };
738
+ error ->
739
+ {Effects0 , State0 }
740
+ end ,
733
741
% Take another one from the waiting consumers and put it in consumers
734
- {NewActiveConsumerId , NewActiveConsumer } = lists :nth (1 , WaitingConsumers0 ),
735
- WaitingConsumers1 = lists :delete ({NewActiveConsumerId , NewActiveConsumer }, WaitingConsumers0 ),
742
+ [{NewActiveConsumerId , NewActiveConsumer } | RemainingWaitingConsumers ] = WaitingConsumers0 ,
736
743
# state {service_queue = ServiceQueue } = State0 ,
737
744
ServiceQueue1 = maybe_queue_consumer (NewActiveConsumerId , NewActiveConsumer , ServiceQueue ),
738
745
State2 = State1 # state {consumers = #{NewActiveConsumerId => NewActiveConsumer },
739
746
service_queue = ServiceQueue1 ,
740
- waiting_consumers = WaitingConsumers1 },
747
+ waiting_consumers = RemainingWaitingConsumers },
741
748
{Effects1 , State2 };
742
749
error ->
743
750
% The cancelled consumer is not the active one
744
751
% Just remove it from idle_consumers
745
- case lists :keytake (ConsumerId , 1 , WaitingConsumers0 ) of
746
- {value , {ConsumerId , # consumer {checked_out = Checked0 }}, WaitingConsumers1 } ->
747
- S = return_all (State0 , Checked0 ),
748
- Effects = cancel_consumer_effects (ConsumerId , S , Effects0 ),
749
- {Effects , # state {waiting_consumers = WaitingConsumers1 }};
750
- false ->
751
- {Effects0 , State0 }
752
- end
752
+ {value , {ConsumerId , # consumer {checked_out = Checked0 }}, WaitingConsumers1 } =
753
+ lists :keytake (ConsumerId , 1 , WaitingConsumers0 ),
754
+ S = return_all (State0 , Checked0 ),
755
+ Effects = cancel_consumer_effects (ConsumerId , S , Effects0 ),
756
+ {Effects , # state {waiting_consumers = WaitingConsumers1 }}
753
757
end .
754
758
755
759
cancel_consumer0 (ConsumerId ,
@@ -1149,17 +1153,17 @@ uniq_queue_in(Key, Queue) ->
1149
1153
1150
1154
1151
1155
update_consumer (ConsumerId , Meta , Spec ,
1152
- # state {single_active_consumer_on = false } = State0 ) ->
1156
+ # state {consumer_strategy = default } = State0 ) ->
1153
1157
% % general case, single active consumer off
1154
1158
update_consumer0 (ConsumerId , Meta , Spec , State0 );
1155
1159
update_consumer (ConsumerId , Meta , Spec ,
1156
- # state {consumers = Cons0 ,
1157
- single_active_consumer_on = true } = State0 ) when map_size (Cons0 ) == 0 ->
1160
+ # state {consumers = Cons0 ,
1161
+ consumer_strategy = single_active } = State0 ) when map_size (Cons0 ) == 0 ->
1158
1162
% % single active consumer on, no one is consuming yet
1159
1163
update_consumer0 (ConsumerId , Meta , Spec , State0 );
1160
1164
update_consumer (ConsumerId , Meta , {Life , Credit , Mode },
1161
- # state {single_active_consumer_on = true ,
1162
- waiting_consumers = WaitingConsumers0 } = State0 ) ->
1165
+ # state {consumer_strategy = single_active ,
1166
+ waiting_consumers = WaitingConsumers0 } = State0 ) ->
1163
1167
% % single active consumer on and one active consumer already
1164
1168
% % adding the new consumer to the waiting list
1165
1169
Consumer = # consumer {lifetime = Life , meta = Meta ,
0 commit comments