Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion deps/rabbit/src/rabbit_msg_store.erl
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,19 @@ consolidate_reads([], Acc) ->
read_many_file3(MsgIds, CState = #client_msstate{ file_handles_ets = FileHandlesEts,
client_ref = Ref }, Acc, File) ->
mark_handle_closed(FileHandlesEts, File, Ref),
read_many_disk(MsgIds, CState, Acc).
%% We go back to reading from the cache rather than from disk
%% because it is possible that messages are not in a perfect
%% order of cache->disk. For example, a fanout message written
%% to a previous file by another queue, but then referenced by
%% our main queue in between newly written messages: our main
%% queue would write MsgA, MsgB, MsgFanout, MsgC, MsgD to the
%% current file, then when trying to read from that same current
%% file, it would get MsgA and MsgB from the cache; MsgFanout
%% from the previous file; and MsgC and MsgD from the cache
%% again. So the correct action here is not to continue reading
%% from disk but instead to go back to the cache to get MsgC
%% and MsgD.
read_many_cache(MsgIds, CState, Acc).

-spec contains(rabbit_types:msg_id(), client_msstate()) -> boolean().

Expand Down
63 changes: 63 additions & 0 deletions deps/rabbit/test/backing_queue_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ groups() ->
[
{backing_queue_tests, [], [
msg_store,
msg_store_read_many_fanout,
msg_store_file_scan,
{backing_queue_v2, [], Common ++ V2Only}
]}
Expand Down Expand Up @@ -320,6 +321,68 @@ msg_store1(_Config) ->
restart_msg_store_empty(),
passed.

msg_store_read_many_fanout(Config) ->
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
?MODULE, msg_store_read_many_fanout1, [Config]).

msg_store_read_many_fanout1(_Config) ->
GenRefFun = fun(Key) -> V = case get(Key) of undefined -> 0; V0 -> V0 end, put(Key, V + 1), V end,
GenRef = fun() -> GenRefFun(msc) end,
%% We will fill the first message store file with random messages
%% + 1 fanout message (written once for now). We will then write
%% two messages from our queue, then the fanout message (to +1
%% from our queue), and two more messages. We expect all messages
%% from our queue to be in the current write file, except the
%% fanout message. We then try to read the messages.
restart_msg_store_empty(),
CRef1 = rabbit_guid:gen(),
CRef2 = rabbit_guid:gen(),
{ok, FileSize} = application:get_env(rabbit, msg_store_file_size_limit),
PayloadSizeBits = 65536,
Payload = <<0:PayloadSizeBits>>,
%% @todo -7 because -1 and -hd, fix better.
NumRandomMsgs = (FileSize div (PayloadSizeBits div 8)) - 1,
RandomMsgIds = [{GenRef(), msg_id_bin(X)} || X <- lists:seq(1, NumRandomMsgs)],
FanoutMsgId = {GenRef(), msg_id_bin(NumRandomMsgs + 1)},
[Q1, Q2, Q3, Q4] = [{GenRef(), msg_id_bin(X)} || X <- lists:seq(NumRandomMsgs + 2, NumRandomMsgs + 5)],
QueueMsgIds0 = [Q1, Q2] ++ [FanoutMsgId] ++ [Q3, Q4],
QueueMsgIds = [{GenRef(), M} || {_, M} <- QueueMsgIds0],
BasicMsgFun = fun(MsgId) ->
Ex = rabbit_misc:r(<<>>, exchange, <<>>),
BasicMsg = rabbit_basic:message(Ex, <<>>,
#'P_basic'{delivery_mode = 2},
Payload),
{ok, Msg0} = mc_amqpl:message(Ex, <<>>, BasicMsg#basic_message.content),
mc:set_annotation(id, MsgId, Msg0)
end,
ok = with_msg_store_client(
?PERSISTENT_MSG_STORE, CRef1,
fun (MSCStateM) ->
[begin
Msg = BasicMsgFun(MsgId),
ok = rabbit_msg_store:write(SeqId, MsgId, Msg, MSCStateM)
end || {SeqId, MsgId} <- [FanoutMsgId] ++ RandomMsgIds],
MSCStateM
end),
ok = with_msg_store_client(
?PERSISTENT_MSG_STORE, CRef2,
fun (MSCStateM) ->
[begin
Msg = BasicMsgFun(MsgId),
ok = rabbit_msg_store:write(SeqId, MsgId, Msg, MSCStateM)
end || {SeqId, MsgId} <- QueueMsgIds],
MSCStateM
end),
ok = with_msg_store_client(
?PERSISTENT_MSG_STORE, CRef2,
fun (MSCStateM) ->
QueueOnlyMsgIds = [M || {_, M} <- QueueMsgIds],
{#{}, MSCStateN} = rabbit_msg_store:read_many(
QueueOnlyMsgIds, MSCStateM),
MSCStateN
end),
passed.

restart_msg_store_empty() ->
ok = rabbit_variable_queue:stop_msg_store(?VHOST),
ok = rabbit_variable_queue:start_msg_store(?VHOST,
Expand Down
Loading