diff --git a/deps/amqp10_client/src/amqp10_msg.erl b/deps/amqp10_client/src/amqp10_msg.erl index 8633854878b7..ac8b9f2a4ba9 100644 --- a/deps/amqp10_client/src/amqp10_msg.erl +++ b/deps/amqp10_client/src/amqp10_msg.erl @@ -266,16 +266,18 @@ body_bin(#amqp10_msg{body = #'v1_0.amqp_value'{} = Body}) -> %% following stucture: %% {amqp10_disposition, {accepted | rejected, DeliveryTag}} -spec new(delivery_tag(), amqp10_body() | binary(), boolean()) -> amqp10_msg(). -new(DeliveryTag, Body, Settled) when is_binary(Body) -> - #amqp10_msg{transfer = #'v1_0.transfer'{delivery_tag = {binary, DeliveryTag}, - settled = Settled, - message_format = {uint, ?MESSAGE_FORMAT}}, - body = [#'v1_0.data'{content = Body}]}; +new(DeliveryTag, Bin, Settled) when is_binary(Bin) -> + Body = [#'v1_0.data'{content = Bin}], + new(DeliveryTag, Body, Settled); new(DeliveryTag, Body, Settled) -> % TODO: constrain to amqp types - #amqp10_msg{transfer = #'v1_0.transfer'{delivery_tag = {binary, DeliveryTag}, - settled = Settled, - message_format = {uint, ?MESSAGE_FORMAT}}, - body = Body}. + #amqp10_msg{ + transfer = #'v1_0.transfer'{ + delivery_tag = {binary, DeliveryTag}, + settled = Settled, + message_format = {uint, ?MESSAGE_FORMAT}}, + %% This lib is safe by default. + header = #'v1_0.header'{durable = true}, + body = Body}. %% @doc Create a new settled amqp10 message using the specified delivery tag %% and body. @@ -283,7 +285,6 @@ new(DeliveryTag, Body, Settled) -> % TODO: constrain to amqp types new(DeliveryTag, Body) -> new(DeliveryTag, Body, false). - % First 3 octets are the format % the last 1 octet is the version % See 2.8.11 in the spec diff --git a/deps/rabbit/src/mc_amqp.erl b/deps/rabbit/src/mc_amqp.erl index 63f6e37e5eb9..1c1c3b9d7f22 100644 --- a/deps/rabbit/src/mc_amqp.erl +++ b/deps/rabbit/src/mc_amqp.erl @@ -251,30 +251,29 @@ routing_headers(Msg, Opts) -> List = application_properties_as_simple_map(Msg, X), maps:from_list(List). -get_property(durable, Msg) -> - case Msg of - #msg_body_encoded{header = #'v1_0.header'{durable = Durable}} - when is_boolean(Durable) -> - Durable; +get_property(durable, #msg_body_encoded{header = Header} = Msg) -> + case Header of + #'v1_0.header'{durable = D} when is_boolean(D) -> + D; _ -> %% fallback in case the source protocol was old AMQP 0.9.1 case message_annotation(<<"x-basic-delivery-mode">>, Msg, undefined) of - {ubyte, 1} -> - false; + {ubyte, 2} -> + true; _ -> - true + false end end; -get_property(timestamp, Msg) -> - case Msg of - #msg_body_encoded{properties = #'v1_0.properties'{creation_time = {timestamp, Ts}}} -> +get_property(timestamp, #msg_body_encoded{properties = Properties}) -> + case Properties of + #'v1_0.properties'{creation_time = {timestamp, Ts}} -> Ts; _ -> undefined end; -get_property(ttl, Msg) -> - case Msg of - #msg_body_encoded{header = #'v1_0.header'{ttl = {uint, Ttl}}} -> +get_property(ttl, #msg_body_encoded{header = Header} = Msg) -> + case Header of + #'v1_0.header'{ttl = {uint, Ttl}} -> Ttl; _ -> %% fallback in case the source protocol was AMQP 0.9.1 @@ -286,9 +285,9 @@ get_property(ttl, Msg) -> undefined end end; -get_property(priority, Msg) -> - case Msg of - #msg_body_encoded{header = #'v1_0.header'{priority = {ubyte, Priority}}} -> +get_property(priority, #msg_body_encoded{header = Header} = Msg) -> + case Header of + #'v1_0.header'{priority = {ubyte, Priority}} -> Priority; _ -> %% fallback in case the source protocol was AMQP 0.9.1 @@ -319,10 +318,7 @@ protocol_state(#msg_body_encoded{header = Header0, [encode(Sections), BareAndFooter]; protocol_state(#v1{message_annotations = MA0, bare_and_footer = BareAndFooter}, Anns) -> - Durable = case Anns of - #{?ANN_DURABLE := D} -> D; - _ -> true - end, + Durable = maps:get(?ANN_DURABLE, Anns, true), Priority = case Anns of #{?ANN_PRIORITY := P} when is_integer(P) -> @@ -667,7 +663,9 @@ binary_part_bare_and_footer(Payload, Start) -> binary_part(Payload, Start, byte_size(Payload) - Start). update_header_from_anns(undefined, Anns) -> - update_header_from_anns(#'v1_0.header'{durable = true}, Anns); + Durable = maps:get(?ANN_DURABLE, Anns, true), + Header = #'v1_0.header'{durable = Durable}, + update_header_from_anns(Header, Anns); update_header_from_anns(Header, Anns) -> DeliveryCount = case Anns of #{delivery_count := C} -> C; diff --git a/deps/rabbit/test/amqp_client_SUITE.erl b/deps/rabbit/test/amqp_client_SUITE.erl index 27a6f357d027..6c9e26bd3995 100644 --- a/deps/rabbit/test/amqp_client_SUITE.erl +++ b/deps/rabbit/test/amqp_client_SUITE.erl @@ -58,6 +58,9 @@ groups() -> sender_settle_mode_unsettled, sender_settle_mode_unsettled_fanout, sender_settle_mode_mixed, + durable_field_classic_queue, + durable_field_quorum_queue, + durable_field_stream, invalid_transfer_settled_flag, quorum_queue_rejects, receiver_settle_mode_first, @@ -916,6 +919,77 @@ sender_settle_mode_mixed(Config) -> rabbitmq_amqp_client:delete_queue(LinkPair, QName)), ok = close(Init). +durable_field_classic_queue(Config) -> + QName = atom_to_binary(?FUNCTION_NAME), + durable_field(Config, <<"classic">>, QName). + +durable_field_quorum_queue(Config) -> + QName = atom_to_binary(?FUNCTION_NAME), + durable_field(Config, <<"quorum">>, QName). + +durable_field_stream(Config) -> + QName = atom_to_binary(?FUNCTION_NAME), + durable_field(Config, <<"stream">>, QName). + +durable_field(Config, QueueType, QName) + when is_binary(QueueType) -> + Address = rabbitmq_amqp_address:queue(QName), + {_Connection, Session, LinkPair} = Init = init(Config), + QProps = #{arguments => #{<<"x-queue-type">> => {utf8, QueueType}}}, + {ok, #{type := QueueType}} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, QProps), + {ok, Sender} = amqp10_client:attach_sender_link( + Session, <<"test-sender">>, Address, unsettled), + wait_for_credit(Sender), + + ok = amqp10_client:send_msg(Sender, + amqp10_msg:set_headers( + #{durable => true}, + amqp10_msg:new(<<"t1">>, <<"durable">>))), + ok = amqp10_client:send_msg(Sender, + amqp10_msg:set_headers( + #{durable => false}, + amqp10_msg:new(<<"t2">>, <<"non-durable">>))), + %% Even though the AMQP spec defines durable=false as default + %% (i.e. durable is false if the field is omitted on the wire), + %% we expect our AMQP Erlang library to be safe by default, + %% and therefore send the message as durable=true on behalf of us. + ok = amqp10_client:send_msg( + Sender, amqp10_msg:new(<<"t3">>, <<"lib publishes as durable by default">>)), + %% When we expliclitly publish without a header section, RabbitMQ should interpret + %% durable as false according to the AMQP spec. + ok = amqp10_client:send_msg( + Sender, + amqp10_msg:from_amqp_records( + [#'v1_0.transfer'{delivery_tag = {binary, <<"t4">>}, + settled = false, + message_format = {uint, 0}}, + #'v1_0.data'{content = <<"publish without header section">>}])), + + ok = wait_for_accepts(4), + ok = detach_link_sync(Sender), + flush(sent), + + Filter = consume_from_first(QueueType), + {ok, Receiver} = amqp10_client:attach_receiver_link( + Session, <<"test-receiver">>, Address, unsettled, + none, Filter), + + ok = amqp10_client:flow_link_credit(Receiver, 4, never), + [M1, M2, M3, M4] = Msgs = receive_messages(Receiver, 4), + ?assertEqual(<<"durable">>, amqp10_msg:body_bin(M1)), + ?assertMatch(#{durable := true}, amqp10_msg:headers(M1)), + ?assertEqual(<<"non-durable">>, amqp10_msg:body_bin(M2)), + ?assertMatch(#{durable := false}, amqp10_msg:headers(M2)), + ?assertEqual(<<"lib publishes as durable by default">>, amqp10_msg:body_bin(M3)), + ?assertMatch(#{durable := true}, amqp10_msg:headers(M3)), + ?assertEqual(<<"publish without header section">>, amqp10_msg:body_bin(M4)), + ?assertMatch(#{durable := false}, amqp10_msg:headers(M4)), + [ok = amqp10_client:accept_msg(Receiver, M) || M <- Msgs], + + ok = detach_link_sync(Receiver), + {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName), + close(Init). + invalid_transfer_settled_flag(Config) -> OpnConf = connection_config(Config), {ok, Connection} = amqp10_client:open_connection(OpnConf), @@ -1301,7 +1375,7 @@ amqp_amqpl(QType, Config) -> Body6 = [#'v1_0.data'{content = <<0, 1>>}, #'v1_0.data'{content = <<2, 3>>}], - %% Send only body sections + %% Send only header and body sections [ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<>>, Body, true)) || Body <- [Body1, Body2, Body3, Body4, Body5, Body6]], %% Send with application-properties @@ -1342,6 +1416,11 @@ amqp_amqpl(QType, Config) -> #{<<"x-array">> => {array, utf8, [{utf8, <<"e1">>}, {utf8, <<"e2">>}]}}, amqp10_msg:new(<<>>, Body1, true))), + ok = amqp10_client:send_msg( + Sender, + amqp10_msg:set_headers( + #{durable => false}, + amqp10_msg:new(<<>>, Body1, true))), ok = amqp10_client:detach_link(Sender), flush(detached), @@ -1365,8 +1444,10 @@ amqp_amqpl(QType, Config) -> receive {#'basic.deliver'{consumer_tag = CTag, redelivered = false}, #amqp_msg{payload = Payload1, - props = #'P_basic'{type = <<"amqp-1.0">>}}} -> - ?assertEqual([Body1], amqp10_framing:decode_bin(Payload1)) + props = #'P_basic'{delivery_mode = DelMode2, + type = <<"amqp-1.0">>}}} -> + ?assertEqual([Body1], amqp10_framing:decode_bin(Payload1)), + ?assertEqual(2, DelMode2) after 30000 -> ct:fail({missing_deliver, ?LINE}) end, receive {_, #amqp_msg{payload = Payload2, @@ -1428,6 +1509,12 @@ amqp_amqpl(QType, Config) -> rabbit_misc:table_lookup(Headers11, <<"x-array">>)) after 30000 -> ct:fail({missing_deliver, ?LINE}) end, + receive {_, #amqp_msg{payload = Payload12, + props = #'P_basic'{delivery_mode = DelMode1}}} -> + ?assertEqual([Body1], amqp10_framing:decode_bin(Payload12)), + ?assertNotEqual(2, DelMode1) + after 30000 -> ct:fail({missing_deliver, ?LINE}) + end, ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch), {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName), @@ -1514,10 +1601,17 @@ amqp091_to_amqp10_header_conversion(Session, Ch, QName, Address) -> amqp_channel:cast( Ch, #'basic.publish'{routing_key = QName}, - #amqp_msg{props = #'P_basic'{headers = Amqp091Headers}, + #amqp_msg{props = #'P_basic'{delivery_mode = 2, + priority = 5, + headers = Amqp091Headers}, payload = <<"foobar">>}), {ok, [Msg]} = drain_queue(Session, Address, 1), + + ?assertMatch(#{durable := true, + priority := 5}, + amqp10_msg:headers(Msg)), + Amqp10MA = amqp10_msg:message_annotations(Msg), ?assertEqual(<<"my-string">>, maps:get(<<"x-string">>, Amqp10MA, undefined)), ?assertEqual(92, maps:get(<<"x-int">>, Amqp10MA, undefined)), @@ -3278,7 +3372,7 @@ max_message_size_client_to_server(Config) -> {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, Address, mixed), ok = wait_for_credit(Sender), - PayloadSmallEnough = binary:copy(<<0>>, MaxMessageSize - 10), + PayloadSmallEnough = binary:copy(<<0>>, MaxMessageSize - 20), ?assertEqual(ok, amqp10_client:send_msg(Sender, amqp10_msg:new(<<"t1">>, PayloadSmallEnough, false))), ok = wait_for_accepted(<<"t1">>), diff --git a/release-notes/4.2.0.md b/release-notes/4.2.0.md index 2534cd59214c..e797c31175f4 100644 --- a/release-notes/4.2.0.md +++ b/release-notes/4.2.0.md @@ -3,6 +3,21 @@ RabbitMQ 4.2.0 is a new feature release. +## Breaking Changes and Compatibility Notes + +### Default value for AMQP 1.0 `durable` field. + +Starting with RabbitMQ 4.2, if a sending client omits the header section, RabbitMQ [assumes](https://github.com/rabbitmq/rabbitmq-server/pull/13918) the `durable` field to be `false` complying with the AMQP 1.0 spec: +``` + +``` + +AMQP 1.0 apps or client libraries must set the `durable` field of the header section to `true` to mark the message as durable. + +Team RabbitMQ recommends client libraries to send messages as durable by default. +All AMQP 1.0 client libraries [maintained by Team RabbitMQ](https://www.rabbitmq.com/client-libraries/amqp-client-libraries) send messages as durable by default. + + ## Features ### Incoming and Outgoing Message Interceptors for native protocols