@@ -727,33 +727,25 @@ cancel_consumer(ConsumerId,
727
727
waiting_consumers = WaitingConsumers0 } = State0 }) ->
728
728
% % single active consumer on, consumers are waiting
729
729
case maps :take (ConsumerId , Cons0 ) of
730
- {_CurrentActiveConsumer , _ } ->
730
+ {_CurrentActiveConsumer = # consumer { checked_out = Checked0 } , _ } ->
731
731
% The active consumer is to be removed
732
732
% Cancel it
733
- {Effects1 , State1 } = case maps :take (ConsumerId , Cons0 ) of
734
- {# consumer {checked_out = Checked0 }, _ } ->
735
- S = return_all (State0 , Checked0 ),
736
- Effects = cancel_consumer_effects (ConsumerId , S , Effects0 ),
737
- {Effects , State0 };
738
- error ->
739
- {Effects0 , State0 }
740
- end ,
733
+ S = return_all (State0 , Checked0 ),
734
+ Effects = cancel_consumer_effects (ConsumerId , S , Effects0 ),
741
735
% Take another one from the waiting consumers and put it in consumers
742
736
[{NewActiveConsumerId , NewActiveConsumer } | RemainingWaitingConsumers ] = WaitingConsumers0 ,
743
737
# state {service_queue = ServiceQueue } = State0 ,
744
738
ServiceQueue1 = maybe_queue_consumer (NewActiveConsumerId , NewActiveConsumer , ServiceQueue ),
745
- State2 = State1 # state {consumers = #{NewActiveConsumerId => NewActiveConsumer },
739
+ State1 = State0 # state {consumers = #{NewActiveConsumerId => NewActiveConsumer },
746
740
service_queue = ServiceQueue1 ,
747
741
waiting_consumers = RemainingWaitingConsumers },
748
- {Effects1 , State2 };
742
+ {Effects , State1 };
749
743
error ->
750
744
% The cancelled consumer is not the active one
751
745
% Just remove it from idle_consumers
752
- {value , {ConsumerId , # consumer {checked_out = Checked0 }}, WaitingConsumers1 } =
753
- lists :keytake (ConsumerId , 1 , WaitingConsumers0 ),
754
- S = return_all (State0 , Checked0 ),
755
- Effects = cancel_consumer_effects (ConsumerId , S , Effects0 ),
756
- {Effects , State0 # state {waiting_consumers = WaitingConsumers1 }}
746
+ {value , _Consumer , WaitingConsumers1 } = lists :keytake (ConsumerId , 1 , WaitingConsumers0 ),
747
+ % A waiting consumer isn't supposed to have any checked out messages, so nothing special to do here
748
+ {Effects0 , State0 # state {waiting_consumers = WaitingConsumers1 }}
757
749
end .
758
750
759
751
cancel_consumer0 (ConsumerId ,
0 commit comments