From a0a75053f85753bf6e13c026f4522f2035a09214 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Mon, 23 Jan 2023 15:37:40 +0100 Subject: [PATCH 1/9] Mark AMQP 1.0 properties chunk as binary It is marked as an UTF8 string, which is not, so strict AMQP 1.0 codecs can fail. --- deps/rabbit/src/rabbit_msg_record.erl | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/deps/rabbit/src/rabbit_msg_record.erl b/deps/rabbit/src/rabbit_msg_record.erl index bfa3eaa2b55b..21388cc945d4 100644 --- a/deps/rabbit/src/rabbit_msg_record.erl +++ b/deps/rabbit/src/rabbit_msg_record.erl @@ -190,7 +190,7 @@ from_amqp091(#'P_basic'{message_id = MsgId, content_encoding = wrap(symbol, ContentEncoding), creation_time = wrap(timestamp, ConvertedTs)}, - APC0 = [{wrap(utf8, K), from_091(T, V)} || {K, T, V} + APC0 = [from_amqp091_header(Type, K, T, V) || {K, T, V} <- case Headers of undefined -> []; _ -> Headers @@ -212,6 +212,11 @@ from_amqp091(#'P_basic'{message_id = MsgId, message_annotations = MA, data = #'v1_0.data'{content = Data}}}. +from_amqp091_header(<<"amqp-1.0">>, <<"x-amqp-1.0-properties">> = K, longstr, V) -> + {wrap(utf8, K), from_091(binary, V)}; +from_amqp091_header(_MessageType, K, T, V) -> + {wrap(utf8, K), from_091(T, V)}. + map_add(_T, _Key, _Type, undefined, Acc) -> Acc; map_add(KeyType, Key, Type, Value, Acc) -> From 9a629738ccb0ae53e8ec52642ac12675bd1378a7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Wed, 25 Jan 2023 16:49:13 +0100 Subject: [PATCH 2/9] Re-use AMQP 1.0 binary chunks if available Instead of converting from AMQP 091 back to AMQP 1.0. This is for AMQP 1.0 properties, application properties, and message annotations. --- deps/rabbit/src/rabbit_msg_record.erl | 200 ++++++++++++++++++-------- 1 file changed, 137 insertions(+), 63 deletions(-) diff --git a/deps/rabbit/src/rabbit_msg_record.erl b/deps/rabbit/src/rabbit_msg_record.erl index 21388cc945d4..2b979ca38a8b 100644 --- a/deps/rabbit/src/rabbit_msg_record.erl +++ b/deps/rabbit/src/rabbit_msg_record.erl @@ -23,9 +23,9 @@ { % header :: maybe(#'v1_0.header'{}), % delivery_annotations :: maybe(#'v1_0.delivery_annotations'{}), - message_annotations :: maybe(#'v1_0.message_annotations'{}), - properties :: maybe(#'v1_0.properties'{}), - application_properties :: maybe(#'v1_0.application_properties'{}), + message_annotations :: maybe(#'v1_0.message_annotations'{}) | binary, + properties :: maybe(#'v1_0.properties'{}) | binary, + application_properties :: maybe(#'v1_0.application_properties'{}) | binary, data :: maybe(amqp10_data()) % footer :: maybe(#'v1_0.footer'{}) }). @@ -41,6 +41,11 @@ state/0 ]). +-define(AMQP10_TYPE, <<"amqp-1.0">>). +-define(AMQP10_PROPERTIES_HEADER, <<"x-amqp-1.0-properties">>). +-define(AMQP10_APP_PROPERTIES_HEADER, <<"x-amqp-1.0-app-properties">>). +-define(AMQP10_MESSAGE_ANNOTATIONS_HEADER, <<"x-amqp-1.0-message-annotations">>). + %% this module acts as a wrapper / converter for the internal binary storage format %% (AMQP 1.0) and any format it needs to be converted to / from. %% Efficiency is key. No unnecessary allocations or work should be done until it @@ -72,7 +77,6 @@ decode([#'v1_0.data'{} = D | Rem], {MA, P, AP, _}) -> amqp10_properties_empty(#'v1_0.properties'{message_id = undefined, user_id = undefined, to = undefined, - % subject = wrap(utf8, RKey), reply_to = undefined, correlation_id = undefined, content_type = undefined, @@ -90,21 +94,36 @@ to_iodata(#?MODULE{msg = #msg{properties = P, data = Data}}) -> [ case MA of - #'v1_0.message_annotations'{content = []} -> - <<>>; + MsgAnnotBin when is_binary(MA) -> + MsgAnnotBin; _ -> - amqp10_framing:encode_bin(MA) + case MA of + #'v1_0.message_annotations'{content = []} -> + <<>>; + _ -> + amqp10_framing:encode_bin(MA) + end end, - case amqp10_properties_empty(P) of - true -> <<>>; - false -> - amqp10_framing:encode_bin(P) + case P of + PropsBin when is_binary(P) -> + PropsBin; + _ -> + case amqp10_properties_empty(P) of + true -> <<>>; + false -> + amqp10_framing:encode_bin(P) + end end, case AP of - #'v1_0.application_properties'{content = []} -> - <<>>; + AppPropsBin when is_binary(AP) -> + AppPropsBin; _ -> - amqp10_framing:encode_bin(AP) + case AP of + #'v1_0.application_properties'{content = []} -> + <<>>; + _ -> + amqp10_framing:encode_bin(AP) + end end, amqp10_framing:encode_bin(Data) ]. @@ -112,6 +131,14 @@ to_iodata(#?MODULE{msg = #msg{properties = P, %% TODO: refine type spec here -spec add_message_annotations(#{binary() => {atom(), term()}}, state()) -> state(). +add_message_annotations(Anns, + #?MODULE{msg = + #msg{message_annotations = MABin} = Msg} = State0) + when is_binary(MABin) -> + [MA] = amqp10_framing:decode_bin(MABin), + State1 = State0#?MODULE{msg = + Msg#msg{message_annotations = MA}}, + add_message_annotations(Anns, State1); add_message_annotations(Anns, #?MODULE{msg = #msg{message_annotations = MA0} = Msg} = State) -> @@ -159,63 +186,101 @@ message_annotation(Key, %% parses it and returns the current parse state %% this is the input function from storage and from, e.g. socket input -spec from_amqp091(#'P_basic'{}, iodata()) -> state(). -from_amqp091(#'P_basic'{message_id = MsgId, - expiration = Expiration, - delivery_mode = DelMode, - headers = Headers, - user_id = UserId, - reply_to = ReplyTo, - type = Type, - priority = Priority, - app_id = AppId, - correlation_id = CorrId, - content_type = ContentType, - content_encoding = ContentEncoding, - timestamp = Timestamp - }, Data) -> - %% TODO: support parsing properties bin directly? +from_amqp091(PB, Data) -> + P = from_amqp091_to_amqp10_properties(PB), + AP = from_amqp091_to_amqp10_app_properties(PB), + MA = from_amqp091_to_amqp10_message_annotations(PB), + + #?MODULE{cfg = #cfg{}, + msg = #msg{properties = P, + application_properties = AP, + message_annotations = MA, + data = #'v1_0.data'{content = Data}}}. + +from_amqp091_to_amqp10_properties(#'P_basic'{type = ?AMQP10_TYPE, + headers = Headers} = P) -> + case proplists:lookup(?AMQP10_PROPERTIES_HEADER, Headers) of + none -> + convert_amqp091_to_amqp10_properties(P); + {_, _, PropsBin} -> + PropsBin + end; +from_amqp091_to_amqp10_properties(P) -> + convert_amqp091_to_amqp10_properties(P). + +convert_amqp091_to_amqp10_properties(#'P_basic'{message_id = MsgId, + user_id = UserId, + reply_to = ReplyTo, + correlation_id = CorrId, + content_type = ContentType, + content_encoding = ContentEncoding, + timestamp = Timestamp + }) -> ConvertedTs = case Timestamp of undefined -> undefined; _ -> Timestamp * 1000 end, - P = #'v1_0.properties'{message_id = wrap(utf8, MsgId), - user_id = wrap(binary, UserId), - to = undefined, - % subject = wrap(utf8, RKey), - reply_to = wrap(utf8, ReplyTo), - correlation_id = wrap(utf8, CorrId), - content_type = wrap(symbol, ContentType), - content_encoding = wrap(symbol, ContentEncoding), - creation_time = wrap(timestamp, ConvertedTs)}, - - APC0 = [from_amqp091_header(Type, K, T, V) || {K, T, V} + #'v1_0.properties'{message_id = wrap(utf8, MsgId), + user_id = wrap(binary, UserId), + to = undefined, + reply_to = wrap(utf8, ReplyTo), + correlation_id = wrap(utf8, CorrId), + content_type = wrap(symbol, ContentType), + content_encoding = wrap(symbol, ContentEncoding), + creation_time = wrap(timestamp, ConvertedTs)}. + +from_amqp091_to_amqp10_app_properties(#'P_basic'{type = ?AMQP10_TYPE, + headers = Headers} = P) -> + case proplists:lookup(?AMQP10_APP_PROPERTIES_HEADER, Headers) of + none -> + convert_amqp091_to_amqp10_app_properties(P); + {_, _, AppPropsBin} -> + AppPropsBin + end; +from_amqp091_to_amqp10_app_properties(P) -> + convert_amqp091_to_amqp10_app_properties(P). + +convert_amqp091_to_amqp10_app_properties(#'P_basic'{headers = Headers, + type = Type, + app_id = AppId}) -> + APC0 = [{wrap(utf8, K), from_091(T, V)} || {K, T, V} <- case Headers of undefined -> []; _ -> Headers - end, not unsupported_header_value_type(T)], - %% properties that do not map directly to AMQP 1.0 properties are stored - %% in application properties - APC = map_add(utf8, <<"x-basic-type">>, utf8, Type, - map_add(utf8, <<"x-basic-app-id">>, utf8, AppId, APC0)), + end, not unsupported_header_value_type(T), + not filtered_header(Type, K)], + APC1 = case Type of + ?AMQP10_TYPE -> + APC0; + _ -> + %% properties that do not map directly to AMQP 1.0 properties are stored + %% in application properties + map_add(utf8, <<"x-basic-type">>, utf8, Type, + map_add(utf8, <<"x-basic-app-id">>, utf8, AppId, APC0)) + end, + #'v1_0.application_properties'{content = APC1}. + +from_amqp091_to_amqp10_message_annotations(#'P_basic'{type = ?AMQP10_TYPE, + headers = Headers}) -> + case proplists:lookup(?AMQP10_MESSAGE_ANNOTATIONS_HEADER, Headers) of + none -> + #'v1_0.message_annotations'{content = []}; + {_, _, MessageAnnotationsBin} -> + MessageAnnotationsBin + end; +from_amqp091_to_amqp10_message_annotations(P) -> + convert_amqp091_to_amqp10_message_annotations(P). +convert_amqp091_to_amqp10_message_annotations(#'P_basic'{priority = Priority, + delivery_mode = DelMode, + expiration = Expiration}) -> MAC = map_add(symbol, <<"x-basic-priority">>, ubyte, Priority, map_add(symbol, <<"x-basic-delivery-mode">>, ubyte, DelMode, map_add(symbol, <<"x-basic-expiration">>, utf8, Expiration, []))), - AP = #'v1_0.application_properties'{content = APC}, - MA = #'v1_0.message_annotations'{content = MAC}, - #?MODULE{cfg = #cfg{}, - msg = #msg{properties = P, - application_properties = AP, - message_annotations = MA, - data = #'v1_0.data'{content = Data}}}. - -from_amqp091_header(<<"amqp-1.0">>, <<"x-amqp-1.0-properties">> = K, longstr, V) -> - {wrap(utf8, K), from_091(binary, V)}; -from_amqp091_header(_MessageType, K, T, V) -> - {wrap(utf8, K), from_091(T, V)}. + #'v1_0.message_annotations'{content = MAC}. map_add(_T, _Key, _Type, undefined, Acc) -> Acc; @@ -395,12 +460,21 @@ message_id({utf8, S}, HKey, H0) -> message_id(MsgId, _, H) -> {H, unwrap(MsgId)}. - unsupported_header_value_type(array) -> - true; - unsupported_header_value_type(table) -> - true; - unsupported_header_value_type(_) -> - false. +unsupported_header_value_type(array) -> + true; +unsupported_header_value_type(table) -> + true; +unsupported_header_value_type(_) -> + false. + +filtered_header(?AMQP10_TYPE, ?AMQP10_PROPERTIES_HEADER) -> + true; +filtered_header(?AMQP10_TYPE, ?AMQP10_APP_PROPERTIES_HEADER) -> + true; +filtered_header(?AMQP10_TYPE, ?AMQP10_MESSAGE_ANNOTATIONS_HEADER) -> + true; +filtered_header(_, _) -> + false. -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). From 220ca8ffa2ffcaec1a2913c5587945eaef4ceb21 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Thu, 26 Jan 2023 10:58:42 +0100 Subject: [PATCH 3/9] Test AMQP 1.0 binary chunk reuse --- deps/rabbit/src/rabbit_msg_record.erl | 2 +- deps/rabbit/test/rabbit_msg_record_SUITE.erl | 31 ++++++++++++++++++-- 2 files changed, 29 insertions(+), 4 deletions(-) diff --git a/deps/rabbit/src/rabbit_msg_record.erl b/deps/rabbit/src/rabbit_msg_record.erl index 2b979ca38a8b..b17482b176f2 100644 --- a/deps/rabbit/src/rabbit_msg_record.erl +++ b/deps/rabbit/src/rabbit_msg_record.erl @@ -187,9 +187,9 @@ message_annotation(Key, %% this is the input function from storage and from, e.g. socket input -spec from_amqp091(#'P_basic'{}, iodata()) -> state(). from_amqp091(PB, Data) -> + MA = from_amqp091_to_amqp10_message_annotations(PB), P = from_amqp091_to_amqp10_properties(PB), AP = from_amqp091_to_amqp10_app_properties(PB), - MA = from_amqp091_to_amqp10_message_annotations(PB), #?MODULE{cfg = #cfg{}, msg = #msg{properties = P, diff --git a/deps/rabbit/test/rabbit_msg_record_SUITE.erl b/deps/rabbit/test/rabbit_msg_record_SUITE.erl index 5b25677a86c2..b90a61483d1b 100644 --- a/deps/rabbit/test/rabbit_msg_record_SUITE.erl +++ b/deps/rabbit/test/rabbit_msg_record_SUITE.erl @@ -28,7 +28,8 @@ all_tests() -> message_id_uuid, message_id_binary, message_id_large_binary, - message_id_large_string + message_id_large_string, + reuse_amqp10_binary_chunks ]. groups() -> @@ -214,6 +215,32 @@ message_id_large_string(_Config) -> Props), ok. +reuse_amqp10_binary_chunks(_Config) -> + Amqp10MsgAnnotations = #'v1_0.message_annotations'{content = + [{{symbol, <<"x-route">>}, {utf8, <<"dummy">>}}]}, + Amqp10MsgAnnotationsBin = amqp10_encode_bin(Amqp10MsgAnnotations), + Amqp10Props = #'v1_0.properties'{group_id = {utf8, <<"my-group">>}, + group_sequence = {uint, 42}}, + Amqp10PropsBin = amqp10_encode_bin(Amqp10Props), + Amqp10AppProps = #'v1_0.application_properties'{content = [{{utf8, <<"foo">>}, {utf8, <<"bar">>}}]}, + Amqp10AppPropsBin = amqp10_encode_bin(Amqp10AppProps), + Amqp091Headers = [{<<"x-amqp-1.0-message-annotations">>, longstr, Amqp10MsgAnnotationsBin}, + {<<"x-amqp-1.0-properties">>, longstr, Amqp10PropsBin}, + {<<"x-amqp-1.0-app-properties">>, longstr, Amqp10AppPropsBin}], + Amqp091Props = #'P_basic'{type = <<"amqp-1.0">>, headers = Amqp091Headers}, + R = rabbit_msg_record:from_amqp091(Amqp091Props, <<"payload-does-not-matter">>), + RBin = rabbit_msg_record:to_iodata(R), + Amqp10DecodedMsg = amqp10_framing:decode_bin(list_to_binary(RBin)), + [Amqp10DecodedMsgAnnotations, Amqp10DecodedProps, + Amqp10DecodedAppProps, _] = Amqp10DecodedMsg, + ?assertEqual(Amqp10MsgAnnotations, Amqp10DecodedMsgAnnotations), + ?assertEqual(Amqp10Props, Amqp10DecodedProps), + ?assertEqual(Amqp10AppProps, Amqp10DecodedAppProps), + ok. + +amqp10_encode_bin(X) -> + list_to_binary(amqp10_framing:encode_bin(X)). + %% Utility test_amqp091_roundtrip(Props, Payload) -> @@ -226,5 +253,3 @@ test_amqp091_roundtrip(Props, Payload) -> ?assertEqual(iolist_to_binary(Payload), iolist_to_binary(PayloadOut)), ok. - - From 88ae346e0de3959e80550c61b0630d4272e170fb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Mon, 30 Jan 2023 11:14:24 +0100 Subject: [PATCH 4/9] Support AMQP 1.0 multi-value body better In the rabbit_msg_record module, mostly. Before this commit, only one Data section was supported. Now multiple Data sections, multiple Sequence sections, and an AMQP value section are supported. --- deps/rabbit/src/rabbit_msg_record.erl | 104 ++++++++++++------ deps/rabbit/test/rabbit_msg_record_SUITE.erl | 2 +- .../src/rabbit_amqp1_0_message.erl | 10 +- 3 files changed, 82 insertions(+), 34 deletions(-) diff --git a/deps/rabbit/src/rabbit_msg_record.erl b/deps/rabbit/src/rabbit_msg_record.erl index b17482b176f2..64e36ec4b783 100644 --- a/deps/rabbit/src/rabbit_msg_record.erl +++ b/deps/rabbit/src/rabbit_msg_record.erl @@ -26,7 +26,7 @@ message_annotations :: maybe(#'v1_0.message_annotations'{}) | binary, properties :: maybe(#'v1_0.properties'{}) | binary, application_properties :: maybe(#'v1_0.application_properties'{}) | binary, - data :: maybe(amqp10_data()) + data :: maybe(amqp10_data()) | binary % footer :: maybe(#'v1_0.footer'{}) }). @@ -56,7 +56,10 @@ init(Bin) when is_binary(Bin) -> %% TODO: delay parsing until needed {MA, P, AP, D} = decode(amqp10_framing:decode_bin(Bin), - {undefined, undefined, undefined, undefined}), + {undefined, undefined, undefined, undefined}), + %% decoded body sections already in reverse order, + %% no need to reverse them after decoding + #?MODULE{cfg = #cfg{}, msg = #msg{properties = P, application_properties = AP, @@ -71,8 +74,19 @@ decode([#'v1_0.properties'{} = P | Rem], {MA, _, AP, D}) -> decode(Rem, {MA, P, AP, D}); decode([#'v1_0.application_properties'{} = AP | Rem], {MA, P, _, D}) -> decode(Rem, {MA, P, AP, D}); -decode([#'v1_0.data'{} = D | Rem], {MA, P, AP, _}) -> - decode(Rem, {MA, P, AP, D}). +decode([#'v1_0.amqp_value'{} = D | Rem], {MA, P, AP, _}) -> + decode(Rem, {MA, P, AP, D}); +decode([#'v1_0.data'{} = D | Rem], {MA, P, AP, undefined}) -> + decode(Rem, {MA, P, AP, D}); +decode([#'v1_0.data'{} = D | Rem], {MA, P, AP, B}) when is_list(B) -> + decode(Rem, {MA, P, AP, [D | B]}); +decode([#'v1_0.data'{} = D | Rem], {MA, P, AP, B}) -> + decode(Rem, {MA, P, AP, [D | [B]]}); +decode([#'v1_0.amqp_sequence'{} = D | Rem], {MA, P, AP, undefined}) -> + decode(Rem, {MA, P, AP, [D]}); +decode([#'v1_0.amqp_sequence'{} = D | Rem], {MA, P, AP, B}) when is_list(B) -> + decode(Rem, {MA, P, AP, [D | B]}). + amqp10_properties_empty(#'v1_0.properties'{message_id = undefined, user_id = undefined, @@ -125,7 +139,12 @@ to_iodata(#?MODULE{msg = #msg{properties = P, amqp10_framing:encode_bin(AP) end end, - amqp10_framing:encode_bin(Data) + case Data of + DataBin when is_binary(Data) orelse is_list(Data) -> + DataBin; + _ -> + amqp10_framing:encode_bin(Data) + end ]. %% TODO: refine type spec here @@ -186,19 +205,26 @@ message_annotation(Key, %% parses it and returns the current parse state %% this is the input function from storage and from, e.g. socket input -spec from_amqp091(#'P_basic'{}, iodata()) -> state(). -from_amqp091(PB, Data) -> +from_amqp091(#'P_basic'{type = T} = PB, Data) -> MA = from_amqp091_to_amqp10_message_annotations(PB), P = from_amqp091_to_amqp10_properties(PB), AP = from_amqp091_to_amqp10_app_properties(PB), + D = case T of + ?AMQP10_TYPE -> + %% the body is already AMQP 1.0 binary content, so leave as-is + Data; + _ -> + #'v1_0.data'{content = Data} + end, + #?MODULE{cfg = #cfg{}, msg = #msg{properties = P, application_properties = AP, message_annotations = MA, - data = #'v1_0.data'{content = Data}}}. + data = D}}. -from_amqp091_to_amqp10_properties(#'P_basic'{type = ?AMQP10_TYPE, - headers = Headers} = P) -> +from_amqp091_to_amqp10_properties(#'P_basic'{headers = Headers} = P) when is_list(Headers) -> case proplists:lookup(?AMQP10_PROPERTIES_HEADER, Headers) of none -> convert_amqp091_to_amqp10_properties(P); @@ -231,8 +257,8 @@ convert_amqp091_to_amqp10_properties(#'P_basic'{message_id = MsgId, content_encoding = wrap(symbol, ContentEncoding), creation_time = wrap(timestamp, ConvertedTs)}. -from_amqp091_to_amqp10_app_properties(#'P_basic'{type = ?AMQP10_TYPE, - headers = Headers} = P) -> +from_amqp091_to_amqp10_app_properties(#'P_basic'{headers = Headers} = P) + when is_list(Headers) -> case proplists:lookup(?AMQP10_APP_PROPERTIES_HEADER, Headers) of none -> convert_amqp091_to_amqp10_app_properties(P); @@ -250,23 +276,26 @@ convert_amqp091_to_amqp10_app_properties(#'P_basic'{headers = Headers, undefined -> []; _ -> Headers end, not unsupported_header_value_type(T), - not filtered_header(Type, K)], + not filtered_header(K)], + APC1 = case Type of ?AMQP10_TYPE -> + %% no need to modify the application properties for the type + %% this info will be restored on decoding if necessary APC0; _ -> - %% properties that do not map directly to AMQP 1.0 properties are stored - %% in application properties - map_add(utf8, <<"x-basic-type">>, utf8, Type, - map_add(utf8, <<"x-basic-app-id">>, utf8, AppId, APC0)) + map_add(utf8, <<"x-basic-type">>, utf8, Type, APC0) end, - #'v1_0.application_properties'{content = APC1}. -from_amqp091_to_amqp10_message_annotations(#'P_basic'{type = ?AMQP10_TYPE, - headers = Headers}) -> + %% properties that do not map directly to AMQP 1.0 properties are stored + %% in application properties + APC2 = map_add(utf8, <<"x-basic-app-id">>, utf8, AppId, APC1), + #'v1_0.application_properties'{content = APC2}. + +from_amqp091_to_amqp10_message_annotations(#'P_basic'{headers = Headers} = P) when is_list(Headers) -> case proplists:lookup(?AMQP10_MESSAGE_ANNOTATIONS_HEADER, Headers) of none -> - #'v1_0.message_annotations'{content = []}; + convert_amqp091_to_amqp10_message_annotations(P); {_, _, MessageAnnotationsBin} -> MessageAnnotationsBin end; @@ -293,12 +322,20 @@ to_amqp091(#?MODULE{msg = #msg{properties = P, message_annotations = MAR, data = Data}}) -> - Payload = case Data of - undefined -> - <<>>; - #'v1_0.data'{content = C} -> - C - end, + %% anything else than a single data section is expected to be AMQP 1.0 binary content + %% enforcing this convention + {Payload, IsAmqp10} = case Data of + undefined -> + {<<>>, false}; + #'v1_0.data'{content = C} -> + {C, false}; + Sections when is_list(Data)-> + B = [amqp10_framing:encode_bin(S) || S <- Sections], + {list_to_binary(B), + true}; + V -> + {amqp10_framing:encode_bin(V), true} + end, #'v1_0.properties'{message_id = MsgId, user_id = UserId, @@ -322,7 +359,12 @@ to_amqp091(#?MODULE{msg = #msg{properties = P, _ -> [] end, - {Type, AP1} = amqp10_map_get(utf8(<<"x-basic-type">>), AP0), + {Type, AP1} = case {amqp10_map_get(utf8(<<"x-basic-type">>), AP0), IsAmqp10} of + {{undefined, M}, true} -> + {?AMQP10_TYPE, M}; + {{T, M}, _} -> + {T, M} + end, {AppId, AP} = amqp10_map_get(utf8(<<"x-basic-app-id">>), AP1), {Priority, MA1} = amqp10_map_get(symbol(<<"x-basic-priority">>), MA0), @@ -467,13 +509,13 @@ unsupported_header_value_type(table) -> unsupported_header_value_type(_) -> false. -filtered_header(?AMQP10_TYPE, ?AMQP10_PROPERTIES_HEADER) -> +filtered_header(?AMQP10_PROPERTIES_HEADER) -> true; -filtered_header(?AMQP10_TYPE, ?AMQP10_APP_PROPERTIES_HEADER) -> +filtered_header(?AMQP10_APP_PROPERTIES_HEADER) -> true; -filtered_header(?AMQP10_TYPE, ?AMQP10_MESSAGE_ANNOTATIONS_HEADER) -> +filtered_header(?AMQP10_MESSAGE_ANNOTATIONS_HEADER) -> true; -filtered_header(_, _) -> +filtered_header(_) -> false. -ifdef(TEST). diff --git a/deps/rabbit/test/rabbit_msg_record_SUITE.erl b/deps/rabbit/test/rabbit_msg_record_SUITE.erl index b90a61483d1b..91d03be957c5 100644 --- a/deps/rabbit/test/rabbit_msg_record_SUITE.erl +++ b/deps/rabbit/test/rabbit_msg_record_SUITE.erl @@ -227,7 +227,7 @@ reuse_amqp10_binary_chunks(_Config) -> Amqp091Headers = [{<<"x-amqp-1.0-message-annotations">>, longstr, Amqp10MsgAnnotationsBin}, {<<"x-amqp-1.0-properties">>, longstr, Amqp10PropsBin}, {<<"x-amqp-1.0-app-properties">>, longstr, Amqp10AppPropsBin}], - Amqp091Props = #'P_basic'{type = <<"amqp-1.0">>, headers = Amqp091Headers}, + Amqp091Props = #'P_basic'{headers = Amqp091Headers}, R = rabbit_msg_record:from_amqp091(Amqp091Props, <<"payload-does-not-matter">>), RBin = rabbit_msg_record:to_iodata(R), Amqp10DecodedMsg = amqp10_framing:decode_bin(list_to_binary(RBin)), diff --git a/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_message.erl b/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_message.erl index 97b893c17f7f..56aa3fd2f27b 100644 --- a/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_message.erl +++ b/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_message.erl @@ -23,9 +23,15 @@ -include("rabbit_amqp1_0.hrl"). assemble(MsgBin) -> - {RKey, Props, Content} = assemble(header, {<<"">>, #'P_basic'{}, []}, + {RKey, Props, Content0} = assemble(header, {<<"">>, #'P_basic'{}, []}, decode_section(MsgBin), MsgBin), - {RKey, #amqp_msg{props = Props, payload = Content}}. + Content1 = case Content0 of + Sections when is_list(Content0) -> + lists:reverse(Sections); + _ -> + Content0 + end, + {RKey, #amqp_msg{props = Props, payload = Content1}}. assemble(header, {R, P, C}, {H = #'v1_0.header'{}, Rest}, _Uneaten) -> assemble(message_annotations, {R, translate_header(H, P), C}, From a88d148687e97b23dc95a63449989680aa6a52ca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Mon, 30 Jan 2023 16:56:10 +0100 Subject: [PATCH 5/9] Add test for non-single-data-section AMQP 1.0 message --- deps/rabbit/src/rabbit_msg_record.erl | 18 ++++--- deps/rabbit/test/rabbit_msg_record_SUITE.erl | 53 +++++++++++++++++-- .../src/rabbit_amqp1_0_message.erl | 1 + 3 files changed, 61 insertions(+), 11 deletions(-) diff --git a/deps/rabbit/src/rabbit_msg_record.erl b/deps/rabbit/src/rabbit_msg_record.erl index 64e36ec4b783..6d34e9fa60f5 100644 --- a/deps/rabbit/src/rabbit_msg_record.erl +++ b/deps/rabbit/src/rabbit_msg_record.erl @@ -55,16 +55,22 @@ -spec init(binary()) -> state(). init(Bin) when is_binary(Bin) -> %% TODO: delay parsing until needed - {MA, P, AP, D} = decode(amqp10_framing:decode_bin(Bin), + {MA, P, AP, D0} = decode(amqp10_framing:decode_bin(Bin), {undefined, undefined, undefined, undefined}), - %% decoded body sections already in reverse order, - %% no need to reverse them after decoding + + D1 = case D0 of + Sections when is_list(D0) -> + lists:reverse(Sections); + _ -> + D0 + end, + #?MODULE{cfg = #cfg{}, msg = #msg{properties = P, application_properties = AP, message_annotations = MA, - data = D}}. + data = D1}}. decode([], Acc) -> Acc; @@ -212,7 +218,7 @@ from_amqp091(#'P_basic'{type = T} = PB, Data) -> D = case T of ?AMQP10_TYPE -> - %% the body is already AMQP 1.0 binary content, so leave as-is + %% the body is already AMQP 1.0 binary content, so leaving it as-is Data; _ -> #'v1_0.data'{content = Data} @@ -331,7 +337,7 @@ to_amqp091(#?MODULE{msg = #msg{properties = P, {C, false}; Sections when is_list(Data)-> B = [amqp10_framing:encode_bin(S) || S <- Sections], - {list_to_binary(B), + {iolist_to_binary(B), true}; V -> {amqp10_framing:encode_bin(V), true} diff --git a/deps/rabbit/test/rabbit_msg_record_SUITE.erl b/deps/rabbit/test/rabbit_msg_record_SUITE.erl index 91d03be957c5..9337a126f8c7 100644 --- a/deps/rabbit/test/rabbit_msg_record_SUITE.erl +++ b/deps/rabbit/test/rabbit_msg_record_SUITE.erl @@ -23,6 +23,7 @@ all() -> all_tests() -> [ ampq091_roundtrip, + amqp10_non_single_data_bodies, unsupported_091_header_is_dropped, message_id_ulong, message_id_uuid, @@ -92,6 +93,43 @@ ampq091_roundtrip(_Config) -> test_amqp091_roundtrip(#'P_basic'{}, Payload), ok. +amqp10_non_single_data_bodies(_Config) -> + Props = #'P_basic'{type = <<"amqp-1.0">>}, + Payloads = [ + [#'v1_0.data'{content = <<"hello">>}, + #'v1_0.data'{content = <<"brave">>}, + #'v1_0.data'{content = <<"new">>}, + #'v1_0.data'{content = <<"world">>} + ], + #'v1_0.amqp_value'{content = {utf8, <<"hello world">>}}, + [#'v1_0.amqp_sequence'{content = [{utf8, <<"one">>}, + {utf8, <<"blah">>}]}, + #'v1_0.amqp_sequence'{content = [{utf8, <<"two">>}]} + ] + ], + + [begin + EncodedPayload = amqp10_encode_bin(Payload), + + MsgRecord0 = rabbit_msg_record:from_amqp091(Props, EncodedPayload), + MsgRecord = rabbit_msg_record:init( + iolist_to_binary(rabbit_msg_record:to_iodata(MsgRecord0))), + {PropsOut, PayloadEncodedOut} = rabbit_msg_record:to_amqp091(MsgRecord), + PayloadOut = case amqp10_framing:decode_bin(iolist_to_binary(PayloadEncodedOut)) of + L when length(L) =:= 1 -> + lists:nth(1, L); + L -> + L + end, + + ?assertEqual(Props, PropsOut), + ?assertEqual(iolist_to_binary(EncodedPayload), + iolist_to_binary(PayloadEncodedOut)), + ?assertEqual(Payload, PayloadOut) + + end || Payload <- Payloads], + ok. + unsupported_091_header_is_dropped(_Config) -> Props = #'P_basic'{ headers = [ @@ -227,19 +265,24 @@ reuse_amqp10_binary_chunks(_Config) -> Amqp091Headers = [{<<"x-amqp-1.0-message-annotations">>, longstr, Amqp10MsgAnnotationsBin}, {<<"x-amqp-1.0-properties">>, longstr, Amqp10PropsBin}, {<<"x-amqp-1.0-app-properties">>, longstr, Amqp10AppPropsBin}], - Amqp091Props = #'P_basic'{headers = Amqp091Headers}, - R = rabbit_msg_record:from_amqp091(Amqp091Props, <<"payload-does-not-matter">>), + Amqp091Props = #'P_basic'{type= <<"amqp-1.0">>, headers = Amqp091Headers}, + Body = #'v1_0.amqp_value'{content = {utf8, <<"hello world">>}}, + EncodedBody = amqp10_encode_bin(Body), + R = rabbit_msg_record:from_amqp091(Amqp091Props, EncodedBody), RBin = rabbit_msg_record:to_iodata(R), - Amqp10DecodedMsg = amqp10_framing:decode_bin(list_to_binary(RBin)), + Amqp10DecodedMsg = amqp10_framing:decode_bin(iolist_to_binary(RBin)), [Amqp10DecodedMsgAnnotations, Amqp10DecodedProps, - Amqp10DecodedAppProps, _] = Amqp10DecodedMsg, + Amqp10DecodedAppProps, DecodedBody] = Amqp10DecodedMsg, ?assertEqual(Amqp10MsgAnnotations, Amqp10DecodedMsgAnnotations), ?assertEqual(Amqp10Props, Amqp10DecodedProps), ?assertEqual(Amqp10AppProps, Amqp10DecodedAppProps), + ?assertEqual(Body, DecodedBody), ok. +amqp10_encode_bin(L) when is_list(L) -> + iolist_to_binary([amqp10_encode_bin(X) || X <- L]); amqp10_encode_bin(X) -> - list_to_binary(amqp10_framing:encode_bin(X)). + iolist_to_binary(amqp10_framing:encode_bin(X)). %% Utility diff --git a/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_message.erl b/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_message.erl index 56aa3fd2f27b..c8c07af57a30 100644 --- a/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_message.erl +++ b/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_message.erl @@ -25,6 +25,7 @@ assemble(MsgBin) -> {RKey, Props, Content0} = assemble(header, {<<"">>, #'P_basic'{}, []}, decode_section(MsgBin), MsgBin), + Content1 = case Content0 of Sections when is_list(Content0) -> lists:reverse(Sections); From 189443e1347f29ff642dc55b99b9faaf7fb53395 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Mon, 30 Jan 2023 18:03:45 +0100 Subject: [PATCH 6/9] Squash some Dialyzer warnings --- deps/rabbit/src/rabbit_msg_record.erl | 73 +++++++++++++-------------- 1 file changed, 35 insertions(+), 38 deletions(-) diff --git a/deps/rabbit/src/rabbit_msg_record.erl b/deps/rabbit/src/rabbit_msg_record.erl index 6d34e9fa60f5..0358f7b78a39 100644 --- a/deps/rabbit/src/rabbit_msg_record.erl +++ b/deps/rabbit/src/rabbit_msg_record.erl @@ -23,10 +23,10 @@ { % header :: maybe(#'v1_0.header'{}), % delivery_annotations :: maybe(#'v1_0.delivery_annotations'{}), - message_annotations :: maybe(#'v1_0.message_annotations'{}) | binary, - properties :: maybe(#'v1_0.properties'{}) | binary, - application_properties :: maybe(#'v1_0.application_properties'{}) | binary, - data :: maybe(amqp10_data()) | binary + message_annotations :: maybe(#'v1_0.message_annotations'{}) | iodata, + properties :: maybe(#'v1_0.properties'{}) | iodata, + application_properties :: maybe(#'v1_0.application_properties'{}) | iodata, + data :: maybe(amqp10_data()) | iodata % footer :: maybe(#'v1_0.footer'{}) }). @@ -114,36 +114,30 @@ to_iodata(#?MODULE{msg = #msg{properties = P, data = Data}}) -> [ case MA of - MsgAnnotBin when is_binary(MA) -> - MsgAnnotBin; - _ -> - case MA of - #'v1_0.message_annotations'{content = []} -> - <<>>; - _ -> - amqp10_framing:encode_bin(MA) - end + #'v1_0.message_annotations'{content = []} -> + <<>>; + #'v1_0.message_annotations'{} -> + amqp10_framing:encode_bin(MA); + MsgAnnotBin -> + MsgAnnotBin end, case P of - PropsBin when is_binary(P) -> - PropsBin; - _ -> + #'v1_0.properties'{} -> case amqp10_properties_empty(P) of true -> <<>>; false -> amqp10_framing:encode_bin(P) - end + end; + PropsBin -> + PropsBin end, case AP of - AppPropsBin when is_binary(AP) -> - AppPropsBin; - _ -> - case AP of - #'v1_0.application_properties'{content = []} -> - <<>>; - _ -> - amqp10_framing:encode_bin(AP) - end + #'v1_0.application_properties'{content = []} -> + <<>>; + #'v1_0.application_properties'{} -> + amqp10_framing:encode_bin(AP); + AppPropsBin -> + AppPropsBin end, case Data of DataBin when is_binary(Data) orelse is_list(Data) -> @@ -158,28 +152,31 @@ to_iodata(#?MODULE{msg = #msg{properties = P, state(). add_message_annotations(Anns, #?MODULE{msg = - #msg{message_annotations = MABin} = Msg} = State0) - when is_binary(MABin) -> - [MA] = amqp10_framing:decode_bin(MABin), - State1 = State0#?MODULE{msg = - Msg#msg{message_annotations = MA}}, - add_message_annotations(Anns, State1); + #msg{message_annotations = undefined} = Msg} = State) -> + add_message_annotations(Anns, + State#?MODULE{msg = Msg#msg{message_annotations = + #'v1_0.message_annotations'{content = []}}}); add_message_annotations(Anns, #?MODULE{msg = - #msg{message_annotations = MA0} = Msg} = State) -> + #msg{message_annotations = + #'v1_0.message_annotations'{content = C}} = Msg} = State) -> Content = maps:fold( fun (K, {T, V}, Acc) -> map_add(symbol, K, T, V, Acc) end, - case MA0 of - undefined -> []; - #'v1_0.message_annotations'{content = C} -> C - end, + C, Anns), State#?MODULE{msg = Msg#msg{message_annotations = - #'v1_0.message_annotations'{content = Content}}}. + #'v1_0.message_annotations'{content = Content}}}; +add_message_annotations(Anns, + #?MODULE{msg = + #msg{message_annotations = MABin} = Msg} = State0) -> + [MA] = amqp10_framing:decode_bin(iolist_to_binary(MABin)), + State1 = State0#?MODULE{msg = + Msg#msg{message_annotations = MA}}, + add_message_annotations(Anns, State1). %% TODO: refine -type amqp10_term() :: {atom(), term()}. From 926c7ac53a45edaf3bb62cfee52e9e28753a234a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Tue, 31 Jan 2023 10:21:00 +0100 Subject: [PATCH 7/9] Silent dialyzer for a function for now --- deps/rabbit/src/rabbit_msg_record.erl | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/deps/rabbit/src/rabbit_msg_record.erl b/deps/rabbit/src/rabbit_msg_record.erl index 0358f7b78a39..e822007d58a2 100644 --- a/deps/rabbit/src/rabbit_msg_record.erl +++ b/deps/rabbit/src/rabbit_msg_record.erl @@ -12,6 +12,8 @@ to_091/2 ]). +-dialyzer({nowarn_function, add_message_annotations/2}). + -include_lib("rabbit_common/include/rabbit_framing.hrl"). -include_lib("amqp10_common/include/amqp10_framing.hrl"). @@ -65,7 +67,6 @@ init(Bin) when is_binary(Bin) -> D0 end, - #?MODULE{cfg = #cfg{}, msg = #msg{properties = P, application_properties = AP, From f8cc9f94895757bb46d9955e412ba98b5aad4dfb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Tue, 31 Jan 2023 10:34:21 +0100 Subject: [PATCH 8/9] Fix type declaration, use type, not atom --- deps/rabbit/src/rabbit_msg_record.erl | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/deps/rabbit/src/rabbit_msg_record.erl b/deps/rabbit/src/rabbit_msg_record.erl index e822007d58a2..6148a943dfd6 100644 --- a/deps/rabbit/src/rabbit_msg_record.erl +++ b/deps/rabbit/src/rabbit_msg_record.erl @@ -12,8 +12,6 @@ to_091/2 ]). --dialyzer({nowarn_function, add_message_annotations/2}). - -include_lib("rabbit_common/include/rabbit_framing.hrl"). -include_lib("amqp10_common/include/amqp10_framing.hrl"). @@ -25,10 +23,10 @@ { % header :: maybe(#'v1_0.header'{}), % delivery_annotations :: maybe(#'v1_0.delivery_annotations'{}), - message_annotations :: maybe(#'v1_0.message_annotations'{}) | iodata, - properties :: maybe(#'v1_0.properties'{}) | iodata, - application_properties :: maybe(#'v1_0.application_properties'{}) | iodata, - data :: maybe(amqp10_data()) | iodata + message_annotations :: maybe(#'v1_0.message_annotations'{}) | iodata(), + properties :: maybe(#'v1_0.properties'{}) | iodata(), + application_properties :: maybe(#'v1_0.application_properties'{}) | iodata(), + data :: maybe(amqp10_data()) | iodata() % footer :: maybe(#'v1_0.footer'{}) }). From 6b770143210b3d056f0c8a4c7c36109066d2b0e0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Tue, 31 Jan 2023 14:30:47 +0100 Subject: [PATCH 9/9] Address review comments --- deps/rabbit/src/rabbit_msg_record.erl | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/deps/rabbit/src/rabbit_msg_record.erl b/deps/rabbit/src/rabbit_msg_record.erl index 6148a943dfd6..568973034178 100644 --- a/deps/rabbit/src/rabbit_msg_record.erl +++ b/deps/rabbit/src/rabbit_msg_record.erl @@ -86,7 +86,7 @@ decode([#'v1_0.data'{} = D | Rem], {MA, P, AP, undefined}) -> decode([#'v1_0.data'{} = D | Rem], {MA, P, AP, B}) when is_list(B) -> decode(Rem, {MA, P, AP, [D | B]}); decode([#'v1_0.data'{} = D | Rem], {MA, P, AP, B}) -> - decode(Rem, {MA, P, AP, [D | [B]]}); + decode(Rem, {MA, P, AP, [D, B]}); decode([#'v1_0.amqp_sequence'{} = D | Rem], {MA, P, AP, undefined}) -> decode(Rem, {MA, P, AP, [D]}); decode([#'v1_0.amqp_sequence'{} = D | Rem], {MA, P, AP, B}) when is_list(B) -> @@ -281,13 +281,13 @@ convert_amqp091_to_amqp10_app_properties(#'P_basic'{headers = Headers, not filtered_header(K)], APC1 = case Type of - ?AMQP10_TYPE -> - %% no need to modify the application properties for the type - %% this info will be restored on decoding if necessary - APC0; - _ -> - map_add(utf8, <<"x-basic-type">>, utf8, Type, APC0) - end, + ?AMQP10_TYPE -> + %% no need to modify the application properties for the type + %% this info will be restored on decoding if necessary + APC0; + _ -> + map_add(utf8, <<"x-basic-type">>, utf8, Type, APC0) + end, %% properties that do not map directly to AMQP 1.0 properties are stored %% in application properties @@ -328,6 +328,8 @@ to_amqp091(#?MODULE{msg = #msg{properties = P, %% enforcing this convention {Payload, IsAmqp10} = case Data of undefined -> + %% not an expected value, + %% but handling it with an empty binary anyway {<<>>, false}; #'v1_0.data'{content = C} -> {C, false}; @@ -336,7 +338,7 @@ to_amqp091(#?MODULE{msg = #msg{properties = P, {iolist_to_binary(B), true}; V -> - {amqp10_framing:encode_bin(V), true} + {iolist_to_binary(amqp10_framing:encode_bin(V)), true} end, #'v1_0.properties'{message_id = MsgId,