Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import datadog.context.propagation.CarrierVisitor;
import datadog.context.propagation.Propagator;
import datadog.trace.api.Config;
import datadog.trace.api.metrics.BaggageMetrics;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext;
import datadog.trace.bootstrap.instrumentation.api.Baggage;
Expand All @@ -26,6 +27,7 @@
public class BaggagePropagator implements Propagator {
private static final Logger LOG = LoggerFactory.getLogger(BaggagePropagator.class);
private static final PercentEscaper UTF_ESCAPER = PercentEscaper.create();
private static final BaggageMetrics BAGGAGE_METRICS = BaggageMetrics.getInstance();
static final String BAGGAGE_KEY = "baggage";
private final boolean injectBaggage;
private final boolean extractBaggage;
Expand Down Expand Up @@ -89,11 +91,13 @@ public <C> void inject(Context context, C carrier, CarrierSetter<C> setter) {
processedItems++;
// reached the max number of baggage items allowed
if (processedItems == this.maxItems) {
BAGGAGE_METRICS.onBaggageTruncatedByItemLimit();
break;
}
// Drop newest k/v pair if adding it leads to exceeding the limit
if (currentBytes + escapedKey.size + escapedVal.size + extraBytes > this.maxBytes) {
baggageText.setLength(currentBytes);
BAGGAGE_METRICS.onBaggageTruncatedByByteLimit();
break;
}
currentBytes += escapedKey.size + escapedVal.size + extraBytes;
Expand All @@ -103,6 +107,9 @@ public <C> void inject(Context context, C carrier, CarrierSetter<C> setter) {
// Save header as cache to re-inject it later if baggage did not change
baggage.setW3cHeader(headerValue);
setter.set(carrier, BAGGAGE_KEY, headerValue);

// Record successful baggage injection for telemetry
BAGGAGE_METRICS.onBaggageInjected();
}

@Override
Expand All @@ -117,6 +124,9 @@ public <C> Context extract(Context context, C carrier, CarrierVisitor<C> visitor
return context;
}

// Record successful baggage extraction for telemetry
BAGGAGE_METRICS.onBaggageExtracted();

// TODO: consider a better way to link baggage with the extracted (legacy) TagContext
AgentSpan extractedSpan = AgentSpan.fromContext(context);
if (extractedSpan != null) {
Expand Down Expand Up @@ -158,12 +168,14 @@ private Map<String, String> parseBaggageHeaders(String input) {
if (kvSeparatorInd > end) {
LOG.debug(
"Dropping baggage headers due to key with no value {}", input.substring(start, end));
BAGGAGE_METRICS.onBaggageMalformed();
return emptyMap();
}
String key = decode(input.substring(start, kvSeparatorInd).trim());
String value = decode(input.substring(kvSeparatorInd + 1, end).trim());
if (key.isEmpty() || value.isEmpty()) {
LOG.debug("Dropping baggage headers due to empty k/v {}:{}", key, value);
BAGGAGE_METRICS.onBaggageMalformed();
return emptyMap();
}
baggage.put(key, value);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package datadog.trace.core.baggage

import datadog.context.Context
import datadog.trace.api.Config
import datadog.trace.api.metrics.BaggageMetrics
import datadog.trace.api.telemetry.CoreMetricCollector
import spock.lang.Specification

class BaggagePropagatorTelemetryTest extends Specification {

def "should directly increment baggage metrics"() {
given:
def baggageMetrics = BaggageMetrics.getInstance()
def collector = CoreMetricCollector.getInstance()

when:
baggageMetrics.onBaggageInjected()
collector.prepareMetrics()
def metrics = collector.drain()

then:
def baggageMetric = metrics.find { it.metricName == "context_header_style.injected" }
baggageMetric != null
baggageMetric.value >= 1
baggageMetric.tags.contains("header_style:baggage")
}

def "should increment telemetry counter when baggage is successfully extracted"() {
given:
def config = Mock(Config) {
isBaggageExtract() >> true
isBaggageInject() >> true
getBaggageMaxItems() >> 64
getBaggageMaxBytes() >> 8192
}
def propagator = new BaggagePropagator(config)
def context = Context.root()
def carrier = ["baggage": "key1=value1,key2=value2"]
def visitor = { map, consumer ->
map.each { k, v -> consumer.accept(k, v) }
}
def collector = CoreMetricCollector.getInstance()

when:
propagator.extract(context, carrier, visitor)
collector.prepareMetrics()
def metrics = collector.drain()

then:
def baggageMetric = metrics.find { it.metricName == "context_header_style.extracted" }
baggageMetric != null
baggageMetric.value >= 1
baggageMetric.tags.contains("header_style:baggage")
}

def "should directly increment all baggage metrics"() {
given:
def baggageMetrics = BaggageMetrics.getInstance()
def collector = CoreMetricCollector.getInstance()

when:
baggageMetrics.onBaggageInjected()
baggageMetrics.onBaggageMalformed()
baggageMetrics.onBaggageTruncatedByByteLimit()
baggageMetrics.onBaggageTruncatedByItemLimit()
collector.prepareMetrics()
def metrics = collector.drain()

then:
def injectedMetric = metrics.find { it.metricName == "context_header_style.injected" }
injectedMetric != null
injectedMetric.value == 1
injectedMetric.tags.contains("header_style:baggage")

def malformedMetric = metrics.find { it.metricName == "context_header_style.malformed" }
malformedMetric != null
malformedMetric.value == 1
malformedMetric.tags.contains("header_style:baggage")

def bytesTruncatedMetric = metrics.find {
it.metricName == "context_header_style.truncated" &&
it.tags.contains("truncation_reason:baggage_byte_count_exceeded")
}
bytesTruncatedMetric != null
bytesTruncatedMetric.value == 1

def itemsTruncatedMetric = metrics.find {
it.metricName == "context_header_style.truncated" &&
it.tags.contains("truncation_reason:baggage_item_count_exceeded")
}
itemsTruncatedMetric != null
itemsTruncatedMetric.value == 1
}

def "should not increment telemetry counter when baggage extraction fails"() {
given:
def config = Mock(Config) {
isBaggageExtract() >> true
isBaggageInject() >> true
getBaggageMaxItems() >> 64
getBaggageMaxBytes() >> 8192
}
def propagator = new BaggagePropagator(config)
def context = Context.root()
def carrier = [:] // No baggage header
def visitor = { map, consumer ->
map.each { k, v -> consumer.accept(k, v) }
}
def collector = CoreMetricCollector.getInstance()

when:
propagator.extract(context, carrier, visitor)
collector.prepareMetrics()
def metrics = collector.drain()

then:
def foundMetrics = metrics.findAll { it.metricName.startsWith("context_header_style.") }
foundMetrics.isEmpty() // No extraction occurred, so no metrics should be created
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package datadog.trace.api.metrics;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;

/** Metrics for baggage propagation operations. */
public class BaggageMetrics {
private static final BaggageMetrics INSTANCE = new BaggageMetrics();
private final AtomicLong extractedCounter = new AtomicLong(0);
private final AtomicLong injectedCounter = new AtomicLong(0);
private final AtomicLong malformedCounter = new AtomicLong(0);
private final AtomicLong truncatedByteCounter = new AtomicLong(0);
private final AtomicLong truncatedItemCounter = new AtomicLong(0);
private final Collection<TaggedCounter> taggedCounters;

public static BaggageMetrics getInstance() {
return INSTANCE;
}

private BaggageMetrics() {
List<TaggedCounter> counters = new ArrayList<>(5);
counters.add(
new TaggedCounter(
"context_header_style.extracted", this.extractedCounter, "header_style:baggage"));
counters.add(
new TaggedCounter(
"context_header_style.injected", this.injectedCounter, "header_style:baggage"));
counters.add(
new TaggedCounter(
"context_header_style.malformed", this.malformedCounter, "header_style:baggage"));
counters.add(
new TaggedCounter(
"context_header_style.truncated",
this.truncatedByteCounter,
"truncation_reason:baggage_byte_count_exceeded"));
counters.add(
new TaggedCounter(
"context_header_style.truncated",
this.truncatedItemCounter,
"truncation_reason:baggage_item_count_exceeded"));
this.taggedCounters = Collections.unmodifiableList(counters);
}

public void onBaggageExtracted() {
this.extractedCounter.incrementAndGet();
}

public void onBaggageInjected() {
this.injectedCounter.incrementAndGet();
}

public void onBaggageMalformed() {
this.malformedCounter.incrementAndGet();
}

public void onBaggageTruncatedByByteLimit() {
this.truncatedByteCounter.incrementAndGet();
}

public void onBaggageTruncatedByItemLimit() {
this.truncatedItemCounter.incrementAndGet();
}

public Collection<TaggedCounter> getTaggedCounters() {
return this.taggedCounters;
}

public static class TaggedCounter implements CoreCounter {
private final String name;
private final AtomicLong counter;
private final String tag;
private long previousCount;

public TaggedCounter(String name, AtomicLong counter, String tag) {
this.name = name;
this.counter = counter;
this.tag = tag;
}

@Override
public String getName() {
return this.name;
}

public String getTag() {
return this.tag;
}

@Override
public long getValue() {
return counter.get();
}

@Override
public long getValueAndReset() {
long count = counter.get();
long delta = count - previousCount;
previousCount = count;
return delta;
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package datadog.trace.api.telemetry;

import datadog.trace.api.metrics.BaggageMetrics;
import datadog.trace.api.metrics.CoreCounter;
import datadog.trace.api.metrics.SpanMetricRegistryImpl;
import datadog.trace.api.metrics.SpanMetricsImpl;
Expand All @@ -16,6 +17,7 @@ public class CoreMetricCollector implements MetricCollector<CoreMetricCollector.
private static final String INTEGRATION_NAME_TAG = "integration_name:";
private static final CoreMetricCollector INSTANCE = new CoreMetricCollector();
private final SpanMetricRegistryImpl spanMetricRegistry = SpanMetricRegistryImpl.getInstance();
private final BaggageMetrics baggageMetrics = BaggageMetrics.getInstance();

private final BlockingQueue<CoreMetric> metricsQueue;

Expand All @@ -29,6 +31,7 @@ private CoreMetricCollector() {

@Override
public void prepareMetrics() {
// Collect span metrics
for (SpanMetricsImpl spanMetrics : this.spanMetricRegistry.getSpanMetrics()) {
String tag = INTEGRATION_NAME_TAG + spanMetrics.getInstrumentationName();
for (CoreCounter counter : spanMetrics.getCounters()) {
Expand All @@ -45,6 +48,23 @@ public void prepareMetrics() {
}
}
}

// Collect baggage metrics
for (BaggageMetrics.TaggedCounter counter : this.baggageMetrics.getTaggedCounters()) {
long value = counter.getValueAndReset();
if (value == 0) {
// Skip not updated counters
continue;
}
// Use the specific tag for each baggage metric
String tag = counter.getTag();
CoreMetric metric =
new CoreMetric(METRIC_NAMESPACE, true, counter.getName(), "count", value, tag);
if (!this.metricsQueue.offer(metric)) {
// Stop adding metrics if the queue is full
break;
}
}
}

@Override
Expand Down
Loading