@@ -81,10 +81,13 @@ groups() ->
8181 stop_classic_queue ,
8282 stop_quorum_queue ,
8383 stop_stream ,
84+ priority_classic_queue ,
85+ priority_quorum_queue ,
8486 consumer_priority_classic_queue ,
8587 consumer_priority_quorum_queue ,
8688 single_active_consumer_classic_queue ,
8789 single_active_consumer_quorum_queue ,
90+ single_active_consumer_priority_quorum_queue ,
8891 single_active_consumer_drain_classic_queue ,
8992 single_active_consumer_drain_quorum_queue ,
9093 detach_requeues_one_session_classic_queue ,
@@ -111,8 +114,6 @@ groups() ->
111114 handshake_timeout ,
112115 credential_expires ,
113116 attach_to_exclusive_queue ,
114- priority_classic_queue ,
115- priority_quorum_queue ,
116117 dead_letter_headers_exchange ,
117118 dead_letter_reject ,
118119 dead_letter_reject_message_order_classic_queue ,
@@ -1965,6 +1966,142 @@ consumer_priority(QType, Config) ->
19651966 ok = end_session_sync (Session ),
19661967 ok = amqp10_client :close_connection (Connection ).
19671968
1969+ single_active_consumer_priority_quorum_queue (Config ) ->
1970+ QType = <<" quorum" >>,
1971+ QName = atom_to_binary (? FUNCTION_NAME ),
1972+ {Connection , Session1 , LinkPair } = init (Config ),
1973+ QProps = #{arguments => #{<<" x-queue-type" >> => {utf8 , QType },
1974+ <<" x-single-active-consumer" >> => true }},
1975+ {ok , #{type := QType }} = rabbitmq_amqp_client :declare_queue (LinkPair , QName , QProps ),
1976+
1977+ % % Send 6 messages.
1978+ Address = rabbitmq_amqp_address :queue (QName ),
1979+ {ok , Sender } = amqp10_client :attach_sender_link (Session1 , <<" test-sender" >>, Address ),
1980+ ok = wait_for_credit (Sender ),
1981+ NumMsgs = 6 ,
1982+ [begin
1983+ Bin = integer_to_binary (N ),
1984+ ok = amqp10_client :send_msg (Sender , amqp10_msg :new (Bin , Bin , true ))
1985+ end || N <- lists :seq (1 , NumMsgs )],
1986+ ok = amqp10_client :detach_link (Sender ),
1987+
1988+ % % The 1st consumer (with default prio 0) will become active.
1989+ {ok , Recv1 } = amqp10_client :attach_receiver_link (
1990+ Session1 , <<" receiver 1" >>, Address , unsettled ),
1991+ receive {amqp10_event , {link , Recv1 , attached }} -> ok
1992+ after 5000 -> ct :fail ({missing_event , ? LINE })
1993+ end ,
1994+
1995+ {ok , Msg1 } = amqp10_client :get_msg (Recv1 ),
1996+ ? assertEqual ([<<" 1" >>], amqp10_msg :body (Msg1 )),
1997+
1998+ % % The 2nd consumer should take over thanks to higher prio.
1999+ {ok , Recv2 } = amqp10_client :attach_receiver_link (
2000+ Session1 , <<" receiver 2" >>, Address , unsettled , none , #{},
2001+ #{<<" rabbitmq:priority" >> => {int , 1 }}),
2002+ receive {amqp10_event , {link , Recv2 , attached }} -> ok
2003+ after 5000 -> ct :fail ({missing_event , ? LINE })
2004+ end ,
2005+ flush (" attched receiver 2" ),
2006+
2007+ % % To ensure in-order processing and to avoid interrupting the 1st consumer during
2008+ % % its long running task processing, neither of the 2 consumers should receive more
2009+ % % messages until the 1st consumer settles all outstanding messages.
2010+ ? assertEqual ({error , timeout }, amqp10_client :get_msg (Recv1 , 5 )),
2011+ ? assertEqual ({error , timeout }, amqp10_client :get_msg (Recv2 , 5 )),
2012+ ok = amqp10_client :accept_msg (Recv1 , Msg1 ),
2013+ receive {amqp10_msg , R1 , Msg2 } ->
2014+ ? assertEqual ([<<" 2" >>], amqp10_msg :body (Msg2 )),
2015+ ? assertEqual (Recv2 , R1 ),
2016+ ok = amqp10_client :accept_msg (Recv2 , Msg2 )
2017+ after 5000 -> ct :fail ({missing_msg , ? LINE })
2018+ end ,
2019+
2020+ % % Attaching with same prio should not take over.
2021+ {ok , Session2 } = amqp10_client :begin_session_sync (Connection ),
2022+ {ok , Recv3 } = amqp10_client :attach_receiver_link (
2023+ Session2 , <<" receiver 3" >>, Address , unsettled , none , #{},
2024+ #{<<" rabbitmq:priority" >> => {int , 1 }}),
2025+ receive {amqp10_event , {link , Recv3 , attached }} -> ok
2026+ after 5000 -> ct :fail ({missing_event , ? LINE })
2027+ end ,
2028+ ? assertEqual ({error , timeout }, amqp10_client :get_msg (Recv3 , 5 )),
2029+ ok = end_session_sync (Session2 ),
2030+
2031+ {ok , Recv4 } = amqp10_client :attach_receiver_link (
2032+ Session1 , <<" receiver 4" >>, Address , unsettled , none , #{},
2033+ #{<<" rabbitmq:priority" >> => {int , 1 }}),
2034+ receive {amqp10_event , {link , Recv4 , attached }} -> ok
2035+ after 5000 -> ct :fail ({missing_event , ? LINE })
2036+ end ,
2037+
2038+ {ok , Recv5 } = amqp10_client :attach_receiver_link (
2039+ Session1 , <<" receiver 5" >>, Address , unsettled , none , #{},
2040+ #{<<" rabbitmq:priority" >> => {int , 1 }}),
2041+ receive {amqp10_event , {link , Recv5 , attached }} -> ok
2042+ after 5000 -> ct :fail ({missing_event , ? LINE })
2043+ end ,
2044+ flush (" attched receivers 4 and 5" ),
2045+
2046+ ok = amqp10_client :flow_link_credit (Recv4 , 1 , never ),
2047+ ok = amqp10_client :flow_link_credit (Recv5 , 2 , never ),
2048+
2049+ % % Stop the active consumer.
2050+ ok = amqp10_client :detach_link (Recv2 ),
2051+ receive {amqp10_event , {link , Recv2 , {detached , normal }}} -> ok
2052+ after 5000 -> ct :fail ({missing_event , ? LINE })
2053+ end ,
2054+
2055+ % % The 5th consumer should become the active one because it is up,
2056+ % % has highest prio (1), and most credits (2).
2057+ receive {amqp10_msg , R2 , Msg3 } ->
2058+ ? assertEqual ([<<" 3" >>], amqp10_msg :body (Msg3 )),
2059+ ? assertEqual (Recv5 , R2 ),
2060+ ok = amqp10_client :accept_msg (Recv5 , Msg3 )
2061+ after 5000 -> ct :fail ({missing_msg , ? LINE })
2062+ end ,
2063+ receive {amqp10_msg , R3 , Msg4 } ->
2064+ ? assertEqual ([<<" 4" >>], amqp10_msg :body (Msg4 )),
2065+ ? assertEqual (Recv5 , R3 ),
2066+ ok = amqp10_client :accept_msg (Recv5 , Msg4 )
2067+ after 5000 -> ct :fail ({missing_msg , ? LINE })
2068+ end ,
2069+
2070+ % % Stop the active consumer.
2071+ ok = amqp10_client :detach_link (Recv5 ),
2072+ receive {amqp10_event , {link , Recv5 , {detached , normal }}} -> ok
2073+ after 5000 -> ct :fail ({missing_event , ? LINE })
2074+ end ,
2075+
2076+ % % The 4th consumer should become the active one because it is up,
2077+ % % has highest prio (1), and most credits (1).
2078+ receive {amqp10_msg , R4 , Msg5 } ->
2079+ ? assertEqual ([<<" 5" >>], amqp10_msg :body (Msg5 )),
2080+ ? assertEqual (Recv4 , R4 ),
2081+ ok = amqp10_client :accept_msg (Recv4 , Msg5 )
2082+ after 5000 -> ct :fail ({missing_msg , ? LINE })
2083+ end ,
2084+
2085+ % % Stop the active consumer.
2086+ ok = amqp10_client :detach_link (Recv4 ),
2087+ receive {amqp10_event , {link , Recv4 , {detached , normal }}} -> ok
2088+ after 5000 -> ct :fail ({missing_event , ? LINE })
2089+ end ,
2090+
2091+ % % The only up consumer left is the 1st one (prio 0) which still has 1 credit.
2092+ receive {amqp10_msg , R5 , Msg6 } ->
2093+ ? assertEqual ([<<" 6" >>], amqp10_msg :body (Msg6 )),
2094+ ? assertEqual (Recv1 , R5 ),
2095+ ok = amqp10_client :accept_msg (Recv1 , Msg6 )
2096+ after 5000 -> ct :fail ({missing_msg , ? LINE })
2097+ end ,
2098+
2099+ ok = amqp10_client :detach_link (Recv1 ),
2100+ {ok , #{message_count := 0 }} = rabbitmq_amqp_client :delete_queue (LinkPair , QName ),
2101+ ok = rabbitmq_amqp_client :detach_management_link_pair_sync (LinkPair ),
2102+ ok = end_session_sync (Session1 ),
2103+ ok = amqp10_client :close_connection (Connection ).
2104+
19682105single_active_consumer_classic_queue (Config ) ->
19692106 single_active_consumer (<<" classic" >>, Config ).
19702107
0 commit comments