Skip to content

Commit d5cd977

Browse files
committed
Refine and fix AMQP 1.0 modified outcome behaviour
The commit in f15e2e4 that effectively maps the modified outcome to accepted is not correct. Modified is a lot more like the released outcome. This commit refines the behaviour of the modified in the following ways: 1. A modified outcome with delivery_failed=true, undeliverable_here=false|undefined will be rejected with requeue=true; 2. Any other modified outcome will be rejected with requeue=false. It is worth noting that this isn't completely correct but probably the closest approximation we can achieve at the moment. The undeliverable_here field refers to the current link, not the message itself but as we have no means of filtering which messages get assigned to which consumer links we instead avoid requeuing it. Also the case where delivery_failed=false|undefined requires the release of the message _without_ incrementing the delivery_count. Again this is not something that our queues are able to do so again we have to reject without requeue.
1 parent b733774 commit d5cd977

File tree

5 files changed

+92
-13
lines changed

5 files changed

+92
-13
lines changed

deps/amqp10_client/src/amqp10_client_session.erl

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -381,8 +381,11 @@ mapped(cast, {#'v1_0.transfer'{handle = {uint, InHandle},
381381

382382
% role=true indicates the disposition is from a `receiver`. i.e. from the
383383
% clients point of view these are dispositions relating to `sender`links
384-
mapped(cast, #'v1_0.disposition'{role = true, settled = true, first = {uint, First},
385-
last = Last0, state = DeliveryState},
384+
mapped(cast, #'v1_0.disposition'{role = true,
385+
settled = true,
386+
first = {uint, First},
387+
last = Last0,
388+
state = DeliveryState},
386389
#state{unsettled = Unsettled0} = State) ->
387390
Last = case Last0 of
388391
undefined -> First;
@@ -393,6 +396,10 @@ mapped(cast, #'v1_0.disposition'{role = true, settled = true, first = {uint, Fir
393396
lists:foldl(fun(Id, Acc) ->
394397
case Acc of
395398
#{Id := {DeliveryTag, Receiver}} ->
399+
%% TODO: currently all modified delivery states
400+
%% will be translated to the old, `modified` atom.
401+
%% At some point we should translate into the
402+
%% full {modified, bool, bool, map) tuple.
396403
S = translate_delivery_state(DeliveryState),
397404
ok = notify_disposition(Receiver,
398405
{S, DeliveryTag}),
@@ -833,6 +840,14 @@ translate_delivery_state(#'v1_0.received'{}) -> received;
833840
translate_delivery_state(accepted) -> #'v1_0.accepted'{};
834841
translate_delivery_state(rejected) -> #'v1_0.rejected'{};
835842
translate_delivery_state(modified) -> #'v1_0.modified'{};
843+
translate_delivery_state({modified,
844+
DeliveryFailed,
845+
UndeliverableHere,
846+
MessageAnnotations}) ->
847+
MA = translate_message_annotations(MessageAnnotations),
848+
#'v1_0.modified'{delivery_failed = DeliveryFailed,
849+
undeliverable_here = UndeliverableHere,
850+
message_annotations = MA};
836851
translate_delivery_state(released) -> #'v1_0.released'{};
837852
translate_delivery_state(received) -> #'v1_0.received'{}.
838853

@@ -971,6 +986,34 @@ socket_send0({ssl, Socket}, Data) ->
971986
make_link_ref(Role, Session, Handle) ->
972987
#link_ref{role = Role, session = Session, link_handle = Handle}.
973988

989+
translate_message_annotations(MA)
990+
when is_map(MA) andalso
991+
map_size(MA) > 0 ->
992+
Content = maps:fold(fun (K, V, Acc) ->
993+
[{sym(K), wrap_map_value(V)} | Acc]
994+
end, [], MA),
995+
#'v1_0.message_annotations'{content = Content};
996+
translate_message_annotations(_MA) ->
997+
undefined.
998+
999+
1000+
wrap_map_value(true) ->
1001+
{boolean, true};
1002+
wrap_map_value(false) ->
1003+
{boolean, false};
1004+
wrap_map_value(V) when is_integer(V) ->
1005+
{uint, V};
1006+
wrap_map_value(V) when is_binary(V) ->
1007+
utf8(V);
1008+
wrap_map_value(V) when is_list(V) ->
1009+
utf8(list_to_binary(V));
1010+
wrap_map_value(V) when is_atom(V) ->
1011+
utf8(atom_to_list(V)).
1012+
1013+
utf8(V) -> amqp10_client_types:utf8(V).
1014+
sym(B) when is_list(B) -> {symbol, list_to_binary(B)};
1015+
sym(B) when is_binary(B) -> {symbol, B}.
1016+
9741017
-ifdef(TEST).
9751018
-include_lib("eunit/include/eunit.hrl").
9761019

deps/amqp10_client/src/amqp10_client_types.erl

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,17 @@
3030
-type source() :: #'v1_0.source'{}.
3131
-type target() :: #'v1_0.target'{}.
3232

33-
-type delivery_state() :: accepted | rejected | modified | received | released.
33+
-type delivery_state() :: accepted |
34+
rejected |
35+
modified |
36+
%% the "full" modified outcome
37+
{modified,
38+
DeliveryFailed :: boolean(),
39+
UndeliverableHere :: boolean(),
40+
MessageAnnotations :: #{amqp10_msg:annotations_key() => term()}
41+
} |
42+
received |
43+
released.
3444

3545
-type amqp_error() :: internal_error | not_found | unauthorized_access |
3646
decode_error | resource_limit_exceeded |

deps/amqp10_client/src/amqp10_msg.erl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,8 @@
9393
amqp10_header/0,
9494
amqp10_properties/0,
9595
amqp10_body/0,
96-
delivery_tag/0
96+
delivery_tag/0,
97+
annotations_key/0
9798
]).
9899

99100
-define(record_to_tuplelist(Rec, Ref),

deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_session_process.erl

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -277,14 +277,29 @@ handle_control(#'v1_0.disposition'{state = Outcome,
277277
#'v1_0.accepted'{} ->
278278
#'basic.ack'{delivery_tag = DeliveryTag,
279279
multiple = false};
280-
%% we don't care if the client modified the
281-
%% so just treat it as accepted.
282-
%% Some clients send modified instead of accepted
283-
%% when e.g. a client
284-
%% side message TTL expires.
280+
#'v1_0.modified'{delivery_failed = true,
281+
undeliverable_here = UndelHere} ->
282+
%% NB: this is not quite correct.
283+
%% `undeliverable_here' refers to the link
284+
%% not the message in general but we cannot
285+
%% filter messages from being assigned to
286+
%% individual consumers
287+
%% so will have to reject it without requeue
288+
%% in this case.
289+
Requeue = case UndelHere of
290+
true ->
291+
false;
292+
_ ->
293+
true
294+
end,
295+
#'basic.reject'{delivery_tag = DeliveryTag,
296+
requeue = Requeue};
285297
#'v1_0.modified'{} ->
286-
#'basic.ack'{delivery_tag = DeliveryTag,
287-
multiple = false};
298+
%% if delivery_failed is not true, treat we
299+
%% can't increment its' delivery_count so
300+
%% will have to reject without requeue
301+
#'basic.reject'{delivery_tag = DeliveryTag,
302+
requeue = false};
288303
#'v1_0.rejected'{} ->
289304
#'basic.reject'{delivery_tag = DeliveryTag,
290305
requeue = false};

deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,17 +73,27 @@ end_per_testcase(Testcase, Config) ->
7373
reliable_send_receive_with_outcomes(Config) ->
7474
Outcomes = [accepted,
7575
modified,
76+
{modified, true, false, #{<<"fruit">> => <<"banana">>}},
77+
{modified, false, true, #{}},
7678
rejected,
7779
released],
7880
[begin
79-
ct:pal("~s testing ~s", [?FUNCTION_NAME, Outcome]),
8081
reliable_send_receive(Config, Outcome)
8182
end || Outcome <- Outcomes],
8283
ok.
8384

8485
reliable_send_receive(Config, Outcome) ->
8586
Container = atom_to_binary(?FUNCTION_NAME, utf8),
86-
OutcomeBin = atom_to_binary(Outcome, utf8),
87+
OutcomeBin = case is_atom(Outcome) of
88+
true ->
89+
atom_to_binary(Outcome, utf8);
90+
false ->
91+
O1 = atom_to_binary(element(1, Outcome), utf8),
92+
O2 = atom_to_binary(element(2, Outcome), utf8),
93+
<<O1/binary, "_", O2/binary>>
94+
end,
95+
96+
ct:pal("~s testing ~s", [?FUNCTION_NAME, OutcomeBin]),
8797
QName = <<Container/binary, OutcomeBin/binary>>,
8898
%% declare a quorum queue
8999
Ch = rabbit_ct_client_helpers:open_channel(Config, 0),

0 commit comments

Comments
 (0)