Skip to content

Commit ec3f83f

Browse files
committed
Pick new exclusive consumer after channel goes down
[#161090309] References #1743
1 parent a59ee6e commit ec3f83f

File tree

3 files changed

+178
-71
lines changed

3 files changed

+178
-71
lines changed

src/rabbit_amqqueue_process.erl

Lines changed: 87 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@
9595
mirroring_policy_version = 0,
9696
%% running | flow | idle
9797
status,
98+
%% true | false
9899
exclusive_consumer_on
99100
}).
100101

@@ -815,9 +816,10 @@ should_auto_delete(#q{q = #amqqueue{auto_delete = false}}) -> false;
815816
should_auto_delete(#q{has_had_consumers = false}) -> false;
816817
should_auto_delete(State) -> is_unused(State).
817818

818-
handle_ch_down(DownPid, State = #q{consumers = Consumers,
819-
exclusive_consumer = Holder,
820-
senders = Senders}) ->
819+
handle_ch_down(DownPid, State = #q{consumers = Consumers,
820+
exclusive_consumer = Holder,
821+
exclusive_consumer_on = ExclusiveConsumerOn,
822+
senders = Senders}) ->
821823
State1 = State#q{senders = case pmon:is_monitored(DownPid, Senders) of
822824
false ->
823825
Senders;
@@ -841,10 +843,7 @@ handle_ch_down(DownPid, State = #q{consumers = Consumers,
841843
{ChAckTags, ChCTags, Consumers1} ->
842844
QName = qname(State1),
843845
[emit_consumer_deleted(DownPid, CTag, QName, ?INTERNAL_USER) || CTag <- ChCTags],
844-
Holder1 = case Holder of
845-
{DownPid, _} -> none;
846-
Other -> Other
847-
end,
846+
Holder1 = new_exclusive_consumer_after_channel_down(DownPid, Holder, ExclusiveConsumerOn, Consumers1),
848847
State2 = State1#q{consumers = Consumers1,
849848
exclusive_consumer = Holder1},
850849
notify_decorators(State2),
@@ -861,6 +860,22 @@ handle_ch_down(DownPid, State = #q{consumers = Consumers,
861860
end
862861
end.
863862

863+
new_exclusive_consumer_after_channel_down(DownChPid, CurrentExclusiveConsumer, _ExclusiveConsumerIsOn = true, Consumers) ->
864+
case CurrentExclusiveConsumer of
865+
{DownChPid, _} ->
866+
case rabbit_queue_consumers:get_consumer(Consumers) of
867+
undefined -> none;
868+
Consumer -> Consumer
869+
end;
870+
false ->
871+
CurrentExclusiveConsumer
872+
end;
873+
new_exclusive_consumer_after_channel_down(DownChPid, CurrentExclusiveConsumer, _ExclusiveConsumerIsOn = false, _Consumers) ->
874+
case CurrentExclusiveConsumer of
875+
{DownChPid, _} -> none;
876+
Other -> Other
877+
end.
878+
864879
check_exclusive_access({_ChPid, _ConsumerTag}, _ExclusiveConsume, _State) ->
865880
in_use;
866881
check_exclusive_access(none, false, _State) ->
@@ -1010,11 +1025,13 @@ i(effective_policy_definition, #q{q = Q}) ->
10101025
end;
10111026
i(exclusive_consumer_pid, #q{exclusive_consumer = none}) ->
10121027
'';
1013-
i(exclusive_consumer_pid, #q{exclusive_consumer = {ChPid, _ConsumerTag}}) ->
1028+
i(exclusive_consumer_pid, #q{exclusive_consumer = {ChPid, _ConsumerTagOrConsumer}}) ->
10141029
ChPid;
10151030
i(exclusive_consumer_tag, #q{exclusive_consumer = none}) ->
10161031
'';
1017-
i(exclusive_consumer_tag, #q{exclusive_consumer = {_ChPid, ConsumerTag}}) ->
1032+
i(exclusive_consumer_tag, #q{exclusive_consumer_on = true, exclusive_consumer = {_ChPid, Consumer}}) ->
1033+
rabbit_queue_consumers:consumer_tag(Consumer);
1034+
i(exclusive_consumer_tag, #q{exclusive_consumer_on = false, exclusive_consumer = {_ChPid, ConsumerTag}}) ->
10181035
ConsumerTag;
10191036
i(messages_ready, #q{backing_queue_state = BQS, backing_queue = BQ}) ->
10201037
BQ:len(BQS);
@@ -1217,7 +1234,7 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
12171234
_From, State = #q{consumers = Consumers,
12181235
exclusive_consumer = Holder,
12191236
exclusive_consumer_on = ExclusiveConsumerOn}) ->
1220-
case ExclusiveConsumerOn of
1237+
State1 = case ExclusiveConsumerOn of
12211238
true ->
12221239
case ExclusiveConsume of
12231240
true ->
@@ -1229,7 +1246,7 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
12291246
PrefetchCount, Args, is_empty(State),
12301247
ActingUser, Consumers),
12311248

1232-
State1 = case Holder of
1249+
case Holder of
12331250
none ->
12341251
NewConsumer = rabbit_queue_consumers:get(ChPid, ConsumerTag, Consumers1),
12351252
State#q{consumers = Consumers1,
@@ -1238,18 +1255,7 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
12381255
_ ->
12391256
State#q{consumers = Consumers1,
12401257
has_had_consumers = true}
1241-
end,
1242-
ok = maybe_send_reply(ChPid, OkMsg),
1243-
QName = qname(State1),
1244-
AckRequired = not NoAck,
1245-
rabbit_core_metrics:consumer_created(
1246-
ChPid, ConsumerTag, ExclusiveConsume, AckRequired, QName,
1247-
PrefetchCount, Args),
1248-
emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
1249-
AckRequired, QName, PrefetchCount,
1250-
Args, none, ActingUser),
1251-
notify_decorators(State1),
1252-
reply(ok, run_message_queue(State1))
1258+
end
12531259
end;
12541260
false ->
12551261
case check_exclusive_access(Holder, ExclusiveConsume, State) of
@@ -1263,23 +1269,22 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
12631269
if ExclusiveConsume -> {ChPid, ConsumerTag};
12641270
true -> Holder
12651271
end,
1266-
State1 = State#q{consumers = Consumers1,
1267-
has_had_consumers = true,
1268-
exclusive_consumer = ExclusiveConsumer},
1269-
ok = maybe_send_reply(ChPid, OkMsg),
1270-
QName = qname(State1),
1271-
AckRequired = not NoAck,
1272-
rabbit_core_metrics:consumer_created(
1273-
ChPid, ConsumerTag, ExclusiveConsume, AckRequired, QName,
1274-
PrefetchCount, Args),
1275-
emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
1276-
AckRequired, QName, PrefetchCount,
1277-
Args, none, ActingUser),
1278-
notify_decorators(State1),
1279-
reply(ok, run_message_queue(State1))
1272+
State#q{consumers = Consumers1,
1273+
has_had_consumers = true,
1274+
exclusive_consumer = ExclusiveConsumer}
12801275
end
1281-
end;
1282-
1276+
end,
1277+
ok = maybe_send_reply(ChPid, OkMsg),
1278+
QName = qname(State1),
1279+
AckRequired = not NoAck,
1280+
rabbit_core_metrics:consumer_created(
1281+
ChPid, ConsumerTag, ExclusiveConsume, AckRequired, QName,
1282+
PrefetchCount, Args),
1283+
emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
1284+
AckRequired, QName, PrefetchCount,
1285+
Args, none, ActingUser),
1286+
notify_decorators(State1),
1287+
reply(ok, run_message_queue(State1));
12831288

12841289
handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg, ActingUser}, _From,
12851290
State = #q{consumers = Consumers,
@@ -1290,23 +1295,9 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg, ActingUser}, _From,
12901295
not_found ->
12911296
reply(ok, State);
12921297
Consumers1 ->
1293-
Holder1 = case ExclusiveConsumerOn of
1294-
true ->
1295-
case rabbit_queue_consumers:is_same(ChPid, ConsumerTag, Holder) of
1296-
true ->
1297-
case rabbit_queue_consumers:get_consumer(Consumers1) of
1298-
undefined -> none;
1299-
Consumer -> Consumer
1300-
end;
1301-
false ->
1302-
Holder
1303-
end;
1304-
false ->
1305-
case Holder of
1306-
{ChPid, ConsumerTag} -> none;
1307-
_ -> Holder
1308-
end
1309-
end,
1298+
Holder1 = new_exclusive_consumer_after_basic_cancel(ChPid, ConsumerTag,
1299+
Holder, ExclusiveConsumerOn, Consumers1
1300+
),
13101301
State1 = State#q{consumers = Consumers1,
13111302
exclusive_consumer = Holder1},
13121303
emit_consumer_deleted(ChPid, ConsumerTag, qname(State1), ActingUser),
@@ -1378,6 +1369,24 @@ handle_call(sync_mirrors, _From, State) ->
13781369
handle_call(cancel_sync_mirrors, _From, State) ->
13791370
reply({ok, not_syncing}, State).
13801371

1372+
new_exclusive_consumer_after_basic_cancel(ChPid, ConsumerTag, CurrentExclusiveConsumer,
1373+
_ExclusiveConsumerIsOn = true, Consumers) ->
1374+
case rabbit_queue_consumers:is_same(ChPid, ConsumerTag, CurrentExclusiveConsumer) of
1375+
true ->
1376+
case rabbit_queue_consumers:get_consumer(Consumers) of
1377+
undefined -> none;
1378+
Consumer -> Consumer
1379+
end;
1380+
false ->
1381+
CurrentExclusiveConsumer
1382+
end;
1383+
new_exclusive_consumer_after_basic_cancel(ChPid, ConsumerTag, CurrentExclusiveConsumer,
1384+
_ExclusiveConsumerIsOn = false, _Consumers) ->
1385+
case CurrentExclusiveConsumer of
1386+
{ChPid, ConsumerTag} -> none;
1387+
_ -> CurrentExclusiveConsumer
1388+
end.
1389+
13811390
handle_cast(init, State) ->
13821391
try
13831392
init_it({no_barrier, non_clean_shutdown}, none, State)
@@ -1487,8 +1496,9 @@ handle_cast({credit, ChPid, CTag, Credit, Drain},
14871496
end);
14881497

14891498
handle_cast({force_event_refresh, Ref},
1490-
State = #q{consumers = Consumers,
1491-
exclusive_consumer = Exclusive}) ->
1499+
State = #q{consumers = Consumers,
1500+
exclusive_consumer = Exclusive,
1501+
exclusive_consumer_on = ExclusiveConsumerOn}) ->
14921502
rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State), Ref),
14931503
QName = qname(State),
14941504
AllConsumers = rabbit_queue_consumers:all(Consumers),
@@ -1499,10 +1509,25 @@ handle_cast({force_event_refresh, Ref},
14991509
Args, Ref, ActingUser) ||
15001510
{Ch, CTag, AckRequired, Prefetch, Args, ActingUser}
15011511
<- AllConsumers];
1502-
{Ch, CTag} ->
1503-
[{Ch, CTag, AckRequired, Prefetch, Args, ActingUser}] = AllConsumers,
1504-
emit_consumer_created(
1505-
Ch, CTag, true, AckRequired, QName, Prefetch, Args, Ref, ActingUser)
1512+
{ExclusiveConsumerChannel, ConsumerOrConsumerTag} ->
1513+
case ExclusiveConsumerOn of
1514+
true ->
1515+
ExclusiveConsumerConsumerTag = rabbit_queue_consumers:consumer_tag(ConsumerOrConsumerTag),
1516+
[emit_consumer_created(
1517+
Ch, CTag,
1518+
case {Ch, CTag} of
1519+
{ExclusiveConsumerChannel, ExclusiveConsumerConsumerTag} -> true;
1520+
_ -> false
1521+
end,
1522+
AckRequired, QName, Prefetch,
1523+
Args, Ref, ActingUser) ||
1524+
{Ch, CTag, AckRequired, Prefetch, Args, ActingUser}
1525+
<- AllConsumers];
1526+
false ->
1527+
[{Ch, CTag, AckRequired, Prefetch, Args, ActingUser}] = AllConsumers,
1528+
emit_consumer_created(
1529+
Ch, CTag, true, AckRequired, QName, Prefetch, Args, Ref, ActingUser)
1530+
end
15061531
end,
15071532
noreply(rabbit_event:init_stats_timer(State, #q.stats_timer));
15081533

src/rabbit_queue_consumers.erl

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@
2121
send_drained/0, deliver/5, record_ack/3, subtract_acks/3,
2222
possibly_unblock/3,
2323
resume_fun/0, notify_sent_fun/1, activate_limit_fun/0,
24-
credit/6, utilisation/1, is_same/3, get_consumer/1, get/3]).
24+
credit/6, utilisation/1, is_same/3, get_consumer/1, get/3,
25+
consumer_tag/1]).
2526

2627
%%----------------------------------------------------------------------------
2728

@@ -97,6 +98,7 @@
9798
-spec credit(boolean(), integer(), boolean(), ch(), rabbit_types:ctag(),
9899
state()) -> 'unchanged' | {'unblocked', state()}.
99100
-spec utilisation(state()) -> ratio().
101+
-spec consumer_tag(consumer()) -> rabbit_types:ctag().
100102

101103
%%----------------------------------------------------------------------------
102104

@@ -391,6 +393,9 @@ get(ChPid, ConsumerTag, #state{consumers = Consumers}) ->
391393
{{value, Consumer, _Priority}, _Tail} -> Consumer
392394
end.
393395

396+
consumer_tag(#consumer{tag = CTag}) ->
397+
CTag.
398+
394399
%%----------------------------------------------------------------------------
395400

396401
parse_credit_args(Default, Args) ->

0 commit comments

Comments
 (0)