Skip to content

Commit b594070

Browse files
Merge pull request #6302 from rabbitmq/mergify/bp/v3.10.x/pr-6299
Refine and fix AMQP 1.0 modified outcome behaviour (backport #6292) (backport #6299)
2 parents d4e3544 + 0905f5f commit b594070

File tree

5 files changed

+92
-13
lines changed

5 files changed

+92
-13
lines changed

deps/amqp10_client/src/amqp10_client_session.erl

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -381,8 +381,11 @@ mapped(cast, {#'v1_0.transfer'{handle = {uint, InHandle},
381381

382382
% role=true indicates the disposition is from a `receiver`. i.e. from the
383383
% clients point of view these are dispositions relating to `sender`links
384-
mapped(cast, #'v1_0.disposition'{role = true, settled = true, first = {uint, First},
385-
last = Last0, state = DeliveryState},
384+
mapped(cast, #'v1_0.disposition'{role = true,
385+
settled = true,
386+
first = {uint, First},
387+
last = Last0,
388+
state = DeliveryState},
386389
#state{unsettled = Unsettled0} = State) ->
387390
Last = case Last0 of
388391
undefined -> First;
@@ -393,6 +396,10 @@ mapped(cast, #'v1_0.disposition'{role = true, settled = true, first = {uint, Fir
393396
lists:foldl(fun(Id, Acc) ->
394397
case Acc of
395398
#{Id := {DeliveryTag, Receiver}} ->
399+
%% TODO: currently all modified delivery states
400+
%% will be translated to the old, `modified` atom.
401+
%% At some point we should translate into the
402+
%% full {modified, bool, bool, map) tuple.
396403
S = translate_delivery_state(DeliveryState),
397404
ok = notify_disposition(Receiver,
398405
{S, DeliveryTag}),
@@ -833,6 +840,14 @@ translate_delivery_state(#'v1_0.received'{}) -> received;
833840
translate_delivery_state(accepted) -> #'v1_0.accepted'{};
834841
translate_delivery_state(rejected) -> #'v1_0.rejected'{};
835842
translate_delivery_state(modified) -> #'v1_0.modified'{};
843+
translate_delivery_state({modified,
844+
DeliveryFailed,
845+
UndeliverableHere,
846+
MessageAnnotations}) ->
847+
MA = translate_message_annotations(MessageAnnotations),
848+
#'v1_0.modified'{delivery_failed = DeliveryFailed,
849+
undeliverable_here = UndeliverableHere,
850+
message_annotations = MA};
836851
translate_delivery_state(released) -> #'v1_0.released'{};
837852
translate_delivery_state(received) -> #'v1_0.received'{}.
838853

@@ -971,6 +986,34 @@ socket_send0({ssl, Socket}, Data) ->
971986
make_link_ref(Role, Session, Handle) ->
972987
#link_ref{role = Role, session = Session, link_handle = Handle}.
973988

989+
translate_message_annotations(MA)
990+
when is_map(MA) andalso
991+
map_size(MA) > 0 ->
992+
Content = maps:fold(fun (K, V, Acc) ->
993+
[{sym(K), wrap_map_value(V)} | Acc]
994+
end, [], MA),
995+
#'v1_0.message_annotations'{content = Content};
996+
translate_message_annotations(_MA) ->
997+
undefined.
998+
999+
1000+
wrap_map_value(true) ->
1001+
{boolean, true};
1002+
wrap_map_value(false) ->
1003+
{boolean, false};
1004+
wrap_map_value(V) when is_integer(V) ->
1005+
{uint, V};
1006+
wrap_map_value(V) when is_binary(V) ->
1007+
utf8(V);
1008+
wrap_map_value(V) when is_list(V) ->
1009+
utf8(list_to_binary(V));
1010+
wrap_map_value(V) when is_atom(V) ->
1011+
utf8(atom_to_list(V)).
1012+
1013+
utf8(V) -> amqp10_client_types:utf8(V).
1014+
sym(B) when is_list(B) -> {symbol, list_to_binary(B)};
1015+
sym(B) when is_binary(B) -> {symbol, B}.
1016+
9741017
-ifdef(TEST).
9751018
-include_lib("eunit/include/eunit.hrl").
9761019

deps/amqp10_client/src/amqp10_client_types.erl

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,17 @@
3030
-type source() :: #'v1_0.source'{}.
3131
-type target() :: #'v1_0.target'{}.
3232

33-
-type delivery_state() :: accepted | rejected | modified | received | released.
33+
-type delivery_state() :: accepted |
34+
rejected |
35+
modified |
36+
%% the "full" modified outcome
37+
{modified,
38+
DeliveryFailed :: boolean(),
39+
UndeliverableHere :: boolean(),
40+
MessageAnnotations :: #{amqp10_msg:annotations_key() => term()}
41+
} |
42+
received |
43+
released.
3444

3545
-type amqp_error() :: internal_error | not_found | unauthorized_access |
3646
decode_error | resource_limit_exceeded |

deps/amqp10_client/src/amqp10_msg.erl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,8 @@
9393
amqp10_header/0,
9494
amqp10_properties/0,
9595
amqp10_body/0,
96-
delivery_tag/0
96+
delivery_tag/0,
97+
annotations_key/0
9798
]).
9899

99100
-define(record_to_tuplelist(Rec, Ref),

deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_session_process.erl

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -277,14 +277,29 @@ handle_control(#'v1_0.disposition'{state = Outcome,
277277
#'v1_0.accepted'{} ->
278278
#'basic.ack'{delivery_tag = DeliveryTag,
279279
multiple = false};
280-
%% we don't care if the client modified the
281-
%% so just treat it as accepted.
282-
%% Some clients send modified instead of accepted
283-
%% when e.g. a client
284-
%% side message TTL expires.
280+
#'v1_0.modified'{delivery_failed = true,
281+
undeliverable_here = UndelHere} ->
282+
%% NB: this is not quite correct.
283+
%% `undeliverable_here' refers to the link
284+
%% not the message in general but we cannot
285+
%% filter messages from being assigned to
286+
%% individual consumers
287+
%% so will have to reject it without requeue
288+
%% in this case.
289+
Requeue = case UndelHere of
290+
true ->
291+
false;
292+
_ ->
293+
true
294+
end,
295+
#'basic.reject'{delivery_tag = DeliveryTag,
296+
requeue = Requeue};
285297
#'v1_0.modified'{} ->
286-
#'basic.ack'{delivery_tag = DeliveryTag,
287-
multiple = false};
298+
%% if delivery_failed is not true, treat we
299+
%% can't increment its' delivery_count so
300+
%% will have to reject without requeue
301+
#'basic.reject'{delivery_tag = DeliveryTag,
302+
requeue = false};
288303
#'v1_0.rejected'{} ->
289304
#'basic.reject'{delivery_tag = DeliveryTag,
290305
requeue = false};

deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,17 +88,27 @@ end_per_testcase(Testcase, Config) ->
8888
reliable_send_receive_with_outcomes(Config) ->
8989
Outcomes = [accepted,
9090
modified,
91+
{modified, true, false, #{<<"fruit">> => <<"banana">>}},
92+
{modified, false, true, #{}},
9193
rejected,
9294
released],
9395
[begin
94-
ct:pal("~s testing ~s", [?FUNCTION_NAME, Outcome]),
9596
reliable_send_receive(Config, Outcome)
9697
end || Outcome <- Outcomes],
9798
ok.
9899

99100
reliable_send_receive(Config, Outcome) ->
100101
Container = atom_to_binary(?FUNCTION_NAME, utf8),
101-
OutcomeBin = atom_to_binary(Outcome, utf8),
102+
OutcomeBin = case is_atom(Outcome) of
103+
true ->
104+
atom_to_binary(Outcome, utf8);
105+
false ->
106+
O1 = atom_to_binary(element(1, Outcome), utf8),
107+
O2 = atom_to_binary(element(2, Outcome), utf8),
108+
<<O1/binary, "_", O2/binary>>
109+
end,
110+
111+
ct:pal("~s testing ~s", [?FUNCTION_NAME, OutcomeBin]),
102112
QName = <<Container/binary, OutcomeBin/binary>>,
103113
%% declare a quorum queue
104114
Ch = rabbit_ct_client_helpers:open_channel(Config, 0),

0 commit comments

Comments
 (0)