From d5cd97784bd3faa7e61d1a242e6c75b4aa90f553 Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Tue, 1 Nov 2022 11:16:41 +0000 Subject: [PATCH] Refine and fix AMQP 1.0 modified outcome behaviour The commit in f15e2e42f7a0adba49c325061481b0e29a6b50fd that effectively maps the modified outcome to accepted is not correct. Modified is a lot more like the released outcome. This commit refines the behaviour of the modified in the following ways: 1. A modified outcome with delivery_failed=true, undeliverable_here=false|undefined will be rejected with requeue=true; 2. Any other modified outcome will be rejected with requeue=false. It is worth noting that this isn't completely correct but probably the closest approximation we can achieve at the moment. The undeliverable_here field refers to the current link, not the message itself but as we have no means of filtering which messages get assigned to which consumer links we instead avoid requeuing it. Also the case where delivery_failed=false|undefined requires the release of the message _without_ incrementing the delivery_count. Again this is not something that our queues are able to do so again we have to reject without requeue. --- .../src/amqp10_client_session.erl | 47 ++++++++++++++++++- .../amqp10_client/src/amqp10_client_types.erl | 12 ++++- deps/amqp10_client/src/amqp10_msg.erl | 3 +- .../src/rabbit_amqp1_0_session_process.erl | 29 +++++++++--- .../test/amqp10_client_SUITE.erl | 14 +++++- 5 files changed, 92 insertions(+), 13 deletions(-) diff --git a/deps/amqp10_client/src/amqp10_client_session.erl b/deps/amqp10_client/src/amqp10_client_session.erl index 4ed61cca1b4c..40e33f5bef1e 100644 --- a/deps/amqp10_client/src/amqp10_client_session.erl +++ b/deps/amqp10_client/src/amqp10_client_session.erl @@ -381,8 +381,11 @@ mapped(cast, {#'v1_0.transfer'{handle = {uint, InHandle}, % role=true indicates the disposition is from a `receiver`. i.e. from the % clients point of view these are dispositions relating to `sender`links -mapped(cast, #'v1_0.disposition'{role = true, settled = true, first = {uint, First}, - last = Last0, state = DeliveryState}, +mapped(cast, #'v1_0.disposition'{role = true, + settled = true, + first = {uint, First}, + last = Last0, + state = DeliveryState}, #state{unsettled = Unsettled0} = State) -> Last = case Last0 of undefined -> First; @@ -393,6 +396,10 @@ mapped(cast, #'v1_0.disposition'{role = true, settled = true, first = {uint, Fir lists:foldl(fun(Id, Acc) -> case Acc of #{Id := {DeliveryTag, Receiver}} -> + %% TODO: currently all modified delivery states + %% will be translated to the old, `modified` atom. + %% At some point we should translate into the + %% full {modified, bool, bool, map) tuple. S = translate_delivery_state(DeliveryState), ok = notify_disposition(Receiver, {S, DeliveryTag}), @@ -833,6 +840,14 @@ translate_delivery_state(#'v1_0.received'{}) -> received; translate_delivery_state(accepted) -> #'v1_0.accepted'{}; translate_delivery_state(rejected) -> #'v1_0.rejected'{}; translate_delivery_state(modified) -> #'v1_0.modified'{}; +translate_delivery_state({modified, + DeliveryFailed, + UndeliverableHere, + MessageAnnotations}) -> + MA = translate_message_annotations(MessageAnnotations), + #'v1_0.modified'{delivery_failed = DeliveryFailed, + undeliverable_here = UndeliverableHere, + message_annotations = MA}; translate_delivery_state(released) -> #'v1_0.released'{}; translate_delivery_state(received) -> #'v1_0.received'{}. @@ -971,6 +986,34 @@ socket_send0({ssl, Socket}, Data) -> make_link_ref(Role, Session, Handle) -> #link_ref{role = Role, session = Session, link_handle = Handle}. +translate_message_annotations(MA) + when is_map(MA) andalso + map_size(MA) > 0 -> + Content = maps:fold(fun (K, V, Acc) -> + [{sym(K), wrap_map_value(V)} | Acc] + end, [], MA), + #'v1_0.message_annotations'{content = Content}; +translate_message_annotations(_MA) -> + undefined. + + +wrap_map_value(true) -> + {boolean, true}; +wrap_map_value(false) -> + {boolean, false}; +wrap_map_value(V) when is_integer(V) -> + {uint, V}; +wrap_map_value(V) when is_binary(V) -> + utf8(V); +wrap_map_value(V) when is_list(V) -> + utf8(list_to_binary(V)); +wrap_map_value(V) when is_atom(V) -> + utf8(atom_to_list(V)). + +utf8(V) -> amqp10_client_types:utf8(V). +sym(B) when is_list(B) -> {symbol, list_to_binary(B)}; +sym(B) when is_binary(B) -> {symbol, B}. + -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). diff --git a/deps/amqp10_client/src/amqp10_client_types.erl b/deps/amqp10_client/src/amqp10_client_types.erl index fece01451dc2..efa5a67588fd 100644 --- a/deps/amqp10_client/src/amqp10_client_types.erl +++ b/deps/amqp10_client/src/amqp10_client_types.erl @@ -30,7 +30,17 @@ -type source() :: #'v1_0.source'{}. -type target() :: #'v1_0.target'{}. --type delivery_state() :: accepted | rejected | modified | received | released. +-type delivery_state() :: accepted | + rejected | + modified | + %% the "full" modified outcome + {modified, + DeliveryFailed :: boolean(), + UndeliverableHere :: boolean(), + MessageAnnotations :: #{amqp10_msg:annotations_key() => term()} + } | + received | + released. -type amqp_error() :: internal_error | not_found | unauthorized_access | decode_error | resource_limit_exceeded | diff --git a/deps/amqp10_client/src/amqp10_msg.erl b/deps/amqp10_client/src/amqp10_msg.erl index b4df7e6f4bab..97f856cf015b 100644 --- a/deps/amqp10_client/src/amqp10_msg.erl +++ b/deps/amqp10_client/src/amqp10_msg.erl @@ -93,7 +93,8 @@ amqp10_header/0, amqp10_properties/0, amqp10_body/0, - delivery_tag/0 + delivery_tag/0, + annotations_key/0 ]). -define(record_to_tuplelist(Rec, Ref), diff --git a/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_session_process.erl b/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_session_process.erl index 4ff63ffa5d8e..916c42dd5969 100644 --- a/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_session_process.erl +++ b/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_session_process.erl @@ -277,14 +277,29 @@ handle_control(#'v1_0.disposition'{state = Outcome, #'v1_0.accepted'{} -> #'basic.ack'{delivery_tag = DeliveryTag, multiple = false}; - %% we don't care if the client modified the - %% so just treat it as accepted. - %% Some clients send modified instead of accepted - %% when e.g. a client - %% side message TTL expires. + #'v1_0.modified'{delivery_failed = true, + undeliverable_here = UndelHere} -> + %% NB: this is not quite correct. + %% `undeliverable_here' refers to the link + %% not the message in general but we cannot + %% filter messages from being assigned to + %% individual consumers + %% so will have to reject it without requeue + %% in this case. + Requeue = case UndelHere of + true -> + false; + _ -> + true + end, + #'basic.reject'{delivery_tag = DeliveryTag, + requeue = Requeue}; #'v1_0.modified'{} -> - #'basic.ack'{delivery_tag = DeliveryTag, - multiple = false}; + %% if delivery_failed is not true, treat we + %% can't increment its' delivery_count so + %% will have to reject without requeue + #'basic.reject'{delivery_tag = DeliveryTag, + requeue = false}; #'v1_0.rejected'{} -> #'basic.reject'{delivery_tag = DeliveryTag, requeue = false}; diff --git a/deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl b/deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl index 1109a336e3b6..ef9e998be8e6 100644 --- a/deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl +++ b/deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl @@ -73,17 +73,27 @@ end_per_testcase(Testcase, Config) -> reliable_send_receive_with_outcomes(Config) -> Outcomes = [accepted, modified, + {modified, true, false, #{<<"fruit">> => <<"banana">>}}, + {modified, false, true, #{}}, rejected, released], [begin - ct:pal("~s testing ~s", [?FUNCTION_NAME, Outcome]), reliable_send_receive(Config, Outcome) end || Outcome <- Outcomes], ok. reliable_send_receive(Config, Outcome) -> Container = atom_to_binary(?FUNCTION_NAME, utf8), - OutcomeBin = atom_to_binary(Outcome, utf8), + OutcomeBin = case is_atom(Outcome) of + true -> + atom_to_binary(Outcome, utf8); + false -> + O1 = atom_to_binary(element(1, Outcome), utf8), + O2 = atom_to_binary(element(2, Outcome), utf8), + <> + end, + + ct:pal("~s testing ~s", [?FUNCTION_NAME, OutcomeBin]), QName = <>, %% declare a quorum queue Ch = rabbit_ct_client_helpers:open_channel(Config, 0),