@@ -439,6 +439,18 @@ public class StreamPerfTest implements Callable<Integer> {
439439 @ ArgGroup (exclusive = false , multiplicity = "0..1" )
440440 InstanceSyncOptions instanceSyncOptions ;
441441
442+ @ CommandLine .Option (
443+ names = {"--filter-value-set" , "-fvs" },
444+ description = "filter value set for publishers, range (e.g. 1..15) are accepted" ,
445+ converter = Utils .FilterValueSetConverter .class )
446+ private List <String > filterValueSet ;
447+
448+ @ CommandLine .Option (
449+ names = {"--filter-values" , "-fv" },
450+ description = "filter values for consumers" ,
451+ split = "," )
452+ private List <String > filterValues ;
453+
442454 static class InstanceSyncOptions {
443455
444456 @ CommandLine .Option (
@@ -589,7 +601,6 @@ public Integer call() throws Exception {
589601 maybeDisplayVersion ();
590602 maybeDisplayEnvironmentVariablesHelp ();
591603 overridePropertiesWithEnvironmentVariables ();
592-
593604 Codec codec = createCodec (this .codecClass );
594605
595606 ByteBufAllocator byteBufAllocator = ByteBufAllocator .DEFAULT ;
@@ -876,19 +887,43 @@ public Integer call() throws Exception {
876887 producerBuilder .name (producerName ).confirmTimeout (Duration .ZERO );
877888 }
878889
879- java .util .function .Consumer <MessageBuilder > messageBuilderConsumer ;
890+ java .util .function .Consumer <MessageBuilder > messageBuilderConsumerTemp ;
880891 if (this .superStreams ) {
881892 producerBuilder
882893 .superStream (stream )
883894 .routing (msg -> msg .getProperties ().getMessageIdAsString ());
884895 AtomicLong messageIdSequence = new AtomicLong (0 );
885- messageBuilderConsumer =
896+ messageBuilderConsumerTemp =
886897 mg -> mg .properties ().messageId (messageIdSequence .getAndIncrement ());
887898 } else {
888- messageBuilderConsumer = mg -> {};
899+ messageBuilderConsumerTemp = mg -> {};
889900 producerBuilder .stream (stream );
890901 }
891902
903+ if (this .filterValueSet != null && this .filterValueSet .size () > 0 ) {
904+ producerBuilder =
905+ producerBuilder .filterValue (msg -> msg .getProperties ().getTo ());
906+ List <String > values = new ArrayList <>(this .filterValueSet );
907+ AtomicInteger count = new AtomicInteger ();
908+ int subSetSize = Utils .filteringSubSetSize (values .size ());
909+ int messageCountCycle = Utils .filteringPublishingCycle (this .rate );
910+ List <String > subSet = new ArrayList <>(subSetSize );
911+ java .util .function .Consumer <MessageBuilder > filteringMessageBuilderConsumer =
912+ b -> {
913+ if (Integer .remainderUnsigned (
914+ count .getAndIncrement (), messageCountCycle )
915+ == 0 ) {
916+ Collections .shuffle (values );
917+ subSet .clear ();
918+ subSet .addAll (values .subList (0 , subSetSize ));
919+ }
920+ b .properties ()
921+ .to (subSet .get (Integer .remainderUnsigned (count .get (), subSetSize )));
922+ };
923+ messageBuilderConsumerTemp =
924+ messageBuilderConsumerTemp .andThen (filteringMessageBuilderConsumer );
925+ }
926+
892927 Producer producer =
893928 producerBuilder
894929 .subEntrySize (this .subEntrySize )
@@ -898,9 +933,9 @@ public Integer call() throws Exception {
898933 .maxUnconfirmedMessages (this .confirms )
899934 .build ();
900935
901- AtomicLong messageCount = new AtomicLong (0 );
902936 ConfirmationHandler confirmationHandler ;
903937 if (this .confirmLatency ) {
938+ AtomicLong messageCount = new AtomicLong (0 );
904939 final PerformanceMetrics metrics = this .performanceMetrics ;
905940 final int divisor = Utils .downSamplingDivisor (this .rate );
906941 confirmationHandler =
@@ -936,6 +971,8 @@ public Integer call() throws Exception {
936971
937972 producers .add (producer );
938973
974+ java .util .function .Consumer <MessageBuilder > messageBuilderConsumer =
975+ messageBuilderConsumerTemp ;
939976 return (Runnable )
940977 () -> {
941978 final int msgSize = this .messageSize ;
@@ -1046,6 +1083,8 @@ public Integer call() throws Exception {
10461083 }
10471084 });
10481085
1086+ consumerBuilder = maybeConfigureForFiltering (consumerBuilder );
1087+
10491088 Consumer consumer = consumerBuilder .build ();
10501089 return consumer ;
10511090 })
@@ -1122,6 +1161,37 @@ public Integer call() throws Exception {
11221161 return 0 ;
11231162 }
11241163
1164+ private ConsumerBuilder maybeConfigureForFiltering (ConsumerBuilder consumerBuilder ) {
1165+ if (this .filterValues != null && this .filterValues .size () > 0 ) {
1166+ consumerBuilder =
1167+ consumerBuilder .filter ().values (this .filterValues .toArray (new String [0 ])).builder ();
1168+
1169+ if (this .filterValues .size () == 1 ) {
1170+ String filterValue = filterValues .get (0 );
1171+ consumerBuilder =
1172+ consumerBuilder
1173+ .filter ()
1174+ .postFilter (msg -> filterValue .equals (msg .getProperties ().getTo ()))
1175+ .builder ();
1176+ } else {
1177+ consumerBuilder =
1178+ consumerBuilder
1179+ .filter ()
1180+ .postFilter (
1181+ msg -> {
1182+ for (String filterValue : this .filterValues ) {
1183+ if (filterValue .equals (msg .getProperties ().getTo ())) {
1184+ return true ;
1185+ }
1186+ }
1187+ return false ;
1188+ })
1189+ .builder ();
1190+ }
1191+ }
1192+ return consumerBuilder ;
1193+ }
1194+
11251195 private void createStream (Environment environment , String stream ) {
11261196 StreamCreator streamCreator =
11271197 environment .streamCreator ().stream (stream )
0 commit comments