Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 23 additions & 35 deletions deps/rabbit/src/rabbit_amqp_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@
-define(INITIAL_OUTGOING_DELIVERY_ID, 0).
-define(DEFAULT_MAX_HANDLE, ?UINT_MAX).
%% [3.4]
%% RabbitMQ does not support the modified outcome.
-define(OUTCOMES, [?V_1_0_SYMBOL_ACCEPTED,
?V_1_0_SYMBOL_REJECTED,
?V_1_0_SYMBOL_RELEASED,
?V_1_0_SYMBOL_MODIFIED]).
?V_1_0_SYMBOL_RELEASED]).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
-define(PROCESS_GROUP_NAME, amqp_sessions).
-define(UINT(N), {uint, N}).
Expand Down Expand Up @@ -954,7 +954,7 @@ handle_control(#'v1_0.attach'{
handle_control(#'v1_0.attach'{role = ?AMQP_ROLE_SENDER,
name = LinkName,
handle = Handle = ?UINT(HandleInt),
source = Source,
source = Source0,
snd_settle_mode = SndSettleMode,
target = Target,
initial_delivery_count = DeliveryCount = ?UINT(DeliveryCountInt)
Expand All @@ -972,7 +972,8 @@ handle_control(#'v1_0.attach'{role = ?AMQP_ROLE_SENDER,
queue_name_bin = QNameBin,
delivery_count = DeliveryCountInt,
credit = ?LINK_CREDIT_RCV},
_Outcomes = outcomes(Source),
Source1 = default(Source0, #'v1_0.source'{}),
Source = Source1#'v1_0.source'{outcomes = {array, symbol, ?OUTCOMES}},
Reply = #'v1_0.attach'{
name = LinkName,
handle = Handle,
Expand Down Expand Up @@ -1097,12 +1098,14 @@ handle_control(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER,
initial_delivery_count = ?UINT(?INITIAL_DELIVERY_COUNT),
snd_settle_mode = EffectiveSndSettleMode,
rcv_settle_mode = RcvSettleMode,
%% The queue process monitors our session process. When our session process
%% terminates (abnormally) any messages checked out to our session process
%% will be requeued. That's why the we only support RELEASED as the default outcome.
source = Source#'v1_0.source'{
%% The queue process monitors our session process.
%% When our session process terminates (abnormally)
%% any message checked out to our session process
%% will be requeued. That's why we only support
%% RELEASED as the default outcome.
default_outcome = #'v1_0.released'{},
outcomes = outcomes(Source)},
outcomes = outcomes(Source#'v1_0.source'.outcomes)},
role = ?AMQP_ROLE_SENDER,
%% Echo back that we will respect the client's requested max-message-size.
max_message_size = MaybeMaxMessageSize},
Expand Down Expand Up @@ -1834,26 +1837,25 @@ settle_op_from_outcome(#'v1_0.rejected'{}) ->
discard;
settle_op_from_outcome(#'v1_0.released'{}) ->
requeue;
%% Keep the same Modified behaviour as in RabbitMQ 3.x
settle_op_from_outcome(#'v1_0.modified'{delivery_failed = true,
undeliverable_here = UndelHere})
when UndelHere =/= true ->
requeue;
settle_op_from_outcome(#'v1_0.modified'{}) ->
%% If delivery_failed is not true, we can't increment its delivery_count.
%% So, we will have to reject without requeue.
%%
%% If undeliverable_here is true, this is not quite correct because
%% undeliverable_here refers to the link, and not the message in general.
%% However, we cannot filter messages from being assigned to individual consumers.
%% That's why we will have to reject it without requeue.
discard;
protocol_error(
?V_1_0_AMQP_ERROR_NOT_IMPLEMENTED,
"modified outcome not implemented",
[]);
settle_op_from_outcome(Outcome) ->
protocol_error(
?V_1_0_AMQP_ERROR_INVALID_FIELD,
"Unrecognised state: ~tp in DISPOSITION",
[Outcome]).

outcomes({array, symbol, Syms}) ->
Outcomes = lists:filter(fun(O) ->
lists:member(O, ?OUTCOMES)
end, Syms),
{array, symbol, Outcomes};
outcomes(_) ->
{array, symbol, ?OUTCOMES}.

-spec flow({uint, link_handle()}, sequence_no()) -> #'v1_0.flow'{}.
flow(Handle, DeliveryCount) ->
flow(Handle, DeliveryCount, ?LINK_CREDIT_RCV).
Expand Down Expand Up @@ -3093,20 +3095,6 @@ declare_queue(QNameBin,
end,
{QNameBin, PermCache}.

outcomes(#'v1_0.source'{outcomes = undefined}) ->
{array, symbol, ?OUTCOMES};
outcomes(#'v1_0.source'{outcomes = {array, symbol, Syms} = Outcomes}) ->
case lists:filter(fun(O) -> not lists:member(O, ?OUTCOMES) end, Syms) of
[] ->
Outcomes;
Unsupported ->
exit_not_implemented("Outcomes not supported: ~tp", [Unsupported])
end;
outcomes(#'v1_0.source'{outcomes = Unsupported}) ->
exit_not_implemented("Outcomes not supported: ~tp", [Unsupported]);
outcomes(_) ->
{array, symbol, ?OUTCOMES}.

-spec handle_to_ctag(link_handle()) -> rabbit_types:ctag().
handle_to_ctag(Handle) ->
integer_to_binary(Handle).
Expand Down
9 changes: 2 additions & 7 deletions deps/rabbit/test/amqp_client_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -300,14 +300,9 @@ reliable_send_receive_with_outcomes_quorum_queue(Config) ->
reliable_send_receive_with_outcomes(<<"quorum">>, Config).

reliable_send_receive_with_outcomes(QType, Config) ->
Outcomes = [
accepted,
modified,
{modified, true, false, #{<<"fruit">> => <<"banana">>}},
{modified, false, true, #{}},
Outcomes = [accepted,
rejected,
released
],
released],
[ok = reliable_send_receive(QType, Outcome, Config) || Outcome <- Outcomes].

reliable_send_receive(QType, Outcome, Config) ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ module Test =
"amqp:rejected:list", null
"amqp:released:list", null
"amqp:modified:list", null
"amqp:madeup:list", "amqp:not-implemented"] do
"amqp:madeup:list", null] do

let source = new Source(Address = "outcomes_q",
Outcomes = [| Symbol outcome |])
Expand Down