Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions deps/amqp10_client/src/amqp10_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
detach_link/1,
send_msg/2,
accept_msg/2,
settle_msg/3,
flow_link_credit/3,
flow_link_credit/4,
echo/1,
Expand Down Expand Up @@ -335,11 +336,18 @@ send_msg(#link_ref{role = sender, session = Session,

%% @doc Accept a message on a the link referred to be the 'LinkRef'.
-spec accept_msg(link_ref(), amqp10_msg:amqp10_msg()) -> ok.
accept_msg(#link_ref{role = receiver, session = Session}, Msg) ->
accept_msg(LinkRef, Msg) ->
settle_msg(LinkRef, Msg, accepted).

%% @doc Settle a message on a the link referred to be the 'LinkRef' using
%% the chosen delivery state.
-spec settle_msg(link_ref(), amqp10_msg:amqp10_msg(),
amqp10_client_types:delivery_state()) -> ok.
settle_msg(#link_ref{role = receiver,
session = Session}, Msg, Settlement) ->
DeliveryId = amqp10_msg:delivery_id(Msg),
amqp10_client_session:disposition(Session, receiver, DeliveryId,
DeliveryId, true, accepted).

DeliveryId, true, Settlement).
%% @doc Get a single message from a link.
%% Flows a single link credit then awaits delivery or timeout.
-spec get_msg(link_ref()) -> {ok, amqp10_msg:amqp10_msg()} | {error, timeout}.
Expand Down
3 changes: 2 additions & 1 deletion deps/amqp10_client/src/amqp10_client.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,6 @@
-define(DBG(F, A), ok).
-endif.

-record(link_ref, {role :: sender | receiver, session :: pid(),
-record(link_ref, {role :: sender | receiver,
session :: pid(),
link_handle :: non_neg_integer()}).
5 changes: 1 addition & 4 deletions deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_link_util.erl
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,11 @@

-define(EXCHANGE_SUB_LIFETIME, "delete-on-close").
-define(DEFAULT_OUTCOME, #'v1_0.released'{}).
-define(SUPPORTED_OUTCOMES, [?V_1_0_SYMBOL_ACCEPTED,
?V_1_0_SYMBOL_REJECTED,
?V_1_0_SYMBOL_RELEASED]).

-define(OUTCOMES, [?V_1_0_SYMBOL_ACCEPTED,
?V_1_0_SYMBOL_REJECTED,
?V_1_0_SYMBOL_RELEASED,
?V_1_0_SYMBOL_MODIFIED]).
-define(SUPPORTED_OUTCOMES, ?OUTCOMES).

outcomes(Source) ->
{DefaultOutcome, Outcomes} =
Expand Down
8 changes: 8 additions & 0 deletions deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_session_process.erl
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,14 @@ handle_control(#'v1_0.disposition'{state = Outcome,
#'v1_0.accepted'{} ->
#'basic.ack'{delivery_tag = DeliveryTag,
multiple = false};
%% we don't care if the client modified the
%% so just treat it as accepted.
%% Some clients send modified instead of accepted
%% when e.g. a client
%% side message TTL expires.
#'v1_0.modified'{} ->
#'basic.ack'{delivery_tag = DeliveryTag,
multiple = false};
#'v1_0.rejected'{} ->
#'basic.reject'{delivery_tag = DeliveryTag,
requeue = false};
Expand Down
114 changes: 99 additions & 15 deletions deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
-include_lib("eunit/include/eunit.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").

-compile(nowarn_export_all).
-compile(export_all).

all() ->
Expand All @@ -22,6 +23,7 @@ all() ->
groups() ->
[
{tests, [], [
reliable_send_receive_with_outcomes,
roundtrip_quorum_queue_with_drain,
message_headers_conversion
]},
Expand Down Expand Up @@ -68,6 +70,75 @@ end_per_testcase(Testcase, Config) ->
%%% TESTS
%%%

reliable_send_receive_with_outcomes(Config) ->
Outcomes = [accepted,
modified,
rejected,
released],
[begin
ct:pal("~s testing ~s", [?FUNCTION_NAME, Outcome]),
reliable_send_receive(Config, Outcome)
end || Outcome <- Outcomes],
ok.

reliable_send_receive(Config, Outcome) ->
Container = atom_to_binary(?FUNCTION_NAME, utf8),
OutcomeBin = atom_to_binary(Outcome, utf8),
QName = <<Container/binary, OutcomeBin/binary>>,
%% declare a quorum queue
Ch = rabbit_ct_client_helpers:open_channel(Config, 0),
amqp_channel:call(Ch, #'queue.declare'{queue = QName,
durable = true,
arguments = [{<<"x-queue-type">>,
longstr, <<"quorum">>}]}),
rabbit_ct_client_helpers:close_channel(Ch),
%% reliable send and consume
Host = ?config(rmq_hostname, Config),
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
Address = <<"/amq/queue/", QName/binary>>,

OpnConf = #{address => Host,
port => Port,
container_id => Container,
sasl => {plain, <<"guest">>, <<"guest">>}},
{ok, Connection} = amqp10_client:open_connection(OpnConf),
{ok, Session} = amqp10_client:begin_session(Connection),
SenderLinkName = <<"test-sender">>,
{ok, Sender} = amqp10_client:attach_sender_link(Session,
SenderLinkName,
Address),
ok = wait_for_credit(Sender),
DTag1 = <<"dtag-1">>,
%% create an unsettled message,
%% link will be in "mixed" mode by default
Msg1 = amqp10_msg:new(DTag1, <<"body-1">>, false),
ok = amqp10_client:send_msg(Sender, Msg1),
ok = wait_for_settlement(DTag1),

ok = amqp10_client:detach_link(Sender),
ok = amqp10_client:close_connection(Connection),
flush("post sender close"),

{ok, Connection2} = amqp10_client:open_connection(OpnConf),
{ok, Session2} = amqp10_client:begin_session(Connection2),
ReceiverLinkName = <<"test-receiver">>,
{ok, Receiver} = amqp10_client:attach_receiver_link(Session2,
ReceiverLinkName,
Address,
unsettled),
{ok, Msg} = amqp10_client:get_msg(Receiver),

ct:pal("got ~p", [amqp10_msg:body(Msg)]),

ok = amqp10_client:settle_msg(Receiver, Msg, Outcome),

flush("post accept"),

ok = amqp10_client:detach_link(Receiver),
ok = amqp10_client:close_connection(Connection2),

ok.

roundtrip_quorum_queue_with_drain(Config) ->
Host = ?config(rmq_hostname, Config),
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
Expand All @@ -83,7 +154,7 @@ roundtrip_quorum_queue_with_drain(Config) ->
port => Port,
container_id => atom_to_binary(?FUNCTION_NAME, utf8),
sasl => {plain, <<"guest">>, <<"guest">>}},

{ok, Connection} = amqp10_client:open_connection(OpnConf),
{ok, Session} = amqp10_client:begin_session(Connection),
SenderLinkName = <<"test-sender">>,
Expand Down Expand Up @@ -141,18 +212,18 @@ message_headers_conversion(Config) ->
amqp_channel:call(Ch, #'queue.declare'{queue = QName,
durable = true,
arguments = [{<<"x-queue-type">>, longstr, <<"quorum">>}]}),

rabbit_ct_broker_helpers:rpc(Config, 0, application, set_env,[rabbitmq_amqp1_0, convert_amqp091_headers_to_app_props, true]),
rabbit_ct_broker_helpers:rpc(Config, 0, application, set_env,[rabbitmq_amqp1_0, convert_app_props_to_amqp091_headers, true]),

OpnConf = #{address => Host,
port => Port,
container_id => atom_to_binary(?FUNCTION_NAME, utf8),
sasl => {plain, <<"guest">>, <<"guest">>}},

{ok, Connection} = amqp10_client:open_connection(OpnConf),
{ok, Session} = amqp10_client:begin_session(Connection),

amqp10_to_amqp091_header_conversion(Session, Ch, QName, Address),

amqp091_to_amqp10_header_conversion(Session, Ch, QName, Address),
Expand All @@ -173,7 +244,7 @@ amqp10_to_amqp091_header_conversion(Session,Ch, QName, Address) ->
wait_for_accepts(1),

{ok, Headers} = amqp091_get_msg_headers(Ch, QName),

?assertEqual({bool, true}, rabbit_misc:table_lookup(Headers, <<"x-bool">>)),
?assertEqual({unsignedint, 3}, rabbit_misc:table_lookup(Headers, <<"x-int">>)),
?assertEqual({longstr, <<"string-value">>}, rabbit_misc:table_lookup(Headers, <<"x-string">>)).
Expand Down Expand Up @@ -252,22 +323,35 @@ open_and_close_connection(OpnConf) ->
ok = amqp10_client:close_connection(Connection).

% before we can send messages we have to wait for credit from the server
wait_for_credit(Sender) ->
wait_for_credit(Sender) ->
receive
{amqp10_event, {link, Sender, credited}} ->
flush(?FUNCTION_NAME),
ok
after 5000 ->
flush("wait_for_credit timed out"),
ct:fail(credited_timeout)
end.

wait_for_settlement(Tag) ->
receive
{amqp10_event, {link, Sender, credited}} ->
{amqp10_disposition, {accepted, Tag}} ->
flush(?FUNCTION_NAME),
ok
after 5000 ->
flush("Credit timed out"),
exit(credited_timeout)
flush("wait_for_settlement timed out"),
ct:fail(credited_timeout)
end.

wait_for_accepts(0) -> ok;
wait_for_accepts(N) ->
receive
{amqp10_disposition,{accepted,_}} -> wait_for_accepts(N -1)
after 250 ->
ok
wait_for_accepts(N) ->
receive
{amqp10_disposition,{accepted,_}} ->
wait_for_accepts(N -1)
after 250 ->
ok
end.

delete_queue(Config, QName) ->
Ch = rabbit_ct_client_helpers:open_channel(Config, 0),
_ = amqp_channel:call(Ch, #'queue.delete'{queue = QName}),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ module Test =
let q = "roundtrip-091-q"
let corr = "corrlation"
let sender = SenderLink(c.Session, q + "-sender" , q)

new Message("hi"B, Header = Header(),
Properties = new Properties(CorrelationId = corr))
|> sender.Send
Expand Down