From d4c5a37e9c8b18e69a6bffb1c1a0626650668bb6 Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Mon, 4 Sep 2023 12:43:42 +0100 Subject: [PATCH 1/4] AMQP encoded bodies should be converted to amqp correctly Fix for AMQP encoded amqpl payloads. Also removing some headers added during amqpl->amqpl conversions that duplicate information in the amqp header. --- deps/rabbit/src/mc_amqpl.erl | 30 +++++++---- deps/rabbit/src/rabbit_fifo_client.erl | 3 +- deps/rabbit/test/mc_SUITE.erl | 71 ++++++++++++++------------ 3 files changed, 61 insertions(+), 43 deletions(-) diff --git a/deps/rabbit/src/mc_amqpl.erl b/deps/rabbit/src/mc_amqpl.erl index 18b5800e49a9..a6a768e69705 100644 --- a/deps/rabbit/src/mc_amqpl.erl +++ b/deps/rabbit/src/mc_amqpl.erl @@ -282,7 +282,15 @@ convert_to(mc_amqp, #content{payload_fragments_rev = Payload} = Content) -> end, %% TODO: only add header section if at least one of the fields %% needs to be set + Ttl = case Expiration of + undefined -> + undefined; + _ -> + binary_to_integer(Expiration) + end, + H = #'v1_0.header'{durable = DelMode =:= 2, + ttl = wrap(uint, Ttl), %% TODO: check Priority is a ubyte? priority = wrap(ubyte, Priority)}, P = case amqp10_section_header(?AMQP10_PROPERTIES_HEADER, Headers) of @@ -327,21 +335,23 @@ convert_to(mc_amqp, #content{payload_fragments_rev = Payload} = Content) -> is_binary(K) ], - %% properties that _are_ potentially used by the broker - %% are stored as message annotations - %% an alternative woud be to store priority and delivery mode in - %% the amqp (1.0) header section using the dura - MAC = map_add(symbol, <<"x-basic-type">>, utf8, Type, - map_add(symbol, <<"x-basic-priority">>, ubyte, Priority, - map_add(symbol, <<"x-basic-delivery-mode">>, ubyte, DelMode, - map_add(symbol, <<"x-basic-expiration">>, utf8, Expiration, - MAC0)))), + %% `type' doesn't have a direct equivalent so adding as + %% a message annotation here + MAC = map_add(symbol, <<"x-basic-type">>, utf8, Type, MAC0), #'v1_0.message_annotations'{content = MAC}; Section -> Section end, - Sections = [H, P, AP, MA, #'v1_0.data'{content = lists:reverse(Payload)}], + BodySections = case Type of + ?AMQP10_TYPE -> + amqp10_framing:decode_bin( + iolist_to_binary(lists:reverse(Payload))); + _ -> + [#'v1_0.data'{content = lists:reverse(Payload)}] + end, + + Sections = [H, MA, P, AP | BodySections], mc_amqp:convert_from(mc_amqp, Sections); convert_to(_TargetProto, _Content) -> not_implemented. diff --git a/deps/rabbit/src/rabbit_fifo_client.erl b/deps/rabbit/src/rabbit_fifo_client.erl index 8e19e6a29303..b8da822ac40d 100644 --- a/deps/rabbit/src/rabbit_fifo_client.erl +++ b/deps/rabbit/src/rabbit_fifo_client.erl @@ -228,7 +228,8 @@ add_delivery_count_header(Msg, Count) -> case mc:is(Msg) of true when is_integer(Count) andalso Count > 0 -> - mc:set_annotation(<<"x-delivery-count">>, Count, Msg); + mc:set_annotation(<<"x-delivery-count">>, Count, + mc:prepare(read, Msg)); _ -> Msg end. diff --git a/deps/rabbit/test/mc_SUITE.erl b/deps/rabbit/test/mc_SUITE.erl index ff1d3adec2de..4c91c8d641b6 100644 --- a/deps/rabbit/test/mc_SUITE.erl +++ b/deps/rabbit/test/mc_SUITE.erl @@ -27,7 +27,8 @@ all_tests() -> amqpl_death_records, amqpl_amqp_bin_amqpl, amqp_amqpl, - amqp_to_amqpl_data_body + amqp_to_amqpl_data_body, + amqp_amqpl_amqp_bodies ]. groups() -> @@ -443,41 +444,47 @@ amqp_to_amqpl_data_body(_Config) -> iolist_to_binary(PayFrag)) end, Cases). -amqp10_non_single_data_bodies(_Config) -> +amqp_amqpl_amqp_bodies(_Config) -> Props = #'P_basic'{type = <<"amqp-1.0">>}, - Payloads = [ - [#'v1_0.data'{content = <<"hello">>}, - #'v1_0.data'{content = <<"brave">>}, - #'v1_0.data'{content = <<"new">>}, - #'v1_0.data'{content = <<"world">>} - ], - #'v1_0.amqp_value'{content = {utf8, <<"hello world">>}}, - [#'v1_0.amqp_sequence'{content = [{utf8, <<"one">>}, - {utf8, <<"blah">>}]}, - #'v1_0.amqp_sequence'{content = [{utf8, <<"two">>}]} - ] - ], + Bodies = [ + #'v1_0.data'{content = <<"helo world">>}, + [#'v1_0.data'{content = <<"hello">>}, + #'v1_0.data'{content = <<"brave">>}, + #'v1_0.data'{content = <<"new">>}, + #'v1_0.data'{content = <<"world">>} + ], + #'v1_0.amqp_value'{content = {utf8, <<"hello world">>}}, + [#'v1_0.amqp_sequence'{content = [{utf8, <<"one">>}, + {utf8, <<"blah">>}]}, + #'v1_0.amqp_sequence'{content = [{utf8, <<"two">>}]} + ] + ], [begin EncodedPayload = amqp10_encode_bin(Payload), - MsgRecord0 = rabbit_msg_record:from_amqp091(Props, EncodedPayload), - MsgRecord = rabbit_msg_record:init( - iolist_to_binary(rabbit_msg_record:to_iodata(MsgRecord0))), - {PropsOut, PayloadEncodedOut} = rabbit_msg_record:to_amqp091(MsgRecord), - PayloadOut = case amqp10_framing:decode_bin(iolist_to_binary(PayloadEncodedOut)) of - L when length(L) =:= 1 -> - lists:nth(1, L); - L -> - L + Ex = #resource{virtual_host = <<"/">>, + kind = exchange, + name = <<"ex">>}, + LegacyMsg = mc_amqpl:message(Ex, <<"rkey">>, + #content{payload_fragments_rev = + lists:reverse(EncodedPayload), + properties = Props}, + #{}, true), + + AmqpMsg = mc:convert(mc_amqp, LegacyMsg), + %% drop any non body sections + BodySections = lists:nthtail(3, mc:protocol_state(AmqpMsg)), + + AssertBody = case is_list(Payload) of + true -> + Payload; + false -> + [Payload] end, - - ?assertEqual(Props, PropsOut), - ?assertEqual(iolist_to_binary(EncodedPayload), - iolist_to_binary(PayloadEncodedOut)), - ?assertEqual(Payload, PayloadOut) - - end || Payload <- Payloads], + % ct:pal("ProtoState ~p", [BodySections]), + ?assertEqual(AssertBody, BodySections) + end || Payload <- Bodies], ok. unsupported_091_header_is_dropped(_Config) -> @@ -630,9 +637,9 @@ reuse_amqp10_binary_chunks(_Config) -> ok. amqp10_encode_bin(L) when is_list(L) -> - iolist_to_binary([amqp10_encode_bin(X) || X <- L]); + [iolist_to_binary(amqp10_framing:encode_bin(X)) || X <- L]; amqp10_encode_bin(X) -> - iolist_to_binary(amqp10_framing:encode_bin(X)). + [iolist_to_binary(amqp10_framing:encode_bin(X))]. %% Utility From 68b14fbb5bf8f43faf199154b2194d91f19e469d Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Mon, 4 Sep 2023 14:12:40 +0100 Subject: [PATCH 2/4] we should not need to prepre for read toset annotations --- deps/rabbit/src/mc_compat.erl | 7 ++++--- deps/rabbit/src/rabbit_fifo_client.erl | 3 +-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/deps/rabbit/src/mc_compat.erl b/deps/rabbit/src/mc_compat.erl index 1c0fde932b49..571acc4a8675 100644 --- a/deps/rabbit/src/mc_compat.erl +++ b/deps/rabbit/src/mc_compat.erl @@ -63,9 +63,10 @@ set_annotation(routing_keys, Value, #basic_message{} = Msg) -> set_annotation(exchange, Value, #basic_message{exchange_name = Ex} = Msg) -> Msg#basic_message{exchange_name = Ex#resource{name = Value}}; set_annotation(<<"x-", _/binary>> = Key, Value, - #basic_message{content = - #content{properties = - #'P_basic'{headers = H0} = B} = C0} = Msg) -> + #basic_message{content = Content0} = Msg) -> + #content{properties = + #'P_basic'{headers = H0} = B} = C0 = + rabbit_binary_parser:ensure_content_decoded(Content0), T = case Value of _ when is_integer(Value) -> long; diff --git a/deps/rabbit/src/rabbit_fifo_client.erl b/deps/rabbit/src/rabbit_fifo_client.erl index b8da822ac40d..8e19e6a29303 100644 --- a/deps/rabbit/src/rabbit_fifo_client.erl +++ b/deps/rabbit/src/rabbit_fifo_client.erl @@ -228,8 +228,7 @@ add_delivery_count_header(Msg, Count) -> case mc:is(Msg) of true when is_integer(Count) andalso Count > 0 -> - mc:set_annotation(<<"x-delivery-count">>, Count, - mc:prepare(read, Msg)); + mc:set_annotation(<<"x-delivery-count">>, Count, Msg); _ -> Msg end. From 4abb80e165e2e2b2f57766af1f74ff3e887e022f Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Mon, 4 Sep 2023 15:02:52 +0100 Subject: [PATCH 3/4] fix tagged_prop() type spec --- deps/rabbit/src/mc.erl | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/deps/rabbit/src/mc.erl b/deps/rabbit/src/mc.erl index 340a7da72dda..b4bb389db17a 100644 --- a/deps/rabbit/src/mc.erl +++ b/deps/rabbit/src/mc.erl @@ -68,10 +68,13 @@ {utf8, binary()} | {binary, binary()} | {boolean, boolean()} | - {long, integer()} | - {ulong, non_neg_integer() } | + {double | float, float()} | + {long | int | short | byte, integer()} | + {ulong | uint | ushort | ubyte, non_neg_integer()} | + {timestamp, non_neg_integer()} | {list, [tagged_prop()]} | {map, [{tagged_prop(), tagged_prop()}]} | + null | undefined. %% behaviour callbacks for protocol specific implementation From ad2c7486e2ad5f2b540414095bae78eef41ddfad Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Mon, 4 Sep 2023 15:18:57 +0100 Subject: [PATCH 4/4] tagged_prop() -> tagged_value() --- deps/rabbit/src/mc.erl | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/deps/rabbit/src/mc.erl b/deps/rabbit/src/mc.erl index b4bb389db17a..25c2f5fa9830 100644 --- a/deps/rabbit/src/mc.erl +++ b/deps/rabbit/src/mc.erl @@ -64,18 +64,18 @@ integer() | float() | boolean(). --type tagged_prop() :: {uuid, binary()} | - {utf8, binary()} | - {binary, binary()} | - {boolean, boolean()} | - {double | float, float()} | - {long | int | short | byte, integer()} | - {ulong | uint | ushort | ubyte, non_neg_integer()} | - {timestamp, non_neg_integer()} | - {list, [tagged_prop()]} | - {map, [{tagged_prop(), tagged_prop()}]} | - null | - undefined. +-type tagged_value() :: {uuid, binary()} | + {utf8, binary()} | + {binary, binary()} | + {boolean, boolean()} | + {double | float, float()} | + {long | int | short | byte, integer()} | + {ulong | uint | ushort | ubyte, non_neg_integer()} | + {timestamp, non_neg_integer()} | + {list, [tagged_value()]} | + {map, [{tagged_value(), tagged_value()}]} | + null | + undefined. %% behaviour callbacks for protocol specific implementation @@ -93,12 +93,12 @@ %% retrieve and x- header from the protocol data %% the return value should be tagged with an AMQP 1.0 type -callback x_header(binary(), proto_state()) -> - tagged_prop(). + tagged_value(). %% retrieve a property field from the protocol data %% e.g. message_id, correlation_id -callback property(atom(), proto_state()) -> - tagged_prop(). + tagged_value(). %% return a map of header values used for message routing, %% optionally include x- headers and / or complex types (i.e. tables, arrays etc) @@ -176,7 +176,7 @@ set_annotation(Key, Value, BasicMessage) -> mc_compat:set_annotation(Key, Value, BasicMessage). -spec x_header(Key :: binary(), state()) -> - tagged_prop(). + tagged_value(). x_header(Key, #?MODULE{protocol = Proto, annotations = Anns, data = Data}) ->