Skip to content

Commit c318bb9

Browse files
dcorbachokjnilsson
authored andcommitted
Return delivered but unack messages to the queeu for noconnection reason
[#161679638]
1 parent 0c7c7d9 commit c318bb9

File tree

1 file changed

+30
-8
lines changed

1 file changed

+30
-8
lines changed

src/rabbit_fifo.erl

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -374,10 +374,16 @@ apply(_, {down, ConsumerPid, noconnection},
374374
Node = node(ConsumerPid),
375375
% mark all consumers and enqueuers as suspect
376376
% and monitor the node
377-
Cons = maps:map(fun({_, P}, C) when node(P) =:= Node ->
378-
C#consumer{suspected_down = true};
379-
(_, C) -> C
380-
end, Cons0),
377+
{Cons, State} = maps:fold(fun({_, P} = K, #consumer{checked_out = Checked0} = C,
378+
{Co, St0}) when node(P) =:= Node ->
379+
St = return_all(St0, Checked0),
380+
{maps:put(K, C#consumer{suspected_down = true,
381+
checked_out = #{}},
382+
Co),
383+
St};
384+
(K, C, {Co, St}) ->
385+
{maps:put(K, C, Co), St}
386+
end, {#{}, State0}, Cons0),
381387
Enqs = maps:map(fun(P, E) when node(P) =:= Node ->
382388
E#enqueuer{suspected_down = true};
383389
(_, E) -> E
@@ -388,7 +394,7 @@ apply(_, {down, ConsumerPid, noconnection},
388394
_ ->
389395
[{monitor, node, Node} | Effects0]
390396
end,
391-
{State0#state{consumers = Cons, enqueuers = Enqs}, Effects, ok};
397+
{State#state{consumers = Cons, enqueuers = Enqs}, Effects, ok};
392398
apply(_, {down, Pid, _Info}, Effects0,
393399
#state{consumers = Cons0,
394400
enqueuers = Enqs0} = State0) ->
@@ -583,9 +589,7 @@ cancel_consumer(ConsumerId,
583589
{Effects0, #state{consumers = C0, name = Name} = S0}) ->
584590
case maps:take(ConsumerId, C0) of
585591
{#consumer{checked_out = Checked0}, Cons} ->
586-
S = maps:fold(fun (_, {MsgNum, Msg}, S) ->
587-
return_one(MsgNum, Msg, S)
588-
end, S0, Checked0),
592+
S = return_all(S0, Checked0),
589593
Effects = cancel_consumer_effects(ConsumerId, Name, S, Effects0),
590594
case maps:size(Cons) of
591595
0 ->
@@ -788,6 +792,10 @@ return_one(MsgNum, {RaftId, {Header0, RawMsg}},
788792
State0#state{messages = maps:put(MsgNum, Msg, Messages),
789793
returns = queue:in(MsgNum, Returns)}.
790794

795+
return_all(State, Checked) ->
796+
maps:fold(fun (_, {MsgNum, Msg}, S) ->
797+
return_one(MsgNum, Msg, S)
798+
end, State, Checked).
791799

792800
checkout(State, Effects) ->
793801
checkout0(checkout_one(State), Effects, #{}).
@@ -1289,6 +1297,20 @@ down_with_noconnection_marks_suspect_and_node_is_monitored_test() ->
12891297
?ASSERT_EFF({monitor, process, P}, P =:= Self, Effects3),
12901298
ok.
12911299

1300+
down_with_noconnection_returns_unack_test() ->
1301+
Pid = spawn(fun() -> ok end),
1302+
Cid = {<<"down_with_noconnect">>, Pid},
1303+
{State0, _} = enq(1, 1, second, test_init(test)),
1304+
?assertEqual(1, maps:size(State0#state.messages)),
1305+
?assertEqual(0, queue:len(State0#state.returns)),
1306+
{State1, {_, _}} = deq(2, Cid, unsettled, State0),
1307+
?assertEqual(0, maps:size(State1#state.messages)),
1308+
?assertEqual(0, queue:len(State1#state.returns)),
1309+
{State2a, _, _} = apply(meta(3), {down, Pid, noconnection}, [], State1),
1310+
?assertEqual(1, maps:size(State2a#state.messages)),
1311+
?assertEqual(1, queue:len(State2a#state.returns)),
1312+
ok.
1313+
12921314
down_with_noproc_enqueuer_is_cleaned_up_test() ->
12931315
State00 = test_init(test),
12941316
Pid = spawn(fun() -> ok end),

0 commit comments

Comments
 (0)