Skip to content

Commit f15e2e4

Browse files
Merge pull request #6228 from rabbitmq/amqp10-modified-outcome
AMQP 1.0: Support the modified outcome
2 parents fac5a9a + 802688a commit f15e2e4

File tree

6 files changed

+122
-23
lines changed

6 files changed

+122
-23
lines changed

deps/amqp10_client/src/amqp10_client.erl

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
detach_link/1,
3333
send_msg/2,
3434
accept_msg/2,
35+
settle_msg/3,
3536
flow_link_credit/3,
3637
flow_link_credit/4,
3738
echo/1,
@@ -335,11 +336,18 @@ send_msg(#link_ref{role = sender, session = Session,
335336

336337
%% @doc Accept a message on a the link referred to be the 'LinkRef'.
337338
-spec accept_msg(link_ref(), amqp10_msg:amqp10_msg()) -> ok.
338-
accept_msg(#link_ref{role = receiver, session = Session}, Msg) ->
339+
accept_msg(LinkRef, Msg) ->
340+
settle_msg(LinkRef, Msg, accepted).
341+
342+
%% @doc Settle a message on a the link referred to be the 'LinkRef' using
343+
%% the chosen delivery state.
344+
-spec settle_msg(link_ref(), amqp10_msg:amqp10_msg(),
345+
amqp10_client_types:delivery_state()) -> ok.
346+
settle_msg(#link_ref{role = receiver,
347+
session = Session}, Msg, Settlement) ->
339348
DeliveryId = amqp10_msg:delivery_id(Msg),
340349
amqp10_client_session:disposition(Session, receiver, DeliveryId,
341-
DeliveryId, true, accepted).
342-
350+
DeliveryId, true, Settlement).
343351
%% @doc Get a single message from a link.
344352
%% Flows a single link credit then awaits delivery or timeout.
345353
-spec get_msg(link_ref()) -> {ok, amqp10_msg:amqp10_msg()} | {error, timeout}.

deps/amqp10_client/src/amqp10_client.hrl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,5 +20,6 @@
2020
-define(DBG(F, A), ok).
2121
-endif.
2222

23-
-record(link_ref, {role :: sender | receiver, session :: pid(),
23+
-record(link_ref, {role :: sender | receiver,
24+
session :: pid(),
2425
link_handle :: non_neg_integer()}).

deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_link_util.erl

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,11 @@
1414

1515
-define(EXCHANGE_SUB_LIFETIME, "delete-on-close").
1616
-define(DEFAULT_OUTCOME, #'v1_0.released'{}).
17-
-define(SUPPORTED_OUTCOMES, [?V_1_0_SYMBOL_ACCEPTED,
18-
?V_1_0_SYMBOL_REJECTED,
19-
?V_1_0_SYMBOL_RELEASED]).
20-
2117
-define(OUTCOMES, [?V_1_0_SYMBOL_ACCEPTED,
2218
?V_1_0_SYMBOL_REJECTED,
2319
?V_1_0_SYMBOL_RELEASED,
2420
?V_1_0_SYMBOL_MODIFIED]).
21+
-define(SUPPORTED_OUTCOMES, ?OUTCOMES).
2522

2623
outcomes(Source) ->
2724
{DefaultOutcome, Outcomes} =

deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_session_process.erl

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,14 @@ handle_control(#'v1_0.disposition'{state = Outcome,
277277
#'v1_0.accepted'{} ->
278278
#'basic.ack'{delivery_tag = DeliveryTag,
279279
multiple = false};
280+
%% we don't care if the client modified the
281+
%% so just treat it as accepted.
282+
%% Some clients send modified instead of accepted
283+
%% when e.g. a client
284+
%% side message TTL expires.
285+
#'v1_0.modified'{} ->
286+
#'basic.ack'{delivery_tag = DeliveryTag,
287+
multiple = false};
280288
#'v1_0.rejected'{} ->
281289
#'basic.reject'{delivery_tag = DeliveryTag,
282290
requeue = false};

deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl

Lines changed: 99 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
-include_lib("eunit/include/eunit.hrl").
1212
-include_lib("amqp_client/include/amqp_client.hrl").
1313

14+
-compile(nowarn_export_all).
1415
-compile(export_all).
1516

1617
all() ->
@@ -22,6 +23,7 @@ all() ->
2223
groups() ->
2324
[
2425
{tests, [], [
26+
reliable_send_receive_with_outcomes,
2527
roundtrip_quorum_queue_with_drain,
2628
message_headers_conversion
2729
]},
@@ -68,6 +70,75 @@ end_per_testcase(Testcase, Config) ->
6870
%%% TESTS
6971
%%%
7072

73+
reliable_send_receive_with_outcomes(Config) ->
74+
Outcomes = [accepted,
75+
modified,
76+
rejected,
77+
released],
78+
[begin
79+
ct:pal("~s testing ~s", [?FUNCTION_NAME, Outcome]),
80+
reliable_send_receive(Config, Outcome)
81+
end || Outcome <- Outcomes],
82+
ok.
83+
84+
reliable_send_receive(Config, Outcome) ->
85+
Container = atom_to_binary(?FUNCTION_NAME, utf8),
86+
OutcomeBin = atom_to_binary(Outcome, utf8),
87+
QName = <<Container/binary, OutcomeBin/binary>>,
88+
%% declare a quorum queue
89+
Ch = rabbit_ct_client_helpers:open_channel(Config, 0),
90+
amqp_channel:call(Ch, #'queue.declare'{queue = QName,
91+
durable = true,
92+
arguments = [{<<"x-queue-type">>,
93+
longstr, <<"quorum">>}]}),
94+
rabbit_ct_client_helpers:close_channel(Ch),
95+
%% reliable send and consume
96+
Host = ?config(rmq_hostname, Config),
97+
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
98+
Address = <<"/amq/queue/", QName/binary>>,
99+
100+
OpnConf = #{address => Host,
101+
port => Port,
102+
container_id => Container,
103+
sasl => {plain, <<"guest">>, <<"guest">>}},
104+
{ok, Connection} = amqp10_client:open_connection(OpnConf),
105+
{ok, Session} = amqp10_client:begin_session(Connection),
106+
SenderLinkName = <<"test-sender">>,
107+
{ok, Sender} = amqp10_client:attach_sender_link(Session,
108+
SenderLinkName,
109+
Address),
110+
ok = wait_for_credit(Sender),
111+
DTag1 = <<"dtag-1">>,
112+
%% create an unsettled message,
113+
%% link will be in "mixed" mode by default
114+
Msg1 = amqp10_msg:new(DTag1, <<"body-1">>, false),
115+
ok = amqp10_client:send_msg(Sender, Msg1),
116+
ok = wait_for_settlement(DTag1),
117+
118+
ok = amqp10_client:detach_link(Sender),
119+
ok = amqp10_client:close_connection(Connection),
120+
flush("post sender close"),
121+
122+
{ok, Connection2} = amqp10_client:open_connection(OpnConf),
123+
{ok, Session2} = amqp10_client:begin_session(Connection2),
124+
ReceiverLinkName = <<"test-receiver">>,
125+
{ok, Receiver} = amqp10_client:attach_receiver_link(Session2,
126+
ReceiverLinkName,
127+
Address,
128+
unsettled),
129+
{ok, Msg} = amqp10_client:get_msg(Receiver),
130+
131+
ct:pal("got ~p", [amqp10_msg:body(Msg)]),
132+
133+
ok = amqp10_client:settle_msg(Receiver, Msg, Outcome),
134+
135+
flush("post accept"),
136+
137+
ok = amqp10_client:detach_link(Receiver),
138+
ok = amqp10_client:close_connection(Connection2),
139+
140+
ok.
141+
71142
roundtrip_quorum_queue_with_drain(Config) ->
72143
Host = ?config(rmq_hostname, Config),
73144
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
@@ -83,7 +154,7 @@ roundtrip_quorum_queue_with_drain(Config) ->
83154
port => Port,
84155
container_id => atom_to_binary(?FUNCTION_NAME, utf8),
85156
sasl => {plain, <<"guest">>, <<"guest">>}},
86-
157+
87158
{ok, Connection} = amqp10_client:open_connection(OpnConf),
88159
{ok, Session} = amqp10_client:begin_session(Connection),
89160
SenderLinkName = <<"test-sender">>,
@@ -141,18 +212,18 @@ message_headers_conversion(Config) ->
141212
amqp_channel:call(Ch, #'queue.declare'{queue = QName,
142213
durable = true,
143214
arguments = [{<<"x-queue-type">>, longstr, <<"quorum">>}]}),
144-
215+
145216
rabbit_ct_broker_helpers:rpc(Config, 0, application, set_env,[rabbitmq_amqp1_0, convert_amqp091_headers_to_app_props, true]),
146217
rabbit_ct_broker_helpers:rpc(Config, 0, application, set_env,[rabbitmq_amqp1_0, convert_app_props_to_amqp091_headers, true]),
147-
218+
148219
OpnConf = #{address => Host,
149220
port => Port,
150221
container_id => atom_to_binary(?FUNCTION_NAME, utf8),
151222
sasl => {plain, <<"guest">>, <<"guest">>}},
152-
223+
153224
{ok, Connection} = amqp10_client:open_connection(OpnConf),
154225
{ok, Session} = amqp10_client:begin_session(Connection),
155-
226+
156227
amqp10_to_amqp091_header_conversion(Session, Ch, QName, Address),
157228

158229
amqp091_to_amqp10_header_conversion(Session, Ch, QName, Address),
@@ -173,7 +244,7 @@ amqp10_to_amqp091_header_conversion(Session,Ch, QName, Address) ->
173244
wait_for_accepts(1),
174245

175246
{ok, Headers} = amqp091_get_msg_headers(Ch, QName),
176-
247+
177248
?assertEqual({bool, true}, rabbit_misc:table_lookup(Headers, <<"x-bool">>)),
178249
?assertEqual({unsignedint, 3}, rabbit_misc:table_lookup(Headers, <<"x-int">>)),
179250
?assertEqual({longstr, <<"string-value">>}, rabbit_misc:table_lookup(Headers, <<"x-string">>)).
@@ -252,22 +323,35 @@ open_and_close_connection(OpnConf) ->
252323
ok = amqp10_client:close_connection(Connection).
253324

254325
% before we can send messages we have to wait for credit from the server
255-
wait_for_credit(Sender) ->
326+
wait_for_credit(Sender) ->
327+
receive
328+
{amqp10_event, {link, Sender, credited}} ->
329+
flush(?FUNCTION_NAME),
330+
ok
331+
after 5000 ->
332+
flush("wait_for_credit timed out"),
333+
ct:fail(credited_timeout)
334+
end.
335+
336+
wait_for_settlement(Tag) ->
256337
receive
257-
{amqp10_event, {link, Sender, credited}} ->
338+
{amqp10_disposition, {accepted, Tag}} ->
339+
flush(?FUNCTION_NAME),
258340
ok
259341
after 5000 ->
260-
flush("Credit timed out"),
261-
exit(credited_timeout)
342+
flush("wait_for_settlement timed out"),
343+
ct:fail(credited_timeout)
262344
end.
263345

264346
wait_for_accepts(0) -> ok;
265-
wait_for_accepts(N) ->
266-
receive
267-
{amqp10_disposition,{accepted,_}} -> wait_for_accepts(N -1)
268-
after 250 ->
269-
ok
347+
wait_for_accepts(N) ->
348+
receive
349+
{amqp10_disposition,{accepted,_}} ->
350+
wait_for_accepts(N -1)
351+
after 250 ->
352+
ok
270353
end.
354+
271355
delete_queue(Config, QName) ->
272356
Ch = rabbit_ct_client_helpers:open_channel(Config, 0),
273357
_ = amqp_channel:call(Ch, #'queue.delete'{queue = QName}),

deps/rabbitmq_amqp1_0/test/system_SUITE_data/fsharp-tests/Program.fs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,7 @@ module Test =
190190
let q = "roundtrip-091-q"
191191
let corr = "corrlation"
192192
let sender = SenderLink(c.Session, q + "-sender" , q)
193+
193194
new Message("hi"B, Header = Header(),
194195
Properties = new Properties(CorrelationId = corr))
195196
|> sender.Send

0 commit comments

Comments
 (0)