diff --git a/deps/rabbit/src/mc.erl b/deps/rabbit/src/mc.erl index 340a7da72dda..25c2f5fa9830 100644 --- a/deps/rabbit/src/mc.erl +++ b/deps/rabbit/src/mc.erl @@ -64,15 +64,18 @@ integer() | float() | boolean(). --type tagged_prop() :: {uuid, binary()} | - {utf8, binary()} | - {binary, binary()} | - {boolean, boolean()} | - {long, integer()} | - {ulong, non_neg_integer() } | - {list, [tagged_prop()]} | - {map, [{tagged_prop(), tagged_prop()}]} | - undefined. +-type tagged_value() :: {uuid, binary()} | + {utf8, binary()} | + {binary, binary()} | + {boolean, boolean()} | + {double | float, float()} | + {long | int | short | byte, integer()} | + {ulong | uint | ushort | ubyte, non_neg_integer()} | + {timestamp, non_neg_integer()} | + {list, [tagged_value()]} | + {map, [{tagged_value(), tagged_value()}]} | + null | + undefined. %% behaviour callbacks for protocol specific implementation @@ -90,12 +93,12 @@ %% retrieve and x- header from the protocol data %% the return value should be tagged with an AMQP 1.0 type -callback x_header(binary(), proto_state()) -> - tagged_prop(). + tagged_value(). %% retrieve a property field from the protocol data %% e.g. message_id, correlation_id -callback property(atom(), proto_state()) -> - tagged_prop(). + tagged_value(). %% return a map of header values used for message routing, %% optionally include x- headers and / or complex types (i.e. tables, arrays etc) @@ -173,7 +176,7 @@ set_annotation(Key, Value, BasicMessage) -> mc_compat:set_annotation(Key, Value, BasicMessage). -spec x_header(Key :: binary(), state()) -> - tagged_prop(). + tagged_value(). x_header(Key, #?MODULE{protocol = Proto, annotations = Anns, data = Data}) -> diff --git a/deps/rabbit/src/mc_amqpl.erl b/deps/rabbit/src/mc_amqpl.erl index 18b5800e49a9..a6a768e69705 100644 --- a/deps/rabbit/src/mc_amqpl.erl +++ b/deps/rabbit/src/mc_amqpl.erl @@ -282,7 +282,15 @@ convert_to(mc_amqp, #content{payload_fragments_rev = Payload} = Content) -> end, %% TODO: only add header section if at least one of the fields %% needs to be set + Ttl = case Expiration of + undefined -> + undefined; + _ -> + binary_to_integer(Expiration) + end, + H = #'v1_0.header'{durable = DelMode =:= 2, + ttl = wrap(uint, Ttl), %% TODO: check Priority is a ubyte? priority = wrap(ubyte, Priority)}, P = case amqp10_section_header(?AMQP10_PROPERTIES_HEADER, Headers) of @@ -327,21 +335,23 @@ convert_to(mc_amqp, #content{payload_fragments_rev = Payload} = Content) -> is_binary(K) ], - %% properties that _are_ potentially used by the broker - %% are stored as message annotations - %% an alternative woud be to store priority and delivery mode in - %% the amqp (1.0) header section using the dura - MAC = map_add(symbol, <<"x-basic-type">>, utf8, Type, - map_add(symbol, <<"x-basic-priority">>, ubyte, Priority, - map_add(symbol, <<"x-basic-delivery-mode">>, ubyte, DelMode, - map_add(symbol, <<"x-basic-expiration">>, utf8, Expiration, - MAC0)))), + %% `type' doesn't have a direct equivalent so adding as + %% a message annotation here + MAC = map_add(symbol, <<"x-basic-type">>, utf8, Type, MAC0), #'v1_0.message_annotations'{content = MAC}; Section -> Section end, - Sections = [H, P, AP, MA, #'v1_0.data'{content = lists:reverse(Payload)}], + BodySections = case Type of + ?AMQP10_TYPE -> + amqp10_framing:decode_bin( + iolist_to_binary(lists:reverse(Payload))); + _ -> + [#'v1_0.data'{content = lists:reverse(Payload)}] + end, + + Sections = [H, MA, P, AP | BodySections], mc_amqp:convert_from(mc_amqp, Sections); convert_to(_TargetProto, _Content) -> not_implemented. diff --git a/deps/rabbit/src/mc_compat.erl b/deps/rabbit/src/mc_compat.erl index 1c0fde932b49..571acc4a8675 100644 --- a/deps/rabbit/src/mc_compat.erl +++ b/deps/rabbit/src/mc_compat.erl @@ -63,9 +63,10 @@ set_annotation(routing_keys, Value, #basic_message{} = Msg) -> set_annotation(exchange, Value, #basic_message{exchange_name = Ex} = Msg) -> Msg#basic_message{exchange_name = Ex#resource{name = Value}}; set_annotation(<<"x-", _/binary>> = Key, Value, - #basic_message{content = - #content{properties = - #'P_basic'{headers = H0} = B} = C0} = Msg) -> + #basic_message{content = Content0} = Msg) -> + #content{properties = + #'P_basic'{headers = H0} = B} = C0 = + rabbit_binary_parser:ensure_content_decoded(Content0), T = case Value of _ when is_integer(Value) -> long; diff --git a/deps/rabbit/test/mc_SUITE.erl b/deps/rabbit/test/mc_SUITE.erl index ff1d3adec2de..4c91c8d641b6 100644 --- a/deps/rabbit/test/mc_SUITE.erl +++ b/deps/rabbit/test/mc_SUITE.erl @@ -27,7 +27,8 @@ all_tests() -> amqpl_death_records, amqpl_amqp_bin_amqpl, amqp_amqpl, - amqp_to_amqpl_data_body + amqp_to_amqpl_data_body, + amqp_amqpl_amqp_bodies ]. groups() -> @@ -443,41 +444,47 @@ amqp_to_amqpl_data_body(_Config) -> iolist_to_binary(PayFrag)) end, Cases). -amqp10_non_single_data_bodies(_Config) -> +amqp_amqpl_amqp_bodies(_Config) -> Props = #'P_basic'{type = <<"amqp-1.0">>}, - Payloads = [ - [#'v1_0.data'{content = <<"hello">>}, - #'v1_0.data'{content = <<"brave">>}, - #'v1_0.data'{content = <<"new">>}, - #'v1_0.data'{content = <<"world">>} - ], - #'v1_0.amqp_value'{content = {utf8, <<"hello world">>}}, - [#'v1_0.amqp_sequence'{content = [{utf8, <<"one">>}, - {utf8, <<"blah">>}]}, - #'v1_0.amqp_sequence'{content = [{utf8, <<"two">>}]} - ] - ], + Bodies = [ + #'v1_0.data'{content = <<"helo world">>}, + [#'v1_0.data'{content = <<"hello">>}, + #'v1_0.data'{content = <<"brave">>}, + #'v1_0.data'{content = <<"new">>}, + #'v1_0.data'{content = <<"world">>} + ], + #'v1_0.amqp_value'{content = {utf8, <<"hello world">>}}, + [#'v1_0.amqp_sequence'{content = [{utf8, <<"one">>}, + {utf8, <<"blah">>}]}, + #'v1_0.amqp_sequence'{content = [{utf8, <<"two">>}]} + ] + ], [begin EncodedPayload = amqp10_encode_bin(Payload), - MsgRecord0 = rabbit_msg_record:from_amqp091(Props, EncodedPayload), - MsgRecord = rabbit_msg_record:init( - iolist_to_binary(rabbit_msg_record:to_iodata(MsgRecord0))), - {PropsOut, PayloadEncodedOut} = rabbit_msg_record:to_amqp091(MsgRecord), - PayloadOut = case amqp10_framing:decode_bin(iolist_to_binary(PayloadEncodedOut)) of - L when length(L) =:= 1 -> - lists:nth(1, L); - L -> - L + Ex = #resource{virtual_host = <<"/">>, + kind = exchange, + name = <<"ex">>}, + LegacyMsg = mc_amqpl:message(Ex, <<"rkey">>, + #content{payload_fragments_rev = + lists:reverse(EncodedPayload), + properties = Props}, + #{}, true), + + AmqpMsg = mc:convert(mc_amqp, LegacyMsg), + %% drop any non body sections + BodySections = lists:nthtail(3, mc:protocol_state(AmqpMsg)), + + AssertBody = case is_list(Payload) of + true -> + Payload; + false -> + [Payload] end, - - ?assertEqual(Props, PropsOut), - ?assertEqual(iolist_to_binary(EncodedPayload), - iolist_to_binary(PayloadEncodedOut)), - ?assertEqual(Payload, PayloadOut) - - end || Payload <- Payloads], + % ct:pal("ProtoState ~p", [BodySections]), + ?assertEqual(AssertBody, BodySections) + end || Payload <- Bodies], ok. unsupported_091_header_is_dropped(_Config) -> @@ -630,9 +637,9 @@ reuse_amqp10_binary_chunks(_Config) -> ok. amqp10_encode_bin(L) when is_list(L) -> - iolist_to_binary([amqp10_encode_bin(X) || X <- L]); + [iolist_to_binary(amqp10_framing:encode_bin(X)) || X <- L]; amqp10_encode_bin(X) -> - iolist_to_binary(amqp10_framing:encode_bin(X)). + [iolist_to_binary(amqp10_framing:encode_bin(X))]. %% Utility