Skip to content

Commit 57b074f

Browse files
committed
Speed up exit when --time is used
We don't close the consumers, publishers, and environment, to be as close as possible to the target duration of the run. This should not be done in real applications, as closing steps are not executed, and some behavior like automatic offset tracking are not guaranted to work properly.
1 parent 4105266 commit 57b074f

File tree

4 files changed

+69
-33
lines changed

4 files changed

+69
-33
lines changed

src/main/java/com/rabbitmq/stream/impl/Utils.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,12 +184,18 @@ static <T> T callAndMaybeRetry(
184184
String description = format(format, args);
185185
int attempt = 0;
186186
Exception lastException = null;
187+
long startTime = System.nanoTime();
187188
boolean keepTrying = true;
188189
while (keepTrying) {
189190
try {
190191
attempt++;
191192
T result = operation.get();
192-
LOGGER.debug("Operation '{}' completed after {} attempt(s)", description, attempt);
193+
Duration operationDuration = Duration.ofNanos(System.nanoTime() - startTime);
194+
LOGGER.debug(
195+
"Operation '{}' completed in {} ms after {} attempt(s)",
196+
description,
197+
operationDuration.toMillis(),
198+
attempt);
193199
return result;
194200
} catch (Exception e) {
195201
lastException = e;

src/main/java/com/rabbitmq/stream/perf/MonitoringContext.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import com.sun.net.httpserver.HttpServer;
1919
import io.micrometer.core.instrument.composite.CompositeMeterRegistry;
2020
import java.io.OutputStream;
21+
import java.io.PrintWriter;
2122
import java.net.InetAddress;
2223
import java.net.InetSocketAddress;
2324
import java.nio.charset.StandardCharsets;
@@ -37,14 +38,19 @@ class MonitoringContext {
3738
private final Environment environment;
3839

3940
private final Collection<Endpoint> endpoints = Collections.synchronizedList(new ArrayList<>());
41+
private final PrintWriter out;
4042

4143
private volatile HttpServer server;
4244

4345
MonitoringContext(
44-
int monitoringPort, CompositeMeterRegistry meterRegistry, Environment environment) {
46+
int monitoringPort,
47+
CompositeMeterRegistry meterRegistry,
48+
Environment environment,
49+
PrintWriter out) {
4550
this.monitoringPort = monitoringPort;
4651
this.meterRegistry = meterRegistry;
4752
this.environment = environment;
53+
this.out = out;
4854
}
4955

5056
void addHttpEndpoint(String path, String description, HttpHandler handler) {
@@ -88,7 +94,7 @@ void start() throws Exception {
8894
});
8995

9096
server.start();
91-
System.out.println("Monitoring endpoints started on http://localhost:" + this.monitoringPort);
97+
this.out.println("Monitoring endpoints started on http://localhost:" + this.monitoringPort);
9298
}
9399
}
94100

src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java

Lines changed: 52 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,8 @@
5858
import io.netty.buffer.ByteBufAllocator;
5959
import io.netty.buffer.ByteBufAllocatorMetric;
6060
import io.netty.buffer.ByteBufAllocatorMetricProvider;
61+
import io.netty.channel.EventLoopGroup;
62+
import io.netty.channel.nio.NioEventLoopGroup;
6163
import io.netty.handler.ssl.SslContextBuilder;
6264
import io.netty.handler.ssl.SslHandler;
6365
import io.netty.util.internal.PlatformDependent;
@@ -491,7 +493,7 @@ static int run(
491493
return commandLine.execute(args);
492494
}
493495

494-
static void versionInformation(PrintStream out) {
496+
static void versionInformation(PrintWriter out) {
495497
String lineSeparator = System.getProperty("line.separator");
496498
String version =
497499
format(
@@ -661,13 +663,16 @@ public Integer call() throws Exception {
661663
}
662664
}
663665

666+
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
667+
664668
EnvironmentBuilder environmentBuilder =
665669
Environment.builder()
666670
.id("stream-perf-test")
667671
.uris(this.uris)
668672
.addressResolver(addrResolver)
669673
.scheduledExecutorService(envExecutor)
670674
.metricsCollector(metricsCollector)
675+
.eventLoopGroup(eventLoopGroup)
671676
.byteBufAllocator(byteBufAllocator)
672677
.codec(codec)
673678
.maxProducersByConnection(this.producersByConnection)
@@ -701,10 +706,20 @@ public Integer call() throws Exception {
701706
}
702707

703708
Environment environment = environmentBuilder.channelCustomizer(channelCustomizer).build();
704-
shutdownService.wrap(closeStep("Closing environment(s)", () -> environment.close()));
709+
if (!isRunTimeLimited()) {
710+
shutdownService.wrap(
711+
closeStep(
712+
"Closing Netty event loop group",
713+
() -> {
714+
if (!eventLoopGroup.isShuttingDown() || !eventLoopGroup.isShutdown()) {
715+
eventLoopGroup.shutdownGracefully(0, 0, TimeUnit.SECONDS);
716+
}
717+
}));
718+
shutdownService.wrap(closeStep("Closing environment", () -> environment.close()));
719+
}
705720

706721
MonitoringContext monitoringContext =
707-
new MonitoringContext(this.monitoringPort, meterRegistry, environment);
722+
new MonitoringContext(this.monitoringPort, meterRegistry, environment, this.out);
708723
this.monitorings.forEach(m -> m.configure(monitoringContext));
709724
monitoringContext.start();
710725

@@ -958,14 +973,16 @@ public Integer call() throws Exception {
958973
})
959974
.collect(Collectors.toList()));
960975

961-
shutdownService.wrap(
962-
closeStep(
963-
"Closing consumers",
964-
() -> {
965-
for (Consumer consumer : consumers) {
966-
consumer.close();
967-
}
968-
}));
976+
if (!isRunTimeLimited()) {
977+
shutdownService.wrap(
978+
closeStep(
979+
"Closing consumers",
980+
() -> {
981+
for (Consumer consumer : consumers) {
982+
consumer.close();
983+
}
984+
}));
985+
}
969986

970987
ExecutorService executorService;
971988
if (this.producers > 0) {
@@ -980,23 +997,25 @@ public Integer call() throws Exception {
980997
executorService = null;
981998
}
982999

983-
shutdownService.wrap(
984-
closeStep(
985-
"Closing producers",
986-
() -> {
987-
for (Producer p : producers) {
988-
p.close();
989-
}
990-
}));
1000+
if (!isRunTimeLimited()) {
1001+
shutdownService.wrap(
1002+
closeStep(
1003+
"Closing producers",
1004+
() -> {
1005+
for (Producer p : producers) {
1006+
p.close();
1007+
}
1008+
}));
9911009

992-
shutdownService.wrap(
993-
closeStep(
994-
"Closing producers executor service",
995-
() -> {
996-
if (executorService != null) {
997-
executorService.shutdownNow();
998-
}
999-
}));
1010+
shutdownService.wrap(
1011+
closeStep(
1012+
"Closing producers executor service",
1013+
() -> {
1014+
if (executorService != null) {
1015+
executorService.shutdownNow();
1016+
}
1017+
}));
1018+
}
10001019

10011020
String metricsHeader = "Arguments: " + String.join(" ", arguments);
10021021

@@ -1008,7 +1027,7 @@ public Integer call() throws Exception {
10081027
Thread shutdownHook = new Thread(() -> latch.countDown());
10091028
Runtime.getRuntime().addShutdownHook(shutdownHook);
10101029
try {
1011-
if (this.time > 0) {
1030+
if (isRunTimeLimited()) {
10121031
latch.await(this.time, TimeUnit.SECONDS);
10131032
} else {
10141033
latch.await();
@@ -1091,7 +1110,7 @@ private void maybeDisplayEnvironmentVariablesHelp() {
10911110

10921111
private void maybeDisplayVersion() {
10931112
if (this.version) {
1094-
versionInformation(System.out);
1113+
versionInformation(this.out);
10951114
System.exit(0);
10961115
}
10971116
}
@@ -1112,6 +1131,10 @@ public String toString() {
11121131
};
11131132
}
11141133

1134+
private boolean isRunTimeLimited() {
1135+
return this.time > 0;
1136+
}
1137+
11151138
public void monitorings(List<Monitoring> monitorings) {
11161139
this.monitorings = monitorings;
11171140
}

src/test/java/com/rabbitmq/stream/perf/StreamPerfTestTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import java.io.IOException;
3737
import java.io.InputStreamReader;
3838
import java.io.PrintStream;
39+
import java.io.PrintWriter;
3940
import java.net.HttpURLConnection;
4041
import java.net.ServerSocket;
4142
import java.net.URL;
@@ -165,7 +166,7 @@ void helpShouldReturnImmediately() throws Exception {
165166

166167
@Test
167168
void versionShouldReturnAppropriateInformation() {
168-
StreamPerfTest.versionInformation(new PrintStream(out, true));
169+
StreamPerfTest.versionInformation(new PrintWriter(out, true));
169170
assertThat(consoleOutput()).contains("RabbitMQ Stream Perf Test");
170171
}
171172

0 commit comments

Comments
 (0)