Skip to content

Commit ec76fef

Browse files
committed
Move consumer timeout tests to own SUITE
Also handle case where client does not support consumer cancellation and rename the queue_cleanup timer to a generic "tick" timer for channels to perform periodic activities. [#164212469]
1 parent d389d04 commit ec76fef

File tree

5 files changed

+307
-88
lines changed

5 files changed

+307
-88
lines changed

Makefile

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,8 @@ define PROJECT_ENV
126126
{vhost_restart_strategy, continue},
127127
%% {global, prefetch count}
128128
{default_consumer_prefetch, {false, 0}},
129-
{channel_queue_cleanup_interval, 60000},
129+
%% interval at which the channel can perform periodic actions
130+
{channel_tick_interval, 60000},
130131
%% Default max message size is 128 MB
131132
{max_message_size, 134217728}
132133
]

src/rabbit_channel.erl

Lines changed: 30 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@
169169
delivery_flow,
170170
interceptor_state,
171171
queue_states,
172-
queue_cleanup_timer
172+
tick_timer
173173
}).
174174

175175
-define(QUEUE, lqueue).
@@ -489,7 +489,6 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
489489
end,
490490
MaxMessageSize = get_max_message_size(),
491491
ConsumerTimeout = get_consumer_timeout(),
492-
rabbit_log:info("consumer timeout ~w", [ConsumerTimeout]),
493492
State = #ch{cfg = #conf{state = starting,
494493
protocol = Protocol,
495494
channel = Channel,
@@ -535,7 +534,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
535534
rabbit_event:if_enabled(State2, #ch.stats_timer,
536535
fun() -> emit_stats(State2) end),
537536
put_operation_timeout(),
538-
State3 = init_queue_cleanup_timer(State2),
537+
State3 = init_tick_timer(State2),
539538
{ok, State3, hibernate,
540539
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
541540

@@ -828,12 +827,13 @@ handle_info({{Ref, Node}, LateAnswer},
828827
[Channel, LateAnswer, Node]),
829828
noreply(State);
830829

831-
handle_info(queue_cleanup, State0 = #ch{cfg = #conf{channel = Channel,
832-
consumer_timeout = Timeout},
833-
queue_states = QueueStates0,
834-
queue_names = QNames,
835-
queue_consumers = QCons,
836-
unacked_message_q = UAMQ}) ->
830+
handle_info(tick, State0 = #ch{cfg = #conf{channel = Channel,
831+
capabilities = Capabilities,
832+
consumer_timeout = Timeout},
833+
queue_states = QueueStates0,
834+
queue_names = QNames,
835+
queue_consumers = QCons,
836+
unacked_message_q = UAMQ}) ->
837837
QueueStates1 =
838838
maps:filter(fun(_, QS) ->
839839
QName = rabbit_quorum_queue:queue_name(QS),
@@ -845,22 +845,24 @@ handle_info(queue_cleanup, State0 = #ch{cfg = #conf{channel = Channel,
845845
{value, {_DTag, ConsumerTag, Time, {QPid, _Msg}}}
846846
when is_integer(Timeout)
847847
andalso Time < Now - Timeout ->
848-
case ConsumerTag of
849-
_ when is_integer(ConsumerTag) ->
850-
%% basic.get - there is no mechanims so we just crash the
851-
%% channel
848+
rabbit_log_channel:info("Consumer ~w on Channel ~w has timed out "
849+
"waiting on ack",
850+
[rabbit_data_coercion:to_binary(ConsumerTag),
851+
Channel]),
852+
SupportsCancel = case rabbit_misc:table_lookup(
853+
Capabilities,
854+
<<"consumer_cancel_notify">>) of
855+
{bool, true} when is_binary(ConsumerTag) ->
856+
true;
857+
_ -> false
858+
end,
859+
case SupportsCancel of
860+
false ->
852861
Ex = rabbit_misc:amqp_error(precondition_failed,
853-
"basic.get ack timed out on channel ~w",
862+
"consumer ack timed out on channel ~w",
854863
[Channel], none),
855864
handle_exception(Ex, State0);
856-
% rabbit_misc:protocol_error(precondition_failed,
857-
% "basic.get ack timed out on channel ~w ",
858-
% [Channel]);
859-
_ ->
860-
rabbit_log_channel:info("Consumer ~w on Channel ~w has timed out "
861-
"waiting on ack",
862-
[rabbit_data_coercion:to_binary(ConsumerTag),
863-
Channel]),
865+
true ->
864866
QRef = qpid_to_ref(QPid),
865867
QName = maps:get(QRef, QNames),
866868
%% cancel the consumer with the client
@@ -881,15 +883,14 @@ handle_info(queue_cleanup, State0 = #ch{cfg = #conf{channel = Channel,
881883
?QUEUE:to_list(UAMQ)),
882884
QueueStates = rabbit_amqqueue:requeue(QPid, {ConsumerTag, MsgIds},
883885
self(), QueueStates2),
884-
885886
State = State1#ch{queue_states = QueueStates,
886887
queue_consumers = maps:remove(QRef, QCons),
887888
unacked_message_q = Rem},
888-
noreply(init_queue_cleanup_timer(State))
889+
noreply(init_tick_timer(State))
889890
end;
890891
_ ->
891892
noreply(
892-
init_queue_cleanup_timer(
893+
init_tick_timer(
893894
State0#ch{queue_states = QueueStates1}))
894895
end;
895896
handle_info({channel_source, Source}, State = #ch{cfg = Cfg}) ->
@@ -1910,10 +1911,7 @@ cancel_consumer(CTag, QName,
19101911
consumer_mapping = CMap}) ->
19111912
case rabbit_misc:table_lookup(
19121913
Capabilities, <<"consumer_cancel_notify">>) of
1913-
{bool, true} -> ok =
1914-
1915-
rabbit_log:info("Consumer cancel notify suppoerted ~w", [CTag]),
1916-
send(#'basic.cancel'{consumer_tag = CTag,
1914+
{bool, true} -> ok = send(#'basic.cancel'{consumer_tag = CTag,
19171915
nowait = true}, State);
19181916
_ -> ok
19191917
end,
@@ -2692,9 +2690,9 @@ handle_basic_get(WriterPid, DeliveryTag, NoAck, MessageCount,
26922690
State1 = track_delivering_queue(NoAck, QPid, QName, State),
26932691
{noreply, record_sent(get, DeliveryTag, not(NoAck), Msg, State1)}.
26942692

2695-
init_queue_cleanup_timer(State) ->
2696-
{ok, Interval} = application:get_env(rabbit, channel_queue_cleanup_interval),
2697-
State#ch{queue_cleanup_timer = erlang:send_after(Interval, self(), queue_cleanup)}.
2693+
init_tick_timer(State) ->
2694+
{ok, Interval} = application:get_env(rabbit, channel_tick_interval),
2695+
State#ch{tick_timer = erlang:send_after(Interval, self(), tick)}.
26982696

26992697
%% only classic queues need monitoring so rather than special casing
27002698
%% everywhere monitors are set up we wrap it here for this module

0 commit comments

Comments
 (0)