diff --git a/deps/rabbit/src/rabbit_amqp_session.erl b/deps/rabbit/src/rabbit_amqp_session.erl index 932eb24ca2a2..636da345a749 100644 --- a/deps/rabbit/src/rabbit_amqp_session.erl +++ b/deps/rabbit/src/rabbit_amqp_session.erl @@ -43,10 +43,10 @@ -define(INITIAL_OUTGOING_DELIVERY_ID, 0). -define(DEFAULT_MAX_HANDLE, ?UINT_MAX). %% [3.4] +%% RabbitMQ does not support the modified outcome. -define(OUTCOMES, [?V_1_0_SYMBOL_ACCEPTED, ?V_1_0_SYMBOL_REJECTED, - ?V_1_0_SYMBOL_RELEASED, - ?V_1_0_SYMBOL_MODIFIED]). + ?V_1_0_SYMBOL_RELEASED]). -define(MAX_PERMISSION_CACHE_SIZE, 12). -define(PROCESS_GROUP_NAME, amqp_sessions). -define(UINT(N), {uint, N}). @@ -954,7 +954,7 @@ handle_control(#'v1_0.attach'{ handle_control(#'v1_0.attach'{role = ?AMQP_ROLE_SENDER, name = LinkName, handle = Handle = ?UINT(HandleInt), - source = Source, + source = Source0, snd_settle_mode = SndSettleMode, target = Target, initial_delivery_count = DeliveryCount = ?UINT(DeliveryCountInt) @@ -972,7 +972,8 @@ handle_control(#'v1_0.attach'{role = ?AMQP_ROLE_SENDER, queue_name_bin = QNameBin, delivery_count = DeliveryCountInt, credit = ?LINK_CREDIT_RCV}, - _Outcomes = outcomes(Source), + Source1 = default(Source0, #'v1_0.source'{}), + Source = Source1#'v1_0.source'{outcomes = {array, symbol, ?OUTCOMES}}, Reply = #'v1_0.attach'{ name = LinkName, handle = Handle, @@ -1097,12 +1098,14 @@ handle_control(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER, initial_delivery_count = ?UINT(?INITIAL_DELIVERY_COUNT), snd_settle_mode = EffectiveSndSettleMode, rcv_settle_mode = RcvSettleMode, - %% The queue process monitors our session process. When our session process - %% terminates (abnormally) any messages checked out to our session process - %% will be requeued. That's why the we only support RELEASED as the default outcome. source = Source#'v1_0.source'{ + %% The queue process monitors our session process. + %% When our session process terminates (abnormally) + %% any message checked out to our session process + %% will be requeued. That's why we only support + %% RELEASED as the default outcome. default_outcome = #'v1_0.released'{}, - outcomes = outcomes(Source)}, + outcomes = outcomes(Source#'v1_0.source'.outcomes)}, role = ?AMQP_ROLE_SENDER, %% Echo back that we will respect the client's requested max-message-size. max_message_size = MaybeMaxMessageSize}, @@ -1834,26 +1837,25 @@ settle_op_from_outcome(#'v1_0.rejected'{}) -> discard; settle_op_from_outcome(#'v1_0.released'{}) -> requeue; -%% Keep the same Modified behaviour as in RabbitMQ 3.x -settle_op_from_outcome(#'v1_0.modified'{delivery_failed = true, - undeliverable_here = UndelHere}) - when UndelHere =/= true -> - requeue; settle_op_from_outcome(#'v1_0.modified'{}) -> - %% If delivery_failed is not true, we can't increment its delivery_count. - %% So, we will have to reject without requeue. - %% - %% If undeliverable_here is true, this is not quite correct because - %% undeliverable_here refers to the link, and not the message in general. - %% However, we cannot filter messages from being assigned to individual consumers. - %% That's why we will have to reject it without requeue. - discard; + protocol_error( + ?V_1_0_AMQP_ERROR_NOT_IMPLEMENTED, + "modified outcome not implemented", + []); settle_op_from_outcome(Outcome) -> protocol_error( ?V_1_0_AMQP_ERROR_INVALID_FIELD, "Unrecognised state: ~tp in DISPOSITION", [Outcome]). +outcomes({array, symbol, Syms}) -> + Outcomes = lists:filter(fun(O) -> + lists:member(O, ?OUTCOMES) + end, Syms), + {array, symbol, Outcomes}; +outcomes(_) -> + {array, symbol, ?OUTCOMES}. + -spec flow({uint, link_handle()}, sequence_no()) -> #'v1_0.flow'{}. flow(Handle, DeliveryCount) -> flow(Handle, DeliveryCount, ?LINK_CREDIT_RCV). @@ -3093,20 +3095,6 @@ declare_queue(QNameBin, end, {QNameBin, PermCache}. -outcomes(#'v1_0.source'{outcomes = undefined}) -> - {array, symbol, ?OUTCOMES}; -outcomes(#'v1_0.source'{outcomes = {array, symbol, Syms} = Outcomes}) -> - case lists:filter(fun(O) -> not lists:member(O, ?OUTCOMES) end, Syms) of - [] -> - Outcomes; - Unsupported -> - exit_not_implemented("Outcomes not supported: ~tp", [Unsupported]) - end; -outcomes(#'v1_0.source'{outcomes = Unsupported}) -> - exit_not_implemented("Outcomes not supported: ~tp", [Unsupported]); -outcomes(_) -> - {array, symbol, ?OUTCOMES}. - -spec handle_to_ctag(link_handle()) -> rabbit_types:ctag(). handle_to_ctag(Handle) -> integer_to_binary(Handle). diff --git a/deps/rabbit/test/amqp_client_SUITE.erl b/deps/rabbit/test/amqp_client_SUITE.erl index f48c6dcc8862..178198603b1c 100644 --- a/deps/rabbit/test/amqp_client_SUITE.erl +++ b/deps/rabbit/test/amqp_client_SUITE.erl @@ -300,14 +300,9 @@ reliable_send_receive_with_outcomes_quorum_queue(Config) -> reliable_send_receive_with_outcomes(<<"quorum">>, Config). reliable_send_receive_with_outcomes(QType, Config) -> - Outcomes = [ - accepted, - modified, - {modified, true, false, #{<<"fruit">> => <<"banana">>}}, - {modified, false, true, #{}}, + Outcomes = [accepted, rejected, - released - ], + released], [ok = reliable_send_receive(QType, Outcome, Config) || Outcome <- Outcomes]. reliable_send_receive(QType, Outcome, Config) -> diff --git a/deps/rabbit/test/amqp_system_SUITE_data/fsharp-tests/Program.fs b/deps/rabbit/test/amqp_system_SUITE_data/fsharp-tests/Program.fs index 7ed91f388f70..ace95b5b5908 100755 --- a/deps/rabbit/test/amqp_system_SUITE_data/fsharp-tests/Program.fs +++ b/deps/rabbit/test/amqp_system_SUITE_data/fsharp-tests/Program.fs @@ -261,7 +261,7 @@ module Test = "amqp:rejected:list", null "amqp:released:list", null "amqp:modified:list", null - "amqp:madeup:list", "amqp:not-implemented"] do + "amqp:madeup:list", null] do let source = new Source(Address = "outcomes_q", Outcomes = [| Symbol outcome |])