diff --git a/deps/amqp10_client/src/amqp10_client_session.erl b/deps/amqp10_client/src/amqp10_client_session.erl index 27391897b968..28680149a93d 100644 --- a/deps/amqp10_client/src/amqp10_client_session.erl +++ b/deps/amqp10_client/src/amqp10_client_session.erl @@ -381,8 +381,11 @@ mapped(cast, {#'v1_0.transfer'{handle = {uint, InHandle}, % role=true indicates the disposition is from a `receiver`. i.e. from the % clients point of view these are dispositions relating to `sender`links -mapped(cast, #'v1_0.disposition'{role = true, settled = true, first = {uint, First}, - last = Last0, state = DeliveryState}, +mapped(cast, #'v1_0.disposition'{role = true, + settled = true, + first = {uint, First}, + last = Last0, + state = DeliveryState}, #state{unsettled = Unsettled0} = State) -> Last = case Last0 of undefined -> First; @@ -393,6 +396,10 @@ mapped(cast, #'v1_0.disposition'{role = true, settled = true, first = {uint, Fir lists:foldl(fun(Id, Acc) -> case Acc of #{Id := {DeliveryTag, Receiver}} -> + %% TODO: currently all modified delivery states + %% will be translated to the old, `modified` atom. + %% At some point we should translate into the + %% full {modified, bool, bool, map) tuple. S = translate_delivery_state(DeliveryState), ok = notify_disposition(Receiver, {S, DeliveryTag}), @@ -833,6 +840,14 @@ translate_delivery_state(#'v1_0.received'{}) -> received; translate_delivery_state(accepted) -> #'v1_0.accepted'{}; translate_delivery_state(rejected) -> #'v1_0.rejected'{}; translate_delivery_state(modified) -> #'v1_0.modified'{}; +translate_delivery_state({modified, + DeliveryFailed, + UndeliverableHere, + MessageAnnotations}) -> + MA = translate_message_annotations(MessageAnnotations), + #'v1_0.modified'{delivery_failed = DeliveryFailed, + undeliverable_here = UndeliverableHere, + message_annotations = MA}; translate_delivery_state(released) -> #'v1_0.released'{}; translate_delivery_state(received) -> #'v1_0.received'{}. @@ -971,6 +986,34 @@ socket_send0({ssl, Socket}, Data) -> make_link_ref(Role, Session, Handle) -> #link_ref{role = Role, session = Session, link_handle = Handle}. +translate_message_annotations(MA) + when is_map(MA) andalso + map_size(MA) > 0 -> + Content = maps:fold(fun (K, V, Acc) -> + [{sym(K), wrap_map_value(V)} | Acc] + end, [], MA), + #'v1_0.message_annotations'{content = Content}; +translate_message_annotations(_MA) -> + undefined. + + +wrap_map_value(true) -> + {boolean, true}; +wrap_map_value(false) -> + {boolean, false}; +wrap_map_value(V) when is_integer(V) -> + {uint, V}; +wrap_map_value(V) when is_binary(V) -> + utf8(V); +wrap_map_value(V) when is_list(V) -> + utf8(list_to_binary(V)); +wrap_map_value(V) when is_atom(V) -> + utf8(atom_to_list(V)). + +utf8(V) -> amqp10_client_types:utf8(V). +sym(B) when is_list(B) -> {symbol, list_to_binary(B)}; +sym(B) when is_binary(B) -> {symbol, B}. + -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). diff --git a/deps/amqp10_client/src/amqp10_client_types.erl b/deps/amqp10_client/src/amqp10_client_types.erl index fece01451dc2..efa5a67588fd 100644 --- a/deps/amqp10_client/src/amqp10_client_types.erl +++ b/deps/amqp10_client/src/amqp10_client_types.erl @@ -30,7 +30,17 @@ -type source() :: #'v1_0.source'{}. -type target() :: #'v1_0.target'{}. --type delivery_state() :: accepted | rejected | modified | received | released. +-type delivery_state() :: accepted | + rejected | + modified | + %% the "full" modified outcome + {modified, + DeliveryFailed :: boolean(), + UndeliverableHere :: boolean(), + MessageAnnotations :: #{amqp10_msg:annotations_key() => term()} + } | + received | + released. -type amqp_error() :: internal_error | not_found | unauthorized_access | decode_error | resource_limit_exceeded | diff --git a/deps/amqp10_client/src/amqp10_msg.erl b/deps/amqp10_client/src/amqp10_msg.erl index b4df7e6f4bab..97f856cf015b 100644 --- a/deps/amqp10_client/src/amqp10_msg.erl +++ b/deps/amqp10_client/src/amqp10_msg.erl @@ -93,7 +93,8 @@ amqp10_header/0, amqp10_properties/0, amqp10_body/0, - delivery_tag/0 + delivery_tag/0, + annotations_key/0 ]). -define(record_to_tuplelist(Rec, Ref), diff --git a/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_session_process.erl b/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_session_process.erl index 8c69effe8486..2c169f870258 100644 --- a/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_session_process.erl +++ b/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_session_process.erl @@ -277,14 +277,29 @@ handle_control(#'v1_0.disposition'{state = Outcome, #'v1_0.accepted'{} -> #'basic.ack'{delivery_tag = DeliveryTag, multiple = false}; - %% we don't care if the client modified the - %% so just treat it as accepted. - %% Some clients send modified instead of accepted - %% when e.g. a client - %% side message TTL expires. + #'v1_0.modified'{delivery_failed = true, + undeliverable_here = UndelHere} -> + %% NB: this is not quite correct. + %% `undeliverable_here' refers to the link + %% not the message in general but we cannot + %% filter messages from being assigned to + %% individual consumers + %% so will have to reject it without requeue + %% in this case. + Requeue = case UndelHere of + true -> + false; + _ -> + true + end, + #'basic.reject'{delivery_tag = DeliveryTag, + requeue = Requeue}; #'v1_0.modified'{} -> - #'basic.ack'{delivery_tag = DeliveryTag, - multiple = false}; + %% if delivery_failed is not true, treat we + %% can't increment its' delivery_count so + %% will have to reject without requeue + #'basic.reject'{delivery_tag = DeliveryTag, + requeue = false}; #'v1_0.rejected'{} -> #'basic.reject'{delivery_tag = DeliveryTag, requeue = false}; diff --git a/deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl b/deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl index 7b8f756fca1e..38eb21d5a3be 100644 --- a/deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl +++ b/deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl @@ -88,17 +88,27 @@ end_per_testcase(Testcase, Config) -> reliable_send_receive_with_outcomes(Config) -> Outcomes = [accepted, modified, + {modified, true, false, #{<<"fruit">> => <<"banana">>}}, + {modified, false, true, #{}}, rejected, released], [begin - ct:pal("~s testing ~s", [?FUNCTION_NAME, Outcome]), reliable_send_receive(Config, Outcome) end || Outcome <- Outcomes], ok. reliable_send_receive(Config, Outcome) -> Container = atom_to_binary(?FUNCTION_NAME, utf8), - OutcomeBin = atom_to_binary(Outcome, utf8), + OutcomeBin = case is_atom(Outcome) of + true -> + atom_to_binary(Outcome, utf8); + false -> + O1 = atom_to_binary(element(1, Outcome), utf8), + O2 = atom_to_binary(element(2, Outcome), utf8), + <> + end, + + ct:pal("~s testing ~s", [?FUNCTION_NAME, OutcomeBin]), QName = <>, %% declare a quorum queue Ch = rabbit_ct_client_helpers:open_channel(Config, 0),