Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 31 additions & 13 deletions deps/rabbitmq_stream/src/rabbit_stream_reader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2008,11 +2008,14 @@ handle_frame_post_auth(Transport,
#stream_connection_state{consumers = Consumers} = State,
{credit, SubscriptionId, Credit}) ->
case Consumers of
#{SubscriptionId := #consumer{log = undefined}} ->
#{SubscriptionId := #consumer{log = undefined} = Consumer} ->
%% the consumer is not active, it's likely to be credit leftovers
%% from a formerly active consumer, just logging and send an error
%% from a formerly active consumer. Taking the credits,
%% logging and sending an error
rabbit_log:debug("Giving credit to an inactive consumer: ~tp",
[SubscriptionId]),
#consumer{credit = AvailableCredit} = Consumer,
Consumer1 = Consumer#consumer{credit = AvailableCredit + Credit},

Code = ?RESPONSE_CODE_PRECONDITION_FAILED,
Frame =
Expand All @@ -2022,7 +2025,9 @@ handle_frame_post_auth(Transport,
rabbit_global_counters:increase_protocol_counter(stream,
?PRECONDITION_FAILED,
1),
{Connection, State};
{Connection,
State#stream_connection_state{consumers =
Consumers#{SubscriptionId => Consumer1}}};
#{SubscriptionId := Consumer} ->
#consumer{credit = AvailableCredit, last_listener_offset = LLO} =
Consumer,
Expand Down Expand Up @@ -2519,9 +2524,10 @@ handle_frame_post_auth(Transport,
ROS
end,

rabbit_log:debug("Initializing reader for active consumer, offset "
rabbit_log:debug("Initializing reader for active consumer "
"(subscription ~tp, stream ~tp), offset "
"spec is ~tp",
[OffsetSpec]),
[SubscriptionId, Stream, OffsetSpec]),
QueueResource =
#resource{name = Stream,
kind = queue,
Expand All @@ -2535,6 +2541,19 @@ handle_frame_post_auth(Transport,
Properties,
OffsetSpec),
Consumer1 = Consumer#consumer{log = Segment},
#consumer{credit = Crdt,
send_limit = SndLmt,
configuration = #consumer_configuration{counters = ConsumerCounters}} = Consumer1,

rabbit_log:debug("Dispatching to subscription ~tp (stream ~tp), "
"credit(s) ~tp, send limit ~tp",
[SubscriptionId,
Stream,
Crdt,
SndLmt]),

ConsumedMessagesBefore = messages_consumed(ConsumerCounters),

Consumer2 =
case send_chunks(DeliverVersion,
Transport,
Expand All @@ -2554,17 +2573,16 @@ handle_frame_post_auth(Transport,
{ok, Csmr} ->
Csmr
end,
#consumer{configuration =
#consumer_configuration{counters =
ConsumerCounters},
log = Log2} =
Consumer2,
#consumer{log = Log2} = Consumer2,
ConsumerOffset = osiris_log:next_offset(Log2),

rabbit_log:debug("Subscription ~tp is now at offset ~tp with ~tp "
ConsumedMessagesAfter = messages_consumed(ConsumerCounters),
rabbit_log:debug("Subscription ~tp (stream ~tp) is now at offset ~tp with ~tp "
"message(s) distributed after subscription",
[SubscriptionId, ConsumerOffset,
messages_consumed(ConsumerCounters)]),
[SubscriptionId,
Stream,
ConsumerOffset,
ConsumedMessagesAfter - ConsumedMessagesBefore]),

Consumers#{SubscriptionId => Consumer2};
#{SubscriptionId :=
Expand Down