1818
1919import com .rabbitmq .stream .*;
2020import io .netty .channel .EventLoopGroup ;
21+ import java .time .Duration ;
2122import java .util .*;
2223import java .util .concurrent .ConcurrentHashMap ;
2324import java .util .concurrent .CountDownLatch ;
3738@ ExtendWith (TestUtils .StreamTestInfrastructureExtension .class )
3839public class FilteringTest {
3940
41+ private static final Duration CONDITION_TIMEOUT = Duration .ofSeconds (5 );
42+
4043 static final int messageCount = 10_000 ;
4144
4245 EventLoopGroup eventLoopGroup ;
@@ -62,105 +65,122 @@ void tearDown() throws Exception {
6265 @ ValueSource (strings = "foo" )
6366 @ NullSource
6467 void publishConsume (String producerName ) throws Exception {
65- List <String > filterValues = new ArrayList <>(Arrays .asList ("apple" , "banana" , "pear" ));
66- Map <String , AtomicInteger > filterValueCount = new HashMap <>();
67- Random random = new Random ();
68-
69- Runnable insert =
70- () ->
71- publish (
72- messageCount ,
73- producerName ,
74- () -> {
75- String filterValue = filterValues .get (random .nextInt (filterValues .size ()));
76- filterValueCount
77- .computeIfAbsent (filterValue , k -> new AtomicInteger ())
78- .incrementAndGet ();
79- return filterValue ;
80- });
81- insert .run ();
82-
83- // second wave of messages, with only one, new filter value
84- String newFilterValue = "orange" ;
85- filterValues .clear ();
86- filterValues .add (newFilterValue );
87- insert .run ();
88-
89- AtomicInteger receivedMessageCount = new AtomicInteger (0 );
90- AtomicInteger filteredConsumedMessageCount = new AtomicInteger (0 );
91- consumerBuilder ()
92- .filter ()
93- .values (newFilterValue )
94- .postFilter (
95- m -> {
96- receivedMessageCount .incrementAndGet ();
97- return newFilterValue .equals (m .getProperties ().getGroupId ());
98- })
99- .builder ()
100- .messageHandler ((context , message ) -> filteredConsumedMessageCount .incrementAndGet ())
101- .build ();
102-
103- int expectedCount = filterValueCount .get (newFilterValue ).get ();
104- waitAtMost (() -> filteredConsumedMessageCount .get () == expectedCount );
105- assertThat (receivedMessageCount ).hasValueLessThan (messageCount * 2 );
68+ repeatIfFailure (
69+ () -> {
70+ List <String > filterValues = new ArrayList <>(Arrays .asList ("apple" , "banana" , "pear" ));
71+ Map <String , AtomicInteger > filterValueCount = new HashMap <>();
72+ Random random = new Random ();
73+
74+ Runnable insert =
75+ () ->
76+ publish (
77+ messageCount ,
78+ producerName ,
79+ () -> {
80+ String filterValue = filterValues .get (random .nextInt (filterValues .size ()));
81+ filterValueCount
82+ .computeIfAbsent (filterValue , k -> new AtomicInteger ())
83+ .incrementAndGet ();
84+ return filterValue ;
85+ });
86+ insert .run ();
87+
88+ // second wave of messages, with only one, new filter value
89+ String newFilterValue = "orange" ;
90+ filterValues .clear ();
91+ filterValues .add (newFilterValue );
92+ insert .run ();
93+
94+ AtomicInteger receivedMessageCount = new AtomicInteger (0 );
95+ AtomicInteger filteredConsumedMessageCount = new AtomicInteger (0 );
96+ try (Consumer ignored =
97+ consumerBuilder ()
98+ .filter ()
99+ .values (newFilterValue )
100+ .postFilter (
101+ m -> {
102+ receivedMessageCount .incrementAndGet ();
103+ return newFilterValue .equals (m .getProperties ().getGroupId ());
104+ })
105+ .builder ()
106+ .messageHandler (
107+ (context , message ) -> filteredConsumedMessageCount .incrementAndGet ())
108+ .build ()) {
109+ int expectedCount = filterValueCount .get (newFilterValue ).get ();
110+ waitAtMost (
111+ CONDITION_TIMEOUT , () -> filteredConsumedMessageCount .get () == expectedCount );
112+ assertThat (receivedMessageCount ).hasValueLessThan (messageCount * 2 );
113+ }
114+ });
106115 }
107116
108117 @ ParameterizedTest
109118 @ ValueSource (strings = "foo" )
110119 @ NullSource
111- void publishWithNullFilterValuesShouldBePossible (String producerName ) {
112- publish (messageCount , producerName , () -> null );
120+ void publishWithNullFilterValuesShouldBePossible (String producerName ) throws Exception {
121+ repeatIfFailure (
122+ () -> {
123+ publish (messageCount , producerName , () -> null );
113124
114- CountDownLatch consumeLatch = new CountDownLatch (messageCount );
115- consumerBuilder ().messageHandler ((ctx , msg ) -> consumeLatch .countDown ()).build ();
116- latchAssert (consumeLatch ).completes ();
125+ CountDownLatch consumeLatch = new CountDownLatch (messageCount );
126+ try (Consumer ignored =
127+ consumerBuilder ().messageHandler ((ctx , msg ) -> consumeLatch .countDown ()).build ()) {
128+ latchAssert (consumeLatch ).completes (CONDITION_TIMEOUT );
129+ }
130+ });
117131 }
118132
119133 @ ParameterizedTest
120134 @ CsvSource ({"foo,true" , "foo,false" , ",true" , ",false" })
121135 void matchUnfilteredShouldReturnNullFilteredValueAndFilteredValues (
122136 String producerName , boolean matchUnfiltered ) throws Exception {
123- publish (messageCount , producerName , () -> null );
124-
125- List <String > filterValues = new ArrayList <>(Arrays .asList ("apple" , "banana" , "pear" ));
126- Map <String , AtomicInteger > filterValueCount = new HashMap <>();
127- Random random = new Random ();
128- publish (
129- messageCount ,
130- producerName ,
137+ repeatIfFailure (
131138 () -> {
132- String filterValue = filterValues .get (random .nextInt (filterValues .size ()));
133- filterValueCount .computeIfAbsent (filterValue , k -> new AtomicInteger ()).incrementAndGet ();
134- return filterValue ;
139+ publish (messageCount , producerName , () -> null );
140+
141+ List <String > filterValues = new ArrayList <>(Arrays .asList ("apple" , "banana" , "pear" ));
142+ Map <String , AtomicInteger > filterValueCount = new HashMap <>();
143+ Random random = new Random ();
144+ publish (
145+ messageCount ,
146+ producerName ,
147+ () -> {
148+ String filterValue = filterValues .get (random .nextInt (filterValues .size ()));
149+ filterValueCount
150+ .computeIfAbsent (filterValue , k -> new AtomicInteger ())
151+ .incrementAndGet ();
152+ return filterValue ;
153+ });
154+
155+ publish (messageCount , producerName , () -> null );
156+
157+ AtomicInteger receivedMessageCount = new AtomicInteger (0 );
158+ Set <String > receivedFilterValues = ConcurrentHashMap .newKeySet ();
159+ try (Consumer ignored =
160+ consumerBuilder ()
161+ .filter ()
162+ .values (filterValues .get (0 ))
163+ .matchUnfiltered (matchUnfiltered )
164+ .postFilter (m -> true )
165+ .builder ()
166+ .messageHandler (
167+ (ctx , msg ) -> {
168+ receivedFilterValues .add (
169+ msg .getProperties ().getGroupId () == null
170+ ? "null"
171+ : msg .getProperties ().getGroupId ());
172+ receivedMessageCount .incrementAndGet ();
173+ })
174+ .build ()) {
175+ int expected ;
176+ if (matchUnfiltered ) {
177+ expected = messageCount * 2 ;
178+ } else {
179+ expected = messageCount ;
180+ }
181+ waitAtMost (CONDITION_TIMEOUT , () -> receivedMessageCount .get () >= expected );
182+ }
135183 });
136-
137- publish (messageCount , producerName , () -> null );
138-
139- AtomicInteger receivedMessageCount = new AtomicInteger (0 );
140- Set <String > receivedFilterValues = ConcurrentHashMap .newKeySet ();
141- consumerBuilder ()
142- .filter ()
143- .values (filterValues .get (0 ))
144- .matchUnfiltered (matchUnfiltered )
145- .postFilter (m -> true )
146- .builder ()
147- .messageHandler (
148- (ctx , msg ) -> {
149- receivedFilterValues .add (
150- msg .getProperties ().getGroupId () == null
151- ? "null"
152- : msg .getProperties ().getGroupId ());
153- receivedMessageCount .incrementAndGet ();
154- })
155- .build ();
156-
157- int expected ;
158- if (matchUnfiltered ) {
159- expected = messageCount * 2 ;
160- } else {
161- expected = messageCount ;
162- }
163- waitAtMost (() -> receivedMessageCount .get () >= expected );
164184 }
165185
166186 private ProducerBuilder producerBuilder () {
@@ -194,7 +214,22 @@ private void publish(
194214 .messageBuilder ()
195215 .build (),
196216 confirmationHandler ));
197- latchAssert (latch ).completes ();
217+ latchAssert (latch ).completes (CONDITION_TIMEOUT );
198218 producer .close ();
199219 }
220+
221+ private static void repeatIfFailure (RunnableWithException test ) throws Exception {
222+ int executionCount = 0 ;
223+ Exception lastException = null ;
224+ while (executionCount < 5 ) {
225+ try {
226+ test .run ();
227+ return ;
228+ } catch (Exception e ) {
229+ executionCount ++;
230+ lastException = e ;
231+ }
232+ }
233+ throw lastException ;
234+ }
200235}
0 commit comments