Skip to content

Commit ae9219a

Browse files
authored
Merge pull request #7641 from rabbitmq/mergify/bp/v3.12.x/pr-7638
Take credits for inactive stream subscription (backport #7638)
2 parents aef9f6d + 986e72e commit ae9219a

File tree

1 file changed

+31
-13
lines changed

1 file changed

+31
-13
lines changed

deps/rabbitmq_stream/src/rabbit_stream_reader.erl

Lines changed: 31 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2008,11 +2008,14 @@ handle_frame_post_auth(Transport,
20082008
#stream_connection_state{consumers = Consumers} = State,
20092009
{credit, SubscriptionId, Credit}) ->
20102010
case Consumers of
2011-
#{SubscriptionId := #consumer{log = undefined}} ->
2011+
#{SubscriptionId := #consumer{log = undefined} = Consumer} ->
20122012
%% the consumer is not active, it's likely to be credit leftovers
2013-
%% from a formerly active consumer, just logging and send an error
2013+
%% from a formerly active consumer. Taking the credits,
2014+
%% logging and sending an error
20142015
rabbit_log:debug("Giving credit to an inactive consumer: ~tp",
20152016
[SubscriptionId]),
2017+
#consumer{credit = AvailableCredit} = Consumer,
2018+
Consumer1 = Consumer#consumer{credit = AvailableCredit + Credit},
20162019

20172020
Code = ?RESPONSE_CODE_PRECONDITION_FAILED,
20182021
Frame =
@@ -2022,7 +2025,9 @@ handle_frame_post_auth(Transport,
20222025
rabbit_global_counters:increase_protocol_counter(stream,
20232026
?PRECONDITION_FAILED,
20242027
1),
2025-
{Connection, State};
2028+
{Connection,
2029+
State#stream_connection_state{consumers =
2030+
Consumers#{SubscriptionId => Consumer1}}};
20262031
#{SubscriptionId := Consumer} ->
20272032
#consumer{credit = AvailableCredit, last_listener_offset = LLO} =
20282033
Consumer,
@@ -2519,9 +2524,10 @@ handle_frame_post_auth(Transport,
25192524
ROS
25202525
end,
25212526

2522-
rabbit_log:debug("Initializing reader for active consumer, offset "
2527+
rabbit_log:debug("Initializing reader for active consumer "
2528+
"(subscription ~tp, stream ~tp), offset "
25232529
"spec is ~tp",
2524-
[OffsetSpec]),
2530+
[SubscriptionId, Stream, OffsetSpec]),
25252531
QueueResource =
25262532
#resource{name = Stream,
25272533
kind = queue,
@@ -2535,6 +2541,19 @@ handle_frame_post_auth(Transport,
25352541
Properties,
25362542
OffsetSpec),
25372543
Consumer1 = Consumer#consumer{log = Segment},
2544+
#consumer{credit = Crdt,
2545+
send_limit = SndLmt,
2546+
configuration = #consumer_configuration{counters = ConsumerCounters}} = Consumer1,
2547+
2548+
rabbit_log:debug("Dispatching to subscription ~tp (stream ~tp), "
2549+
"credit(s) ~tp, send limit ~tp",
2550+
[SubscriptionId,
2551+
Stream,
2552+
Crdt,
2553+
SndLmt]),
2554+
2555+
ConsumedMessagesBefore = messages_consumed(ConsumerCounters),
2556+
25382557
Consumer2 =
25392558
case send_chunks(DeliverVersion,
25402559
Transport,
@@ -2554,17 +2573,16 @@ handle_frame_post_auth(Transport,
25542573
{ok, Csmr} ->
25552574
Csmr
25562575
end,
2557-
#consumer{configuration =
2558-
#consumer_configuration{counters =
2559-
ConsumerCounters},
2560-
log = Log2} =
2561-
Consumer2,
2576+
#consumer{log = Log2} = Consumer2,
25622577
ConsumerOffset = osiris_log:next_offset(Log2),
25632578

2564-
rabbit_log:debug("Subscription ~tp is now at offset ~tp with ~tp "
2579+
ConsumedMessagesAfter = messages_consumed(ConsumerCounters),
2580+
rabbit_log:debug("Subscription ~tp (stream ~tp) is now at offset ~tp with ~tp "
25652581
"message(s) distributed after subscription",
2566-
[SubscriptionId, ConsumerOffset,
2567-
messages_consumed(ConsumerCounters)]),
2582+
[SubscriptionId,
2583+
Stream,
2584+
ConsumerOffset,
2585+
ConsumedMessagesAfter - ConsumedMessagesBefore]),
25682586

25692587
Consumers#{SubscriptionId => Consumer2};
25702588
#{SubscriptionId :=

0 commit comments

Comments
 (0)