diff --git a/dd-trace-core/src/main/java/datadog/trace/core/baggage/BaggagePropagator.java b/dd-trace-core/src/main/java/datadog/trace/core/baggage/BaggagePropagator.java index f3957dbc003..0e345296b32 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/baggage/BaggagePropagator.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/baggage/BaggagePropagator.java @@ -7,6 +7,7 @@ import datadog.context.propagation.CarrierVisitor; import datadog.context.propagation.Propagator; import datadog.trace.api.Config; +import datadog.trace.api.metrics.BaggageMetrics; import datadog.trace.bootstrap.instrumentation.api.AgentSpan; import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext; import datadog.trace.bootstrap.instrumentation.api.Baggage; @@ -26,6 +27,7 @@ public class BaggagePropagator implements Propagator { private static final Logger LOG = LoggerFactory.getLogger(BaggagePropagator.class); private static final PercentEscaper UTF_ESCAPER = PercentEscaper.create(); + private static final BaggageMetrics BAGGAGE_METRICS = BaggageMetrics.getInstance(); static final String BAGGAGE_KEY = "baggage"; private final boolean injectBaggage; private final boolean extractBaggage; @@ -89,11 +91,13 @@ public void inject(Context context, C carrier, CarrierSetter setter) { processedItems++; // reached the max number of baggage items allowed if (processedItems == this.maxItems) { + BAGGAGE_METRICS.onBaggageTruncatedByItemLimit(); break; } // Drop newest k/v pair if adding it leads to exceeding the limit if (currentBytes + escapedKey.size + escapedVal.size + extraBytes > this.maxBytes) { baggageText.setLength(currentBytes); + BAGGAGE_METRICS.onBaggageTruncatedByByteLimit(); break; } currentBytes += escapedKey.size + escapedVal.size + extraBytes; @@ -103,6 +107,9 @@ public void inject(Context context, C carrier, CarrierSetter setter) { // Save header as cache to re-inject it later if baggage did not change baggage.setW3cHeader(headerValue); setter.set(carrier, BAGGAGE_KEY, headerValue); + + // Record successful baggage injection for telemetry + BAGGAGE_METRICS.onBaggageInjected(); } @Override @@ -117,6 +124,9 @@ public Context extract(Context context, C carrier, CarrierVisitor visitor return context; } + // Record successful baggage extraction for telemetry + BAGGAGE_METRICS.onBaggageExtracted(); + // TODO: consider a better way to link baggage with the extracted (legacy) TagContext AgentSpan extractedSpan = AgentSpan.fromContext(context); if (extractedSpan != null) { @@ -158,12 +168,14 @@ private Map parseBaggageHeaders(String input) { if (kvSeparatorInd > end) { LOG.debug( "Dropping baggage headers due to key with no value {}", input.substring(start, end)); + BAGGAGE_METRICS.onBaggageMalformed(); return emptyMap(); } String key = decode(input.substring(start, kvSeparatorInd).trim()); String value = decode(input.substring(kvSeparatorInd + 1, end).trim()); if (key.isEmpty() || value.isEmpty()) { LOG.debug("Dropping baggage headers due to empty k/v {}:{}", key, value); + BAGGAGE_METRICS.onBaggageMalformed(); return emptyMap(); } baggage.put(key, value); diff --git a/dd-trace-core/src/test/groovy/datadog/trace/core/baggage/BaggagePropagatorTelemetryTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/core/baggage/BaggagePropagatorTelemetryTest.groovy new file mode 100644 index 00000000000..7907452c1d7 --- /dev/null +++ b/dd-trace-core/src/test/groovy/datadog/trace/core/baggage/BaggagePropagatorTelemetryTest.groovy @@ -0,0 +1,120 @@ +package datadog.trace.core.baggage + +import datadog.context.Context +import datadog.trace.api.Config +import datadog.trace.api.metrics.BaggageMetrics +import datadog.trace.api.telemetry.CoreMetricCollector +import spock.lang.Specification + +class BaggagePropagatorTelemetryTest extends Specification { + + def "should directly increment baggage metrics"() { + given: + def baggageMetrics = BaggageMetrics.getInstance() + def collector = CoreMetricCollector.getInstance() + + when: + baggageMetrics.onBaggageInjected() + collector.prepareMetrics() + def metrics = collector.drain() + + then: + def baggageMetric = metrics.find { it.metricName == "context_header_style.injected" } + baggageMetric != null + baggageMetric.value >= 1 + baggageMetric.tags.contains("header_style:baggage") + } + + def "should increment telemetry counter when baggage is successfully extracted"() { + given: + def config = Mock(Config) { + isBaggageExtract() >> true + isBaggageInject() >> true + getBaggageMaxItems() >> 64 + getBaggageMaxBytes() >> 8192 + } + def propagator = new BaggagePropagator(config) + def context = Context.root() + def carrier = ["baggage": "key1=value1,key2=value2"] + def visitor = { map, consumer -> + map.each { k, v -> consumer.accept(k, v) } + } + def collector = CoreMetricCollector.getInstance() + + when: + propagator.extract(context, carrier, visitor) + collector.prepareMetrics() + def metrics = collector.drain() + + then: + def baggageMetric = metrics.find { it.metricName == "context_header_style.extracted" } + baggageMetric != null + baggageMetric.value >= 1 + baggageMetric.tags.contains("header_style:baggage") + } + + def "should directly increment all baggage metrics"() { + given: + def baggageMetrics = BaggageMetrics.getInstance() + def collector = CoreMetricCollector.getInstance() + + when: + baggageMetrics.onBaggageInjected() + baggageMetrics.onBaggageMalformed() + baggageMetrics.onBaggageTruncatedByByteLimit() + baggageMetrics.onBaggageTruncatedByItemLimit() + collector.prepareMetrics() + def metrics = collector.drain() + + then: + def injectedMetric = metrics.find { it.metricName == "context_header_style.injected" } + injectedMetric != null + injectedMetric.value == 1 + injectedMetric.tags.contains("header_style:baggage") + + def malformedMetric = metrics.find { it.metricName == "context_header_style.malformed" } + malformedMetric != null + malformedMetric.value == 1 + malformedMetric.tags.contains("header_style:baggage") + + def bytesTruncatedMetric = metrics.find { + it.metricName == "context_header_style.truncated" && + it.tags.contains("truncation_reason:baggage_byte_count_exceeded") + } + bytesTruncatedMetric != null + bytesTruncatedMetric.value == 1 + + def itemsTruncatedMetric = metrics.find { + it.metricName == "context_header_style.truncated" && + it.tags.contains("truncation_reason:baggage_item_count_exceeded") + } + itemsTruncatedMetric != null + itemsTruncatedMetric.value == 1 + } + + def "should not increment telemetry counter when baggage extraction fails"() { + given: + def config = Mock(Config) { + isBaggageExtract() >> true + isBaggageInject() >> true + getBaggageMaxItems() >> 64 + getBaggageMaxBytes() >> 8192 + } + def propagator = new BaggagePropagator(config) + def context = Context.root() + def carrier = [:] // No baggage header + def visitor = { map, consumer -> + map.each { k, v -> consumer.accept(k, v) } + } + def collector = CoreMetricCollector.getInstance() + + when: + propagator.extract(context, carrier, visitor) + collector.prepareMetrics() + def metrics = collector.drain() + + then: + def foundMetrics = metrics.findAll { it.metricName.startsWith("context_header_style.") } + foundMetrics.isEmpty() // No extraction occurred, so no metrics should be created + } +} diff --git a/internal-api/src/main/java/datadog/trace/api/metrics/BaggageMetrics.java b/internal-api/src/main/java/datadog/trace/api/metrics/BaggageMetrics.java new file mode 100644 index 00000000000..c1aa498ccb6 --- /dev/null +++ b/internal-api/src/main/java/datadog/trace/api/metrics/BaggageMetrics.java @@ -0,0 +1,105 @@ +package datadog.trace.api.metrics; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; + +/** Metrics for baggage propagation operations. */ +public class BaggageMetrics { + private static final BaggageMetrics INSTANCE = new BaggageMetrics(); + private final AtomicLong extractedCounter = new AtomicLong(0); + private final AtomicLong injectedCounter = new AtomicLong(0); + private final AtomicLong malformedCounter = new AtomicLong(0); + private final AtomicLong truncatedByteCounter = new AtomicLong(0); + private final AtomicLong truncatedItemCounter = new AtomicLong(0); + private final Collection taggedCounters; + + public static BaggageMetrics getInstance() { + return INSTANCE; + } + + private BaggageMetrics() { + List counters = new ArrayList<>(5); + counters.add( + new TaggedCounter( + "context_header_style.extracted", this.extractedCounter, "header_style:baggage")); + counters.add( + new TaggedCounter( + "context_header_style.injected", this.injectedCounter, "header_style:baggage")); + counters.add( + new TaggedCounter( + "context_header_style.malformed", this.malformedCounter, "header_style:baggage")); + counters.add( + new TaggedCounter( + "context_header_style.truncated", + this.truncatedByteCounter, + "truncation_reason:baggage_byte_count_exceeded")); + counters.add( + new TaggedCounter( + "context_header_style.truncated", + this.truncatedItemCounter, + "truncation_reason:baggage_item_count_exceeded")); + this.taggedCounters = Collections.unmodifiableList(counters); + } + + public void onBaggageExtracted() { + this.extractedCounter.incrementAndGet(); + } + + public void onBaggageInjected() { + this.injectedCounter.incrementAndGet(); + } + + public void onBaggageMalformed() { + this.malformedCounter.incrementAndGet(); + } + + public void onBaggageTruncatedByByteLimit() { + this.truncatedByteCounter.incrementAndGet(); + } + + public void onBaggageTruncatedByItemLimit() { + this.truncatedItemCounter.incrementAndGet(); + } + + public Collection getTaggedCounters() { + return this.taggedCounters; + } + + public static class TaggedCounter implements CoreCounter { + private final String name; + private final AtomicLong counter; + private final String tag; + private long previousCount; + + public TaggedCounter(String name, AtomicLong counter, String tag) { + this.name = name; + this.counter = counter; + this.tag = tag; + } + + @Override + public String getName() { + return this.name; + } + + public String getTag() { + return this.tag; + } + + @Override + public long getValue() { + return counter.get(); + } + + @Override + public long getValueAndReset() { + long count = counter.get(); + long delta = count - previousCount; + previousCount = count; + return delta; + } + } +} diff --git a/internal-api/src/main/java/datadog/trace/api/telemetry/CoreMetricCollector.java b/internal-api/src/main/java/datadog/trace/api/telemetry/CoreMetricCollector.java index 12aac7f74ed..d33fcf1529d 100644 --- a/internal-api/src/main/java/datadog/trace/api/telemetry/CoreMetricCollector.java +++ b/internal-api/src/main/java/datadog/trace/api/telemetry/CoreMetricCollector.java @@ -1,5 +1,6 @@ package datadog.trace.api.telemetry; +import datadog.trace.api.metrics.BaggageMetrics; import datadog.trace.api.metrics.CoreCounter; import datadog.trace.api.metrics.SpanMetricRegistryImpl; import datadog.trace.api.metrics.SpanMetricsImpl; @@ -16,6 +17,7 @@ public class CoreMetricCollector implements MetricCollector metricsQueue; @@ -29,6 +31,7 @@ private CoreMetricCollector() { @Override public void prepareMetrics() { + // Collect span metrics for (SpanMetricsImpl spanMetrics : this.spanMetricRegistry.getSpanMetrics()) { String tag = INTEGRATION_NAME_TAG + spanMetrics.getInstrumentationName(); for (CoreCounter counter : spanMetrics.getCounters()) { @@ -45,6 +48,23 @@ public void prepareMetrics() { } } } + + // Collect baggage metrics + for (BaggageMetrics.TaggedCounter counter : this.baggageMetrics.getTaggedCounters()) { + long value = counter.getValueAndReset(); + if (value == 0) { + // Skip not updated counters + continue; + } + // Use the specific tag for each baggage metric + String tag = counter.getTag(); + CoreMetric metric = + new CoreMetric(METRIC_NAMESPACE, true, counter.getName(), "count", value, tag); + if (!this.metricsQueue.offer(metric)) { + // Stop adding metrics if the queue is full + break; + } + } } @Override diff --git a/internal-api/src/test/groovy/datadog/trace/api/telemetry/CoreMetricCollectorBaggageTest.groovy b/internal-api/src/test/groovy/datadog/trace/api/telemetry/CoreMetricCollectorBaggageTest.groovy new file mode 100644 index 00000000000..14b63e428bb --- /dev/null +++ b/internal-api/src/test/groovy/datadog/trace/api/telemetry/CoreMetricCollectorBaggageTest.groovy @@ -0,0 +1,131 @@ +package datadog.trace.api.telemetry + +import datadog.trace.api.metrics.BaggageMetrics +import spock.lang.Specification + +class CoreMetricCollectorBaggageTest extends Specification { + + def "should collect baggage extraction metrics with header_style tag"() { + given: + def collector = CoreMetricCollector.getInstance() + def baggageMetrics = BaggageMetrics.getInstance() + + when: + baggageMetrics.onBaggageExtracted() + baggageMetrics.onBaggageExtracted() + collector.prepareMetrics() + def metrics = collector.drain() + + then: + def baggageMetric = metrics.find { it.metricName == "context_header_style.extracted" } + baggageMetric != null + baggageMetric.namespace == "tracers" + baggageMetric.type == "count" + baggageMetric.value == 2 + baggageMetric.tags.contains("header_style:baggage") + baggageMetric.common == true + } + + def "should collect baggage injection metrics with header_style tag"() { + given: + def collector = CoreMetricCollector.getInstance() + def baggageMetrics = BaggageMetrics.getInstance() + + when: + baggageMetrics.onBaggageInjected() + collector.prepareMetrics() + def metrics = collector.drain() + + then: + def baggageMetric = metrics.find { it.metricName == "context_header_style.injected" } + baggageMetric != null + baggageMetric.tags.contains("header_style:baggage") + baggageMetric.value == 1 + } + + def "should collect baggage malformed metrics with header_style tag"() { + given: + def collector = CoreMetricCollector.getInstance() + def baggageMetrics = BaggageMetrics.getInstance() + + when: + baggageMetrics.onBaggageMalformed() + collector.prepareMetrics() + def metrics = collector.drain() + + then: + def baggageMetric = metrics.find { it.metricName == "context_header_style.malformed" } + baggageMetric != null + baggageMetric.tags.contains("header_style:baggage") + baggageMetric.value == 1 + } + + def "should collect baggage truncated metrics with byte count truncation_reason tag"() { + given: + def collector = CoreMetricCollector.getInstance() + def baggageMetrics = BaggageMetrics.getInstance() + + when: + baggageMetrics.onBaggageTruncatedByByteLimit() + collector.prepareMetrics() + def metrics = collector.drain() + + then: + def baggageMetric = metrics.find { + it.metricName == "context_header_style.truncated" && + it.tags.contains("truncation_reason:baggage_byte_count_exceeded") + } + baggageMetric != null + baggageMetric.value == 1 + } + + def "should collect baggage truncated metrics with item count truncation_reason tag"() { + given: + def collector = CoreMetricCollector.getInstance() + def baggageMetrics = BaggageMetrics.getInstance() + + when: + baggageMetrics.onBaggageTruncatedByItemLimit() + collector.prepareMetrics() + def metrics = collector.drain() + + then: + def baggageMetric = metrics.find { + it.metricName == "context_header_style.truncated" && + it.tags.contains("truncation_reason:baggage_item_count_exceeded") + } + baggageMetric != null + baggageMetric.value == 1 + } + + def "should not create baggage metrics when no events occurred"() { + given: + def collector = CoreMetricCollector.getInstance() + + when: + collector.prepareMetrics() + def metrics = collector.drain() + + then: + def foundMetrics = metrics.findAll { it.metricName.startsWith("context_header_style.") } + foundMetrics.isEmpty() + } + + def "should reset baggage counters after prepareMetrics"() { + given: + def collector = CoreMetricCollector.getInstance() + def baggageMetrics = BaggageMetrics.getInstance() + + when: + baggageMetrics.onBaggageExtracted() + baggageMetrics.onBaggageInjected() + collector.prepareMetrics() + collector.drain() + collector.prepareMetrics() + def metrics = collector.drain() + + then: + def foundMetrics = metrics.findAll { it.metricName.startsWith("context_header_style.") } + foundMetrics.isEmpty() + } +}