From 410526640033b15f06cc9a709aaedb831787e426 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Thu, 19 Jan 2023 15:48:06 +0100 Subject: [PATCH 1/2] Do not record trackers It's just for debugging. --- .../java/com/rabbitmq/stream/impl/ConsumersCoordinator.java | 2 +- .../java/com/rabbitmq/stream/impl/ProducersCoordinator.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java b/src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java index 5603fac54f..6025b13fcb 100644 --- a/src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java +++ b/src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java @@ -84,7 +84,7 @@ class ConsumersCoordinator { private final NavigableSet managers = new ConcurrentSkipListSet<>(); private final AtomicLong trackerIdSequence = new AtomicLong(0); - private final boolean debug = true; + private final boolean debug = false; private final List trackers = new CopyOnWriteArrayList<>(); ConsumersCoordinator( diff --git a/src/main/java/com/rabbitmq/stream/impl/ProducersCoordinator.java b/src/main/java/com/rabbitmq/stream/impl/ProducersCoordinator.java index 9eda0b91d0..109fc434bc 100644 --- a/src/main/java/com/rabbitmq/stream/impl/ProducersCoordinator.java +++ b/src/main/java/com/rabbitmq/stream/impl/ProducersCoordinator.java @@ -68,7 +68,7 @@ class ProducersCoordinator { private final AtomicLong managerIdSequence = new AtomicLong(0); private final NavigableSet managers = new ConcurrentSkipListSet<>(); private final AtomicLong trackerIdSequence = new AtomicLong(0); - private final boolean debug = true; + private final boolean debug = false; private final List producerTrackers = new CopyOnWriteArrayList<>(); ProducersCoordinator( From 57b074fad0ddccd999cdfc46dbf565957581f770 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Thu, 19 Jan 2023 16:18:34 +0100 Subject: [PATCH 2/2] Speed up exit when --time is used We don't close the consumers, publishers, and environment, to be as close as possible to the target duration of the run. This should not be done in real applications, as closing steps are not executed, and some behavior like automatic offset tracking are not guaranted to work properly. --- .../java/com/rabbitmq/stream/impl/Utils.java | 8 +- .../stream/perf/MonitoringContext.java | 10 ++- .../rabbitmq/stream/perf/StreamPerfTest.java | 81 ++++++++++++------- .../stream/perf/StreamPerfTestTest.java | 3 +- 4 files changed, 69 insertions(+), 33 deletions(-) diff --git a/src/main/java/com/rabbitmq/stream/impl/Utils.java b/src/main/java/com/rabbitmq/stream/impl/Utils.java index 1f4e742e84..a8a47730f4 100644 --- a/src/main/java/com/rabbitmq/stream/impl/Utils.java +++ b/src/main/java/com/rabbitmq/stream/impl/Utils.java @@ -184,12 +184,18 @@ static T callAndMaybeRetry( String description = format(format, args); int attempt = 0; Exception lastException = null; + long startTime = System.nanoTime(); boolean keepTrying = true; while (keepTrying) { try { attempt++; T result = operation.get(); - LOGGER.debug("Operation '{}' completed after {} attempt(s)", description, attempt); + Duration operationDuration = Duration.ofNanos(System.nanoTime() - startTime); + LOGGER.debug( + "Operation '{}' completed in {} ms after {} attempt(s)", + description, + operationDuration.toMillis(), + attempt); return result; } catch (Exception e) { lastException = e; diff --git a/src/main/java/com/rabbitmq/stream/perf/MonitoringContext.java b/src/main/java/com/rabbitmq/stream/perf/MonitoringContext.java index d40208ab88..54ab38b020 100644 --- a/src/main/java/com/rabbitmq/stream/perf/MonitoringContext.java +++ b/src/main/java/com/rabbitmq/stream/perf/MonitoringContext.java @@ -18,6 +18,7 @@ import com.sun.net.httpserver.HttpServer; import io.micrometer.core.instrument.composite.CompositeMeterRegistry; import java.io.OutputStream; +import java.io.PrintWriter; import java.net.InetAddress; import java.net.InetSocketAddress; import java.nio.charset.StandardCharsets; @@ -37,14 +38,19 @@ class MonitoringContext { private final Environment environment; private final Collection endpoints = Collections.synchronizedList(new ArrayList<>()); + private final PrintWriter out; private volatile HttpServer server; MonitoringContext( - int monitoringPort, CompositeMeterRegistry meterRegistry, Environment environment) { + int monitoringPort, + CompositeMeterRegistry meterRegistry, + Environment environment, + PrintWriter out) { this.monitoringPort = monitoringPort; this.meterRegistry = meterRegistry; this.environment = environment; + this.out = out; } void addHttpEndpoint(String path, String description, HttpHandler handler) { @@ -88,7 +94,7 @@ void start() throws Exception { }); server.start(); - System.out.println("Monitoring endpoints started on http://localhost:" + this.monitoringPort); + this.out.println("Monitoring endpoints started on http://localhost:" + this.monitoringPort); } } diff --git a/src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java b/src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java index 803e1cb209..f8a1ddafe4 100644 --- a/src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java +++ b/src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java @@ -58,6 +58,8 @@ import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufAllocatorMetric; import io.netty.buffer.ByteBufAllocatorMetricProvider; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.SslHandler; import io.netty.util.internal.PlatformDependent; @@ -491,7 +493,7 @@ static int run( return commandLine.execute(args); } - static void versionInformation(PrintStream out) { + static void versionInformation(PrintWriter out) { String lineSeparator = System.getProperty("line.separator"); String version = format( @@ -661,6 +663,8 @@ public Integer call() throws Exception { } } + EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); + EnvironmentBuilder environmentBuilder = Environment.builder() .id("stream-perf-test") @@ -668,6 +672,7 @@ public Integer call() throws Exception { .addressResolver(addrResolver) .scheduledExecutorService(envExecutor) .metricsCollector(metricsCollector) + .eventLoopGroup(eventLoopGroup) .byteBufAllocator(byteBufAllocator) .codec(codec) .maxProducersByConnection(this.producersByConnection) @@ -701,10 +706,20 @@ public Integer call() throws Exception { } Environment environment = environmentBuilder.channelCustomizer(channelCustomizer).build(); - shutdownService.wrap(closeStep("Closing environment(s)", () -> environment.close())); + if (!isRunTimeLimited()) { + shutdownService.wrap( + closeStep( + "Closing Netty event loop group", + () -> { + if (!eventLoopGroup.isShuttingDown() || !eventLoopGroup.isShutdown()) { + eventLoopGroup.shutdownGracefully(0, 0, TimeUnit.SECONDS); + } + })); + shutdownService.wrap(closeStep("Closing environment", () -> environment.close())); + } MonitoringContext monitoringContext = - new MonitoringContext(this.monitoringPort, meterRegistry, environment); + new MonitoringContext(this.monitoringPort, meterRegistry, environment, this.out); this.monitorings.forEach(m -> m.configure(monitoringContext)); monitoringContext.start(); @@ -958,14 +973,16 @@ public Integer call() throws Exception { }) .collect(Collectors.toList())); - shutdownService.wrap( - closeStep( - "Closing consumers", - () -> { - for (Consumer consumer : consumers) { - consumer.close(); - } - })); + if (!isRunTimeLimited()) { + shutdownService.wrap( + closeStep( + "Closing consumers", + () -> { + for (Consumer consumer : consumers) { + consumer.close(); + } + })); + } ExecutorService executorService; if (this.producers > 0) { @@ -980,23 +997,25 @@ public Integer call() throws Exception { executorService = null; } - shutdownService.wrap( - closeStep( - "Closing producers", - () -> { - for (Producer p : producers) { - p.close(); - } - })); + if (!isRunTimeLimited()) { + shutdownService.wrap( + closeStep( + "Closing producers", + () -> { + for (Producer p : producers) { + p.close(); + } + })); - shutdownService.wrap( - closeStep( - "Closing producers executor service", - () -> { - if (executorService != null) { - executorService.shutdownNow(); - } - })); + shutdownService.wrap( + closeStep( + "Closing producers executor service", + () -> { + if (executorService != null) { + executorService.shutdownNow(); + } + })); + } String metricsHeader = "Arguments: " + String.join(" ", arguments); @@ -1008,7 +1027,7 @@ public Integer call() throws Exception { Thread shutdownHook = new Thread(() -> latch.countDown()); Runtime.getRuntime().addShutdownHook(shutdownHook); try { - if (this.time > 0) { + if (isRunTimeLimited()) { latch.await(this.time, TimeUnit.SECONDS); } else { latch.await(); @@ -1091,7 +1110,7 @@ private void maybeDisplayEnvironmentVariablesHelp() { private void maybeDisplayVersion() { if (this.version) { - versionInformation(System.out); + versionInformation(this.out); System.exit(0); } } @@ -1112,6 +1131,10 @@ public String toString() { }; } + private boolean isRunTimeLimited() { + return this.time > 0; + } + public void monitorings(List monitorings) { this.monitorings = monitorings; } diff --git a/src/test/java/com/rabbitmq/stream/perf/StreamPerfTestTest.java b/src/test/java/com/rabbitmq/stream/perf/StreamPerfTestTest.java index 779ccfbff0..da536f2a91 100644 --- a/src/test/java/com/rabbitmq/stream/perf/StreamPerfTestTest.java +++ b/src/test/java/com/rabbitmq/stream/perf/StreamPerfTestTest.java @@ -36,6 +36,7 @@ import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintStream; +import java.io.PrintWriter; import java.net.HttpURLConnection; import java.net.ServerSocket; import java.net.URL; @@ -165,7 +166,7 @@ void helpShouldReturnImmediately() throws Exception { @Test void versionShouldReturnAppropriateInformation() { - StreamPerfTest.versionInformation(new PrintStream(out, true)); + StreamPerfTest.versionInformation(new PrintWriter(out, true)); assertThat(consoleOutput()).contains("RabbitMQ Stream Perf Test"); }