1414-dialyzer (no_improper_lists ).
1515
1616-include (" rabbit_fifo.hrl" ).
17- -include_lib (" rabbit_common/include/rabbit.hrl" ).
1817
1918-define (STATE , ? MODULE ).
2019
@@ -739,8 +738,9 @@ apply(_Meta, Cmd, State) ->
739738 {State , ok , []}.
740739
741740convert_v3_to_v4 (#{system_time := Ts },
742- # rabbit_fifo {messages = Messages0 ,
743- consumers = Consumers0 } = StateV3 ) ->
741+ StateV3 ) ->
742+ Messages0 = rabbit_fifo_v3 :get_field (messages , StateV3 ),
743+ Consumers0 = rabbit_fifo_v3 :get_field (consumers , StateV3 ),
744744 Messages = rabbit_fifo_q :from_lqueue (Messages0 ),
745745 Consumers = maps :map (
746746 fun (_CKey , # consumer {checked_out = Ch0 } = C ) ->
@@ -750,8 +750,23 @@ convert_v3_to_v4(#{system_time := Ts},
750750 end , Ch0 ),
751751 C # consumer {checked_out = Ch }
752752 end , Consumers0 ),
753- StateV3 #? MODULE {messages = Messages ,
754- consumers = Consumers }.
753+ #? MODULE {cfg = rabbit_fifo_v3 :get_field (cfg , StateV3 ),
754+ messages = Messages ,
755+ messages_total = rabbit_fifo_v3 :get_field (messages_total , StateV3 ),
756+ returns = rabbit_fifo_v3 :get_field (returns , StateV3 ),
757+ enqueue_count = rabbit_fifo_v3 :get_field (enqueue_count , StateV3 ),
758+ enqueuers = rabbit_fifo_v3 :get_field (enqueuers , StateV3 ),
759+ ra_indexes = rabbit_fifo_v3 :get_field (ra_indexes , StateV3 ),
760+ release_cursors = rabbit_fifo_v3 :get_field (release_cursors , StateV3 ),
761+ consumers = Consumers ,
762+ % consumers that require further service are queued here
763+ service_queue = rabbit_fifo_v3 :get_field (service_queue , StateV3 ),
764+ dlx = rabbit_fifo_v3 :get_field (dlx , StateV3 ),
765+ msg_bytes_enqueue = rabbit_fifo_v3 :get_field (msg_bytes_enqueue , StateV3 ),
766+ msg_bytes_checkout = rabbit_fifo_v3 :get_field (msg_bytes_checkout , StateV3 ),
767+ waiting_consumers = rabbit_fifo_v3 :get_field (waiting_consumers , StateV3 ),
768+ last_active = rabbit_fifo_v3 :get_field (last_active , StateV3 ),
769+ msg_cache = rabbit_fifo_v3 :get_field (msg_cache , StateV3 )}.
755770
756771purge_node (Meta , Node , State , Effects ) ->
757772 lists :foldl (fun (Pid , {S0 , E0 }) ->
@@ -1605,19 +1620,6 @@ drop_head(#?STATE{ra_indexes = Indexes0} = State0, Effects) ->
16051620 {State0 , Effects }
16061621 end .
16071622
1608- maybe_set_msg_ttl (# basic_message {content = # content {properties = none }},
1609- RaCmdTs , Header ,
1610- #? STATE {cfg = # cfg {msg_ttl = PerQueueMsgTTL }}) ->
1611- update_expiry_header (RaCmdTs , PerQueueMsgTTL , Header );
1612- maybe_set_msg_ttl (# basic_message {content = # content {properties = Props }},
1613- RaCmdTs , Header ,
1614- #? STATE {cfg = # cfg {msg_ttl = PerQueueMsgTTL }}) ->
1615- % % rabbit_quorum_queue will leave the properties decoded if and only if
1616- % % per message message TTL is set.
1617- % % We already check in the channel that expiration must be valid.
1618- {ok , PerMsgMsgTTL } = rabbit_basic :parse_expiration (Props ),
1619- TTL = min (PerMsgMsgTTL , PerQueueMsgTTL ),
1620- update_expiry_header (RaCmdTs , TTL , Header );
16211623maybe_set_msg_ttl (Msg , RaCmdTs , Header ,
16221624 #? STATE {cfg = # cfg {msg_ttl = MsgTTL }}) ->
16231625 case mc :is (Msg ) of
@@ -1832,10 +1834,10 @@ update_smallest_raft_index(IncomingRaftIdx, Reply,
18321834 #? STATE {cfg = Cfg ,
18331835 release_cursors = Cursors0 } = State0 ,
18341836 Effects ) ->
1835- Total = messages_total (State0 ),
1837+ % Total = messages_total(State0),
18361838 % % TODO: optimise
18371839 case smallest_raft_index (State0 ) of
1838- undefined when Total == 0 ->
1840+ undefined ->
18391841 % there are no messages on queue anymore and no pending enqueues
18401842 % we can forward release_cursor all the way until
18411843 % the last received command, hooray
@@ -1846,8 +1848,8 @@ update_smallest_raft_index(IncomingRaftIdx, Reply,
18461848 release_cursors = lqueue :new (),
18471849 enqueue_count = 0 },
18481850 {State , Reply , Effects ++ [{release_cursor , IncomingRaftIdx , State }]};
1849- undefined ->
1850- {State0 , Reply , Effects };
1851+ % undefined ->
1852+ % {State0, Reply, Effects};
18511853 Smallest when is_integer (Smallest ) ->
18521854 case find_next_cursor (Smallest , Cursors0 ) of
18531855 empty ->
@@ -2077,7 +2079,7 @@ take_next_msg(#?STATE{returns = Returns0,
20772079 case rabbit_fifo_q :out (Messages0 ) of
20782080 {empty , _ } ->
20792081 empty ;
2080- {{ value , ? MSG (RaftIdx , _ ) = Msg } , Messages } ->
2082+ {_P , ? MSG (RaftIdx , _ ) = Msg , Messages } ->
20812083 % % add index here
20822084 Indexes = rabbit_fifo_index :append (RaftIdx , Indexes0 ),
20832085 {Msg , State #? STATE {messages = Messages ,
@@ -2418,7 +2420,8 @@ normalize(#?STATE{ra_indexes = _Indexes,
24182420 release_cursors = Cursors ,
24192421 dlx = DlxState } = State ) ->
24202422 State #? STATE {returns = lqueue :from_list (lqueue :to_list (Returns )),
2421- messages = rabbit_fifo_q :from_lqueue (Messages ),
2423+ messages = rabbit_fifo_q :normalize (Messages ,
2424+ rabbit_fifo_q :new ()),
24222425 release_cursors = lqueue :from_list (lqueue :to_list (Cursors )),
24232426 dlx = rabbit_fifo_dlx :normalize (DlxState )}.
24242427
@@ -2528,9 +2531,6 @@ add_bytes_return(Header,
25282531 State #? STATE {msg_bytes_checkout = Checkout - Size ,
25292532 msg_bytes_enqueue = Enqueue + Size }.
25302533
2531- message_size (# basic_message {content = Content }) ->
2532- # content {payload_fragments_rev = PFR } = Content ,
2533- iolist_size (PFR );
25342534message_size (B ) when is_binary (B ) ->
25352535 byte_size (B );
25362536message_size (Msg ) ->
@@ -2652,12 +2652,7 @@ smallest_raft_index(#?STATE{messages = Messages,
26522652 ra_indexes = Indexes ,
26532653 dlx = DlxState }) ->
26542654 SmallestDlxRaIdx = rabbit_fifo_dlx :smallest_raft_index (DlxState ),
2655- SmallestMsgsRaIdx = case rabbit_fifo_q :get (Messages ) of
2656- ? MSG (I , _ ) when is_integer (I ) ->
2657- I ;
2658- _ ->
2659- undefined
2660- end ,
2655+ SmallestMsgsRaIdx = rabbit_fifo_q :get_lowest_index (Messages ),
26612656 SmallestRaIdx = rabbit_fifo_index :smallest (Indexes ),
26622657 lists :min ([SmallestDlxRaIdx , SmallestMsgsRaIdx , SmallestRaIdx ]).
26632658
0 commit comments