From 316c552bf761feca5a83cb1796b69b4b86ef8045 Mon Sep 17 00:00:00 2001 From: Andrea Marziali Date: Fri, 11 Jul 2025 10:57:12 +0200 Subject: [PATCH] Calculate client stats also if the span kind is eligible --- .../ddagent/DDAgentFeaturesDiscovery.java | 25 ++++++----- .../metrics/ConflatingMetricsAggregator.java | 10 ++++- .../ConflatingMetricAggregatorTest.groovy | 45 +++++++++++++++++-- 3 files changed, 66 insertions(+), 14 deletions(-) diff --git a/communication/src/main/java/datadog/communication/ddagent/DDAgentFeaturesDiscovery.java b/communication/src/main/java/datadog/communication/ddagent/DDAgentFeaturesDiscovery.java index 22f5f4603ac..f51caeb7da2 100644 --- a/communication/src/main/java/datadog/communication/ddagent/DDAgentFeaturesDiscovery.java +++ b/communication/src/main/java/datadog/communication/ddagent/DDAgentFeaturesDiscovery.java @@ -1,9 +1,9 @@ package datadog.communication.ddagent; import static datadog.communication.serialization.msgpack.MsgPackWriter.FIXARRAY; -import static java.util.Collections.emptyList; +import static java.util.Collections.emptySet; import static java.util.Collections.singletonList; -import static java.util.Collections.unmodifiableList; +import static java.util.Collections.unmodifiableSet; import com.squareup.moshi.JsonAdapter; import com.squareup.moshi.Moshi; @@ -88,8 +88,8 @@ public class DDAgentFeaturesDiscovery implements DroppingPolicy { private volatile String evpProxyEndpoint; private volatile String version; private volatile String telemetryProxyEndpoint; - private volatile List peerTags = emptyList(); - private volatile List spanKindsToComputedStats = emptyList(); + private volatile Set peerTags = emptySet(); + private volatile Set spanKindsToComputedStats = emptySet(); private long lastTimeDiscovered; @@ -123,8 +123,8 @@ private void reset() { version = null; lastTimeDiscovered = 0; telemetryProxyEndpoint = null; - peerTags = emptyList(); - spanKindsToComputedStats = emptyList(); + peerTags = emptySet(); + spanKindsToComputedStats = emptySet(); } /** Run feature discovery, unconditionally. */ @@ -295,11 +295,16 @@ private boolean processInfoResponse(String response) { || Boolean.TRUE.equals(canDrop)); Object peer_tags = map.get("peer_tags"); - peerTags = peer_tags == null ? emptyList() : unmodifiableList((List) peer_tags); + peerTags = + peer_tags instanceof List + ? unmodifiableSet(new HashSet<>((List) peer_tags)) + : emptySet(); Object span_kinds = map.get("span_kinds_stats_computed"); spanKindsToComputedStats = - span_kinds == null ? emptyList() : unmodifiableList((List) span_kinds); + span_kinds instanceof List + ? unmodifiableSet(new HashSet<>((List) span_kinds)) + : emptySet(); } try { state = Strings.sha256(response); @@ -357,11 +362,11 @@ public boolean supportsLongRunning() { return supportsLongRunning; } - public List peerTags() { + public Set peerTags() { return peerTags; } - public List spanKindsToComputedStats() { + public Set spanKindsToComputedStats() { return spanKindsToComputedStats; } diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java index 03b9801be99..1280c0acf0d 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java @@ -2,6 +2,7 @@ import static datadog.communication.ddagent.DDAgentFeaturesDiscovery.V6_METRICS_ENDPOINT; import static datadog.trace.api.Functions.UTF8_ENCODE; +import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND; import static datadog.trace.common.metrics.AggregateMetric.ERROR_TAG; import static datadog.trace.common.metrics.AggregateMetric.TOP_LEVEL_TAG; import static datadog.trace.common.metrics.SignalItem.ReportSignal.REPORT; @@ -230,7 +231,14 @@ public boolean publish(List> trace) { } private boolean shouldComputeMetric(CoreSpan span) { - return (span.isMeasured() || span.isTopLevel()) && span.getDurationNano() > 0; + return (span.isMeasured() || span.isTopLevel() || spanKindEligible(span)) + && span.getDurationNano() > 0; + } + + private boolean spanKindEligible(CoreSpan span) { + final Object spanKind = span.getTag(SPAN_KIND); + // use toString since it could be a CharSequence... + return spanKind != null && features.spanKindsToComputedStats().contains(spanKind.toString()); } private boolean publish(CoreSpan span, boolean isTopLevel) { diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy index bad7eb8f587..3e2875c2993 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy @@ -13,6 +13,7 @@ import java.util.concurrent.TimeUnit import java.util.concurrent.TimeoutException import java.util.function.Supplier +import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND import static java.util.concurrent.TimeUnit.MILLISECONDS import static java.util.concurrent.TimeUnit.SECONDS @@ -32,7 +33,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { Sink sink = Mock(Sink) DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true - WellKnownTags wellKnownTags = new WellKnownTags("runtimeid", "hostname", "env", "service", "version","language") + WellKnownTags wellKnownTags = new WellKnownTags("runtimeid", "hostname", "env", "service", "version", "language") ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator( wellKnownTags, empty, @@ -61,7 +62,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { Sink sink = Mock(Sink) DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true - WellKnownTags wellKnownTags = new WellKnownTags("runtimeid", "hostname", "env", "service", "version","language") + WellKnownTags wellKnownTags = new WellKnownTags("runtimeid", "hostname", "env", "service", "version", "language") ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator( wellKnownTags, [ignoredResourceName].toSet(), @@ -120,6 +121,44 @@ class ConflatingMetricAggregatorTest extends DDSpecification { aggregator.close() } + def "should compute stats for span kind #kind"() { + setup: + MetricWriter writer = Mock(MetricWriter) + Sink sink = Stub(Sink) + DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) + features.supportsMetrics() >> true + features.spanKindsToComputedStats() >> ["client", "server", "producer", "consumer"] + ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, + features, sink, writer, 10, queueSize, reportingInterval, SECONDS) + aggregator.start() + + when: + CountDownLatch latch = new CountDownLatch(1) + def span = Spy(new SimpleSpan("service", "operation", "resource", "type", false, false, false, 0, 100, HTTP_OK)) + span.getTag(SPAN_KIND) >> kind + aggregator.publish([span]) + aggregator.report() + def latchTriggered = latch.await(2, SECONDS) + + then: + latchTriggered == statsComputed + (statsComputed ? 1 : 0) * writer.startBucket(1, _, _) + (statsComputed ? 1 : 0) * writer.add(new MetricKey("resource", "service", "operation", "type", HTTP_OK, false), _) >> { MetricKey key, AggregateMetric value -> + value.getHitCount() == 1 && value.getTopLevelCount() == 0 && value.getDuration() == 100 + } + (statsComputed ? 1 : 0) * writer.finishBucket() >> { latch.countDown() } + + cleanup: + aggregator.close() + + where: + kind | statsComputed + "client" | true + UTF8BytesString.create("server") | true + "internal" | false + null | false + } + def "measured spans do not contribute to top level count"() { setup: MetricWriter writer = Mock(MetricWriter) @@ -472,7 +511,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, features, sink, writer, 10, queueSize, 200, MILLISECONDS) final spans = [ - new SimpleSpan("service" , "operation", "resource", "type", false, true, false, 0, 10, HTTP_OK) + new SimpleSpan("service", "operation", "resource", "type", false, true, false, 0, 10, HTTP_OK) ] aggregator.start()