2020import static org .assertj .core .api .Assertions .assertThat ;
2121import static org .assertj .core .api .Assertions .assertThatThrownBy ;
2222
23- import com .rabbitmq .stream .Environment ;
24- import com .rabbitmq .stream .EnvironmentBuilder ;
25- import com .rabbitmq .stream .Message ;
26- import com .rabbitmq .stream .OffsetSpecification ;
27- import com .rabbitmq .stream .Producer ;
23+ import com .rabbitmq .stream .*;
2824import io .netty .channel .EventLoopGroup ;
2925import io .netty .channel .nio .NioEventLoopGroup ;
3026import java .io .BufferedReader ;
3430import java .net .Socket ;
3531import java .nio .charset .StandardCharsets ;
3632import java .time .Duration ;
37- import java .util .HashMap ;
38- import java .util .List ;
39- import java .util .Map ;
40- import java .util .Set ;
41- import java .util .UUID ;
42- import java .util .concurrent .ConcurrentHashMap ;
43- import java .util .concurrent .CopyOnWriteArrayList ;
44- import java .util .concurrent .CountDownLatch ;
45- import java .util .concurrent .ExecutorService ;
46- import java .util .concurrent .Executors ;
47- import java .util .concurrent .Future ;
48- import java .util .concurrent .TimeoutException ;
33+ import java .util .*;
34+ import java .util .concurrent .*;
4935import java .util .concurrent .atomic .AtomicBoolean ;
5036import java .util .concurrent .atomic .AtomicInteger ;
5137import java .util .concurrent .atomic .AtomicLong ;
5238import java .util .concurrent .atomic .AtomicReference ;
5339import java .util .function .Predicate ;
54- import org .junit .jupiter .api .AfterAll ;
55- import org .junit .jupiter .api .AfterEach ;
56- import org .junit .jupiter .api .BeforeAll ;
57- import org .junit .jupiter .api .BeforeEach ;
58- import org .junit .jupiter .api .Test ;
40+ import java .util .function .Supplier ;
41+ import org .junit .jupiter .api .*;
5942import org .junit .jupiter .api .extension .ExtendWith ;
6043
6144@ ExtendWith (TestUtils .StreamTestInfrastructureExtension .class )
@@ -64,6 +47,8 @@ public class StompInteroperabilityTest {
6447
6548 public static final String MESSAGE_ID = "message-id" ;
6649 public static final String X_STREAM_OFFSET = "x-stream-offset" ;
50+ public static final String X_STREAM_FILTER_SIZE_BYTES = "x-stream-filter-size-bytes" ;
51+ public static final String X_STREAM_FILTER_VALUE = "x-stream-filter-value" ;
6752 public static final String MESSAGE_COMMAND = "MESSAGE" ;
6853 public static final String ACK_COMMAND = "ACK" ;
6954 private static final String NEW_LINE = "\n " ;
@@ -109,20 +94,19 @@ static long offset(String line) {
10994 void init () throws Exception {
11095 environmentBuilder = Environment .builder ();
11196 env = environmentBuilder .netty ().eventLoopGroup (eventLoopGroup ).environmentBuilder ().build ();
112- socket = new Socket ("localhost" , 61613 );
113- out = socket .getOutputStream ();
114- in = new BufferedReader (new InputStreamReader (socket .getInputStream ()));
11597 executorService = Executors .newSingleThreadExecutor ();
11698 }
11799
118100 @ AfterEach
119101 void tearDown () throws Exception {
120102 env .close ();
121- socket .close ();
122103 executorService .shutdownNow ();
123104 }
124105
125106 void stompConnect () throws Exception {
107+ socket = new Socket ("localhost" , 61613 );
108+ out = socket .getOutputStream ();
109+ in = new BufferedReader (new InputStreamReader (socket .getInputStream ()));
126110 byte [] connect =
127111 frameBuilder ()
128112 .command ("CONNECT" )
@@ -139,6 +123,28 @@ void stompConnect() throws Exception {
139123 }
140124 }
141125
126+ void stompDisconnect () throws Exception {
127+ String receipt = UUID .randomUUID ().toString ();
128+ byte [] connect =
129+ frameBuilder ()
130+ .command ("DISCONNECT" )
131+ .header ("receipt" , receipt )
132+ .header ("passcode" , "guest" )
133+ .build ();
134+ out .write (connect );
135+ waitForReceipt (receipt );
136+ socket .close ();
137+ }
138+
139+ void waitForReceipt (String receipt ) throws Exception {
140+ AtomicBoolean gotReceipt = new AtomicBoolean (false );
141+ read (
142+ line -> {
143+ gotReceipt .compareAndSet (false , line .contains (receipt ));
144+ return line .equals (NULL ) && gotReceipt .get ();
145+ });
146+ }
147+
142148 void read (Predicate <String > condition ) throws Exception {
143149 read (condition , Duration .ofSeconds (10 ));
144150 }
@@ -181,13 +187,7 @@ void publishToStompDestinationConsumeFromStream() throws Exception {
181187 .build ();
182188
183189 out .write (frame );
184-
185- AtomicBoolean gotReceipt = new AtomicBoolean (false );
186- read (
187- line -> {
188- gotReceipt .compareAndSet (false , line .contains (receipt ));
189- return line .equals (NULL ) && gotReceipt .get ();
190- });
190+ waitForReceipt (receipt );
191191
192192 CountDownLatch latch = new CountDownLatch (1 );
193193 AtomicReference <Message > messageReference = new AtomicReference <>();
@@ -215,34 +215,39 @@ void publishToStompDestinationConsumeFromStream() throws Exception {
215215
216216 assertThat (message .getMessageAnnotations ().get ("x-routing-key" )).isEqualTo (stream );
217217 assertThat (message .getMessageAnnotations ().get ("x-exchange" )).isEqualTo ("" );
218+ stompDisconnect ();
218219 }
219220
220221 void stompSubscribe (String stream , String ack , int prefetchCount ) throws Exception {
221- stompSubscribe (stream , ack , prefetchCount , null );
222+ stompSubscribe (stream , ack , prefetchCount , null , Collections . emptyMap () );
222223 }
223224
224225 void stompSubscribe (String stream , String ack , int prefetchCount , String offset )
225226 throws Exception {
226- String receipt = UUID .randomUUID ().toString ();
227- FrameBuilder builder =
228- frameBuilder ()
229- .command ("SUBSCRIBE" )
230- .header ("id" , "0" )
231- .header ("destination" , "/amq/queue/" + stream )
232- .header ("ack" , ack )
233- .header ("prefetch-count" , String .valueOf (prefetchCount ))
234- .header ("receipt" , receipt );
227+ stompSubscribe (stream , ack , prefetchCount , offset , Collections .emptyMap ());
228+ }
235229
230+ void stompSubscribe (
231+ String stream , String ack , int prefetchCount , String offset , Map <String , String > headers )
232+ throws Exception {
233+ String receipt = UUID .randomUUID ().toString ();
234+ Map <String , String > defaultHeaders = new LinkedHashMap <>();
235+ defaultHeaders .put ("id" , "0" );
236+ defaultHeaders .put ("destination" , "/amq/queue/" + stream );
237+ defaultHeaders .put ("ack" , ack );
238+ defaultHeaders .put ("prefetch-count" , String .valueOf (prefetchCount ));
239+ defaultHeaders .put ("receipt" , receipt );
236240 if (offset != null ) {
237- builder . header ("x-stream-offset" , offset );
241+ defaultHeaders . put ("x-stream-offset" , offset );
238242 }
243+ defaultHeaders .putAll (headers );
244+
245+ FrameBuilder builder = frameBuilder ().command ("SUBSCRIBE" );
246+
247+ defaultHeaders .forEach (builder ::header );
248+
239249 out .write (builder .build ());
240- AtomicBoolean gotReceipt = new AtomicBoolean (false );
241- read (
242- line -> {
243- gotReceipt .compareAndSet (false , line .contains (receipt ));
244- return line .equals (NULL ) && gotReceipt .get ();
245- });
250+ waitForReceipt (receipt );
246251 }
247252
248253 @ Test
@@ -309,6 +314,8 @@ void publishToStreamConsumeFromStomp() throws Exception {
309314 assertThat (headers .get ("timestamp" )).isEqualTo ("1000" ); // in seconds
310315 assertThat (headers .get ("some-header" )).isEqualTo ("some header value" );
311316 assertThat (headers .get ("x-stream-offset" )).isNotNull ().isEqualTo ("0" );
317+
318+ stompDisconnect ();
312319 }
313320
314321 @ Test
@@ -350,6 +357,122 @@ void offsetTypeFirstShouldStartConsumingFromBeginning() throws Exception {
350357
351358 assertThat (first .get ()).isEqualTo (0 );
352359 assertThat (last .get ()).isEqualTo (messageCount - 1 );
360+ stompDisconnect ();
361+ }
362+
363+ @ Test
364+ @ DisabledIfFilteringNotSupported
365+ void filtering (TestInfo info ) throws Exception {
366+ int messageCount = 1000 ;
367+ repeatIfFailure (
368+ () -> {
369+ String s = TestUtils .streamName (info );
370+
371+ stompConnect ();
372+ Map <String , String > headers = new LinkedHashMap <>();
373+ headers .put ("destination" , "/topic/stream-queue-test" );
374+ headers .put ("x-queue-name" , s );
375+ headers .put ("x-queue-type" , "stream" );
376+ headers .put (X_STREAM_FILTER_SIZE_BYTES , "32" );
377+ headers .put ("durable" , "true" );
378+ headers .put ("auto-delete" , "false" );
379+ stompSubscribe ("does not matter" , "client" , 10 , "first" , headers );
380+ stompDisconnect ();
381+
382+ List <String > filterValues = new ArrayList <>(Arrays .asList ("apple" , "banana" , "pear" ));
383+ Map <String , AtomicInteger > filterValueCount = new HashMap <>();
384+ Random random = new Random ();
385+
386+ Callable <Void > insert =
387+ () -> {
388+ stompConnect ();
389+ publishStomp (
390+ messageCount ,
391+ "/amq/queue/" + s ,
392+ () -> {
393+ String filterValue = filterValues .get (random .nextInt (filterValues .size ()));
394+ filterValueCount
395+ .computeIfAbsent (filterValue , k -> new AtomicInteger ())
396+ .incrementAndGet ();
397+ return filterValue ;
398+ });
399+ stompDisconnect ();
400+ return null ;
401+ };
402+ insert .call ();
403+
404+ // second wave of messages, with only one, new filter value
405+ String newFilterValue = "orange" ;
406+ filterValues .clear ();
407+ filterValues .add (newFilterValue );
408+ insert .call ();
409+
410+ try {
411+ stompConnect ();
412+ int prefetchCount = 10 ;
413+ headers .clear ();
414+ headers .put ("destination" , "/amq/queue/" + s );
415+ headers .put ("x-stream-filter" , newFilterValue );
416+ headers .put ("x-stream-match-unfiltered" , "true" );
417+ stompSubscribe ("does not matter" , "client" , prefetchCount , "first" , headers );
418+
419+ int expectedCount = filterValueCount .get (newFilterValue ).get ();
420+
421+ AtomicInteger receivedMessageCount = new AtomicInteger (0 );
422+ AtomicInteger filteredConsumedMessageCount = new AtomicInteger (0 );
423+ AtomicReference <String > lastMessageId = new AtomicReference <>();
424+ read (
425+ line -> {
426+ if (line .contains (MESSAGE_COMMAND )) {
427+ receivedMessageCount .incrementAndGet ();
428+ }
429+ if (hasHeader (line , MESSAGE_ID )) {
430+ lastMessageId .set (header (line ));
431+ }
432+ if (hasHeader (line , X_STREAM_FILTER_VALUE )) {
433+ String filterValue = header (line );
434+ if (newFilterValue .equals (filterValue )) {
435+ filteredConsumedMessageCount .incrementAndGet ();
436+ }
437+ }
438+ if (line .contains (NULL ) && receivedMessageCount .get () % prefetchCount == 0 ) {
439+ write (
440+ frameBuilder ()
441+ .command (ACK_COMMAND )
442+ .header (MESSAGE_ID , lastMessageId .get ())
443+ .build ());
444+ }
445+
446+ return line .contains (NULL ) && filteredConsumedMessageCount .get () == expectedCount ;
447+ });
448+ assertThat (filteredConsumedMessageCount ).hasValue (expectedCount );
449+ assertThat (receivedMessageCount ).hasValueLessThan (messageCount * 2 );
450+ } finally {
451+ stompDisconnect ();
452+ env .deleteStream (s );
453+ }
454+ });
455+ }
456+
457+ private void publishStomp (
458+ int messageCount , String destination , Supplier <String > filterValueSupplier ) throws Exception {
459+ String messageBody = UUID .randomUUID ().toString ();
460+ for (int i = 0 ; i < messageCount ; i ++) {
461+ String receipt = UUID .randomUUID ().toString ();
462+ byte [] frame =
463+ frameBuilder ()
464+ .command ("SEND" )
465+ .header ("destination" , destination )
466+ .header ("content-type" , "text/plain" )
467+ .header ("content-length" , String .valueOf (messageBody .length ()))
468+ .header ("some-header" , "some header value" )
469+ .header ("receipt" , receipt )
470+ .header (X_STREAM_FILTER_VALUE , filterValueSupplier .get ())
471+ .body (messageBody )
472+ .build ();
473+ out .write (frame );
474+ waitForReceipt (receipt );
475+ }
353476 }
354477
355478 void write (byte [] content ) {
@@ -413,6 +536,7 @@ void offsetTypeLastShouldStartConsumingFromTheLastChunk() throws Exception {
413536
414537 assertThat (first .get ()).isEqualTo (chunkOffset .get ());
415538 assertThat (last .get ()).isEqualTo (lastOffset );
539+ stompDisconnect ();
416540 }
417541
418542 @ Test
@@ -470,6 +594,7 @@ void offsetTypeNextShouldReturnNewPublishedMessages() throws Exception {
470594
471595 assertThat (first .get ()).isEqualTo (firstWaveMessageCount );
472596 assertThat (last .get ()).isEqualTo (lastOffset );
597+ stompDisconnect ();
473598 }
474599
475600 @ Test
@@ -512,6 +637,7 @@ void offsetTypeOffsetShouldStartConsumingFromOffset() throws Exception {
512637
513638 assertThat (first .get ()).isEqualTo (offset );
514639 assertThat (last .get ()).isEqualTo (messageCount - 1 );
640+ stompDisconnect ();
515641 }
516642
517643 @ Test
@@ -570,6 +696,7 @@ void offsetTypeTimestampShouldStartConsumingFromTimestamp() throws Exception {
570696 assertThat (last .get ()).isEqualTo (lastOffset );
571697 consumed .stream ()
572698 .forEach (v -> assertThat (v ).startsWith ("second wave" ).doesNotStartWith ("first wave" ));
699+ stompDisconnect ();
573700 }
574701
575702 private static class FrameBuilder {
0 commit comments