From f8f1396c90748a8fcce9d59f54ccd63db7a22280 Mon Sep 17 00:00:00 2001 From: David Ansari Date: Mon, 19 May 2025 14:44:04 +0200 Subject: [PATCH 1/2] Follow AMQP spec for durable field The AMQP spec defines: ``` ``` RabbitMQ 4.0 and 4.1 interpret the durable field as true if not set. The idea was to favour safety over performance. This complies with the AMQP spec because the spec allows other target or node specific defaults for the durable field: > If the header section is omitted the receiver MUST assume the appropriate > default values (or the meaning implied by no value being set) for the fields > within the header unless other target or node specific defaults have otherwise > been set. However, some client libraries completely omit the header section if the app expliclity sets durable=false. This complies with the spec, but it means that RabbitMQ cannot diffentiate between "client app forgot to set the durable field" vs "client lib opted in for an optimisation omitting the header section". This is problematic with JMS message selectors where JMS apps can filter on JMSDeliveryMode. To be able to correctly filter on JMSDeliveryMode, RabbitMQ needs to know whether the JMS app sent the message as PERSISTENT or NON_PERSISTENT. Rather than relying on client libs to always send the header section including the durable field, this commit makes RabbitMQ comply with the default value for durable in the AMQP spec. Some client lib maintainers accepted to send the header section, while other maintainers refused to do so: https://github.com/Azure/go-amqp/issues/330 https://issues.apache.org/jira/browse/QPIDJMS-608 Likely the AMQP spec was designed to omit the header section when performance is important, as is the case with durable=false. Omitting the header section means saving a few bytes per message on the wire and some marshalling and unmarshalling overhead on both client and server. Therefore, it's better to push the "safe by default" behaviour from the broker back to the client libs. Client libs should send messages as durable by default unless the client app expliclity opts in to send messages as non-durable. This is also what JMS does: By default JMS apps send messages as PERSISTENT: > The message producer's default delivery mode is PERSISTENT. Therefore, this commit also makes the AMQP Erlang client send messages as durable, by default. This commit will apply to RabbitMQ 4.2. It's arguably not a breaking change because in RabbitMQ, message durability is actually more determined by the queue type the message is sent to rather than the durable field of the message: * Quroum queues and streams store messages durably (fsync or replicate) no matter what the durable field is * MQTT QoS 0 queues hold messages in memory no matter what the durable field is * Classic queues do not fsync even if the durable field is set to true In addition, the RabbitMQ AMQP Java library introduced in RabbitMQ 4.0 sends messages with durable=true: https://github.com/rabbitmq/rabbitmq-amqp-java-client/blob/53e3dd6abbcbce8ca4f2257da56b314786b037cc/src/main/java/com/rabbitmq/client/amqp/impl/AmqpPublisher.java#L91 The tests for selecting messages by JMSDeliveryMode relying on the behaviour in this commit can be found on the `jms` branch. --- deps/amqp10_client/src/amqp10_msg.erl | 21 ++--- deps/rabbit/src/mc_amqp.erl | 42 +++++----- deps/rabbit/test/amqp_client_SUITE.erl | 104 +++++++++++++++++++++++-- 3 files changed, 130 insertions(+), 37 deletions(-) diff --git a/deps/amqp10_client/src/amqp10_msg.erl b/deps/amqp10_client/src/amqp10_msg.erl index 8633854878b7..ac8b9f2a4ba9 100644 --- a/deps/amqp10_client/src/amqp10_msg.erl +++ b/deps/amqp10_client/src/amqp10_msg.erl @@ -266,16 +266,18 @@ body_bin(#amqp10_msg{body = #'v1_0.amqp_value'{} = Body}) -> %% following stucture: %% {amqp10_disposition, {accepted | rejected, DeliveryTag}} -spec new(delivery_tag(), amqp10_body() | binary(), boolean()) -> amqp10_msg(). -new(DeliveryTag, Body, Settled) when is_binary(Body) -> - #amqp10_msg{transfer = #'v1_0.transfer'{delivery_tag = {binary, DeliveryTag}, - settled = Settled, - message_format = {uint, ?MESSAGE_FORMAT}}, - body = [#'v1_0.data'{content = Body}]}; +new(DeliveryTag, Bin, Settled) when is_binary(Bin) -> + Body = [#'v1_0.data'{content = Bin}], + new(DeliveryTag, Body, Settled); new(DeliveryTag, Body, Settled) -> % TODO: constrain to amqp types - #amqp10_msg{transfer = #'v1_0.transfer'{delivery_tag = {binary, DeliveryTag}, - settled = Settled, - message_format = {uint, ?MESSAGE_FORMAT}}, - body = Body}. + #amqp10_msg{ + transfer = #'v1_0.transfer'{ + delivery_tag = {binary, DeliveryTag}, + settled = Settled, + message_format = {uint, ?MESSAGE_FORMAT}}, + %% This lib is safe by default. + header = #'v1_0.header'{durable = true}, + body = Body}. %% @doc Create a new settled amqp10 message using the specified delivery tag %% and body. @@ -283,7 +285,6 @@ new(DeliveryTag, Body, Settled) -> % TODO: constrain to amqp types new(DeliveryTag, Body) -> new(DeliveryTag, Body, false). - % First 3 octets are the format % the last 1 octet is the version % See 2.8.11 in the spec diff --git a/deps/rabbit/src/mc_amqp.erl b/deps/rabbit/src/mc_amqp.erl index 63f6e37e5eb9..1c1c3b9d7f22 100644 --- a/deps/rabbit/src/mc_amqp.erl +++ b/deps/rabbit/src/mc_amqp.erl @@ -251,30 +251,29 @@ routing_headers(Msg, Opts) -> List = application_properties_as_simple_map(Msg, X), maps:from_list(List). -get_property(durable, Msg) -> - case Msg of - #msg_body_encoded{header = #'v1_0.header'{durable = Durable}} - when is_boolean(Durable) -> - Durable; +get_property(durable, #msg_body_encoded{header = Header} = Msg) -> + case Header of + #'v1_0.header'{durable = D} when is_boolean(D) -> + D; _ -> %% fallback in case the source protocol was old AMQP 0.9.1 case message_annotation(<<"x-basic-delivery-mode">>, Msg, undefined) of - {ubyte, 1} -> - false; + {ubyte, 2} -> + true; _ -> - true + false end end; -get_property(timestamp, Msg) -> - case Msg of - #msg_body_encoded{properties = #'v1_0.properties'{creation_time = {timestamp, Ts}}} -> +get_property(timestamp, #msg_body_encoded{properties = Properties}) -> + case Properties of + #'v1_0.properties'{creation_time = {timestamp, Ts}} -> Ts; _ -> undefined end; -get_property(ttl, Msg) -> - case Msg of - #msg_body_encoded{header = #'v1_0.header'{ttl = {uint, Ttl}}} -> +get_property(ttl, #msg_body_encoded{header = Header} = Msg) -> + case Header of + #'v1_0.header'{ttl = {uint, Ttl}} -> Ttl; _ -> %% fallback in case the source protocol was AMQP 0.9.1 @@ -286,9 +285,9 @@ get_property(ttl, Msg) -> undefined end end; -get_property(priority, Msg) -> - case Msg of - #msg_body_encoded{header = #'v1_0.header'{priority = {ubyte, Priority}}} -> +get_property(priority, #msg_body_encoded{header = Header} = Msg) -> + case Header of + #'v1_0.header'{priority = {ubyte, Priority}} -> Priority; _ -> %% fallback in case the source protocol was AMQP 0.9.1 @@ -319,10 +318,7 @@ protocol_state(#msg_body_encoded{header = Header0, [encode(Sections), BareAndFooter]; protocol_state(#v1{message_annotations = MA0, bare_and_footer = BareAndFooter}, Anns) -> - Durable = case Anns of - #{?ANN_DURABLE := D} -> D; - _ -> true - end, + Durable = maps:get(?ANN_DURABLE, Anns, true), Priority = case Anns of #{?ANN_PRIORITY := P} when is_integer(P) -> @@ -667,7 +663,9 @@ binary_part_bare_and_footer(Payload, Start) -> binary_part(Payload, Start, byte_size(Payload) - Start). update_header_from_anns(undefined, Anns) -> - update_header_from_anns(#'v1_0.header'{durable = true}, Anns); + Durable = maps:get(?ANN_DURABLE, Anns, true), + Header = #'v1_0.header'{durable = Durable}, + update_header_from_anns(Header, Anns); update_header_from_anns(Header, Anns) -> DeliveryCount = case Anns of #{delivery_count := C} -> C; diff --git a/deps/rabbit/test/amqp_client_SUITE.erl b/deps/rabbit/test/amqp_client_SUITE.erl index 27a6f357d027..6c9e26bd3995 100644 --- a/deps/rabbit/test/amqp_client_SUITE.erl +++ b/deps/rabbit/test/amqp_client_SUITE.erl @@ -58,6 +58,9 @@ groups() -> sender_settle_mode_unsettled, sender_settle_mode_unsettled_fanout, sender_settle_mode_mixed, + durable_field_classic_queue, + durable_field_quorum_queue, + durable_field_stream, invalid_transfer_settled_flag, quorum_queue_rejects, receiver_settle_mode_first, @@ -916,6 +919,77 @@ sender_settle_mode_mixed(Config) -> rabbitmq_amqp_client:delete_queue(LinkPair, QName)), ok = close(Init). +durable_field_classic_queue(Config) -> + QName = atom_to_binary(?FUNCTION_NAME), + durable_field(Config, <<"classic">>, QName). + +durable_field_quorum_queue(Config) -> + QName = atom_to_binary(?FUNCTION_NAME), + durable_field(Config, <<"quorum">>, QName). + +durable_field_stream(Config) -> + QName = atom_to_binary(?FUNCTION_NAME), + durable_field(Config, <<"stream">>, QName). + +durable_field(Config, QueueType, QName) + when is_binary(QueueType) -> + Address = rabbitmq_amqp_address:queue(QName), + {_Connection, Session, LinkPair} = Init = init(Config), + QProps = #{arguments => #{<<"x-queue-type">> => {utf8, QueueType}}}, + {ok, #{type := QueueType}} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, QProps), + {ok, Sender} = amqp10_client:attach_sender_link( + Session, <<"test-sender">>, Address, unsettled), + wait_for_credit(Sender), + + ok = amqp10_client:send_msg(Sender, + amqp10_msg:set_headers( + #{durable => true}, + amqp10_msg:new(<<"t1">>, <<"durable">>))), + ok = amqp10_client:send_msg(Sender, + amqp10_msg:set_headers( + #{durable => false}, + amqp10_msg:new(<<"t2">>, <<"non-durable">>))), + %% Even though the AMQP spec defines durable=false as default + %% (i.e. durable is false if the field is omitted on the wire), + %% we expect our AMQP Erlang library to be safe by default, + %% and therefore send the message as durable=true on behalf of us. + ok = amqp10_client:send_msg( + Sender, amqp10_msg:new(<<"t3">>, <<"lib publishes as durable by default">>)), + %% When we expliclitly publish without a header section, RabbitMQ should interpret + %% durable as false according to the AMQP spec. + ok = amqp10_client:send_msg( + Sender, + amqp10_msg:from_amqp_records( + [#'v1_0.transfer'{delivery_tag = {binary, <<"t4">>}, + settled = false, + message_format = {uint, 0}}, + #'v1_0.data'{content = <<"publish without header section">>}])), + + ok = wait_for_accepts(4), + ok = detach_link_sync(Sender), + flush(sent), + + Filter = consume_from_first(QueueType), + {ok, Receiver} = amqp10_client:attach_receiver_link( + Session, <<"test-receiver">>, Address, unsettled, + none, Filter), + + ok = amqp10_client:flow_link_credit(Receiver, 4, never), + [M1, M2, M3, M4] = Msgs = receive_messages(Receiver, 4), + ?assertEqual(<<"durable">>, amqp10_msg:body_bin(M1)), + ?assertMatch(#{durable := true}, amqp10_msg:headers(M1)), + ?assertEqual(<<"non-durable">>, amqp10_msg:body_bin(M2)), + ?assertMatch(#{durable := false}, amqp10_msg:headers(M2)), + ?assertEqual(<<"lib publishes as durable by default">>, amqp10_msg:body_bin(M3)), + ?assertMatch(#{durable := true}, amqp10_msg:headers(M3)), + ?assertEqual(<<"publish without header section">>, amqp10_msg:body_bin(M4)), + ?assertMatch(#{durable := false}, amqp10_msg:headers(M4)), + [ok = amqp10_client:accept_msg(Receiver, M) || M <- Msgs], + + ok = detach_link_sync(Receiver), + {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName), + close(Init). + invalid_transfer_settled_flag(Config) -> OpnConf = connection_config(Config), {ok, Connection} = amqp10_client:open_connection(OpnConf), @@ -1301,7 +1375,7 @@ amqp_amqpl(QType, Config) -> Body6 = [#'v1_0.data'{content = <<0, 1>>}, #'v1_0.data'{content = <<2, 3>>}], - %% Send only body sections + %% Send only header and body sections [ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<>>, Body, true)) || Body <- [Body1, Body2, Body3, Body4, Body5, Body6]], %% Send with application-properties @@ -1342,6 +1416,11 @@ amqp_amqpl(QType, Config) -> #{<<"x-array">> => {array, utf8, [{utf8, <<"e1">>}, {utf8, <<"e2">>}]}}, amqp10_msg:new(<<>>, Body1, true))), + ok = amqp10_client:send_msg( + Sender, + amqp10_msg:set_headers( + #{durable => false}, + amqp10_msg:new(<<>>, Body1, true))), ok = amqp10_client:detach_link(Sender), flush(detached), @@ -1365,8 +1444,10 @@ amqp_amqpl(QType, Config) -> receive {#'basic.deliver'{consumer_tag = CTag, redelivered = false}, #amqp_msg{payload = Payload1, - props = #'P_basic'{type = <<"amqp-1.0">>}}} -> - ?assertEqual([Body1], amqp10_framing:decode_bin(Payload1)) + props = #'P_basic'{delivery_mode = DelMode2, + type = <<"amqp-1.0">>}}} -> + ?assertEqual([Body1], amqp10_framing:decode_bin(Payload1)), + ?assertEqual(2, DelMode2) after 30000 -> ct:fail({missing_deliver, ?LINE}) end, receive {_, #amqp_msg{payload = Payload2, @@ -1428,6 +1509,12 @@ amqp_amqpl(QType, Config) -> rabbit_misc:table_lookup(Headers11, <<"x-array">>)) after 30000 -> ct:fail({missing_deliver, ?LINE}) end, + receive {_, #amqp_msg{payload = Payload12, + props = #'P_basic'{delivery_mode = DelMode1}}} -> + ?assertEqual([Body1], amqp10_framing:decode_bin(Payload12)), + ?assertNotEqual(2, DelMode1) + after 30000 -> ct:fail({missing_deliver, ?LINE}) + end, ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch), {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName), @@ -1514,10 +1601,17 @@ amqp091_to_amqp10_header_conversion(Session, Ch, QName, Address) -> amqp_channel:cast( Ch, #'basic.publish'{routing_key = QName}, - #amqp_msg{props = #'P_basic'{headers = Amqp091Headers}, + #amqp_msg{props = #'P_basic'{delivery_mode = 2, + priority = 5, + headers = Amqp091Headers}, payload = <<"foobar">>}), {ok, [Msg]} = drain_queue(Session, Address, 1), + + ?assertMatch(#{durable := true, + priority := 5}, + amqp10_msg:headers(Msg)), + Amqp10MA = amqp10_msg:message_annotations(Msg), ?assertEqual(<<"my-string">>, maps:get(<<"x-string">>, Amqp10MA, undefined)), ?assertEqual(92, maps:get(<<"x-int">>, Amqp10MA, undefined)), @@ -3278,7 +3372,7 @@ max_message_size_client_to_server(Config) -> {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, Address, mixed), ok = wait_for_credit(Sender), - PayloadSmallEnough = binary:copy(<<0>>, MaxMessageSize - 10), + PayloadSmallEnough = binary:copy(<<0>>, MaxMessageSize - 20), ?assertEqual(ok, amqp10_client:send_msg(Sender, amqp10_msg:new(<<"t1">>, PayloadSmallEnough, false))), ok = wait_for_accepted(<<"t1">>), From 67895da04d9b1ae2e56348b6adefe774ec0a51e2 Mon Sep 17 00:00:00 2001 From: David Ansari Date: Tue, 20 May 2025 08:13:30 +0200 Subject: [PATCH 2/2] Mention AMQP durable field in 4.2 release notes --- release-notes/4.2.0.md | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/release-notes/4.2.0.md b/release-notes/4.2.0.md index 2534cd59214c..e797c31175f4 100644 --- a/release-notes/4.2.0.md +++ b/release-notes/4.2.0.md @@ -3,6 +3,21 @@ RabbitMQ 4.2.0 is a new feature release. +## Breaking Changes and Compatibility Notes + +### Default value for AMQP 1.0 `durable` field. + +Starting with RabbitMQ 4.2, if a sending client omits the header section, RabbitMQ [assumes](https://github.com/rabbitmq/rabbitmq-server/pull/13918) the `durable` field to be `false` complying with the AMQP 1.0 spec: +``` + +``` + +AMQP 1.0 apps or client libraries must set the `durable` field of the header section to `true` to mark the message as durable. + +Team RabbitMQ recommends client libraries to send messages as durable by default. +All AMQP 1.0 client libraries [maintained by Team RabbitMQ](https://www.rabbitmq.com/client-libraries/amqp-client-libraries) send messages as durable by default. + + ## Features ### Incoming and Outgoing Message Interceptors for native protocols