From 08f7ce492ecfee213ae06094bf846723a37a5473 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Tue, 15 Nov 2022 11:37:44 +0100 Subject: [PATCH] Add percentiles to chunk size metrics in performance tool --- .../metrics/MicrometerMetricsCollector.java | 15 ++++++-- .../rabbitmq/stream/perf/StreamPerfTest.java | 4 +-- .../java/com/rabbitmq/stream/perf/Utils.java | 34 +++++++++++++++++++ .../stream/perf/StreamPerfTestTest.java | 4 ++- 4 files changed, 51 insertions(+), 6 deletions(-) diff --git a/src/main/java/com/rabbitmq/stream/metrics/MicrometerMetricsCollector.java b/src/main/java/com/rabbitmq/stream/metrics/MicrometerMetricsCollector.java index 8feacaf233..4e952169b9 100644 --- a/src/main/java/com/rabbitmq/stream/metrics/MicrometerMetricsCollector.java +++ b/src/main/java/com/rabbitmq/stream/metrics/MicrometerMetricsCollector.java @@ -29,7 +29,7 @@ public class MicrometerMetricsCollector implements MetricsCollector { private final Counter readBytes; private final AtomicLong outstandingPublishConfirm; - private final DistributionSummary chunkSize; + protected final DistributionSummary chunkSize; public MicrometerMetricsCollector(MeterRegistry registry) { this(registry, "rabbitmq.stream"); @@ -50,8 +50,8 @@ public MicrometerMetricsCollector( this.publish = registry.counter(prefix + ".published", tags); this.publishConfirm = registry.counter(prefix + ".confirmed", tags); this.publishError = registry.counter(prefix + ".errored", tags); - this.chunk = registry.counter(prefix + ".chunk", tags); - this.chunkSize = registry.summary(prefix + ".chunk_size", tags); + this.chunk = this.createChunkCounter(registry, prefix, tags); + this.chunkSize = this.createChunkSizeDistributionSummary(registry, prefix, tags); this.consume = registry.counter(prefix + ".consumed", tags); this.writtenBytes = registry.counter(prefix + ".written_bytes", tags); this.readBytes = registry.counter(prefix + ".read_bytes", tags); @@ -59,6 +59,15 @@ public MicrometerMetricsCollector( registry.gauge(prefix + ".outstanding_publish_confirm", tags, new AtomicLong(0)); } + protected Counter createChunkCounter(MeterRegistry registry, String prefix, Iterable tags) { + return Counter.builder(prefix + ".chunk").tags(tags).register(registry); + } + + protected DistributionSummary createChunkSizeDistributionSummary( + MeterRegistry registry, String prefix, Iterable tags) { + return DistributionSummary.builder(prefix + ".chunk_size").tags(tags).register(registry); + } + @Override public void openConnection() { this.connections.incrementAndGet(); diff --git a/src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java b/src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java index 27244c4764..bddead454c 100644 --- a/src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java +++ b/src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java @@ -45,9 +45,9 @@ import com.rabbitmq.stream.compression.Compression; import com.rabbitmq.stream.impl.Client; import com.rabbitmq.stream.metrics.MetricsCollector; -import com.rabbitmq.stream.metrics.MicrometerMetricsCollector; import com.rabbitmq.stream.perf.ShutdownService.CloseCallback; import com.rabbitmq.stream.perf.Utils.NamedThreadFactory; +import com.rabbitmq.stream.perf.Utils.PerformanceMicrometerMetricsCollector; import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.Tag; import io.micrometer.core.instrument.composite.CompositeMeterRegistry; @@ -513,7 +513,7 @@ public Integer call() throws Exception { CompositeMeterRegistry meterRegistry = new CompositeMeterRegistry(); meterRegistry.config().commonTags(this.metricsTags); String metricsPrefix = "rabbitmq.stream"; - this.metricsCollector = new MicrometerMetricsCollector(meterRegistry, metricsPrefix); + this.metricsCollector = new PerformanceMicrometerMetricsCollector(meterRegistry, metricsPrefix); Counter producerConfirm = meterRegistry.counter(metricsPrefix + ".producer_confirmed"); diff --git a/src/main/java/com/rabbitmq/stream/perf/Utils.java b/src/main/java/com/rabbitmq/stream/perf/Utils.java index 549558690b..38cd61d5d2 100644 --- a/src/main/java/com/rabbitmq/stream/perf/Utils.java +++ b/src/main/java/com/rabbitmq/stream/perf/Utils.java @@ -23,7 +23,11 @@ import com.rabbitmq.stream.OffsetSpecification; import com.rabbitmq.stream.StreamCreator.LeaderLocator; import com.rabbitmq.stream.compression.Compression; +import com.rabbitmq.stream.metrics.MicrometerMetricsCollector; import com.sun.management.OperatingSystemMXBean; +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.DistributionSummary; +import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.Tag; import java.lang.reflect.Constructor; import java.lang.reflect.Field; @@ -730,4 +734,34 @@ static Connection amqpConnection( } return connectionFactory.newConnection("stream-perf-test-amqp-connection"); } + + static class PerformanceMicrometerMetricsCollector extends MicrometerMetricsCollector { + + public PerformanceMicrometerMetricsCollector(MeterRegistry registry, String prefix) { + super(registry, prefix); + } + + @Override + protected Counter createChunkCounter( + MeterRegistry registry, String prefix, Iterable tags) { + return null; + } + + @Override + protected DistributionSummary createChunkSizeDistributionSummary( + MeterRegistry registry, String prefix, Iterable tags) { + return DistributionSummary.builder(prefix + ".chunk_size") + .tags(tags) + .description("chunk size") + .publishPercentiles(0.5, 0.75, 0.95, 0.99) + .distributionStatisticExpiry(Duration.ofSeconds(1)) + .serviceLevelObjectives() + .register(registry); + } + + @Override + public void chunk(int entriesCount) { + this.chunkSize.record(entriesCount); + } + } } diff --git a/src/test/java/com/rabbitmq/stream/perf/StreamPerfTestTest.java b/src/test/java/com/rabbitmq/stream/perf/StreamPerfTestTest.java index 5c67082d50..5bc85f4c63 100644 --- a/src/test/java/com/rabbitmq/stream/perf/StreamPerfTestTest.java +++ b/src/test/java/com/rabbitmq/stream/perf/StreamPerfTestTest.java @@ -381,7 +381,9 @@ void monitoringShouldReturnValidEndpoint() throws Exception { () -> { HttpResponse response = httpRequest("http://localhost:" + monitoringPort + "/metrics"); return response.responseCode == 200 - && response.body.contains("# HELP rabbitmq_stream_published_total"); + && response.body.contains("# HELP rabbitmq_stream_published_total") + && response.body.contains("rabbitmq_stream_chunk_size{quantile=\"0.5\",}") + && !response.body.contains("chunk_total"); }); run.cancel(true); waitRunEnds();