@@ -66,6 +66,7 @@ groups() ->
6666 [
6767 {backing_queue_tests , [], [
6868 msg_store ,
69+ msg_store_file_scan ,
6970 {backing_queue_v2 , [], Common ++ V2Only },
7071 {backing_queue_v1 , [], Common }
7172 ]}
@@ -535,6 +536,80 @@ test_msg_store_client_delete_and_terminate(GenRef) ->
535536 ok = rabbit_msg_store :client_delete_and_terminate (MSCState ),
536537 passed .
537538
539+ % % -------------------------------------------------------------------
540+ % % Message store file scanning.
541+ % % -------------------------------------------------------------------
542+
543+ msg_store_file_scan (Config ) ->
544+ passed = rabbit_ct_broker_helpers :rpc (Config , 0 ,
545+ ? MODULE , msg_store_file_scan1 , [Config ]).
546+
547+ msg_store_file_scan1 (Config ) ->
548+ Scan = fun (Blocks ) ->
549+ Expected = gen_result (Blocks ),
550+ Path = gen_msg_file (Config , Blocks ),
551+ Result = rabbit_msg_store :scan_file_for_valid_messages (Path ),
552+ case Result of
553+ Expected -> ok ;
554+ _ -> {expected , Expected , got , Result }
555+ end
556+ end ,
557+ % % Empty files.
558+ ok = Scan ([]),
559+ ok = Scan ([{pad , 1024 }]),
560+ ok = Scan ([{pad , 1024 * 1024 }]),
561+ % % One-message files.
562+ ok = Scan ([{msg , gen_id (), gen_msg ()}]),
563+ ok = Scan ([{pad , 1024 }, {msg , gen_id (), gen_msg ()}]),
564+ ok = Scan ([{pad , 1024 * 1024 }, {msg , gen_id (), gen_msg ()}]),
565+ ok = Scan ([{msg , gen_id (), gen_msg ()}, {pad , 1024 }]),
566+ ok = Scan ([{msg , gen_id (), gen_msg ()}, {pad , 1024 * 1024 }]),
567+ % %
568+ passed .
569+
570+ gen_id () ->
571+ rand :bytes (16 ).
572+
573+ gen_msg () ->
574+ gen_msg (1024 * 1024 ).
575+
576+ gen_msg (MaxSize ) ->
577+ % % This might generate false positives but very rarely
578+ % % so we don't do anything to prevent them.
579+ rand :bytes (rand :uniform (MaxSize )).
580+
581+ gen_msg_file (Config , Blocks ) ->
582+ PrivDir = ? config (priv_dir , Config ),
583+ TmpFile = integer_to_list (erlang :unique_integer ([positive ])),
584+ Path = filename :join (PrivDir , TmpFile ),
585+ ok = file :write_file (Path , [case Block of
586+ % % @todo Have a {pad, Bin} to set an explicit value as padding to simulate moved messages.
587+ {pad , Size } ->
588+ % % @todo This might generate false positives although very unlikely.
589+ rand :bytes (Size );
590+ {msg , MsgId , Msg } ->
591+ Size = 16 + byte_size (Msg ),
592+ [<<Size :64 >>, MsgId , Msg , <<255 >>]
593+ end || Block <- Blocks ]),
594+ Path .
595+
596+ gen_result (Blocks ) ->
597+ Messages = gen_result (Blocks , 0 , []),
598+ case Messages of
599+ [] ->
600+ {ok , [], 0 };
601+ [{_ , TotalSize , Offset }|_ ] ->
602+ {ok , Messages , Offset + TotalSize }
603+ end .
604+
605+ gen_result ([], _ , Acc ) ->
606+ Acc ;
607+ gen_result ([{pad , Size }|Tail ], Offset , Acc ) ->
608+ gen_result (Tail , Offset + Size , Acc );
609+ gen_result ([{msg , MsgId , Msg }|Tail ], Offset , Acc ) ->
610+ Size = 9 + 16 + byte_size (Msg ),
611+ gen_result (Tail , Offset + Size , [{MsgId , Size , Offset }|Acc ]).
612+
538613% % -------------------------------------------------------------------
539614% % Backing queue.
540615% % -------------------------------------------------------------------
0 commit comments