Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 15 additions & 12 deletions deps/rabbit/src/mc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -64,15 +64,18 @@
integer() |
float() |
boolean().
-type tagged_prop() :: {uuid, binary()} |
{utf8, binary()} |
{binary, binary()} |
{boolean, boolean()} |
{long, integer()} |
{ulong, non_neg_integer() } |
{list, [tagged_prop()]} |
{map, [{tagged_prop(), tagged_prop()}]} |
undefined.
-type tagged_value() :: {uuid, binary()} |
{utf8, binary()} |
{binary, binary()} |
{boolean, boolean()} |
{double | float, float()} |
{long | int | short | byte, integer()} |
{ulong | uint | ushort | ubyte, non_neg_integer()} |
{timestamp, non_neg_integer()} |
{list, [tagged_value()]} |
{map, [{tagged_value(), tagged_value()}]} |
null |
undefined.

%% behaviour callbacks for protocol specific implementation

Expand All @@ -90,12 +93,12 @@
%% retrieve and x- header from the protocol data
%% the return value should be tagged with an AMQP 1.0 type
-callback x_header(binary(), proto_state()) ->
tagged_prop().
tagged_value().

%% retrieve a property field from the protocol data
%% e.g. message_id, correlation_id
-callback property(atom(), proto_state()) ->
tagged_prop().
tagged_value().

%% return a map of header values used for message routing,
%% optionally include x- headers and / or complex types (i.e. tables, arrays etc)
Expand Down Expand Up @@ -173,7 +176,7 @@ set_annotation(Key, Value, BasicMessage) ->
mc_compat:set_annotation(Key, Value, BasicMessage).

-spec x_header(Key :: binary(), state()) ->
tagged_prop().
tagged_value().
x_header(Key, #?MODULE{protocol = Proto,
annotations = Anns,
data = Data}) ->
Expand Down
30 changes: 20 additions & 10 deletions deps/rabbit/src/mc_amqpl.erl
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,15 @@ convert_to(mc_amqp, #content{payload_fragments_rev = Payload} = Content) ->
end,
%% TODO: only add header section if at least one of the fields
%% needs to be set
Ttl = case Expiration of
undefined ->
undefined;
_ ->
binary_to_integer(Expiration)
end,

H = #'v1_0.header'{durable = DelMode =:= 2,
ttl = wrap(uint, Ttl),
%% TODO: check Priority is a ubyte?
priority = wrap(ubyte, Priority)},
P = case amqp10_section_header(?AMQP10_PROPERTIES_HEADER, Headers) of
Expand Down Expand Up @@ -327,21 +335,23 @@ convert_to(mc_amqp, #content{payload_fragments_rev = Payload} = Content) ->
is_binary(K)
],

%% properties that _are_ potentially used by the broker
%% are stored as message annotations
%% an alternative woud be to store priority and delivery mode in
%% the amqp (1.0) header section using the dura
MAC = map_add(symbol, <<"x-basic-type">>, utf8, Type,
map_add(symbol, <<"x-basic-priority">>, ubyte, Priority,
map_add(symbol, <<"x-basic-delivery-mode">>, ubyte, DelMode,
map_add(symbol, <<"x-basic-expiration">>, utf8, Expiration,
MAC0)))),
%% `type' doesn't have a direct equivalent so adding as
%% a message annotation here
MAC = map_add(symbol, <<"x-basic-type">>, utf8, Type, MAC0),
#'v1_0.message_annotations'{content = MAC};
Section ->
Section
end,

Sections = [H, P, AP, MA, #'v1_0.data'{content = lists:reverse(Payload)}],
BodySections = case Type of
?AMQP10_TYPE ->
amqp10_framing:decode_bin(
iolist_to_binary(lists:reverse(Payload)));
_ ->
[#'v1_0.data'{content = lists:reverse(Payload)}]
end,

Sections = [H, MA, P, AP | BodySections],
mc_amqp:convert_from(mc_amqp, Sections);
convert_to(_TargetProto, _Content) ->
not_implemented.
Expand Down
7 changes: 4 additions & 3 deletions deps/rabbit/src/mc_compat.erl
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,10 @@ set_annotation(routing_keys, Value, #basic_message{} = Msg) ->
set_annotation(exchange, Value, #basic_message{exchange_name = Ex} = Msg) ->
Msg#basic_message{exchange_name = Ex#resource{name = Value}};
set_annotation(<<"x-", _/binary>> = Key, Value,
#basic_message{content =
#content{properties =
#'P_basic'{headers = H0} = B} = C0} = Msg) ->
#basic_message{content = Content0} = Msg) ->
#content{properties =
#'P_basic'{headers = H0} = B} = C0 =
rabbit_binary_parser:ensure_content_decoded(Content0),
T = case Value of
_ when is_integer(Value) ->
long;
Expand Down
71 changes: 39 additions & 32 deletions deps/rabbit/test/mc_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ all_tests() ->
amqpl_death_records,
amqpl_amqp_bin_amqpl,
amqp_amqpl,
amqp_to_amqpl_data_body
amqp_to_amqpl_data_body,
amqp_amqpl_amqp_bodies
].

groups() ->
Expand Down Expand Up @@ -443,41 +444,47 @@ amqp_to_amqpl_data_body(_Config) ->
iolist_to_binary(PayFrag))
end, Cases).

amqp10_non_single_data_bodies(_Config) ->
amqp_amqpl_amqp_bodies(_Config) ->
Props = #'P_basic'{type = <<"amqp-1.0">>},
Payloads = [
[#'v1_0.data'{content = <<"hello">>},
#'v1_0.data'{content = <<"brave">>},
#'v1_0.data'{content = <<"new">>},
#'v1_0.data'{content = <<"world">>}
],
#'v1_0.amqp_value'{content = {utf8, <<"hello world">>}},
[#'v1_0.amqp_sequence'{content = [{utf8, <<"one">>},
{utf8, <<"blah">>}]},
#'v1_0.amqp_sequence'{content = [{utf8, <<"two">>}]}
]
],
Bodies = [
#'v1_0.data'{content = <<"helo world">>},
[#'v1_0.data'{content = <<"hello">>},
#'v1_0.data'{content = <<"brave">>},
#'v1_0.data'{content = <<"new">>},
#'v1_0.data'{content = <<"world">>}
],
#'v1_0.amqp_value'{content = {utf8, <<"hello world">>}},
[#'v1_0.amqp_sequence'{content = [{utf8, <<"one">>},
{utf8, <<"blah">>}]},
#'v1_0.amqp_sequence'{content = [{utf8, <<"two">>}]}
]
],

[begin
EncodedPayload = amqp10_encode_bin(Payload),

MsgRecord0 = rabbit_msg_record:from_amqp091(Props, EncodedPayload),
MsgRecord = rabbit_msg_record:init(
iolist_to_binary(rabbit_msg_record:to_iodata(MsgRecord0))),
{PropsOut, PayloadEncodedOut} = rabbit_msg_record:to_amqp091(MsgRecord),
PayloadOut = case amqp10_framing:decode_bin(iolist_to_binary(PayloadEncodedOut)) of
L when length(L) =:= 1 ->
lists:nth(1, L);
L ->
L
Ex = #resource{virtual_host = <<"/">>,
kind = exchange,
name = <<"ex">>},
LegacyMsg = mc_amqpl:message(Ex, <<"rkey">>,
#content{payload_fragments_rev =
lists:reverse(EncodedPayload),
properties = Props},
#{}, true),

AmqpMsg = mc:convert(mc_amqp, LegacyMsg),
%% drop any non body sections
BodySections = lists:nthtail(3, mc:protocol_state(AmqpMsg)),

AssertBody = case is_list(Payload) of
true ->
Payload;
false ->
[Payload]
end,

?assertEqual(Props, PropsOut),
?assertEqual(iolist_to_binary(EncodedPayload),
iolist_to_binary(PayloadEncodedOut)),
?assertEqual(Payload, PayloadOut)

end || Payload <- Payloads],
% ct:pal("ProtoState ~p", [BodySections]),
?assertEqual(AssertBody, BodySections)
end || Payload <- Bodies],
ok.

unsupported_091_header_is_dropped(_Config) ->
Expand Down Expand Up @@ -630,9 +637,9 @@ reuse_amqp10_binary_chunks(_Config) ->
ok.

amqp10_encode_bin(L) when is_list(L) ->
iolist_to_binary([amqp10_encode_bin(X) || X <- L]);
[iolist_to_binary(amqp10_framing:encode_bin(X)) || X <- L];
amqp10_encode_bin(X) ->
iolist_to_binary(amqp10_framing:encode_bin(X)).
[iolist_to_binary(amqp10_framing:encode_bin(X))].

%% Utility

Expand Down