From 2e5484cb9af24476bb0bc1bf4a777a7f1ef71097 Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Mon, 24 Oct 2022 15:50:32 +0100 Subject: [PATCH 1/2] AMQP 1.0: Support the modified outcome Some client libraries (QPid) will automatically send a disposition with the 'modified' outcome in response to a client local message TTL expiry. To support this case and others we treat 'modified' the same as 'accepted' and simply ack the message back to the queue. This change also contains some API extensions to the amqp10_client to better support sending the various delivery states (outcomes). (cherry picked from commit 802688a8ab6c3a0d55576e68a3605ed6541025f7) (cherry picked from commit 6a42c8a34a02326b5fc650d1f97033c867e3a760) # Conflicts: # deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl --- deps/amqp10_client/src/amqp10_client.erl | 14 +- deps/amqp10_client/src/amqp10_client.hrl | 3 +- .../src/rabbit_amqp1_0_link_util.erl | 5 +- .../src/rabbit_amqp1_0_session_process.erl | 8 + .../test/amqp10_client_SUITE.erl | 227 ++++++++++++++++++ .../system_SUITE_data/fsharp-tests/Program.fs | 1 + 6 files changed, 250 insertions(+), 8 deletions(-) diff --git a/deps/amqp10_client/src/amqp10_client.erl b/deps/amqp10_client/src/amqp10_client.erl index 785f44f00570..a8488185544c 100644 --- a/deps/amqp10_client/src/amqp10_client.erl +++ b/deps/amqp10_client/src/amqp10_client.erl @@ -32,6 +32,7 @@ detach_link/1, send_msg/2, accept_msg/2, + settle_msg/3, flow_link_credit/3, flow_link_credit/4, echo/1, @@ -335,11 +336,18 @@ send_msg(#link_ref{role = sender, session = Session, %% @doc Accept a message on a the link referred to be the 'LinkRef'. -spec accept_msg(link_ref(), amqp10_msg:amqp10_msg()) -> ok. -accept_msg(#link_ref{role = receiver, session = Session}, Msg) -> +accept_msg(LinkRef, Msg) -> + settle_msg(LinkRef, Msg, accepted). + +%% @doc Settle a message on a the link referred to be the 'LinkRef' using +%% the chosen delivery state. +-spec settle_msg(link_ref(), amqp10_msg:amqp10_msg(), + amqp10_client_types:delivery_state()) -> ok. +settle_msg(#link_ref{role = receiver, + session = Session}, Msg, Settlement) -> DeliveryId = amqp10_msg:delivery_id(Msg), amqp10_client_session:disposition(Session, receiver, DeliveryId, - DeliveryId, true, accepted). - + DeliveryId, true, Settlement). %% @doc Get a single message from a link. %% Flows a single link credit then awaits delivery or timeout. -spec get_msg(link_ref()) -> {ok, amqp10_msg:amqp10_msg()} | {error, timeout}. diff --git a/deps/amqp10_client/src/amqp10_client.hrl b/deps/amqp10_client/src/amqp10_client.hrl index 38627f80c5c9..becbd758615a 100644 --- a/deps/amqp10_client/src/amqp10_client.hrl +++ b/deps/amqp10_client/src/amqp10_client.hrl @@ -20,5 +20,6 @@ -define(DBG(F, A), ok). -endif. --record(link_ref, {role :: sender | receiver, session :: pid(), +-record(link_ref, {role :: sender | receiver, + session :: pid(), link_handle :: non_neg_integer()}). diff --git a/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_link_util.erl b/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_link_util.erl index 48bc4ba9321c..82838a9ba108 100644 --- a/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_link_util.erl +++ b/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_link_util.erl @@ -14,14 +14,11 @@ -define(EXCHANGE_SUB_LIFETIME, "delete-on-close"). -define(DEFAULT_OUTCOME, #'v1_0.released'{}). --define(SUPPORTED_OUTCOMES, [?V_1_0_SYMBOL_ACCEPTED, - ?V_1_0_SYMBOL_REJECTED, - ?V_1_0_SYMBOL_RELEASED]). - -define(OUTCOMES, [?V_1_0_SYMBOL_ACCEPTED, ?V_1_0_SYMBOL_REJECTED, ?V_1_0_SYMBOL_RELEASED, ?V_1_0_SYMBOL_MODIFIED]). +-define(SUPPORTED_OUTCOMES, ?OUTCOMES). outcomes(Source) -> {DefaultOutcome, Outcomes} = diff --git a/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_session_process.erl b/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_session_process.erl index 71bba9ce5d4f..8c69effe8486 100644 --- a/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_session_process.erl +++ b/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_session_process.erl @@ -277,6 +277,14 @@ handle_control(#'v1_0.disposition'{state = Outcome, #'v1_0.accepted'{} -> #'basic.ack'{delivery_tag = DeliveryTag, multiple = false}; + %% we don't care if the client modified the + %% so just treat it as accepted. + %% Some clients send modified instead of accepted + %% when e.g. a client + %% side message TTL expires. + #'v1_0.modified'{} -> + #'basic.ack'{delivery_tag = DeliveryTag, + multiple = false}; #'v1_0.rejected'{} -> #'basic.reject'{delivery_tag = DeliveryTag, requeue = false}; diff --git a/deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl b/deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl index 3946183bb227..2a5eccd8aa75 100644 --- a/deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl +++ b/deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl @@ -11,6 +11,7 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("rabbit_common/include/rabbit_framing.hrl"). +-compile(nowarn_export_all). -compile(export_all). all() -> @@ -22,7 +23,13 @@ all() -> groups() -> [ {tests, [], [ +<<<<<<< HEAD roundtrip_quorum_queue_with_drain +======= + reliable_send_receive_with_outcomes, + roundtrip_quorum_queue_with_drain, + message_headers_conversion +>>>>>>> 6a42c8a34a (AMQP 1.0: Support the modified outcome) ]}, {metrics, [], [ auth_attempt_metrics @@ -83,6 +90,75 @@ end_per_testcase(Testcase, Config) -> %%% TESTS %%% +reliable_send_receive_with_outcomes(Config) -> + Outcomes = [accepted, + modified, + rejected, + released], + [begin + ct:pal("~s testing ~s", [?FUNCTION_NAME, Outcome]), + reliable_send_receive(Config, Outcome) + end || Outcome <- Outcomes], + ok. + +reliable_send_receive(Config, Outcome) -> + Container = atom_to_binary(?FUNCTION_NAME, utf8), + OutcomeBin = atom_to_binary(Outcome, utf8), + QName = <>, + %% declare a quorum queue + Ch = rabbit_ct_client_helpers:open_channel(Config, 0), + amqp_channel:call(Ch, #'queue.declare'{queue = QName, + durable = true, + arguments = [{<<"x-queue-type">>, + longstr, <<"quorum">>}]}), + rabbit_ct_client_helpers:close_channel(Ch), + %% reliable send and consume + Host = ?config(rmq_hostname, Config), + Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp), + Address = <<"/amq/queue/", QName/binary>>, + + OpnConf = #{address => Host, + port => Port, + container_id => Container, + sasl => {plain, <<"guest">>, <<"guest">>}}, + {ok, Connection} = amqp10_client:open_connection(OpnConf), + {ok, Session} = amqp10_client:begin_session(Connection), + SenderLinkName = <<"test-sender">>, + {ok, Sender} = amqp10_client:attach_sender_link(Session, + SenderLinkName, + Address), + ok = wait_for_credit(Sender), + DTag1 = <<"dtag-1">>, + %% create an unsettled message, + %% link will be in "mixed" mode by default + Msg1 = amqp10_msg:new(DTag1, <<"body-1">>, false), + ok = amqp10_client:send_msg(Sender, Msg1), + ok = wait_for_settlement(DTag1), + + ok = amqp10_client:detach_link(Sender), + ok = amqp10_client:close_connection(Connection), + flush("post sender close"), + + {ok, Connection2} = amqp10_client:open_connection(OpnConf), + {ok, Session2} = amqp10_client:begin_session(Connection2), + ReceiverLinkName = <<"test-receiver">>, + {ok, Receiver} = amqp10_client:attach_receiver_link(Session2, + ReceiverLinkName, + Address, + unsettled), + {ok, Msg} = amqp10_client:get_msg(Receiver), + + ct:pal("got ~p", [amqp10_msg:body(Msg)]), + + ok = amqp10_client:settle_msg(Receiver, Msg, Outcome), + + flush("post accept"), + + ok = amqp10_client:detach_link(Receiver), + ok = amqp10_client:close_connection(Connection2), + + ok. + roundtrip_quorum_queue_with_drain(Config) -> Host = ?config(rmq_hostname, Config), Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp), @@ -98,7 +174,11 @@ roundtrip_quorum_queue_with_drain(Config) -> port => Port, container_id => atom_to_binary(?FUNCTION_NAME, utf8), sasl => {plain, <<"guest">>, <<"guest">>}}, +<<<<<<< HEAD % ct:pal("opening connectoin with ~p", [OpnConf]), +======= + +>>>>>>> 6a42c8a34a (AMQP 1.0: Support the modified outcome) {ok, Connection} = amqp10_client:open_connection(OpnConf), {ok, Session} = amqp10_client:begin_session(Connection), SenderLinkName = <<"test-sender">>, @@ -151,6 +231,78 @@ roundtrip_quorum_queue_with_drain(Config) -> ok = amqp10_client:close_connection(Connection), ok. +<<<<<<< HEAD +======= +message_headers_conversion(Config) -> + Host = ?config(rmq_hostname, Config), + Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp), + QName = atom_to_binary(?FUNCTION_NAME, utf8), + Address = <<"/amq/queue/", QName/binary>>, + %% declare a quorum queue + Ch = rabbit_ct_client_helpers:open_channel(Config, 0), + amqp_channel:call(Ch, #'queue.declare'{queue = QName, + durable = true, + arguments = [{<<"x-queue-type">>, longstr, <<"quorum">>}]}), + + rabbit_ct_broker_helpers:rpc(Config, 0, application, set_env,[rabbitmq_amqp1_0, convert_amqp091_headers_to_app_props, true]), + rabbit_ct_broker_helpers:rpc(Config, 0, application, set_env,[rabbitmq_amqp1_0, convert_app_props_to_amqp091_headers, true]), + + OpnConf = #{address => Host, + port => Port, + container_id => atom_to_binary(?FUNCTION_NAME, utf8), + sasl => {plain, <<"guest">>, <<"guest">>}}, + + {ok, Connection} = amqp10_client:open_connection(OpnConf), + {ok, Session} = amqp10_client:begin_session(Connection), + + amqp10_to_amqp091_header_conversion(Session, Ch, QName, Address), + + amqp091_to_amqp10_header_conversion(Session, Ch, QName, Address), + delete_queue(Config, QName), + ok = amqp10_client:close_connection(Connection), + ok. + +amqp10_to_amqp091_header_conversion(Session,Ch, QName, Address) -> + {ok, Sender} = create_amqp10_sender(Session, Address), + + OutMsg = amqp10_msg:new(<<"my-tag">>, <<"my-body">>, true), + OutMsg2 = amqp10_msg:set_application_properties(#{ + "x-string" => "string-value", + "x-int" => 3, + "x-bool" => true + }, OutMsg), + ok = amqp10_client:send_msg(Sender, OutMsg2), + wait_for_accepts(1), + + {ok, Headers} = amqp091_get_msg_headers(Ch, QName), + + ?assertEqual({bool, true}, rabbit_misc:table_lookup(Headers, <<"x-bool">>)), + ?assertEqual({unsignedint, 3}, rabbit_misc:table_lookup(Headers, <<"x-int">>)), + ?assertEqual({longstr, <<"string-value">>}, rabbit_misc:table_lookup(Headers, <<"x-string">>)). + + +amqp091_to_amqp10_header_conversion(Session, Ch, QName, Address) -> + Amqp091Headers = [{<<"x-forwarding">>, array, + [{table, [{<<"uri">>, longstr, + <<"amqp://localhost/%2F/upstream">>}]}]}, + {<<"x-string">>, longstr, "my-string"}, + {<<"x-int">>, long, 92}, + {<<"x-bool">>, bool, true}], + + amqp_channel:cast(Ch, + #'basic.publish'{exchange = <<"">>, routing_key = QName}, + #amqp_msg{props = #'P_basic'{ + headers = Amqp091Headers}, + payload = <<"foobar">> } + ), + + {ok, [Msg]} = drain_queue(Session, Address, 1), + Amqp10Props = amqp10_msg:application_properties(Msg), + ?assertEqual(true, maps:get(<<"x-bool">>, Amqp10Props, undefined)), + ?assertEqual(92, maps:get(<<"x-int">>, Amqp10Props, undefined)), + ?assertEqual(<<"my-string">>, maps:get(<<"x-string">>, Amqp10Props, undefined)). + +>>>>>>> 6a42c8a34a (AMQP 1.0: Support the modified outcome) auth_attempt_metrics(Config) -> Host = ?config(rmq_hostname, Config), Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp), @@ -201,3 +353,78 @@ open_and_close_connection(OpnConf) -> {ok, Connection} = amqp10_client:open_connection(OpnConf), {ok, _} = amqp10_client:begin_session(Connection), ok = amqp10_client:close_connection(Connection). +<<<<<<< HEAD +======= + +% before we can send messages we have to wait for credit from the server +wait_for_credit(Sender) -> + receive + {amqp10_event, {link, Sender, credited}} -> + flush(?FUNCTION_NAME), + ok + after 5000 -> + flush("wait_for_credit timed out"), + ct:fail(credited_timeout) + end. + +wait_for_settlement(Tag) -> + receive + {amqp10_disposition, {accepted, Tag}} -> + flush(?FUNCTION_NAME), + ok + after 5000 -> + flush("wait_for_settlement timed out"), + ct:fail(credited_timeout) + end. + +wait_for_accepts(0) -> ok; +wait_for_accepts(N) -> + receive + {amqp10_disposition,{accepted,_}} -> + wait_for_accepts(N -1) + after 250 -> + ok + end. + +delete_queue(Config, QName) -> + Ch = rabbit_ct_client_helpers:open_channel(Config, 0), + _ = amqp_channel:call(Ch, #'queue.delete'{queue = QName}), + rabbit_ct_client_helpers:close_channel(Ch). + + +amqp091_get_msg_headers(Channel, QName) -> + {#'basic.get_ok'{}, #amqp_msg{props = #'P_basic'{ headers= Headers}}} + = amqp_channel:call(Channel, #'basic.get'{queue = QName, no_ack = true}), + {ok, Headers}. + +create_amqp10_sender(Session, Address) -> + SenderLinkName = <<"test-sender">>, + {ok, Sender} = amqp10_client:attach_sender_link(Session, + SenderLinkName, + Address), + wait_for_credit(Sender), + {ok, Sender}. + + drain_queue(Session, Address, N) -> + flush("Before drain_queue"), + {ok, Receiver} = amqp10_client:attach_receiver_link(Session, + <<"test-receiver">>, + Address, + settled, + configuration), + + ok = amqp10_client:flow_link_credit(Receiver, 1000, never, true), + Msgs = receive_message(Receiver, N, []), + flush("after drain"), + ok = amqp10_client:detach_link(Receiver), + {ok, Msgs}. + +receive_message(_Receiver, 0, Acc) -> lists:reverse(Acc); +receive_message(Receiver, N, Acc) -> + receive + {amqp10_msg, Receiver, Msg} -> + receive_message(Receiver, N-1, [Msg | Acc]) + after 5000 -> + exit(receive_timed_out) + end. +>>>>>>> 6a42c8a34a (AMQP 1.0: Support the modified outcome) diff --git a/deps/rabbitmq_amqp1_0/test/system_SUITE_data/fsharp-tests/Program.fs b/deps/rabbitmq_amqp1_0/test/system_SUITE_data/fsharp-tests/Program.fs index 6f083c23097b..84ed881cc2d3 100755 --- a/deps/rabbitmq_amqp1_0/test/system_SUITE_data/fsharp-tests/Program.fs +++ b/deps/rabbitmq_amqp1_0/test/system_SUITE_data/fsharp-tests/Program.fs @@ -190,6 +190,7 @@ module Test = let q = "roundtrip-091-q" let corr = "corrlation" let sender = SenderLink(c.Session, q + "-sender" , q) + new Message("hi"B, Header = Header(), Properties = new Properties(CorrelationId = corr)) |> sender.Send From 7849983a32ffb821b34db9bdb6c12d6da357b66d Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Tue, 25 Oct 2022 15:36:26 +0400 Subject: [PATCH 2/2] Resolve a conflict --- .../test/amqp10_client_SUITE.erl | 89 +------------------ 1 file changed, 2 insertions(+), 87 deletions(-) diff --git a/deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl b/deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl index 2a5eccd8aa75..7b8f756fca1e 100644 --- a/deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl +++ b/deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl @@ -9,7 +9,7 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("eunit/include/eunit.hrl"). --include_lib("rabbit_common/include/rabbit_framing.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). -compile(nowarn_export_all). -compile(export_all). @@ -23,13 +23,8 @@ all() -> groups() -> [ {tests, [], [ -<<<<<<< HEAD - roundtrip_quorum_queue_with_drain -======= reliable_send_receive_with_outcomes, - roundtrip_quorum_queue_with_drain, - message_headers_conversion ->>>>>>> 6a42c8a34a (AMQP 1.0: Support the modified outcome) + roundtrip_quorum_queue_with_drain ]}, {metrics, [], [ auth_attempt_metrics @@ -174,11 +169,6 @@ roundtrip_quorum_queue_with_drain(Config) -> port => Port, container_id => atom_to_binary(?FUNCTION_NAME, utf8), sasl => {plain, <<"guest">>, <<"guest">>}}, -<<<<<<< HEAD - % ct:pal("opening connectoin with ~p", [OpnConf]), -======= - ->>>>>>> 6a42c8a34a (AMQP 1.0: Support the modified outcome) {ok, Connection} = amqp10_client:open_connection(OpnConf), {ok, Session} = amqp10_client:begin_session(Connection), SenderLinkName = <<"test-sender">>, @@ -231,78 +221,6 @@ roundtrip_quorum_queue_with_drain(Config) -> ok = amqp10_client:close_connection(Connection), ok. -<<<<<<< HEAD -======= -message_headers_conversion(Config) -> - Host = ?config(rmq_hostname, Config), - Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp), - QName = atom_to_binary(?FUNCTION_NAME, utf8), - Address = <<"/amq/queue/", QName/binary>>, - %% declare a quorum queue - Ch = rabbit_ct_client_helpers:open_channel(Config, 0), - amqp_channel:call(Ch, #'queue.declare'{queue = QName, - durable = true, - arguments = [{<<"x-queue-type">>, longstr, <<"quorum">>}]}), - - rabbit_ct_broker_helpers:rpc(Config, 0, application, set_env,[rabbitmq_amqp1_0, convert_amqp091_headers_to_app_props, true]), - rabbit_ct_broker_helpers:rpc(Config, 0, application, set_env,[rabbitmq_amqp1_0, convert_app_props_to_amqp091_headers, true]), - - OpnConf = #{address => Host, - port => Port, - container_id => atom_to_binary(?FUNCTION_NAME, utf8), - sasl => {plain, <<"guest">>, <<"guest">>}}, - - {ok, Connection} = amqp10_client:open_connection(OpnConf), - {ok, Session} = amqp10_client:begin_session(Connection), - - amqp10_to_amqp091_header_conversion(Session, Ch, QName, Address), - - amqp091_to_amqp10_header_conversion(Session, Ch, QName, Address), - delete_queue(Config, QName), - ok = amqp10_client:close_connection(Connection), - ok. - -amqp10_to_amqp091_header_conversion(Session,Ch, QName, Address) -> - {ok, Sender} = create_amqp10_sender(Session, Address), - - OutMsg = amqp10_msg:new(<<"my-tag">>, <<"my-body">>, true), - OutMsg2 = amqp10_msg:set_application_properties(#{ - "x-string" => "string-value", - "x-int" => 3, - "x-bool" => true - }, OutMsg), - ok = amqp10_client:send_msg(Sender, OutMsg2), - wait_for_accepts(1), - - {ok, Headers} = amqp091_get_msg_headers(Ch, QName), - - ?assertEqual({bool, true}, rabbit_misc:table_lookup(Headers, <<"x-bool">>)), - ?assertEqual({unsignedint, 3}, rabbit_misc:table_lookup(Headers, <<"x-int">>)), - ?assertEqual({longstr, <<"string-value">>}, rabbit_misc:table_lookup(Headers, <<"x-string">>)). - - -amqp091_to_amqp10_header_conversion(Session, Ch, QName, Address) -> - Amqp091Headers = [{<<"x-forwarding">>, array, - [{table, [{<<"uri">>, longstr, - <<"amqp://localhost/%2F/upstream">>}]}]}, - {<<"x-string">>, longstr, "my-string"}, - {<<"x-int">>, long, 92}, - {<<"x-bool">>, bool, true}], - - amqp_channel:cast(Ch, - #'basic.publish'{exchange = <<"">>, routing_key = QName}, - #amqp_msg{props = #'P_basic'{ - headers = Amqp091Headers}, - payload = <<"foobar">> } - ), - - {ok, [Msg]} = drain_queue(Session, Address, 1), - Amqp10Props = amqp10_msg:application_properties(Msg), - ?assertEqual(true, maps:get(<<"x-bool">>, Amqp10Props, undefined)), - ?assertEqual(92, maps:get(<<"x-int">>, Amqp10Props, undefined)), - ?assertEqual(<<"my-string">>, maps:get(<<"x-string">>, Amqp10Props, undefined)). - ->>>>>>> 6a42c8a34a (AMQP 1.0: Support the modified outcome) auth_attempt_metrics(Config) -> Host = ?config(rmq_hostname, Config), Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp), @@ -353,8 +271,6 @@ open_and_close_connection(OpnConf) -> {ok, Connection} = amqp10_client:open_connection(OpnConf), {ok, _} = amqp10_client:begin_session(Connection), ok = amqp10_client:close_connection(Connection). -<<<<<<< HEAD -======= % before we can send messages we have to wait for credit from the server wait_for_credit(Sender) -> @@ -427,4 +343,3 @@ receive_message(Receiver, N, Acc) -> after 5000 -> exit(receive_timed_out) end. ->>>>>>> 6a42c8a34a (AMQP 1.0: Support the modified outcome)