Skip to content

Commit b4e9250

Browse files
committed
Add filtering benchmark
1 parent 0c5e392 commit b4e9250

File tree

1 file changed

+192
-0
lines changed

1 file changed

+192
-0
lines changed
Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
// Copyright (c) 2023 VMware, Inc. or its affiliates. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
4+
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
5+
// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
6+
// please see LICENSE-APACHE2.
7+
//
8+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
9+
// either express or implied. See the LICENSE file for specific language governing
10+
// rights and limitations of this software.
11+
//
12+
// If you have any questions regarding licensing, please contact us at
13+
14+
package com.rabbitmq.stream.benchmark;
15+
16+
import static com.rabbitmq.stream.impl.TestUtils.waitAtMost;
17+
18+
import com.codahale.metrics.MetricRegistry;
19+
import com.google.common.util.concurrent.RateLimiter;
20+
import com.rabbitmq.stream.*;
21+
import com.rabbitmq.stream.metrics.DropwizardMetricsCollector;
22+
import java.time.Duration;
23+
import java.util.*;
24+
import java.util.concurrent.Executors;
25+
import java.util.concurrent.ScheduledExecutorService;
26+
import java.util.concurrent.TimeUnit;
27+
import java.util.concurrent.atomic.AtomicBoolean;
28+
import java.util.concurrent.atomic.AtomicLong;
29+
import java.util.stream.IntStream;
30+
31+
public class FilteringBenchmark {
32+
33+
static final String stream = "filtering";
34+
35+
public static void main(String[] args) throws Exception {
36+
int filterValueCount = 100;
37+
int filterValueSubsetCount = 40;
38+
int rate = 100_000;
39+
int filterSize = 255;
40+
int batchSize = 1;
41+
int maxUnconfirmedMessages = 1;
42+
43+
Duration publishingDuration = Duration.ofSeconds(10);
44+
Duration publishingCycle = Duration.ofSeconds(1);
45+
46+
ScheduledExecutorService scheduledExecutorService =
47+
Executors.newSingleThreadScheduledExecutor();
48+
try (Environment env = Environment.builder().build()) {
49+
try {
50+
env.deleteStream(stream);
51+
} catch (StreamException e) {
52+
// OK
53+
}
54+
env.streamCreator().stream(stream).filterSize(filterSize).create();
55+
56+
List<String> filterValues = new ArrayList<>(filterValueCount);
57+
IntStream.range(0, filterValueCount)
58+
.forEach(i -> filterValues.add(UUID.randomUUID().toString()));
59+
60+
AtomicLong publishedCount = new AtomicLong(0);
61+
AtomicLong confirmedCount = new AtomicLong(0);
62+
63+
Producer producer =
64+
env.producerBuilder().stream(stream)
65+
.batchSize(batchSize)
66+
.maxUnconfirmedMessages(maxUnconfirmedMessages)
67+
.filterValue(msg -> msg.getProperties().getTo())
68+
.build();
69+
70+
AtomicBoolean keepPublishing = new AtomicBoolean(true);
71+
scheduledExecutorService.schedule(
72+
() -> keepPublishing.set(false), publishingDuration.toMillis(), TimeUnit.MILLISECONDS);
73+
74+
RateLimiter rateLimiter = RateLimiter.create(rate);
75+
76+
Random random = new Random();
77+
ConfirmationHandler confirmationHandler = status -> confirmedCount.getAndIncrement();
78+
System.out.printf(
79+
"Starting test, filter values %s, subset %s, filter size %d%n",
80+
filterValueCount, filterValueSubsetCount, filterSize);
81+
System.out.printf(
82+
"Starting publishing for %d second(s) at rate %d, batch size %d, max unconfirmed messages %d...%n",
83+
publishingDuration.getSeconds(), rate, batchSize, maxUnconfirmedMessages);
84+
while (keepPublishing.get()) {
85+
AtomicBoolean keepPublishingInCycle = new AtomicBoolean(true);
86+
scheduledExecutorService.schedule(
87+
() -> keepPublishingInCycle.set(false),
88+
publishingCycle.toMillis(),
89+
TimeUnit.MILLISECONDS);
90+
Collections.shuffle(filterValues);
91+
List<String> filterValueSubset = filterValues.subList(0, filterValueSubsetCount);
92+
System.out.printf(
93+
"Starting publishing cycle for %d second(s)...%n", publishingCycle.getSeconds());
94+
while (keepPublishingInCycle.get()) {
95+
rateLimiter.acquire(1);
96+
String filterValue = filterValueSubset.get(random.nextInt(filterValueSubsetCount));
97+
producer.send(
98+
producer.messageBuilder().properties().to(filterValue).messageBuilder().build(),
99+
confirmationHandler);
100+
publishedCount.getAndIncrement();
101+
}
102+
}
103+
System.out.println("Done publishing, waiting for all confirmations...");
104+
waitAtMost(() -> publishedCount.get() == confirmedCount.get());
105+
106+
System.out.println("Starting consuming...");
107+
108+
List<String> values = filterValues.subList(0, 10);
109+
for (String filterValue : values) {
110+
Duration timeout = Duration.ofSeconds(30);
111+
long start = System.nanoTime();
112+
System.out.printf("For filter value %s%n", filterValue);
113+
MetricRegistry registry = new MetricRegistry();
114+
DropwizardMetricsCollector collector = new DropwizardMetricsCollector(registry);
115+
AtomicLong unfilteredTargetMessageCount = new AtomicLong(0);
116+
Duration unfilteredDuration;
117+
try (Environment e = Environment.builder().metricsCollector(collector).build()) {
118+
AtomicBoolean hasReceivedSomething = new AtomicBoolean(false);
119+
AtomicLong lastReceived = new AtomicLong(0);
120+
long s = System.nanoTime();
121+
e.consumerBuilder().stream(stream)
122+
.offset(OffsetSpecification.first())
123+
.messageHandler(
124+
(ctx, msg) -> {
125+
hasReceivedSomething.set(true);
126+
lastReceived.set(System.nanoTime());
127+
if (filterValue.equals(msg.getProperties().getTo())) {
128+
unfilteredTargetMessageCount.getAndIncrement();
129+
}
130+
})
131+
.build();
132+
waitAtMost(
133+
timeout,
134+
() ->
135+
hasReceivedSomething.get()
136+
&& System.nanoTime() - lastReceived.get() > Duration.ofSeconds(1).toNanos());
137+
unfilteredDuration = Duration.ofNanos(System.nanoTime() - s);
138+
}
139+
140+
long unfilteredChunkCount = registry.getMeters().get("rabbitmq.stream.chunk").getCount();
141+
long unfilteredMessageCount =
142+
registry.getMeters().get("rabbitmq.stream.consumed").getCount();
143+
144+
registry = new MetricRegistry();
145+
collector = new DropwizardMetricsCollector(registry);
146+
AtomicLong filteredTargetMessageCount = new AtomicLong(0);
147+
Duration filteredDuration;
148+
try (Environment e = Environment.builder().metricsCollector(collector).build()) {
149+
AtomicBoolean hasReceivedSomething = new AtomicBoolean(false);
150+
AtomicLong lastReceived = new AtomicLong(0);
151+
long s = System.nanoTime();
152+
e.consumerBuilder().stream(stream)
153+
.offset(OffsetSpecification.first())
154+
.filter()
155+
.values(filterValue)
156+
.postFilter(msg -> filterValue.equals(msg.getProperties().getTo()))
157+
.builder()
158+
.messageHandler(
159+
(ctx, msg) -> {
160+
hasReceivedSomething.set(true);
161+
lastReceived.set(System.nanoTime());
162+
filteredTargetMessageCount.getAndIncrement();
163+
})
164+
.build();
165+
waitAtMost(
166+
timeout,
167+
() ->
168+
hasReceivedSomething.get()
169+
&& System.nanoTime() - lastReceived.get() > Duration.ofSeconds(1).toNanos());
170+
filteredDuration = Duration.ofNanos(System.nanoTime() - s);
171+
}
172+
long filteredChunkCount = registry.getMeters().get("rabbitmq.stream.chunk").getCount();
173+
long filteredMessageCount = registry.getMeters().get("rabbitmq.stream.consumed").getCount();
174+
System.out.printf(
175+
"consumed in %d / %d ms, target messages %d / %d, chunk count %d / %d (%d %%), messages %d / %d (%d %%)%n",
176+
unfilteredDuration.toMillis(),
177+
filteredDuration.toMillis(),
178+
unfilteredTargetMessageCount.get(),
179+
filteredTargetMessageCount.get(),
180+
unfilteredChunkCount,
181+
filteredChunkCount,
182+
(unfilteredChunkCount - filteredChunkCount) * 100 / unfilteredChunkCount,
183+
unfilteredMessageCount,
184+
filteredMessageCount,
185+
(unfilteredMessageCount - filteredMessageCount) * 100 / unfilteredMessageCount);
186+
}
187+
188+
} finally {
189+
scheduledExecutorService.shutdownNow();
190+
}
191+
}
192+
}

0 commit comments

Comments
 (0)