From 3390582f7d3afb5c8937129ec2c1ad101dc6abd3 Mon Sep 17 00:00:00 2001 From: David Ansari Date: Tue, 16 Jul 2024 18:25:34 +0200 Subject: [PATCH] Fix unupported modified outcome behaviour The modified outcome has not been supported in RabbitMQ 3.13 and won't be supported in RabbitMQ 4.0. Specifically, neither in 3.13 nor in 4.0 are any of the modified fields implemented correctly: ``` ``` However in 3.13, RabbitMQ pretends to support the modified outcome by including the modified outcome in the `outcomes` field of the `Source`. When the receiver settles with the modified outcome, RabbitMQ either requeues or discards the message depending on some specific modified field combinations. To follow the principle of least surprise, RabbitMQ 4.0 will make it clear in the `outcomes` field of the `Source` in the replying `Attach` frame, that RabbitMQ won't support the modified outcome. If a receiver nevertheless settles with the modified outcome, RabbitMQ will close the session. This is a breaking change in RabbitMQ 4.0. RabbitMQ 3.13 already documents in https://github.com/rabbitmq/rabbitmq-server/tree/v3.13.x/deps/rabbitmq_amqp1_0 that the modified outcome is unsupported. The same should be documented in RabbitMQ 4.0. --- deps/rabbit/src/rabbit_amqp_session.erl | 58 ++++++++----------- deps/rabbit/test/amqp_client_SUITE.erl | 9 +-- .../fsharp-tests/Program.fs | 2 +- 3 files changed, 26 insertions(+), 43 deletions(-) diff --git a/deps/rabbit/src/rabbit_amqp_session.erl b/deps/rabbit/src/rabbit_amqp_session.erl index 932eb24ca2a2..636da345a749 100644 --- a/deps/rabbit/src/rabbit_amqp_session.erl +++ b/deps/rabbit/src/rabbit_amqp_session.erl @@ -43,10 +43,10 @@ -define(INITIAL_OUTGOING_DELIVERY_ID, 0). -define(DEFAULT_MAX_HANDLE, ?UINT_MAX). %% [3.4] +%% RabbitMQ does not support the modified outcome. -define(OUTCOMES, [?V_1_0_SYMBOL_ACCEPTED, ?V_1_0_SYMBOL_REJECTED, - ?V_1_0_SYMBOL_RELEASED, - ?V_1_0_SYMBOL_MODIFIED]). + ?V_1_0_SYMBOL_RELEASED]). -define(MAX_PERMISSION_CACHE_SIZE, 12). -define(PROCESS_GROUP_NAME, amqp_sessions). -define(UINT(N), {uint, N}). @@ -954,7 +954,7 @@ handle_control(#'v1_0.attach'{ handle_control(#'v1_0.attach'{role = ?AMQP_ROLE_SENDER, name = LinkName, handle = Handle = ?UINT(HandleInt), - source = Source, + source = Source0, snd_settle_mode = SndSettleMode, target = Target, initial_delivery_count = DeliveryCount = ?UINT(DeliveryCountInt) @@ -972,7 +972,8 @@ handle_control(#'v1_0.attach'{role = ?AMQP_ROLE_SENDER, queue_name_bin = QNameBin, delivery_count = DeliveryCountInt, credit = ?LINK_CREDIT_RCV}, - _Outcomes = outcomes(Source), + Source1 = default(Source0, #'v1_0.source'{}), + Source = Source1#'v1_0.source'{outcomes = {array, symbol, ?OUTCOMES}}, Reply = #'v1_0.attach'{ name = LinkName, handle = Handle, @@ -1097,12 +1098,14 @@ handle_control(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER, initial_delivery_count = ?UINT(?INITIAL_DELIVERY_COUNT), snd_settle_mode = EffectiveSndSettleMode, rcv_settle_mode = RcvSettleMode, - %% The queue process monitors our session process. When our session process - %% terminates (abnormally) any messages checked out to our session process - %% will be requeued. That's why the we only support RELEASED as the default outcome. source = Source#'v1_0.source'{ + %% The queue process monitors our session process. + %% When our session process terminates (abnormally) + %% any message checked out to our session process + %% will be requeued. That's why we only support + %% RELEASED as the default outcome. default_outcome = #'v1_0.released'{}, - outcomes = outcomes(Source)}, + outcomes = outcomes(Source#'v1_0.source'.outcomes)}, role = ?AMQP_ROLE_SENDER, %% Echo back that we will respect the client's requested max-message-size. max_message_size = MaybeMaxMessageSize}, @@ -1834,26 +1837,25 @@ settle_op_from_outcome(#'v1_0.rejected'{}) -> discard; settle_op_from_outcome(#'v1_0.released'{}) -> requeue; -%% Keep the same Modified behaviour as in RabbitMQ 3.x -settle_op_from_outcome(#'v1_0.modified'{delivery_failed = true, - undeliverable_here = UndelHere}) - when UndelHere =/= true -> - requeue; settle_op_from_outcome(#'v1_0.modified'{}) -> - %% If delivery_failed is not true, we can't increment its delivery_count. - %% So, we will have to reject without requeue. - %% - %% If undeliverable_here is true, this is not quite correct because - %% undeliverable_here refers to the link, and not the message in general. - %% However, we cannot filter messages from being assigned to individual consumers. - %% That's why we will have to reject it without requeue. - discard; + protocol_error( + ?V_1_0_AMQP_ERROR_NOT_IMPLEMENTED, + "modified outcome not implemented", + []); settle_op_from_outcome(Outcome) -> protocol_error( ?V_1_0_AMQP_ERROR_INVALID_FIELD, "Unrecognised state: ~tp in DISPOSITION", [Outcome]). +outcomes({array, symbol, Syms}) -> + Outcomes = lists:filter(fun(O) -> + lists:member(O, ?OUTCOMES) + end, Syms), + {array, symbol, Outcomes}; +outcomes(_) -> + {array, symbol, ?OUTCOMES}. + -spec flow({uint, link_handle()}, sequence_no()) -> #'v1_0.flow'{}. flow(Handle, DeliveryCount) -> flow(Handle, DeliveryCount, ?LINK_CREDIT_RCV). @@ -3093,20 +3095,6 @@ declare_queue(QNameBin, end, {QNameBin, PermCache}. -outcomes(#'v1_0.source'{outcomes = undefined}) -> - {array, symbol, ?OUTCOMES}; -outcomes(#'v1_0.source'{outcomes = {array, symbol, Syms} = Outcomes}) -> - case lists:filter(fun(O) -> not lists:member(O, ?OUTCOMES) end, Syms) of - [] -> - Outcomes; - Unsupported -> - exit_not_implemented("Outcomes not supported: ~tp", [Unsupported]) - end; -outcomes(#'v1_0.source'{outcomes = Unsupported}) -> - exit_not_implemented("Outcomes not supported: ~tp", [Unsupported]); -outcomes(_) -> - {array, symbol, ?OUTCOMES}. - -spec handle_to_ctag(link_handle()) -> rabbit_types:ctag(). handle_to_ctag(Handle) -> integer_to_binary(Handle). diff --git a/deps/rabbit/test/amqp_client_SUITE.erl b/deps/rabbit/test/amqp_client_SUITE.erl index f48c6dcc8862..178198603b1c 100644 --- a/deps/rabbit/test/amqp_client_SUITE.erl +++ b/deps/rabbit/test/amqp_client_SUITE.erl @@ -300,14 +300,9 @@ reliable_send_receive_with_outcomes_quorum_queue(Config) -> reliable_send_receive_with_outcomes(<<"quorum">>, Config). reliable_send_receive_with_outcomes(QType, Config) -> - Outcomes = [ - accepted, - modified, - {modified, true, false, #{<<"fruit">> => <<"banana">>}}, - {modified, false, true, #{}}, + Outcomes = [accepted, rejected, - released - ], + released], [ok = reliable_send_receive(QType, Outcome, Config) || Outcome <- Outcomes]. reliable_send_receive(QType, Outcome, Config) -> diff --git a/deps/rabbit/test/amqp_system_SUITE_data/fsharp-tests/Program.fs b/deps/rabbit/test/amqp_system_SUITE_data/fsharp-tests/Program.fs index 7ed91f388f70..ace95b5b5908 100755 --- a/deps/rabbit/test/amqp_system_SUITE_data/fsharp-tests/Program.fs +++ b/deps/rabbit/test/amqp_system_SUITE_data/fsharp-tests/Program.fs @@ -261,7 +261,7 @@ module Test = "amqp:rejected:list", null "amqp:released:list", null "amqp:modified:list", null - "amqp:madeup:list", "amqp:not-implemented"] do + "amqp:madeup:list", null] do let source = new Source(Address = "outcomes_q", Outcomes = [| Symbol outcome |])