1919import com .google .common .util .concurrent .RateLimiter ;
2020import com .rabbitmq .stream .*;
2121import com .rabbitmq .stream .metrics .DropwizardMetricsCollector ;
22+ import com .rabbitmq .stream .metrics .MetricsCollector ;
2223import java .time .Duration ;
2324import java .util .*;
2425import java .util .concurrent .Executors ;
2526import java .util .concurrent .ScheduledExecutorService ;
2627import java .util .concurrent .TimeUnit ;
2728import java .util .concurrent .atomic .AtomicBoolean ;
29+ import java .util .concurrent .atomic .AtomicInteger ;
2830import java .util .concurrent .atomic .AtomicLong ;
2931import java .util .stream .IntStream ;
3032
@@ -33,12 +35,12 @@ public class FilteringBenchmark {
3335 static final String stream = "filtering" ;
3436
3537 public static void main (String [] args ) throws Exception {
36- int filterValueCount = 100 ;
37- int filterValueSubsetCount = 40 ;
38+ int filterValueCount = 200 ;
39+ int filterValueSubsetCount = 80 ;
3840 int rate = 100_000 ;
39- int filterSize = 255 ;
40- int batchSize = 1 ;
41- int maxUnconfirmedMessages = 1 ;
41+ int filterSize = 64 ;
42+ int batchSize = 100 ;
43+ int maxUnconfirmedMessages = 10_000 ;
4244
4345 Duration publishingDuration = Duration .ofSeconds (10 );
4446 Duration publishingCycle = Duration .ofSeconds (1 );
@@ -108,10 +110,9 @@ public static void main(String[] args) throws Exception {
108110 List <String > values = filterValues .subList (0 , 10 );
109111 for (String filterValue : values ) {
110112 Duration timeout = Duration .ofSeconds (30 );
111- long start = System .nanoTime ();
112113 System .out .printf ("For filter value %s%n" , filterValue );
113114 MetricRegistry registry = new MetricRegistry ();
114- DropwizardMetricsCollector collector = new DropwizardMetricsCollector (registry );
115+ MetricsCollector collector = new DropwizardMetricsCollector (registry );
115116 AtomicLong unfilteredTargetMessageCount = new AtomicLong (0 );
116117 Duration unfilteredDuration ;
117118 try (Environment e = Environment .builder ().metricsCollector (collector ).build ()) {
@@ -141,22 +142,55 @@ public static void main(String[] args) throws Exception {
141142 long unfilteredMessageCount =
142143 registry .getMeters ().get ("rabbitmq.stream.consumed" ).getCount ();
143144
145+ AtomicInteger chunkFilteredMessages = new AtomicInteger (0 );
146+ AtomicInteger chunkMessageCount = new AtomicInteger (0 );
147+ AtomicInteger chunkWithNoMessagesCount = new AtomicInteger (0 );
148+ AtomicBoolean firstChunk = new AtomicBoolean (true );
149+ AtomicLong droppedMessages = new AtomicLong (0 );
144150 registry = new MetricRegistry ();
145151 collector = new DropwizardMetricsCollector (registry );
152+ collector =
153+ new DelegatingMetricsCollector (collector ) {
154+
155+ @ Override
156+ public void chunk (int entriesCount ) {
157+ if (firstChunk .get ()) {
158+ firstChunk .set (false );
159+ } else {
160+ if (chunkMessageCount .get () == chunkFilteredMessages .get ()) {
161+ chunkWithNoMessagesCount .incrementAndGet ();
162+ }
163+ chunkFilteredMessages .set (0 );
164+ chunkMessageCount .set (entriesCount );
165+ }
166+ super .chunk (entriesCount );
167+ }
168+ };
146169 AtomicLong filteredTargetMessageCount = new AtomicLong (0 );
147170 Duration filteredDuration ;
148171 try (Environment e = Environment .builder ().metricsCollector (collector ).build ()) {
149172 AtomicBoolean hasReceivedSomething = new AtomicBoolean (false );
150173 AtomicLong lastReceived = new AtomicLong (0 );
151174 long s = System .nanoTime ();
175+ AtomicLong chunkId = new AtomicLong (-1 );
152176 e .consumerBuilder ().stream (stream )
153177 .offset (OffsetSpecification .first ())
154178 .filter ()
155179 .values (filterValue )
156- .postFilter (msg -> filterValue .equals (msg .getProperties ().getTo ()))
180+ .postFilter (
181+ msg -> {
182+ boolean shouldPass = filterValue .equals (msg .getProperties ().getTo ());
183+ if (!shouldPass ) {
184+ droppedMessages .getAndIncrement ();
185+ chunkFilteredMessages .getAndIncrement ();
186+ }
187+ return shouldPass ;
188+ })
157189 .builder ()
158190 .messageHandler (
159191 (ctx , msg ) -> {
192+ if (chunkId .get () == -1 || chunkId .get () != ctx .committedChunkId ()) {}
193+
160194 hasReceivedSomething .set (true );
161195 lastReceived .set (System .nanoTime ());
162196 filteredTargetMessageCount .getAndIncrement ();
@@ -183,10 +217,70 @@ public static void main(String[] args) throws Exception {
183217 unfilteredMessageCount ,
184218 filteredMessageCount ,
185219 (unfilteredMessageCount - filteredMessageCount ) * 100 / unfilteredMessageCount );
220+ System .out .printf (
221+ "chunk without matching messages %d / %d, dropped messages %d / %d%n" ,
222+ chunkWithNoMessagesCount .get (),
223+ filteredChunkCount ,
224+ droppedMessages .getAndIncrement (),
225+ filteredMessageCount );
186226 }
187227
188228 } finally {
189229 scheduledExecutorService .shutdownNow ();
190230 }
191231 }
232+
233+ private static class DelegatingMetricsCollector implements MetricsCollector {
234+
235+ private final MetricsCollector delegate ;
236+
237+ private DelegatingMetricsCollector (MetricsCollector delegate ) {
238+ this .delegate = delegate ;
239+ }
240+
241+ @ Override
242+ public void openConnection () {
243+ this .delegate .openConnection ();
244+ }
245+
246+ @ Override
247+ public void closeConnection () {
248+ this .delegate .closeConnection ();
249+ }
250+
251+ @ Override
252+ public void publish (int count ) {
253+ this .delegate .publish (count );
254+ }
255+
256+ @ Override
257+ public void publishConfirm (int count ) {
258+ this .delegate .publishConfirm (count );
259+ }
260+
261+ @ Override
262+ public void publishError (int count ) {
263+ this .delegate .publishError (count );
264+ }
265+
266+ @ Override
267+ public void chunk (int entriesCount ) {
268+ this .delegate .chunk (entriesCount );
269+ }
270+
271+ @ Override
272+ public void consume (long count ) {
273+ this .delegate .consume (count );
274+ }
275+
276+ @ Override
277+ public void writtenBytes (int writtenBytes ) {
278+ this .delegate .writtenBytes (writtenBytes );
279+ }
280+
281+ @ Override
282+ public void readBytes (int readBytes ) {
283+ this .delegate .readBytes (readBytes );
284+ }
285+ }
192286}
0 commit comments