Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
269 changes: 197 additions & 72 deletions deps/rabbit/src/rabbit_msg_record.erl
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@
{
% header :: maybe(#'v1_0.header'{}),
% delivery_annotations :: maybe(#'v1_0.delivery_annotations'{}),
message_annotations :: maybe(#'v1_0.message_annotations'{}),
properties :: maybe(#'v1_0.properties'{}),
application_properties :: maybe(#'v1_0.application_properties'{}),
data :: maybe(amqp10_data())
message_annotations :: maybe(#'v1_0.message_annotations'{}) | iodata(),
properties :: maybe(#'v1_0.properties'{}) | iodata(),
application_properties :: maybe(#'v1_0.application_properties'{}) | iodata(),
data :: maybe(amqp10_data()) | iodata()
% footer :: maybe(#'v1_0.footer'{})
}).

Expand All @@ -41,6 +41,11 @@
state/0
]).

-define(AMQP10_TYPE, <<"amqp-1.0">>).
-define(AMQP10_PROPERTIES_HEADER, <<"x-amqp-1.0-properties">>).
-define(AMQP10_APP_PROPERTIES_HEADER, <<"x-amqp-1.0-app-properties">>).
-define(AMQP10_MESSAGE_ANNOTATIONS_HEADER, <<"x-amqp-1.0-message-annotations">>).

%% this module acts as a wrapper / converter for the internal binary storage format
%% (AMQP 1.0) and any format it needs to be converted to / from.
%% Efficiency is key. No unnecessary allocations or work should be done until it
Expand All @@ -50,13 +55,21 @@
-spec init(binary()) -> state().
init(Bin) when is_binary(Bin) ->
%% TODO: delay parsing until needed
{MA, P, AP, D} = decode(amqp10_framing:decode_bin(Bin),
{undefined, undefined, undefined, undefined}),
{MA, P, AP, D0} = decode(amqp10_framing:decode_bin(Bin),
{undefined, undefined, undefined, undefined}),

D1 = case D0 of
Sections when is_list(D0) ->
lists:reverse(Sections);
_ ->
D0
end,

#?MODULE{cfg = #cfg{},
msg = #msg{properties = P,
application_properties = AP,
message_annotations = MA,
data = D}}.
data = D1}}.

decode([], Acc) ->
Acc;
Expand All @@ -66,13 +79,23 @@ decode([#'v1_0.properties'{} = P | Rem], {MA, _, AP, D}) ->
decode(Rem, {MA, P, AP, D});
decode([#'v1_0.application_properties'{} = AP | Rem], {MA, P, _, D}) ->
decode(Rem, {MA, P, AP, D});
decode([#'v1_0.data'{} = D | Rem], {MA, P, AP, _}) ->
decode(Rem, {MA, P, AP, D}).
decode([#'v1_0.amqp_value'{} = D | Rem], {MA, P, AP, _}) ->
decode(Rem, {MA, P, AP, D});
decode([#'v1_0.data'{} = D | Rem], {MA, P, AP, undefined}) ->
decode(Rem, {MA, P, AP, D});
decode([#'v1_0.data'{} = D | Rem], {MA, P, AP, B}) when is_list(B) ->
decode(Rem, {MA, P, AP, [D | B]});
decode([#'v1_0.data'{} = D | Rem], {MA, P, AP, B}) ->
decode(Rem, {MA, P, AP, [D, B]});
decode([#'v1_0.amqp_sequence'{} = D | Rem], {MA, P, AP, undefined}) ->
decode(Rem, {MA, P, AP, [D]});
decode([#'v1_0.amqp_sequence'{} = D | Rem], {MA, P, AP, B}) when is_list(B) ->
decode(Rem, {MA, P, AP, [D | B]}).


amqp10_properties_empty(#'v1_0.properties'{message_id = undefined,
user_id = undefined,
to = undefined,
% subject = wrap(utf8, RKey),
reply_to = undefined,
correlation_id = undefined,
content_type = undefined,
Expand All @@ -92,42 +115,67 @@ to_iodata(#?MODULE{msg = #msg{properties = P,
case MA of
#'v1_0.message_annotations'{content = []} ->
<<>>;
_ ->
amqp10_framing:encode_bin(MA)
#'v1_0.message_annotations'{} ->
amqp10_framing:encode_bin(MA);
MsgAnnotBin ->
MsgAnnotBin
end,
case amqp10_properties_empty(P) of
true -> <<>>;
false ->
amqp10_framing:encode_bin(P)
case P of
#'v1_0.properties'{} ->
case amqp10_properties_empty(P) of
true -> <<>>;
false ->
amqp10_framing:encode_bin(P)
end;
PropsBin ->
PropsBin
end,
case AP of
#'v1_0.application_properties'{content = []} ->
<<>>;
_ ->
amqp10_framing:encode_bin(AP)
#'v1_0.application_properties'{} ->
amqp10_framing:encode_bin(AP);
AppPropsBin ->
AppPropsBin
end,
amqp10_framing:encode_bin(Data)
case Data of
DataBin when is_binary(Data) orelse is_list(Data) ->
DataBin;
_ ->
amqp10_framing:encode_bin(Data)
end
].

%% TODO: refine type spec here
-spec add_message_annotations(#{binary() => {atom(), term()}}, state()) ->
state().
add_message_annotations(Anns,
#?MODULE{msg =
#msg{message_annotations = MA0} = Msg} = State) ->
#msg{message_annotations = undefined} = Msg} = State) ->
add_message_annotations(Anns,
State#?MODULE{msg = Msg#msg{message_annotations =
#'v1_0.message_annotations'{content = []}}});
add_message_annotations(Anns,
#?MODULE{msg =
#msg{message_annotations =
#'v1_0.message_annotations'{content = C}} = Msg} = State) ->
Content = maps:fold(
fun (K, {T, V}, Acc) ->
map_add(symbol, K, T, V, Acc)
end,
case MA0 of
undefined -> [];
#'v1_0.message_annotations'{content = C} -> C
end,
C,
Anns),

State#?MODULE{msg =
Msg#msg{message_annotations =
#'v1_0.message_annotations'{content = Content}}}.
#'v1_0.message_annotations'{content = Content}}};
add_message_annotations(Anns,
#?MODULE{msg =
#msg{message_annotations = MABin} = Msg} = State0) ->
[MA] = amqp10_framing:decode_bin(iolist_to_binary(MABin)),
State1 = State0#?MODULE{msg =
Msg#msg{message_annotations = MA}},
add_message_annotations(Anns, State1).

%% TODO: refine
-type amqp10_term() :: {atom(), term()}.
Expand Down Expand Up @@ -159,58 +207,111 @@ message_annotation(Key,
%% parses it and returns the current parse state
%% this is the input function from storage and from, e.g. socket input
-spec from_amqp091(#'P_basic'{}, iodata()) -> state().
from_amqp091(#'P_basic'{message_id = MsgId,
expiration = Expiration,
delivery_mode = DelMode,
headers = Headers,
user_id = UserId,
reply_to = ReplyTo,
type = Type,
priority = Priority,
app_id = AppId,
correlation_id = CorrId,
content_type = ContentType,
content_encoding = ContentEncoding,
timestamp = Timestamp
}, Data) ->
%% TODO: support parsing properties bin directly?
from_amqp091(#'P_basic'{type = T} = PB, Data) ->
MA = from_amqp091_to_amqp10_message_annotations(PB),
P = from_amqp091_to_amqp10_properties(PB),
AP = from_amqp091_to_amqp10_app_properties(PB),

D = case T of
?AMQP10_TYPE ->
%% the body is already AMQP 1.0 binary content, so leaving it as-is
Data;
_ ->
#'v1_0.data'{content = Data}
end,

#?MODULE{cfg = #cfg{},
msg = #msg{properties = P,
application_properties = AP,
message_annotations = MA,
data = D}}.

from_amqp091_to_amqp10_properties(#'P_basic'{headers = Headers} = P) when is_list(Headers) ->
case proplists:lookup(?AMQP10_PROPERTIES_HEADER, Headers) of
none ->
convert_amqp091_to_amqp10_properties(P);
{_, _, PropsBin} ->
PropsBin
end;
from_amqp091_to_amqp10_properties(P) ->
convert_amqp091_to_amqp10_properties(P).

convert_amqp091_to_amqp10_properties(#'P_basic'{message_id = MsgId,
user_id = UserId,
reply_to = ReplyTo,
correlation_id = CorrId,
content_type = ContentType,
content_encoding = ContentEncoding,
timestamp = Timestamp
}) ->
ConvertedTs = case Timestamp of
undefined ->
undefined;
_ ->
Timestamp * 1000
end,
P = #'v1_0.properties'{message_id = wrap(utf8, MsgId),
user_id = wrap(binary, UserId),
to = undefined,
% subject = wrap(utf8, RKey),
reply_to = wrap(utf8, ReplyTo),
correlation_id = wrap(utf8, CorrId),
content_type = wrap(symbol, ContentType),
content_encoding = wrap(symbol, ContentEncoding),
creation_time = wrap(timestamp, ConvertedTs)},
#'v1_0.properties'{message_id = wrap(utf8, MsgId),
user_id = wrap(binary, UserId),
to = undefined,
reply_to = wrap(utf8, ReplyTo),
correlation_id = wrap(utf8, CorrId),
content_type = wrap(symbol, ContentType),
content_encoding = wrap(symbol, ContentEncoding),
creation_time = wrap(timestamp, ConvertedTs)}.

from_amqp091_to_amqp10_app_properties(#'P_basic'{headers = Headers} = P)
when is_list(Headers) ->
case proplists:lookup(?AMQP10_APP_PROPERTIES_HEADER, Headers) of
none ->
convert_amqp091_to_amqp10_app_properties(P);
{_, _, AppPropsBin} ->
AppPropsBin
end;
from_amqp091_to_amqp10_app_properties(P) ->
convert_amqp091_to_amqp10_app_properties(P).

convert_amqp091_to_amqp10_app_properties(#'P_basic'{headers = Headers,
type = Type,
app_id = AppId}) ->
APC0 = [{wrap(utf8, K), from_091(T, V)} || {K, T, V}
<- case Headers of
undefined -> [];
_ -> Headers
end, not unsupported_header_value_type(T)],
end, not unsupported_header_value_type(T),
not filtered_header(K)],

APC1 = case Type of
?AMQP10_TYPE ->
%% no need to modify the application properties for the type
%% this info will be restored on decoding if necessary
APC0;
_ ->
map_add(utf8, <<"x-basic-type">>, utf8, Type, APC0)
end,

%% properties that do not map directly to AMQP 1.0 properties are stored
%% in application properties
APC = map_add(utf8, <<"x-basic-type">>, utf8, Type,
map_add(utf8, <<"x-basic-app-id">>, utf8, AppId, APC0)),
APC2 = map_add(utf8, <<"x-basic-app-id">>, utf8, AppId, APC1),
#'v1_0.application_properties'{content = APC2}.

from_amqp091_to_amqp10_message_annotations(#'P_basic'{headers = Headers} = P) when is_list(Headers) ->
case proplists:lookup(?AMQP10_MESSAGE_ANNOTATIONS_HEADER, Headers) of
none ->
convert_amqp091_to_amqp10_message_annotations(P);
{_, _, MessageAnnotationsBin} ->
MessageAnnotationsBin
end;
from_amqp091_to_amqp10_message_annotations(P) ->
convert_amqp091_to_amqp10_message_annotations(P).

convert_amqp091_to_amqp10_message_annotations(#'P_basic'{priority = Priority,
delivery_mode = DelMode,
expiration = Expiration}) ->
MAC = map_add(symbol, <<"x-basic-priority">>, ubyte, Priority,
map_add(symbol, <<"x-basic-delivery-mode">>, ubyte, DelMode,
map_add(symbol, <<"x-basic-expiration">>, utf8, Expiration, []))),

AP = #'v1_0.application_properties'{content = APC},
MA = #'v1_0.message_annotations'{content = MAC},
#?MODULE{cfg = #cfg{},
msg = #msg{properties = P,
application_properties = AP,
message_annotations = MA,
data = #'v1_0.data'{content = Data}}}.
#'v1_0.message_annotations'{content = MAC}.

map_add(_T, _Key, _Type, undefined, Acc) ->
Acc;
Expand All @@ -223,12 +324,22 @@ to_amqp091(#?MODULE{msg = #msg{properties = P,
message_annotations = MAR,
data = Data}}) ->

Payload = case Data of
undefined ->
<<>>;
#'v1_0.data'{content = C} ->
C
end,
%% anything else than a single data section is expected to be AMQP 1.0 binary content
%% enforcing this convention
{Payload, IsAmqp10} = case Data of
undefined ->
%% not an expected value,
%% but handling it with an empty binary anyway
{<<>>, false};
#'v1_0.data'{content = C} ->
{C, false};
Sections when is_list(Data)->
B = [amqp10_framing:encode_bin(S) || S <- Sections],
{iolist_to_binary(B),
true};
V ->
{iolist_to_binary(amqp10_framing:encode_bin(V)), true}
end,

#'v1_0.properties'{message_id = MsgId,
user_id = UserId,
Expand All @@ -252,7 +363,12 @@ to_amqp091(#?MODULE{msg = #msg{properties = P,
_ -> []
end,

{Type, AP1} = amqp10_map_get(utf8(<<"x-basic-type">>), AP0),
{Type, AP1} = case {amqp10_map_get(utf8(<<"x-basic-type">>), AP0), IsAmqp10} of
{{undefined, M}, true} ->
{?AMQP10_TYPE, M};
{{T, M}, _} ->
{T, M}
end,
{AppId, AP} = amqp10_map_get(utf8(<<"x-basic-app-id">>), AP1),

{Priority, MA1} = amqp10_map_get(symbol(<<"x-basic-priority">>), MA0),
Expand Down Expand Up @@ -390,12 +506,21 @@ message_id({utf8, S}, HKey, H0) ->
message_id(MsgId, _, H) ->
{H, unwrap(MsgId)}.

unsupported_header_value_type(array) ->
true;
unsupported_header_value_type(table) ->
true;
unsupported_header_value_type(_) ->
false.
unsupported_header_value_type(array) ->
true;
unsupported_header_value_type(table) ->
true;
unsupported_header_value_type(_) ->
false.

filtered_header(?AMQP10_PROPERTIES_HEADER) ->
true;
filtered_header(?AMQP10_APP_PROPERTIES_HEADER) ->
true;
filtered_header(?AMQP10_MESSAGE_ANNOTATIONS_HEADER) ->
true;
filtered_header(_) ->
false.

-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
Expand Down
Loading