diff --git a/dd-trace-api/src/main/java/datadog/trace/api/ConfigDefaults.java b/dd-trace-api/src/main/java/datadog/trace/api/ConfigDefaults.java index 02f41c3a9ab..2988e995164 100644 --- a/dd-trace-api/src/main/java/datadog/trace/api/ConfigDefaults.java +++ b/dd-trace-api/src/main/java/datadog/trace/api/ConfigDefaults.java @@ -215,6 +215,8 @@ public final class ConfigDefaults { static final float DEFAULT_TRACE_FLUSH_INTERVAL = 1; + static final long DEFAULT_TRACE_POST_PROCESSING_TIMEOUT = 1000; // 1 second + static final boolean DEFAULT_COUCHBASE_INTERNAL_SPANS_ENABLED = true; static final boolean DEFAULT_ELASTICSEARCH_BODY_ENABLED = false; static final boolean DEFAULT_ELASTICSEARCH_PARAMS_ENABLED = true; diff --git a/dd-trace-api/src/main/java/datadog/trace/api/config/TracerConfig.java b/dd-trace-api/src/main/java/datadog/trace/api/config/TracerConfig.java index 3cf94286aa5..cb1e1c1baa1 100644 --- a/dd-trace-api/src/main/java/datadog/trace/api/config/TracerConfig.java +++ b/dd-trace-api/src/main/java/datadog/trace/api/config/TracerConfig.java @@ -133,5 +133,7 @@ public final class TracerConfig { public static final String TRACE_FLUSH_INTERVAL = "trace.flush.interval"; + public static final String TRACE_POST_PROCESSING_TIMEOUT = "trace.post-processing.timeout"; + private TracerConfig() {} } diff --git a/dd-trace-core/src/main/java/datadog/trace/common/writer/DDAgentWriter.java b/dd-trace-core/src/main/java/datadog/trace/common/writer/DDAgentWriter.java index a29b07a4af9..0c85cb49abc 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/writer/DDAgentWriter.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/writer/DDAgentWriter.java @@ -154,7 +154,8 @@ public DDAgentWriter build() { null == prioritization ? FAST_LANE : prioritization, flushIntervalMilliseconds, TimeUnit.MILLISECONDS, - singleSpanSampler); + singleSpanSampler, + null); return new DDAgentWriter(traceProcessingWorker, dispatcher, healthMetrics, alwaysFlush); } diff --git a/dd-trace-core/src/main/java/datadog/trace/common/writer/DDIntakeWriter.java b/dd-trace-core/src/main/java/datadog/trace/common/writer/DDIntakeWriter.java index d5e70dd6b47..7075266352c 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/writer/DDIntakeWriter.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/writer/DDIntakeWriter.java @@ -122,7 +122,8 @@ public DDIntakeWriter build() { prioritization, flushIntervalMilliseconds, TimeUnit.MILLISECONDS, - singleSpanSampler); + singleSpanSampler, + null); return new DDIntakeWriter( traceProcessingWorker, diff --git a/dd-trace-core/src/main/java/datadog/trace/common/writer/TraceProcessingWorker.java b/dd-trace-core/src/main/java/datadog/trace/common/writer/TraceProcessingWorker.java index 99d55fb2732..c17c00932bb 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/writer/TraceProcessingWorker.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/writer/TraceProcessingWorker.java @@ -13,10 +13,14 @@ import datadog.trace.common.writer.ddagent.PrioritizationStrategy; import datadog.trace.core.CoreSpan; import datadog.trace.core.DDSpan; +import datadog.trace.core.DDSpanContext; import datadog.trace.core.monitor.HealthMetrics; +import datadog.trace.core.postprocessor.SpanPostProcessor; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.function.BooleanSupplier; import org.jctools.queues.MessagePassingQueue; import org.jctools.queues.MpscBlockingConsumerArrayQueue; import org.slf4j.Logger; @@ -50,7 +54,8 @@ public TraceProcessingWorker( final Prioritization prioritization, final long flushInterval, final TimeUnit timeUnit, - final SingleSpanSampler singleSpanSampler) { + final SingleSpanSampler singleSpanSampler, + final SpanPostProcessor spanPostProcessor) { this.capacity = capacity; this.primaryQueue = createQueue(capacity); this.secondaryQueue = createQueue(capacity); @@ -73,9 +78,21 @@ public TraceProcessingWorker( this.serializingHandler = runAsDaemon ? new DaemonTraceSerializingHandler( - primaryQueue, secondaryQueue, healthMetrics, dispatcher, flushInterval, timeUnit) + primaryQueue, + secondaryQueue, + healthMetrics, + dispatcher, + flushInterval, + timeUnit, + spanPostProcessor) : new NonDaemonTraceSerializingHandler( - primaryQueue, secondaryQueue, healthMetrics, dispatcher, flushInterval, timeUnit); + primaryQueue, + secondaryQueue, + healthMetrics, + dispatcher, + flushInterval, + timeUnit, + spanPostProcessor); this.serializerThread = newAgentThread(TRACE_PROCESSOR, serializingHandler, runAsDaemon); } @@ -134,9 +151,16 @@ public DaemonTraceSerializingHandler( HealthMetrics healthMetrics, PayloadDispatcher payloadDispatcher, long flushInterval, - TimeUnit timeUnit) { + TimeUnit timeUnit, + SpanPostProcessor spanPostProcessor) { super( - primaryQueue, secondaryQueue, healthMetrics, payloadDispatcher, flushInterval, timeUnit); + primaryQueue, + secondaryQueue, + healthMetrics, + payloadDispatcher, + flushInterval, + timeUnit, + spanPostProcessor); } @Override @@ -169,9 +193,16 @@ public NonDaemonTraceSerializingHandler( HealthMetrics healthMetrics, PayloadDispatcher payloadDispatcher, long flushInterval, - TimeUnit timeUnit) { + TimeUnit timeUnit, + SpanPostProcessor spanPostProcessor) { super( - primaryQueue, secondaryQueue, healthMetrics, payloadDispatcher, flushInterval, timeUnit); + primaryQueue, + secondaryQueue, + healthMetrics, + payloadDispatcher, + flushInterval, + timeUnit, + spanPostProcessor); } @Override @@ -206,6 +237,7 @@ public abstract static class TraceSerializingHandler implements Runnable { private final boolean doTimeFlush; private final PayloadDispatcher payloadDispatcher; private long lastTicks; + private final SpanPostProcessor spanPostProcessor; public TraceSerializingHandler( final MpscBlockingConsumerArrayQueue primaryQueue, @@ -213,7 +245,8 @@ public TraceSerializingHandler( final HealthMetrics healthMetrics, final PayloadDispatcher payloadDispatcher, final long flushInterval, - final TimeUnit timeUnit) { + final TimeUnit timeUnit, + final SpanPostProcessor spanPostProcessor) { this.primaryQueue = primaryQueue; this.secondaryQueue = secondaryQueue; this.healthMetrics = healthMetrics; @@ -225,6 +258,7 @@ public TraceSerializingHandler( } else { this.ticksRequiredToFlush = Long.MAX_VALUE; } + this.spanPostProcessor = spanPostProcessor; } @SuppressWarnings("unchecked") @@ -235,6 +269,7 @@ public void onEvent(Object event) { try { if (event instanceof List) { List trace = (List) event; + maybeTracePostProcessing(trace); // TODO populate `_sample_rate` metric in a way that accounts for lost/dropped traces payloadDispatcher.addTrace(trace); } else if (event instanceof FlushEvent) { @@ -295,5 +330,44 @@ private void consumeBatch(MessagePassingQueue queue) { protected boolean queuesAreEmpty() { return primaryQueue.isEmpty() && secondaryQueue.isEmpty(); } + + private void maybeTracePostProcessing(List trace) { + if (trace == null || trace.isEmpty()) { + return; + } + + // Filter spans that need post-processing + List spansToPostProcess = null; + for (DDSpan span : trace) { + DDSpanContext context = span.context(); + if (context != null && context.isRequiresPostProcessing()) { + if (spansToPostProcess == null) { + spansToPostProcess = new ArrayList<>(); + } + spansToPostProcess.add(span); + } + } + + if (spansToPostProcess == null) { + return; + } + + try { + long timeout = Config.get().getTracePostProcessingTimeout(); + long deadline = System.currentTimeMillis() + timeout; + BooleanSupplier timeoutCheck = () -> System.currentTimeMillis() > deadline; + + for (DDSpan span : spansToPostProcess) { + if (!spanPostProcessor.process(span, timeoutCheck)) { + log.debug("Span post-processing interrupted due to timeout."); + break; + } + } + } catch (Throwable e) { + if (log.isDebugEnabled()) { + log.debug("Error while trace post-processing", e); + } + } + } } } diff --git a/dd-trace-core/src/main/java/datadog/trace/core/DDSpanContext.java b/dd-trace-core/src/main/java/datadog/trace/core/DDSpanContext.java index 0811a9807aa..eb11bb93b40 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/DDSpanContext.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/DDSpanContext.java @@ -140,6 +140,7 @@ public class DDSpanContext private final boolean injectBaggageAsTags; private volatile int encodedOperationName; private volatile int encodedResourceName; + private volatile boolean requiresPostProcessing; public DDSpanContext( final DDTraceId traceId, @@ -944,4 +945,12 @@ private String getTagName(String key) { // TODO is this decided? return "_dd." + key + ".json"; } + + public void setRequiresPostProcessing(boolean postProcessing) { + this.requiresPostProcessing = postProcessing; + } + + public boolean isRequiresPostProcessing() { + return requiresPostProcessing; + } } diff --git a/dd-trace-core/src/main/java/datadog/trace/core/postprocessor/SpanPostProcessor.java b/dd-trace-core/src/main/java/datadog/trace/core/postprocessor/SpanPostProcessor.java new file mode 100644 index 00000000000..8751959456f --- /dev/null +++ b/dd-trace-core/src/main/java/datadog/trace/core/postprocessor/SpanPostProcessor.java @@ -0,0 +1,26 @@ +package datadog.trace.core.postprocessor; + +import datadog.trace.core.DDSpan; +import java.util.function.BooleanSupplier; + +/** + * Span Post-processing with a timeout check capability. + * + *

Implementations of this interface should carry out post-processing of spans while supporting + * interruption when a specified time limit is exceeded. The method {@code process} must check the + * state of {@code timeoutCheck} while processing span. If {@code timeoutCheck.getAsBoolean()} + * returns {@code true}, processing should be immediately halted, and the method should return + * {@code false}. If post-processing completes successfully before the timeout, the method should + * return {@code true}. + */ +public interface SpanPostProcessor { + /** + * Post-processes a span. + * + * @param span The span to be post-processed. + * @param timeoutCheck A timeout check returning {@code true} if the allotted time has elapsed. + * @return {@code true} if the span was successfully processed; {@code false} in case of a + * timeout. + */ + boolean process(DDSpan span, BooleanSupplier timeoutCheck); +} diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/writer/TraceProcessingWorkerTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/writer/TraceProcessingWorkerTest.groovy index dd823a587be..b976dcdbe1e 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/writer/TraceProcessingWorkerTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/writer/TraceProcessingWorkerTest.groovy @@ -4,7 +4,10 @@ import datadog.trace.common.sampling.SingleSpanSampler import datadog.trace.common.writer.ddagent.PrioritizationStrategy.PublishResult import datadog.trace.core.CoreSpan import datadog.trace.core.DDSpan +import datadog.trace.core.DDSpanContext +import datadog.trace.core.PendingTrace import datadog.trace.core.monitor.HealthMetrics +import datadog.trace.core.postprocessor.SpanPostProcessor import datadog.trace.test.util.DDSpecification import spock.util.concurrent.PollingConditions @@ -41,6 +44,7 @@ class TraceProcessingWorkerTest extends DDSpecification { FAST_LANE, 1, TimeUnit.NANOSECONDS, + null, null ) // stop heartbeats from being throttled @@ -67,6 +71,7 @@ class TraceProcessingWorkerTest extends DDSpecification { FAST_LANE, 1, TimeUnit.NANOSECONDS, + null, null ) // stop heartbeats from being throttled def timeConditions = new PollingConditions(timeout: 1, initialDelay: 1, factor: 1.25) @@ -92,7 +97,7 @@ class TraceProcessingWorkerTest extends DDSpecification { false }, FAST_LANE, - 100, TimeUnit.SECONDS, null) // prevent heartbeats from helping the flush happen + 100, TimeUnit.SECONDS, null, null) // prevent heartbeats from helping the flush happen when: "there is pending work it is completed before a flush" // processing this span will throw an exception, but it should be caught @@ -131,7 +136,7 @@ class TraceProcessingWorkerTest extends DDSpecification { throwingDispatcher, { false }, FAST_LANE, - 100, TimeUnit.SECONDS, null) // prevent heartbeats from helping the flush happen + 100, TimeUnit.SECONDS, null, null) // prevent heartbeats from helping the flush happen worker.start() when: "a trace is processed but can't be passed on" @@ -149,6 +154,58 @@ class TraceProcessingWorkerTest extends DDSpecification { priority << [SAMPLER_DROP, USER_DROP, SAMPLER_KEEP, USER_KEEP, UNSET] } + def "trace should be post-processed"() { + setup: + AtomicInteger acceptedCount = new AtomicInteger() + PayloadDispatcherImpl countingDispatcher = Mock(PayloadDispatcherImpl) + countingDispatcher.addTrace(_) >> { + acceptedCount.getAndIncrement() + } + HealthMetrics healthMetrics = Mock(HealthMetrics) + + // Span 1 - should be post-processed + def span1 = DDSpan.create("test", 0, Mock(DDSpanContext) { + isRequiresPostProcessing() >> true + getTrace() >> Mock(PendingTrace) { + getCurrentTimeNano() >> 0 + } + }, []) + def processedSpan1 = false + + // Span 2 - should NOT be post-processed + def span2 = DDSpan.create("test", 0, Mock(DDSpanContext) { + isRequiresPostProcessing() >> false + getTrace() >> Mock(PendingTrace) { + getCurrentTimeNano() >> 0 + } + }, []) + def processedSpan2 = false + + SpanPostProcessor spanPostProcessor = Mock(SpanPostProcessor) { + process(span1, _) >> { processedSpan1 = true } + process(span2, _) >> { processedSpan2 = true } + } + + TraceProcessingWorker worker = new TraceProcessingWorker(10, healthMetrics, + countingDispatcher, { + false + }, FAST_LANE, 100, TimeUnit.SECONDS, null, spanPostProcessor) + worker.start() + + when: "traces are submitted" + worker.publish(span1, SAMPLER_KEEP, [span1, span2]) + worker.publish(span2, SAMPLER_KEEP, [span1, span2]) + + then: "traces are passed through unless rejected on submission" + conditions.eventually { + assert processedSpan1 == true + assert processedSpan2 == false + } + + cleanup: + worker.close() + } + def "traces should be processed"() { setup: AtomicInteger acceptedCount = new AtomicInteger() @@ -160,7 +217,7 @@ class TraceProcessingWorkerTest extends DDSpecification { TraceProcessingWorker worker = new TraceProcessingWorker(10, healthMetrics, countingDispatcher, { false - }, FAST_LANE, 100, TimeUnit.SECONDS, null) + }, FAST_LANE, 100, TimeUnit.SECONDS, null, null) // prevent heartbeats from helping the flush happen worker.start() @@ -211,7 +268,7 @@ class TraceProcessingWorkerTest extends DDSpecification { TraceProcessingWorker worker = new TraceProcessingWorker(10, healthMetrics, countingDispatcher, { false - }, FAST_LANE, 100, TimeUnit.SECONDS, null) + }, FAST_LANE, 100, TimeUnit.SECONDS, null, null) worker.start() worker.close() int queueSize = 0 @@ -248,7 +305,7 @@ class TraceProcessingWorkerTest extends DDSpecification { return false } } - TraceProcessingWorker worker = new TraceProcessingWorker(10, healthMetrics, countingDispatcher, { true }, FAST_LANE, 100, TimeUnit.SECONDS, singleSpanSampler) + TraceProcessingWorker worker = new TraceProcessingWorker(10, healthMetrics, countingDispatcher, { true }, FAST_LANE, 100, TimeUnit.SECONDS, singleSpanSampler, null) worker.start() when: "traces are submitted" @@ -324,7 +381,7 @@ class TraceProcessingWorkerTest extends DDSpecification { return false } } - TraceProcessingWorker worker = new TraceProcessingWorker(10, healthMetrics, countingDispatcher, { false }, FAST_LANE, 100, TimeUnit.SECONDS, singleSpanSampler) + TraceProcessingWorker worker = new TraceProcessingWorker(10, healthMetrics, countingDispatcher, { false }, FAST_LANE, 100, TimeUnit.SECONDS, singleSpanSampler, null) worker.start() when: "traces are submitted" diff --git a/internal-api/src/main/java/datadog/trace/api/Config.java b/internal-api/src/main/java/datadog/trace/api/Config.java index 519552a4718..ee0f152782d 100644 --- a/internal-api/src/main/java/datadog/trace/api/Config.java +++ b/internal-api/src/main/java/datadog/trace/api/Config.java @@ -436,6 +436,7 @@ import static datadog.trace.api.config.TracerConfig.TRACE_PEER_SERVICE_COMPONENT_OVERRIDES; import static datadog.trace.api.config.TracerConfig.TRACE_PEER_SERVICE_DEFAULTS_ENABLED; import static datadog.trace.api.config.TracerConfig.TRACE_PEER_SERVICE_MAPPING; +import static datadog.trace.api.config.TracerConfig.TRACE_POST_PROCESSING_TIMEOUT; import static datadog.trace.api.config.TracerConfig.TRACE_PROPAGATION_EXTRACT_FIRST; import static datadog.trace.api.config.TracerConfig.TRACE_PROPAGATION_STYLE; import static datadog.trace.api.config.TracerConfig.TRACE_PROPAGATION_STYLE_EXTRACT; @@ -928,6 +929,7 @@ static class HostNameHolder { private final boolean axisPromoteResourceName; private final float traceFlushIntervalSeconds; + private final long tracePostProcessingTimeout; private final boolean telemetryDebugRequestsEnabled; @@ -2042,6 +2044,10 @@ PROFILING_DATADOG_PROFILER_ENABLED, isDatadogProfilerSafeInCurrentEnvironment()) "Agentless profiling activated but no api key provided. Profile uploading will likely fail"); } + this.tracePostProcessingTimeout = + configProvider.getLong( + TRACE_POST_PROCESSING_TIMEOUT, ConfigDefaults.DEFAULT_TRACE_POST_PROCESSING_TIMEOUT); + if (isCiVisibilityEnabled() && ciVisibilityAgentlessEnabled && (apiKey == null || apiKey.isEmpty())) { @@ -2142,6 +2148,10 @@ public float getTraceFlushIntervalSeconds() { return traceFlushIntervalSeconds; } + public long getTracePostProcessingTimeout() { + return tracePostProcessingTimeout; + } + public boolean isIntegrationSynapseLegacyOperationName() { return integrationSynapseLegacyOperationName; }