diff --git a/src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java b/src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java index 5603fac54f..6025b13fcb 100644 --- a/src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java +++ b/src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java @@ -84,7 +84,7 @@ class ConsumersCoordinator { private final NavigableSet managers = new ConcurrentSkipListSet<>(); private final AtomicLong trackerIdSequence = new AtomicLong(0); - private final boolean debug = true; + private final boolean debug = false; private final List trackers = new CopyOnWriteArrayList<>(); ConsumersCoordinator( diff --git a/src/main/java/com/rabbitmq/stream/impl/ProducersCoordinator.java b/src/main/java/com/rabbitmq/stream/impl/ProducersCoordinator.java index 9eda0b91d0..109fc434bc 100644 --- a/src/main/java/com/rabbitmq/stream/impl/ProducersCoordinator.java +++ b/src/main/java/com/rabbitmq/stream/impl/ProducersCoordinator.java @@ -68,7 +68,7 @@ class ProducersCoordinator { private final AtomicLong managerIdSequence = new AtomicLong(0); private final NavigableSet managers = new ConcurrentSkipListSet<>(); private final AtomicLong trackerIdSequence = new AtomicLong(0); - private final boolean debug = true; + private final boolean debug = false; private final List producerTrackers = new CopyOnWriteArrayList<>(); ProducersCoordinator( diff --git a/src/main/java/com/rabbitmq/stream/impl/Utils.java b/src/main/java/com/rabbitmq/stream/impl/Utils.java index 1f4e742e84..a8a47730f4 100644 --- a/src/main/java/com/rabbitmq/stream/impl/Utils.java +++ b/src/main/java/com/rabbitmq/stream/impl/Utils.java @@ -184,12 +184,18 @@ static T callAndMaybeRetry( String description = format(format, args); int attempt = 0; Exception lastException = null; + long startTime = System.nanoTime(); boolean keepTrying = true; while (keepTrying) { try { attempt++; T result = operation.get(); - LOGGER.debug("Operation '{}' completed after {} attempt(s)", description, attempt); + Duration operationDuration = Duration.ofNanos(System.nanoTime() - startTime); + LOGGER.debug( + "Operation '{}' completed in {} ms after {} attempt(s)", + description, + operationDuration.toMillis(), + attempt); return result; } catch (Exception e) { lastException = e; diff --git a/src/main/java/com/rabbitmq/stream/perf/MonitoringContext.java b/src/main/java/com/rabbitmq/stream/perf/MonitoringContext.java index d40208ab88..54ab38b020 100644 --- a/src/main/java/com/rabbitmq/stream/perf/MonitoringContext.java +++ b/src/main/java/com/rabbitmq/stream/perf/MonitoringContext.java @@ -18,6 +18,7 @@ import com.sun.net.httpserver.HttpServer; import io.micrometer.core.instrument.composite.CompositeMeterRegistry; import java.io.OutputStream; +import java.io.PrintWriter; import java.net.InetAddress; import java.net.InetSocketAddress; import java.nio.charset.StandardCharsets; @@ -37,14 +38,19 @@ class MonitoringContext { private final Environment environment; private final Collection endpoints = Collections.synchronizedList(new ArrayList<>()); + private final PrintWriter out; private volatile HttpServer server; MonitoringContext( - int monitoringPort, CompositeMeterRegistry meterRegistry, Environment environment) { + int monitoringPort, + CompositeMeterRegistry meterRegistry, + Environment environment, + PrintWriter out) { this.monitoringPort = monitoringPort; this.meterRegistry = meterRegistry; this.environment = environment; + this.out = out; } void addHttpEndpoint(String path, String description, HttpHandler handler) { @@ -88,7 +94,7 @@ void start() throws Exception { }); server.start(); - System.out.println("Monitoring endpoints started on http://localhost:" + this.monitoringPort); + this.out.println("Monitoring endpoints started on http://localhost:" + this.monitoringPort); } } diff --git a/src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java b/src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java index 803e1cb209..f8a1ddafe4 100644 --- a/src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java +++ b/src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java @@ -58,6 +58,8 @@ import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufAllocatorMetric; import io.netty.buffer.ByteBufAllocatorMetricProvider; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.SslHandler; import io.netty.util.internal.PlatformDependent; @@ -491,7 +493,7 @@ static int run( return commandLine.execute(args); } - static void versionInformation(PrintStream out) { + static void versionInformation(PrintWriter out) { String lineSeparator = System.getProperty("line.separator"); String version = format( @@ -661,6 +663,8 @@ public Integer call() throws Exception { } } + EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); + EnvironmentBuilder environmentBuilder = Environment.builder() .id("stream-perf-test") @@ -668,6 +672,7 @@ public Integer call() throws Exception { .addressResolver(addrResolver) .scheduledExecutorService(envExecutor) .metricsCollector(metricsCollector) + .eventLoopGroup(eventLoopGroup) .byteBufAllocator(byteBufAllocator) .codec(codec) .maxProducersByConnection(this.producersByConnection) @@ -701,10 +706,20 @@ public Integer call() throws Exception { } Environment environment = environmentBuilder.channelCustomizer(channelCustomizer).build(); - shutdownService.wrap(closeStep("Closing environment(s)", () -> environment.close())); + if (!isRunTimeLimited()) { + shutdownService.wrap( + closeStep( + "Closing Netty event loop group", + () -> { + if (!eventLoopGroup.isShuttingDown() || !eventLoopGroup.isShutdown()) { + eventLoopGroup.shutdownGracefully(0, 0, TimeUnit.SECONDS); + } + })); + shutdownService.wrap(closeStep("Closing environment", () -> environment.close())); + } MonitoringContext monitoringContext = - new MonitoringContext(this.monitoringPort, meterRegistry, environment); + new MonitoringContext(this.monitoringPort, meterRegistry, environment, this.out); this.monitorings.forEach(m -> m.configure(monitoringContext)); monitoringContext.start(); @@ -958,14 +973,16 @@ public Integer call() throws Exception { }) .collect(Collectors.toList())); - shutdownService.wrap( - closeStep( - "Closing consumers", - () -> { - for (Consumer consumer : consumers) { - consumer.close(); - } - })); + if (!isRunTimeLimited()) { + shutdownService.wrap( + closeStep( + "Closing consumers", + () -> { + for (Consumer consumer : consumers) { + consumer.close(); + } + })); + } ExecutorService executorService; if (this.producers > 0) { @@ -980,23 +997,25 @@ public Integer call() throws Exception { executorService = null; } - shutdownService.wrap( - closeStep( - "Closing producers", - () -> { - for (Producer p : producers) { - p.close(); - } - })); + if (!isRunTimeLimited()) { + shutdownService.wrap( + closeStep( + "Closing producers", + () -> { + for (Producer p : producers) { + p.close(); + } + })); - shutdownService.wrap( - closeStep( - "Closing producers executor service", - () -> { - if (executorService != null) { - executorService.shutdownNow(); - } - })); + shutdownService.wrap( + closeStep( + "Closing producers executor service", + () -> { + if (executorService != null) { + executorService.shutdownNow(); + } + })); + } String metricsHeader = "Arguments: " + String.join(" ", arguments); @@ -1008,7 +1027,7 @@ public Integer call() throws Exception { Thread shutdownHook = new Thread(() -> latch.countDown()); Runtime.getRuntime().addShutdownHook(shutdownHook); try { - if (this.time > 0) { + if (isRunTimeLimited()) { latch.await(this.time, TimeUnit.SECONDS); } else { latch.await(); @@ -1091,7 +1110,7 @@ private void maybeDisplayEnvironmentVariablesHelp() { private void maybeDisplayVersion() { if (this.version) { - versionInformation(System.out); + versionInformation(this.out); System.exit(0); } } @@ -1112,6 +1131,10 @@ public String toString() { }; } + private boolean isRunTimeLimited() { + return this.time > 0; + } + public void monitorings(List monitorings) { this.monitorings = monitorings; } diff --git a/src/test/java/com/rabbitmq/stream/perf/StreamPerfTestTest.java b/src/test/java/com/rabbitmq/stream/perf/StreamPerfTestTest.java index 779ccfbff0..da536f2a91 100644 --- a/src/test/java/com/rabbitmq/stream/perf/StreamPerfTestTest.java +++ b/src/test/java/com/rabbitmq/stream/perf/StreamPerfTestTest.java @@ -36,6 +36,7 @@ import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintStream; +import java.io.PrintWriter; import java.net.HttpURLConnection; import java.net.ServerSocket; import java.net.URL; @@ -165,7 +166,7 @@ void helpShouldReturnImmediately() throws Exception { @Test void versionShouldReturnAppropriateInformation() { - StreamPerfTest.versionInformation(new PrintStream(out, true)); + StreamPerfTest.versionInformation(new PrintWriter(out, true)); assertThat(consoleOutput()).contains("RabbitMQ Stream Perf Test"); }