@@ -66,6 +66,7 @@ groups() ->
6666 [
6767 {backing_queue_tests , [], [
6868 msg_store ,
69+ msg_store_file_scan ,
6970 {backing_queue_v2 , [], Common ++ V2Only },
7071 {backing_queue_v1 , [], Common }
7172 ]}
@@ -535,6 +536,191 @@ test_msg_store_client_delete_and_terminate(GenRef) ->
535536 ok = rabbit_msg_store :client_delete_and_terminate (MSCState ),
536537 passed .
537538
539+ % % -------------------------------------------------------------------
540+ % % Message store file scanning.
541+ % % -------------------------------------------------------------------
542+
543+ % % While it is possible although very unlikely that this test case
544+ % % produces false positives, all failures of this test case should
545+ % % be investigated thoroughly as they test an algorithm that is
546+ % % central to the reliability of the data in the shared message store.
547+ % % Failing files can be found in the CT private data.
548+ msg_store_file_scan (Config ) ->
549+ passed = rabbit_ct_broker_helpers :rpc (Config , 0 ,
550+ ? MODULE , msg_store_file_scan1 , [Config ]).
551+
552+ msg_store_file_scan1 (Config ) ->
553+ Scan = fun (Blocks ) ->
554+ Expected = gen_result (Blocks ),
555+ Path = gen_msg_file (Config , Blocks ),
556+ Result = rabbit_msg_store :scan_file_for_valid_messages (Path ),
557+ case Result of
558+ Expected -> ok ;
559+ _ -> {expected , Expected , got , Result }
560+ end
561+ end ,
562+ % % Empty files.
563+ ok = Scan ([]),
564+ ok = Scan ([{pad , 1024 }]),
565+ ok = Scan ([{pad , 1024 * 1024 }]),
566+ % % One-message files.
567+ ok = Scan ([{msg , gen_id (), <<0 >>}]),
568+ ok = Scan ([{msg , gen_id (), <<255 >>}]),
569+ ok = Scan ([{msg , gen_id (), gen_msg ()}]),
570+ ok = Scan ([{pad , 1024 }, {msg , gen_id (), gen_msg ()}]),
571+ ok = Scan ([{pad , 1024 * 1024 }, {msg , gen_id (), gen_msg ()}]),
572+ ok = Scan ([{msg , gen_id (), gen_msg ()}, {pad , 1024 }]),
573+ ok = Scan ([{msg , gen_id (), gen_msg ()}, {pad , 1024 * 1024 }]),
574+ % % Multiple messages.
575+ ok = Scan ([{msg , gen_id (), gen_msg ()} || _ <- lists :seq (1 , 2 )]),
576+ ok = Scan ([{msg , gen_id (), gen_msg ()} || _ <- lists :seq (1 , 5 )]),
577+ ok = Scan ([{msg , gen_id (), gen_msg ()} || _ <- lists :seq (1 , 20 )]),
578+ ok = Scan ([{msg , gen_id (), gen_msg ()} || _ <- lists :seq (1 , 100 )]),
579+ % % Multiple messages with padding.
580+ ok = Scan ([
581+ {pad , 1024 },
582+ {msg , gen_id (), gen_msg ()},
583+ {msg , gen_id (), gen_msg ()}
584+ ]),
585+ ok = Scan ([
586+ {msg , gen_id (), gen_msg ()},
587+ {pad , 1024 },
588+ {msg , gen_id (), gen_msg ()}
589+ ]),
590+ ok = Scan ([
591+ {msg , gen_id (), gen_msg ()},
592+ {msg , gen_id (), gen_msg ()},
593+ {pad , 1024 }
594+ ]),
595+ ok = Scan ([
596+ {pad , 1024 },
597+ {msg , gen_id (), gen_msg ()},
598+ {pad , 1024 },
599+ {msg , gen_id (), gen_msg ()}
600+ ]),
601+ ok = Scan ([
602+ {msg , gen_id (), gen_msg ()},
603+ {pad , 1024 },
604+ {msg , gen_id (), gen_msg ()},
605+ {pad , 1024 }
606+ ]),
607+ ok = Scan ([
608+ {pad , 1024 },
609+ {msg , gen_id (), gen_msg ()},
610+ {msg , gen_id (), gen_msg ()},
611+ {pad , 1024 }
612+ ]),
613+ ok = Scan ([
614+ {pad , 1024 },
615+ {msg , gen_id (), gen_msg ()},
616+ {pad , 1024 },
617+ {msg , gen_id (), gen_msg ()},
618+ {pad , 1024 }
619+ ]),
620+ OneOf = fun (A , B ) ->
621+ case rand :uniform () of
622+ F when F < + 0.5 -> A ;
623+ _ -> B
624+ end
625+ end ,
626+ ok = Scan ([OneOf ({msg , gen_id (), gen_msg ()}, {pad , 1024 }) || _ <- lists :seq (1 , 2 )]),
627+ ok = Scan ([OneOf ({msg , gen_id (), gen_msg ()}, {pad , 1024 }) || _ <- lists :seq (1 , 5 )]),
628+ ok = Scan ([OneOf ({msg , gen_id (), gen_msg ()}, {pad , 1024 }) || _ <- lists :seq (1 , 20 )]),
629+ ok = Scan ([OneOf ({msg , gen_id (), gen_msg ()}, {pad , 1024 }) || _ <- lists :seq (1 , 100 )]),
630+ % % Duplicate messages.
631+ Msg = {msg , gen_id (), gen_msg ()},
632+ ok = Scan ([Msg , Msg ]),
633+ ok = Scan ([Msg , Msg , Msg , Msg , Msg ]),
634+ ok = Scan ([Msg , {pad , 1024 }, Msg ]),
635+ ok = Scan ([Msg ]
636+ ++ [OneOf ({msg , gen_id (), gen_msg ()}, {pad , 1024 }) || _ <- lists :seq (1 , 100 )]
637+ ++ [Msg ]),
638+ % % Truncated start of message.
639+ ok = Scan ([{bin , <<21 :56 , " deadbeefdeadbeef" , " hello" , 255 >>}]),
640+ ok = Scan ([{bin , <<21 :48 , " deadbeefdeadbeef" , " hello" , 255 >>}]),
641+ ok = Scan ([{bin , <<21 :40 , " deadbeefdeadbeef" , " hello" , 255 >>}]),
642+ ok = Scan ([{bin , <<21 :32 , " deadbeefdeadbeef" , " hello" , 255 >>}]),
643+ ok = Scan ([{bin , <<21 :24 , " deadbeefdeadbeef" , " hello" , 255 >>}]),
644+ ok = Scan ([{bin , <<21 :16 , " deadbeefdeadbeef" , " hello" , 255 >>}]),
645+ ok = Scan ([{bin , <<21 :8 , " deadbeefdeadbeef" , " hello" , 255 >>}]),
646+ ok = Scan ([{bin , <<" deadbeefdeadbeef" , " hello" , 255 >>}]),
647+ ok = Scan ([{bin , <<" beefdeadbeef" , " hello" , 255 >>}]),
648+ ok = Scan ([{bin , <<" deadbeef" , " hello" , 255 >>}]),
649+ ok = Scan ([{bin , <<" beef" , " hello" , 255 >>}]),
650+ ok = Scan ([{bin , <<" hello" , 255 >>}]),
651+ ok = Scan ([{bin , <<255 >>}]),
652+ % % Truncated end of message (unlikely).
653+ ok = Scan ([{bin , <<255 >>}]),
654+ ok = Scan ([{bin , <<255 , 255 >>}]),
655+ ok = Scan ([{bin , <<255 , 255 , 255 >>}]),
656+ ok = Scan ([{bin , <<255 , 255 , 255 , 255 >>}]),
657+ ok = Scan ([{bin , <<255 , 255 , 255 , 255 , 255 >>}]),
658+ ok = Scan ([{bin , <<255 , 255 , 255 , 255 , 255 , 255 >>}]),
659+ ok = Scan ([{bin , <<255 , 255 , 255 , 255 , 255 , 255 , 255 >>}]),
660+ ok = Scan ([{bin , <<255 , 255 , 255 , 255 , 255 , 255 , 255 , 255 >>}]),
661+ ok = Scan ([{bin , <<15 :64 , " deadbeefdeadbee" >>}]),
662+ ok = Scan ([{bin , <<16 :64 , " deadbeefdeadbeef" >>}]),
663+ ok = Scan ([{bin , <<17 :64 , " deadbeefdeadbeef" , 0 >>}]),
664+ ok = Scan ([{bin , <<17 :64 , " deadbeefdeadbeef" , 255 >>}]),
665+ ok = Scan ([{bin , <<17 :64 , " deadbeefdeadbeef" , 255 , 254 >>}]),
666+ % % Messages with no content.
667+ ok = Scan ([{bin , <<0 :64 , " deadbeefdeadbeef" , 255 >>}]),
668+ ok = Scan ([{msg , gen_id (), <<>>}]),
669+ % % All good!!
670+ passed .
671+
672+ gen_id () ->
673+ rand :bytes (16 ).
674+
675+ gen_msg () ->
676+ gen_msg (1024 * 1024 ).
677+
678+ gen_msg (MaxSize ) ->
679+ % % This might generate false positives but very rarely
680+ % % so we don't do anything to prevent them.
681+ rand :bytes (rand :uniform (MaxSize )).
682+
683+ gen_msg_file (Config , Blocks ) ->
684+ PrivDir = ? config (priv_dir , Config ),
685+ TmpFile = integer_to_list (erlang :unique_integer ([positive ])),
686+ Path = filename :join (PrivDir , TmpFile ),
687+ ok = file :write_file (Path , [case Block of
688+ {bin , Bin } ->
689+ Bin ;
690+ {pad , Size } ->
691+ % % This might generate false positives although very unlikely.
692+ rand :bytes (Size );
693+ {msg , MsgId , Msg } ->
694+ Size = 16 + byte_size (Msg ),
695+ [<<Size :64 >>, MsgId , Msg , <<255 >>]
696+ end || Block <- Blocks ]),
697+ Path .
698+
699+ gen_result (Blocks ) ->
700+ Messages = gen_result (Blocks , 0 , []),
701+ case Messages of
702+ [] ->
703+ {ok , [], 0 };
704+ [{_ , TotalSize , Offset }|_ ] ->
705+ {ok , Messages , Offset + TotalSize }
706+ end .
707+
708+ gen_result ([], _ , Acc ) ->
709+ Acc ;
710+ gen_result ([{bin , Bin }|Tail ], Offset , Acc ) ->
711+ gen_result (Tail , Offset + byte_size (Bin ), Acc );
712+ gen_result ([{pad , Size }|Tail ], Offset , Acc ) ->
713+ gen_result (Tail , Offset + Size , Acc );
714+ gen_result ([{msg , MsgId , Msg }|Tail ], Offset , Acc ) ->
715+ Size = 9 + 16 + byte_size (Msg ),
716+ % % Only the first MsgId found is returned when duplicates exist.
717+ case lists :keymember (MsgId , 1 , Acc ) of
718+ false ->
719+ gen_result (Tail , Offset + Size , [{MsgId , Size , Offset }|Acc ]);
720+ true ->
721+ gen_result (Tail , Offset + Size , Acc )
722+ end .
723+
538724% % -------------------------------------------------------------------
539725% % Backing queue.
540726% % -------------------------------------------------------------------
0 commit comments