@@ -62,6 +62,7 @@ groups() ->
6262 [
6363 {backing_queue_tests , [], [
6464 msg_store ,
65+ msg_store_read_many_fanout ,
6566 msg_store_file_scan ,
6667 {backing_queue_v2 , [], Common ++ V2Only }
6768 ]}
@@ -320,6 +321,68 @@ msg_store1(_Config) ->
320321 restart_msg_store_empty (),
321322 passed .
322323
324+ msg_store_read_many_fanout (Config ) ->
325+ passed = rabbit_ct_broker_helpers :rpc (Config , 0 ,
326+ ? MODULE , msg_store_read_many_fanout1 , [Config ]).
327+
328+ msg_store_read_many_fanout1 (_Config ) ->
329+ GenRefFun = fun (Key ) -> V = case get (Key ) of undefined -> 0 ; V0 -> V0 end , put (Key , V + 1 ), V end ,
330+ GenRef = fun () -> GenRefFun (msc ) end ,
331+ % % We will fill the first message store file with random messages
332+ % % + 1 fanout message (written once for now). We will then write
333+ % % two messages from our queue, then the fanout message (to +1
334+ % % from our queue), and two more messages. We expect all messages
335+ % % from our queue to be in the current write file, except the
336+ % % fanout message. We then try to read the messages.
337+ restart_msg_store_empty (),
338+ CRef1 = rabbit_guid :gen (),
339+ CRef2 = rabbit_guid :gen (),
340+ {ok , FileSize } = application :get_env (rabbit , msg_store_file_size_limit ),
341+ PayloadSizeBits = 65536 ,
342+ Payload = <<0 :PayloadSizeBits >>,
343+ % % @todo -7 because -1 and -hd, fix better.
344+ NumRandomMsgs = (FileSize div (PayloadSizeBits div 8 )) - 1 ,
345+ RandomMsgIds = [{GenRef (), msg_id_bin (X )} || X <- lists :seq (1 , NumRandomMsgs )],
346+ FanoutMsgId = {GenRef (), msg_id_bin (NumRandomMsgs + 1 )},
347+ [Q1 , Q2 , Q3 , Q4 ] = [{GenRef (), msg_id_bin (X )} || X <- lists :seq (NumRandomMsgs + 2 , NumRandomMsgs + 5 )],
348+ QueueMsgIds0 = [Q1 , Q2 ] ++ [FanoutMsgId ] ++ [Q3 , Q4 ],
349+ QueueMsgIds = [{GenRef (), M } || {_ , M } <- QueueMsgIds0 ],
350+ BasicMsgFun = fun (MsgId ) ->
351+ Ex = rabbit_misc :r (<<>>, exchange , <<>>),
352+ BasicMsg = rabbit_basic :message (Ex , <<>>,
353+ # 'P_basic' {delivery_mode = 2 },
354+ Payload ),
355+ {ok , Msg0 } = mc_amqpl :message (Ex , <<>>, BasicMsg # basic_message .content ),
356+ mc :set_annotation (id , MsgId , Msg0 )
357+ end ,
358+ ok = with_msg_store_client (
359+ ? PERSISTENT_MSG_STORE , CRef1 ,
360+ fun (MSCStateM ) ->
361+ [begin
362+ Msg = BasicMsgFun (MsgId ),
363+ ok = rabbit_msg_store :write (SeqId , MsgId , Msg , MSCStateM )
364+ end || {SeqId , MsgId } <- [FanoutMsgId ] ++ RandomMsgIds ],
365+ MSCStateM
366+ end ),
367+ ok = with_msg_store_client (
368+ ? PERSISTENT_MSG_STORE , CRef2 ,
369+ fun (MSCStateM ) ->
370+ [begin
371+ Msg = BasicMsgFun (MsgId ),
372+ ok = rabbit_msg_store :write (SeqId , MsgId , Msg , MSCStateM )
373+ end || {SeqId , MsgId } <- QueueMsgIds ],
374+ MSCStateM
375+ end ),
376+ ok = with_msg_store_client (
377+ ? PERSISTENT_MSG_STORE , CRef2 ,
378+ fun (MSCStateM ) ->
379+ QueueOnlyMsgIds = [M || {_ , M } <- QueueMsgIds ],
380+ {#{}, MSCStateN } = rabbit_msg_store :read_many (
381+ QueueOnlyMsgIds , MSCStateM ),
382+ MSCStateN
383+ end ),
384+ passed .
385+
323386restart_msg_store_empty () ->
324387 ok = rabbit_variable_queue :stop_msg_store (? VHOST ),
325388 ok = rabbit_variable_queue :start_msg_store (? VHOST ,
0 commit comments