diff --git a/deps/rabbit/src/rabbit_msg_record.erl b/deps/rabbit/src/rabbit_msg_record.erl index bfa3eaa2b55b..568973034178 100644 --- a/deps/rabbit/src/rabbit_msg_record.erl +++ b/deps/rabbit/src/rabbit_msg_record.erl @@ -23,10 +23,10 @@ { % header :: maybe(#'v1_0.header'{}), % delivery_annotations :: maybe(#'v1_0.delivery_annotations'{}), - message_annotations :: maybe(#'v1_0.message_annotations'{}), - properties :: maybe(#'v1_0.properties'{}), - application_properties :: maybe(#'v1_0.application_properties'{}), - data :: maybe(amqp10_data()) + message_annotations :: maybe(#'v1_0.message_annotations'{}) | iodata(), + properties :: maybe(#'v1_0.properties'{}) | iodata(), + application_properties :: maybe(#'v1_0.application_properties'{}) | iodata(), + data :: maybe(amqp10_data()) | iodata() % footer :: maybe(#'v1_0.footer'{}) }). @@ -41,6 +41,11 @@ state/0 ]). +-define(AMQP10_TYPE, <<"amqp-1.0">>). +-define(AMQP10_PROPERTIES_HEADER, <<"x-amqp-1.0-properties">>). +-define(AMQP10_APP_PROPERTIES_HEADER, <<"x-amqp-1.0-app-properties">>). +-define(AMQP10_MESSAGE_ANNOTATIONS_HEADER, <<"x-amqp-1.0-message-annotations">>). + %% this module acts as a wrapper / converter for the internal binary storage format %% (AMQP 1.0) and any format it needs to be converted to / from. %% Efficiency is key. No unnecessary allocations or work should be done until it @@ -50,13 +55,21 @@ -spec init(binary()) -> state(). init(Bin) when is_binary(Bin) -> %% TODO: delay parsing until needed - {MA, P, AP, D} = decode(amqp10_framing:decode_bin(Bin), - {undefined, undefined, undefined, undefined}), + {MA, P, AP, D0} = decode(amqp10_framing:decode_bin(Bin), + {undefined, undefined, undefined, undefined}), + + D1 = case D0 of + Sections when is_list(D0) -> + lists:reverse(Sections); + _ -> + D0 + end, + #?MODULE{cfg = #cfg{}, msg = #msg{properties = P, application_properties = AP, message_annotations = MA, - data = D}}. + data = D1}}. decode([], Acc) -> Acc; @@ -66,13 +79,23 @@ decode([#'v1_0.properties'{} = P | Rem], {MA, _, AP, D}) -> decode(Rem, {MA, P, AP, D}); decode([#'v1_0.application_properties'{} = AP | Rem], {MA, P, _, D}) -> decode(Rem, {MA, P, AP, D}); -decode([#'v1_0.data'{} = D | Rem], {MA, P, AP, _}) -> - decode(Rem, {MA, P, AP, D}). +decode([#'v1_0.amqp_value'{} = D | Rem], {MA, P, AP, _}) -> + decode(Rem, {MA, P, AP, D}); +decode([#'v1_0.data'{} = D | Rem], {MA, P, AP, undefined}) -> + decode(Rem, {MA, P, AP, D}); +decode([#'v1_0.data'{} = D | Rem], {MA, P, AP, B}) when is_list(B) -> + decode(Rem, {MA, P, AP, [D | B]}); +decode([#'v1_0.data'{} = D | Rem], {MA, P, AP, B}) -> + decode(Rem, {MA, P, AP, [D, B]}); +decode([#'v1_0.amqp_sequence'{} = D | Rem], {MA, P, AP, undefined}) -> + decode(Rem, {MA, P, AP, [D]}); +decode([#'v1_0.amqp_sequence'{} = D | Rem], {MA, P, AP, B}) when is_list(B) -> + decode(Rem, {MA, P, AP, [D | B]}). + amqp10_properties_empty(#'v1_0.properties'{message_id = undefined, user_id = undefined, to = undefined, - % subject = wrap(utf8, RKey), reply_to = undefined, correlation_id = undefined, content_type = undefined, @@ -92,21 +115,35 @@ to_iodata(#?MODULE{msg = #msg{properties = P, case MA of #'v1_0.message_annotations'{content = []} -> <<>>; - _ -> - amqp10_framing:encode_bin(MA) + #'v1_0.message_annotations'{} -> + amqp10_framing:encode_bin(MA); + MsgAnnotBin -> + MsgAnnotBin end, - case amqp10_properties_empty(P) of - true -> <<>>; - false -> - amqp10_framing:encode_bin(P) + case P of + #'v1_0.properties'{} -> + case amqp10_properties_empty(P) of + true -> <<>>; + false -> + amqp10_framing:encode_bin(P) + end; + PropsBin -> + PropsBin end, case AP of #'v1_0.application_properties'{content = []} -> <<>>; - _ -> - amqp10_framing:encode_bin(AP) + #'v1_0.application_properties'{} -> + amqp10_framing:encode_bin(AP); + AppPropsBin -> + AppPropsBin end, - amqp10_framing:encode_bin(Data) + case Data of + DataBin when is_binary(Data) orelse is_list(Data) -> + DataBin; + _ -> + amqp10_framing:encode_bin(Data) + end ]. %% TODO: refine type spec here @@ -114,20 +151,31 @@ to_iodata(#?MODULE{msg = #msg{properties = P, state(). add_message_annotations(Anns, #?MODULE{msg = - #msg{message_annotations = MA0} = Msg} = State) -> + #msg{message_annotations = undefined} = Msg} = State) -> + add_message_annotations(Anns, + State#?MODULE{msg = Msg#msg{message_annotations = + #'v1_0.message_annotations'{content = []}}}); +add_message_annotations(Anns, + #?MODULE{msg = + #msg{message_annotations = + #'v1_0.message_annotations'{content = C}} = Msg} = State) -> Content = maps:fold( fun (K, {T, V}, Acc) -> map_add(symbol, K, T, V, Acc) end, - case MA0 of - undefined -> []; - #'v1_0.message_annotations'{content = C} -> C - end, + C, Anns), State#?MODULE{msg = Msg#msg{message_annotations = - #'v1_0.message_annotations'{content = Content}}}. + #'v1_0.message_annotations'{content = Content}}}; +add_message_annotations(Anns, + #?MODULE{msg = + #msg{message_annotations = MABin} = Msg} = State0) -> + [MA] = amqp10_framing:decode_bin(iolist_to_binary(MABin)), + State1 = State0#?MODULE{msg = + Msg#msg{message_annotations = MA}}, + add_message_annotations(Anns, State1). %% TODO: refine -type amqp10_term() :: {atom(), term()}. @@ -159,58 +207,111 @@ message_annotation(Key, %% parses it and returns the current parse state %% this is the input function from storage and from, e.g. socket input -spec from_amqp091(#'P_basic'{}, iodata()) -> state(). -from_amqp091(#'P_basic'{message_id = MsgId, - expiration = Expiration, - delivery_mode = DelMode, - headers = Headers, - user_id = UserId, - reply_to = ReplyTo, - type = Type, - priority = Priority, - app_id = AppId, - correlation_id = CorrId, - content_type = ContentType, - content_encoding = ContentEncoding, - timestamp = Timestamp - }, Data) -> - %% TODO: support parsing properties bin directly? +from_amqp091(#'P_basic'{type = T} = PB, Data) -> + MA = from_amqp091_to_amqp10_message_annotations(PB), + P = from_amqp091_to_amqp10_properties(PB), + AP = from_amqp091_to_amqp10_app_properties(PB), + + D = case T of + ?AMQP10_TYPE -> + %% the body is already AMQP 1.0 binary content, so leaving it as-is + Data; + _ -> + #'v1_0.data'{content = Data} + end, + + #?MODULE{cfg = #cfg{}, + msg = #msg{properties = P, + application_properties = AP, + message_annotations = MA, + data = D}}. + +from_amqp091_to_amqp10_properties(#'P_basic'{headers = Headers} = P) when is_list(Headers) -> + case proplists:lookup(?AMQP10_PROPERTIES_HEADER, Headers) of + none -> + convert_amqp091_to_amqp10_properties(P); + {_, _, PropsBin} -> + PropsBin + end; +from_amqp091_to_amqp10_properties(P) -> + convert_amqp091_to_amqp10_properties(P). + +convert_amqp091_to_amqp10_properties(#'P_basic'{message_id = MsgId, + user_id = UserId, + reply_to = ReplyTo, + correlation_id = CorrId, + content_type = ContentType, + content_encoding = ContentEncoding, + timestamp = Timestamp + }) -> ConvertedTs = case Timestamp of undefined -> undefined; _ -> Timestamp * 1000 end, - P = #'v1_0.properties'{message_id = wrap(utf8, MsgId), - user_id = wrap(binary, UserId), - to = undefined, - % subject = wrap(utf8, RKey), - reply_to = wrap(utf8, ReplyTo), - correlation_id = wrap(utf8, CorrId), - content_type = wrap(symbol, ContentType), - content_encoding = wrap(symbol, ContentEncoding), - creation_time = wrap(timestamp, ConvertedTs)}, + #'v1_0.properties'{message_id = wrap(utf8, MsgId), + user_id = wrap(binary, UserId), + to = undefined, + reply_to = wrap(utf8, ReplyTo), + correlation_id = wrap(utf8, CorrId), + content_type = wrap(symbol, ContentType), + content_encoding = wrap(symbol, ContentEncoding), + creation_time = wrap(timestamp, ConvertedTs)}. + +from_amqp091_to_amqp10_app_properties(#'P_basic'{headers = Headers} = P) + when is_list(Headers) -> + case proplists:lookup(?AMQP10_APP_PROPERTIES_HEADER, Headers) of + none -> + convert_amqp091_to_amqp10_app_properties(P); + {_, _, AppPropsBin} -> + AppPropsBin + end; +from_amqp091_to_amqp10_app_properties(P) -> + convert_amqp091_to_amqp10_app_properties(P). +convert_amqp091_to_amqp10_app_properties(#'P_basic'{headers = Headers, + type = Type, + app_id = AppId}) -> APC0 = [{wrap(utf8, K), from_091(T, V)} || {K, T, V} <- case Headers of undefined -> []; _ -> Headers - end, not unsupported_header_value_type(T)], + end, not unsupported_header_value_type(T), + not filtered_header(K)], + + APC1 = case Type of + ?AMQP10_TYPE -> + %% no need to modify the application properties for the type + %% this info will be restored on decoding if necessary + APC0; + _ -> + map_add(utf8, <<"x-basic-type">>, utf8, Type, APC0) + end, + %% properties that do not map directly to AMQP 1.0 properties are stored %% in application properties - APC = map_add(utf8, <<"x-basic-type">>, utf8, Type, - map_add(utf8, <<"x-basic-app-id">>, utf8, AppId, APC0)), + APC2 = map_add(utf8, <<"x-basic-app-id">>, utf8, AppId, APC1), + #'v1_0.application_properties'{content = APC2}. + +from_amqp091_to_amqp10_message_annotations(#'P_basic'{headers = Headers} = P) when is_list(Headers) -> + case proplists:lookup(?AMQP10_MESSAGE_ANNOTATIONS_HEADER, Headers) of + none -> + convert_amqp091_to_amqp10_message_annotations(P); + {_, _, MessageAnnotationsBin} -> + MessageAnnotationsBin + end; +from_amqp091_to_amqp10_message_annotations(P) -> + convert_amqp091_to_amqp10_message_annotations(P). +convert_amqp091_to_amqp10_message_annotations(#'P_basic'{priority = Priority, + delivery_mode = DelMode, + expiration = Expiration}) -> MAC = map_add(symbol, <<"x-basic-priority">>, ubyte, Priority, map_add(symbol, <<"x-basic-delivery-mode">>, ubyte, DelMode, map_add(symbol, <<"x-basic-expiration">>, utf8, Expiration, []))), - AP = #'v1_0.application_properties'{content = APC}, - MA = #'v1_0.message_annotations'{content = MAC}, - #?MODULE{cfg = #cfg{}, - msg = #msg{properties = P, - application_properties = AP, - message_annotations = MA, - data = #'v1_0.data'{content = Data}}}. + #'v1_0.message_annotations'{content = MAC}. map_add(_T, _Key, _Type, undefined, Acc) -> Acc; @@ -223,12 +324,22 @@ to_amqp091(#?MODULE{msg = #msg{properties = P, message_annotations = MAR, data = Data}}) -> - Payload = case Data of - undefined -> - <<>>; - #'v1_0.data'{content = C} -> - C - end, + %% anything else than a single data section is expected to be AMQP 1.0 binary content + %% enforcing this convention + {Payload, IsAmqp10} = case Data of + undefined -> + %% not an expected value, + %% but handling it with an empty binary anyway + {<<>>, false}; + #'v1_0.data'{content = C} -> + {C, false}; + Sections when is_list(Data)-> + B = [amqp10_framing:encode_bin(S) || S <- Sections], + {iolist_to_binary(B), + true}; + V -> + {iolist_to_binary(amqp10_framing:encode_bin(V)), true} + end, #'v1_0.properties'{message_id = MsgId, user_id = UserId, @@ -252,7 +363,12 @@ to_amqp091(#?MODULE{msg = #msg{properties = P, _ -> [] end, - {Type, AP1} = amqp10_map_get(utf8(<<"x-basic-type">>), AP0), + {Type, AP1} = case {amqp10_map_get(utf8(<<"x-basic-type">>), AP0), IsAmqp10} of + {{undefined, M}, true} -> + {?AMQP10_TYPE, M}; + {{T, M}, _} -> + {T, M} + end, {AppId, AP} = amqp10_map_get(utf8(<<"x-basic-app-id">>), AP1), {Priority, MA1} = amqp10_map_get(symbol(<<"x-basic-priority">>), MA0), @@ -390,12 +506,21 @@ message_id({utf8, S}, HKey, H0) -> message_id(MsgId, _, H) -> {H, unwrap(MsgId)}. - unsupported_header_value_type(array) -> - true; - unsupported_header_value_type(table) -> - true; - unsupported_header_value_type(_) -> - false. +unsupported_header_value_type(array) -> + true; +unsupported_header_value_type(table) -> + true; +unsupported_header_value_type(_) -> + false. + +filtered_header(?AMQP10_PROPERTIES_HEADER) -> + true; +filtered_header(?AMQP10_APP_PROPERTIES_HEADER) -> + true; +filtered_header(?AMQP10_MESSAGE_ANNOTATIONS_HEADER) -> + true; +filtered_header(_) -> + false. -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). diff --git a/deps/rabbit/test/rabbit_msg_record_SUITE.erl b/deps/rabbit/test/rabbit_msg_record_SUITE.erl index 5b25677a86c2..9337a126f8c7 100644 --- a/deps/rabbit/test/rabbit_msg_record_SUITE.erl +++ b/deps/rabbit/test/rabbit_msg_record_SUITE.erl @@ -23,12 +23,14 @@ all() -> all_tests() -> [ ampq091_roundtrip, + amqp10_non_single_data_bodies, unsupported_091_header_is_dropped, message_id_ulong, message_id_uuid, message_id_binary, message_id_large_binary, - message_id_large_string + message_id_large_string, + reuse_amqp10_binary_chunks ]. groups() -> @@ -91,6 +93,43 @@ ampq091_roundtrip(_Config) -> test_amqp091_roundtrip(#'P_basic'{}, Payload), ok. +amqp10_non_single_data_bodies(_Config) -> + Props = #'P_basic'{type = <<"amqp-1.0">>}, + Payloads = [ + [#'v1_0.data'{content = <<"hello">>}, + #'v1_0.data'{content = <<"brave">>}, + #'v1_0.data'{content = <<"new">>}, + #'v1_0.data'{content = <<"world">>} + ], + #'v1_0.amqp_value'{content = {utf8, <<"hello world">>}}, + [#'v1_0.amqp_sequence'{content = [{utf8, <<"one">>}, + {utf8, <<"blah">>}]}, + #'v1_0.amqp_sequence'{content = [{utf8, <<"two">>}]} + ] + ], + + [begin + EncodedPayload = amqp10_encode_bin(Payload), + + MsgRecord0 = rabbit_msg_record:from_amqp091(Props, EncodedPayload), + MsgRecord = rabbit_msg_record:init( + iolist_to_binary(rabbit_msg_record:to_iodata(MsgRecord0))), + {PropsOut, PayloadEncodedOut} = rabbit_msg_record:to_amqp091(MsgRecord), + PayloadOut = case amqp10_framing:decode_bin(iolist_to_binary(PayloadEncodedOut)) of + L when length(L) =:= 1 -> + lists:nth(1, L); + L -> + L + end, + + ?assertEqual(Props, PropsOut), + ?assertEqual(iolist_to_binary(EncodedPayload), + iolist_to_binary(PayloadEncodedOut)), + ?assertEqual(Payload, PayloadOut) + + end || Payload <- Payloads], + ok. + unsupported_091_header_is_dropped(_Config) -> Props = #'P_basic'{ headers = [ @@ -214,6 +253,37 @@ message_id_large_string(_Config) -> Props), ok. +reuse_amqp10_binary_chunks(_Config) -> + Amqp10MsgAnnotations = #'v1_0.message_annotations'{content = + [{{symbol, <<"x-route">>}, {utf8, <<"dummy">>}}]}, + Amqp10MsgAnnotationsBin = amqp10_encode_bin(Amqp10MsgAnnotations), + Amqp10Props = #'v1_0.properties'{group_id = {utf8, <<"my-group">>}, + group_sequence = {uint, 42}}, + Amqp10PropsBin = amqp10_encode_bin(Amqp10Props), + Amqp10AppProps = #'v1_0.application_properties'{content = [{{utf8, <<"foo">>}, {utf8, <<"bar">>}}]}, + Amqp10AppPropsBin = amqp10_encode_bin(Amqp10AppProps), + Amqp091Headers = [{<<"x-amqp-1.0-message-annotations">>, longstr, Amqp10MsgAnnotationsBin}, + {<<"x-amqp-1.0-properties">>, longstr, Amqp10PropsBin}, + {<<"x-amqp-1.0-app-properties">>, longstr, Amqp10AppPropsBin}], + Amqp091Props = #'P_basic'{type= <<"amqp-1.0">>, headers = Amqp091Headers}, + Body = #'v1_0.amqp_value'{content = {utf8, <<"hello world">>}}, + EncodedBody = amqp10_encode_bin(Body), + R = rabbit_msg_record:from_amqp091(Amqp091Props, EncodedBody), + RBin = rabbit_msg_record:to_iodata(R), + Amqp10DecodedMsg = amqp10_framing:decode_bin(iolist_to_binary(RBin)), + [Amqp10DecodedMsgAnnotations, Amqp10DecodedProps, + Amqp10DecodedAppProps, DecodedBody] = Amqp10DecodedMsg, + ?assertEqual(Amqp10MsgAnnotations, Amqp10DecodedMsgAnnotations), + ?assertEqual(Amqp10Props, Amqp10DecodedProps), + ?assertEqual(Amqp10AppProps, Amqp10DecodedAppProps), + ?assertEqual(Body, DecodedBody), + ok. + +amqp10_encode_bin(L) when is_list(L) -> + iolist_to_binary([amqp10_encode_bin(X) || X <- L]); +amqp10_encode_bin(X) -> + iolist_to_binary(amqp10_framing:encode_bin(X)). + %% Utility test_amqp091_roundtrip(Props, Payload) -> @@ -226,5 +296,3 @@ test_amqp091_roundtrip(Props, Payload) -> ?assertEqual(iolist_to_binary(Payload), iolist_to_binary(PayloadOut)), ok. - - diff --git a/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_message.erl b/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_message.erl index 97b893c17f7f..c8c07af57a30 100644 --- a/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_message.erl +++ b/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_message.erl @@ -23,9 +23,16 @@ -include("rabbit_amqp1_0.hrl"). assemble(MsgBin) -> - {RKey, Props, Content} = assemble(header, {<<"">>, #'P_basic'{}, []}, + {RKey, Props, Content0} = assemble(header, {<<"">>, #'P_basic'{}, []}, decode_section(MsgBin), MsgBin), - {RKey, #amqp_msg{props = Props, payload = Content}}. + + Content1 = case Content0 of + Sections when is_list(Content0) -> + lists:reverse(Sections); + _ -> + Content0 + end, + {RKey, #amqp_msg{props = Props, payload = Content1}}. assemble(header, {R, P, C}, {H = #'v1_0.header'{}, Rest}, _Uneaten) -> assemble(message_annotations, {R, translate_header(H, P), C},