Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package datadog.communication.ddagent;

import static datadog.communication.serialization.msgpack.MsgPackWriter.FIXARRAY;
import static java.util.Collections.emptyList;
import static java.util.Collections.emptySet;
import static java.util.Collections.singletonList;
import static java.util.Collections.unmodifiableList;
import static java.util.Collections.unmodifiableSet;

import com.squareup.moshi.JsonAdapter;
import com.squareup.moshi.Moshi;
Expand Down Expand Up @@ -88,8 +88,8 @@ public class DDAgentFeaturesDiscovery implements DroppingPolicy {
private volatile String evpProxyEndpoint;
private volatile String version;
private volatile String telemetryProxyEndpoint;
private volatile List<String> peerTags = emptyList();
private volatile List<String> spanKindsToComputedStats = emptyList();
private volatile Set<String> peerTags = emptySet();
private volatile Set<String> spanKindsToComputedStats = emptySet();

private long lastTimeDiscovered;

Expand Down Expand Up @@ -123,8 +123,8 @@ private void reset() {
version = null;
lastTimeDiscovered = 0;
telemetryProxyEndpoint = null;
peerTags = emptyList();
spanKindsToComputedStats = emptyList();
peerTags = emptySet();
spanKindsToComputedStats = emptySet();
}

/** Run feature discovery, unconditionally. */
Expand Down Expand Up @@ -295,11 +295,16 @@ private boolean processInfoResponse(String response) {
|| Boolean.TRUE.equals(canDrop));

Object peer_tags = map.get("peer_tags");
peerTags = peer_tags == null ? emptyList() : unmodifiableList((List<String>) peer_tags);
peerTags =
peer_tags instanceof List
? unmodifiableSet(new HashSet<>((List<String>) peer_tags))
: emptySet();

Object span_kinds = map.get("span_kinds_stats_computed");
spanKindsToComputedStats =
span_kinds == null ? emptyList() : unmodifiableList((List<String>) span_kinds);
span_kinds instanceof List
? unmodifiableSet(new HashSet<>((List<String>) span_kinds))
: emptySet();
Comment on lines +305 to +307
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

praise: Indeed that's better to use an instance check if the paylaod ever changes 👍

}
try {
state = Strings.sha256(response);
Expand Down Expand Up @@ -357,11 +362,11 @@ public boolean supportsLongRunning() {
return supportsLongRunning;
}

public List<String> peerTags() {
public Set<String> peerTags() {
return peerTags;
}

public List<String> spanKindsToComputedStats() {
public Set<String> spanKindsToComputedStats() {
return spanKindsToComputedStats;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static datadog.communication.ddagent.DDAgentFeaturesDiscovery.V6_METRICS_ENDPOINT;
import static datadog.trace.api.Functions.UTF8_ENCODE;
import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND;
import static datadog.trace.common.metrics.AggregateMetric.ERROR_TAG;
import static datadog.trace.common.metrics.AggregateMetric.TOP_LEVEL_TAG;
import static datadog.trace.common.metrics.SignalItem.ReportSignal.REPORT;
Expand Down Expand Up @@ -230,7 +231,14 @@ public boolean publish(List<? extends CoreSpan<?>> trace) {
}

private boolean shouldComputeMetric(CoreSpan<?> span) {
return (span.isMeasured() || span.isTopLevel()) && span.getDurationNano() > 0;
return (span.isMeasured() || span.isTopLevel() || spanKindEligible(span))
&& span.getDurationNano() > 0;
}

private boolean spanKindEligible(CoreSpan<?> span) {
final Object spanKind = span.getTag(SPAN_KIND);
// use toString since it could be a CharSequence...
return spanKind != null && features.spanKindsToComputedStats().contains(spanKind.toString());
}

private boolean publish(CoreSpan<?> span, boolean isTopLevel) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException
import java.util.function.Supplier

import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND
import static java.util.concurrent.TimeUnit.MILLISECONDS
import static java.util.concurrent.TimeUnit.SECONDS

Expand All @@ -32,7 +33,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
Sink sink = Mock(Sink)
DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery)
features.supportsMetrics() >> true
WellKnownTags wellKnownTags = new WellKnownTags("runtimeid", "hostname", "env", "service", "version","language")
WellKnownTags wellKnownTags = new WellKnownTags("runtimeid", "hostname", "env", "service", "version", "language")
ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(
wellKnownTags,
empty,
Expand Down Expand Up @@ -61,7 +62,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
Sink sink = Mock(Sink)
DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery)
features.supportsMetrics() >> true
WellKnownTags wellKnownTags = new WellKnownTags("runtimeid", "hostname", "env", "service", "version","language")
WellKnownTags wellKnownTags = new WellKnownTags("runtimeid", "hostname", "env", "service", "version", "language")
ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(
wellKnownTags,
[ignoredResourceName].toSet(),
Expand Down Expand Up @@ -120,6 +121,44 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
aggregator.close()
}

def "should compute stats for span kind #kind"() {
setup:
MetricWriter writer = Mock(MetricWriter)
Sink sink = Stub(Sink)
DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery)
features.supportsMetrics() >> true
features.spanKindsToComputedStats() >> ["client", "server", "producer", "consumer"]
ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty,
features, sink, writer, 10, queueSize, reportingInterval, SECONDS)
aggregator.start()

when:
CountDownLatch latch = new CountDownLatch(1)
def span = Spy(new SimpleSpan("service", "operation", "resource", "type", false, false, false, 0, 100, HTTP_OK))
span.getTag(SPAN_KIND) >> kind
aggregator.publish([span])
aggregator.report()
def latchTriggered = latch.await(2, SECONDS)

then:
latchTriggered == statsComputed
(statsComputed ? 1 : 0) * writer.startBucket(1, _, _)
(statsComputed ? 1 : 0) * writer.add(new MetricKey("resource", "service", "operation", "type", HTTP_OK, false), _) >> { MetricKey key, AggregateMetric value ->
value.getHitCount() == 1 && value.getTopLevelCount() == 0 && value.getDuration() == 100
}
(statsComputed ? 1 : 0) * writer.finishBucket() >> { latch.countDown() }

cleanup:
aggregator.close()

where:
kind | statsComputed
"client" | true
UTF8BytesString.create("server") | true
"internal" | false
null | false
}

def "measured spans do not contribute to top level count"() {
setup:
MetricWriter writer = Mock(MetricWriter)
Expand Down Expand Up @@ -472,7 +511,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty,
features, sink, writer, 10, queueSize, 200, MILLISECONDS)
final spans = [
new SimpleSpan("service" , "operation", "resource", "type", false, true, false, 0, 10, HTTP_OK)
new SimpleSpan("service", "operation", "resource", "type", false, true, false, 0, 10, HTTP_OK)
]
aggregator.start()

Expand Down