From bfe73e90d32b5bfceb2682fab644cb8e240a1641 Mon Sep 17 00:00:00 2001 From: Valentin Zakharov Date: Tue, 12 Mar 2024 00:55:38 +0100 Subject: [PATCH 01/11] Introduced trace post-processing --- .../trace/agent/test/AgentTestRunner.groovy | 6 +++ .../trace/api/config/TracerConfig.java | 4 ++ .../trace/api/internal/TraceSegment.java | 10 +++++ .../common/writer/TraceProcessingWorker.java | 43 +++++++++++++++++++ .../datadog/trace/core/DDSpanContext.java | 10 +++++ .../postprocessor/AppSecPostProcessor.java | 12 ++++++ .../postprocessor/TracePostProcessor.java | 9 ++++ .../main/java/datadog/trace/api/Config.java | 17 ++++++++ .../trace/util/AgentThreadFactory.java | 1 + 9 files changed, 112 insertions(+) create mode 100644 dd-trace-core/src/main/java/datadog/trace/core/postprocessor/AppSecPostProcessor.java create mode 100644 dd-trace-core/src/main/java/datadog/trace/core/postprocessor/TracePostProcessor.java diff --git a/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/AgentTestRunner.groovy b/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/AgentTestRunner.groovy index b323128d638..f2b5963303f 100644 --- a/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/AgentTestRunner.groovy +++ b/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/AgentTestRunner.groovy @@ -551,6 +551,12 @@ abstract class AgentTestRunner extends DDSpecification implements AgentBuilder.L check() return delegate.getDataCurrent(key) } + + @Override + void setRequiresPostProcessing(boolean postProcessing) { + check() + delegate.setRequiresPostProcessing(postProcessing) + } } /** Override to clean up things after the agent is removed */ diff --git a/dd-trace-api/src/main/java/datadog/trace/api/config/TracerConfig.java b/dd-trace-api/src/main/java/datadog/trace/api/config/TracerConfig.java index 3cf94286aa5..f82697d40cb 100644 --- a/dd-trace-api/src/main/java/datadog/trace/api/config/TracerConfig.java +++ b/dd-trace-api/src/main/java/datadog/trace/api/config/TracerConfig.java @@ -133,5 +133,9 @@ public final class TracerConfig { public static final String TRACE_FLUSH_INTERVAL = "trace.flush.interval"; + public static final String TRACE_POST_PROCESSING_ENABLED = "trace.post-processing.enabled"; + + public static final String TRACE_POST_PROCESSING_TIMEOUT = "trace.post-processing.timeout"; + private TracerConfig() {} } diff --git a/dd-trace-api/src/main/java/datadog/trace/api/internal/TraceSegment.java b/dd-trace-api/src/main/java/datadog/trace/api/internal/TraceSegment.java index cf1ce90fa15..6d3f6e6ef03 100644 --- a/dd-trace-api/src/main/java/datadog/trace/api/internal/TraceSegment.java +++ b/dd-trace-api/src/main/java/datadog/trace/api/internal/TraceSegment.java @@ -75,6 +75,13 @@ default void setTagCurrent(String key, Object value) { */ void setDataCurrent(String key, Object value); + /** + * Set flag to indicate the need for post-processing + * + * @param postProcessing flag to indicate the need for post-processing + */ + void setRequiresPostProcessing(boolean postProcessing); + /** * Gets the value of the tag from the current span in this {@code TraceSegment}. * @@ -112,5 +119,8 @@ public void setDataCurrent(String key, Object value) {} public Object getDataCurrent(String key) { return null; } + + @Override + public void setRequiresPostProcessing(boolean postProcessing) {} } } diff --git a/dd-trace-core/src/main/java/datadog/trace/common/writer/TraceProcessingWorker.java b/dd-trace-core/src/main/java/datadog/trace/common/writer/TraceProcessingWorker.java index 99d55fb2732..77c3c818246 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/writer/TraceProcessingWorker.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/writer/TraceProcessingWorker.java @@ -13,7 +13,11 @@ import datadog.trace.common.writer.ddagent.PrioritizationStrategy; import datadog.trace.core.CoreSpan; import datadog.trace.core.DDSpan; +import datadog.trace.core.DDSpanContext; import datadog.trace.core.monitor.HealthMetrics; +import datadog.trace.core.postprocessor.AppSecPostProcessor; +import datadog.trace.core.postprocessor.TracePostProcessor; +import datadog.trace.util.AgentThreadFactory; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -206,6 +210,7 @@ public abstract static class TraceSerializingHandler implements Runnable { private final boolean doTimeFlush; private final PayloadDispatcher payloadDispatcher; private long lastTicks; + private final TracePostProcessor tracePostProcessor; public TraceSerializingHandler( final MpscBlockingConsumerArrayQueue primaryQueue, @@ -225,6 +230,7 @@ public TraceSerializingHandler( } else { this.ticksRequiredToFlush = Long.MAX_VALUE; } + this.tracePostProcessor = new AppSecPostProcessor(); } @SuppressWarnings("unchecked") @@ -235,6 +241,7 @@ public void onEvent(Object event) { try { if (event instanceof List) { List trace = (List) event; + maybeTracePostProcessing(trace); // TODO populate `_sample_rate` metric in a way that accounts for lost/dropped traces payloadDispatcher.addTrace(trace); } else if (event instanceof FlushEvent) { @@ -295,5 +302,41 @@ private void consumeBatch(MessagePassingQueue queue) { protected boolean queuesAreEmpty() { return primaryQueue.isEmpty() && secondaryQueue.isEmpty(); } + + private void maybeTracePostProcessing(List trace) { + if (!Config.get().isTracePostProcessingEnabled()) { + return; + } + + if (trace == null || trace.isEmpty()) { + return; + } + + DDSpanContext context = trace.get(0).context(); + if (context == null || !context.isRequiresPostProcessing()) { + return; + } + + Thread thread = + AgentThreadFactory.newAgentThread( + AgentThreadFactory.AgentThread.TRACE_POST_PROCESSOR, + () -> { + // do a trace post-processing work + tracePostProcessor.process(trace, context); + }); + thread.start(); + + try { + long timeout = Config.get().getTracePostProcessingTimeout(); + thread.join(timeout); + if (thread.isAlive()) { + thread.interrupt(); + } + } catch (InterruptedException e) { + if (log.isDebugEnabled()) { + log.debug("Trace post-processing is interrupted.", e); + } + } + } } } diff --git a/dd-trace-core/src/main/java/datadog/trace/core/DDSpanContext.java b/dd-trace-core/src/main/java/datadog/trace/core/DDSpanContext.java index 0811a9807aa..a2cc0bc6d2e 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/DDSpanContext.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/DDSpanContext.java @@ -140,6 +140,7 @@ public class DDSpanContext private final boolean injectBaggageAsTags; private volatile int encodedOperationName; private volatile int encodedResourceName; + private boolean requiresPostProcessing; public DDSpanContext( final DDTraceId traceId, @@ -944,4 +945,13 @@ private String getTagName(String key) { // TODO is this decided? return "_dd." + key + ".json"; } + + @Override + public void setRequiresPostProcessing(boolean postProcessing) { + this.requiresPostProcessing = postProcessing; + } + + public boolean isRequiresPostProcessing() { + return requiresPostProcessing; + } } diff --git a/dd-trace-core/src/main/java/datadog/trace/core/postprocessor/AppSecPostProcessor.java b/dd-trace-core/src/main/java/datadog/trace/core/postprocessor/AppSecPostProcessor.java new file mode 100644 index 00000000000..1c8bd10dc35 --- /dev/null +++ b/dd-trace-core/src/main/java/datadog/trace/core/postprocessor/AppSecPostProcessor.java @@ -0,0 +1,12 @@ +package datadog.trace.core.postprocessor; + +import datadog.trace.core.DDSpan; +import datadog.trace.core.DDSpanContext; +import java.util.List; + +public class AppSecPostProcessor implements TracePostProcessor { + @Override + public void process(List trace, DDSpanContext context) { + // Do AppSec post-processing + } +} diff --git a/dd-trace-core/src/main/java/datadog/trace/core/postprocessor/TracePostProcessor.java b/dd-trace-core/src/main/java/datadog/trace/core/postprocessor/TracePostProcessor.java new file mode 100644 index 00000000000..c8b678bdfa7 --- /dev/null +++ b/dd-trace-core/src/main/java/datadog/trace/core/postprocessor/TracePostProcessor.java @@ -0,0 +1,9 @@ +package datadog.trace.core.postprocessor; + +import datadog.trace.core.DDSpan; +import datadog.trace.core.DDSpanContext; +import java.util.List; + +public interface TracePostProcessor { + void process(List trace, DDSpanContext context); +} diff --git a/internal-api/src/main/java/datadog/trace/api/Config.java b/internal-api/src/main/java/datadog/trace/api/Config.java index 519552a4718..21391e7618f 100644 --- a/internal-api/src/main/java/datadog/trace/api/Config.java +++ b/internal-api/src/main/java/datadog/trace/api/Config.java @@ -436,6 +436,8 @@ import static datadog.trace.api.config.TracerConfig.TRACE_PEER_SERVICE_COMPONENT_OVERRIDES; import static datadog.trace.api.config.TracerConfig.TRACE_PEER_SERVICE_DEFAULTS_ENABLED; import static datadog.trace.api.config.TracerConfig.TRACE_PEER_SERVICE_MAPPING; +import static datadog.trace.api.config.TracerConfig.TRACE_POST_PROCESSING_ENABLED; +import static datadog.trace.api.config.TracerConfig.TRACE_POST_PROCESSING_TIMEOUT; import static datadog.trace.api.config.TracerConfig.TRACE_PROPAGATION_EXTRACT_FIRST; import static datadog.trace.api.config.TracerConfig.TRACE_PROPAGATION_STYLE; import static datadog.trace.api.config.TracerConfig.TRACE_PROPAGATION_STYLE_EXTRACT; @@ -928,6 +930,8 @@ static class HostNameHolder { private final boolean axisPromoteResourceName; private final float traceFlushIntervalSeconds; + private final boolean tracePostProcessingEnabled; + private final long tracePostProcessingTimeout; private final boolean telemetryDebugRequestsEnabled; @@ -2042,6 +2046,11 @@ PROFILING_DATADOG_PROFILER_ENABLED, isDatadogProfilerSafeInCurrentEnvironment()) "Agentless profiling activated but no api key provided. Profile uploading will likely fail"); } + this.tracePostProcessingEnabled = + configProvider.getBoolean(TRACE_POST_PROCESSING_ENABLED, true); + + this.tracePostProcessingTimeout = configProvider.getLong(TRACE_POST_PROCESSING_TIMEOUT, 1000); + if (isCiVisibilityEnabled() && ciVisibilityAgentlessEnabled && (apiKey == null || apiKey.isEmpty())) { @@ -2142,6 +2151,14 @@ public float getTraceFlushIntervalSeconds() { return traceFlushIntervalSeconds; } + public boolean isTracePostProcessingEnabled() { + return tracePostProcessingEnabled; + } + + public long getTracePostProcessingTimeout() { + return tracePostProcessingTimeout; + } + public boolean isIntegrationSynapseLegacyOperationName() { return integrationSynapseLegacyOperationName; } diff --git a/internal-api/src/main/java/datadog/trace/util/AgentThreadFactory.java b/internal-api/src/main/java/datadog/trace/util/AgentThreadFactory.java index a860b35b833..46c1fc0f95d 100644 --- a/internal-api/src/main/java/datadog/trace/util/AgentThreadFactory.java +++ b/internal-api/src/main/java/datadog/trace/util/AgentThreadFactory.java @@ -16,6 +16,7 @@ public enum AgentThread { TRACE_STARTUP("dd-agent-startup-datadog-tracer"), TRACE_MONITOR("dd-trace-monitor"), TRACE_PROCESSOR("dd-trace-processor"), + TRACE_POST_PROCESSOR("dd-trace-post-processor"), SPAN_SAMPLING_PROCESSOR("dd-span-sampling-processor"), TRACE_CASSANDRA_ASYNC_SESSION("dd-cassandra-session-executor"), From da1e19b224929c0bb263572a8329d020887188af Mon Sep 17 00:00:00 2001 From: Valentin Zakharov Date: Mon, 18 Mar 2024 17:42:54 +0100 Subject: [PATCH 02/11] setRequiresPostProcessing fully moved to DDSpanContext --- .../datadog/trace/agent/test/AgentTestRunner.groovy | 6 ------ .../java/datadog/trace/api/internal/TraceSegment.java | 10 ---------- .../main/java/datadog/trace/core/DDSpanContext.java | 1 - 3 files changed, 17 deletions(-) diff --git a/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/AgentTestRunner.groovy b/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/AgentTestRunner.groovy index f2b5963303f..b323128d638 100644 --- a/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/AgentTestRunner.groovy +++ b/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/AgentTestRunner.groovy @@ -551,12 +551,6 @@ abstract class AgentTestRunner extends DDSpecification implements AgentBuilder.L check() return delegate.getDataCurrent(key) } - - @Override - void setRequiresPostProcessing(boolean postProcessing) { - check() - delegate.setRequiresPostProcessing(postProcessing) - } } /** Override to clean up things after the agent is removed */ diff --git a/dd-trace-api/src/main/java/datadog/trace/api/internal/TraceSegment.java b/dd-trace-api/src/main/java/datadog/trace/api/internal/TraceSegment.java index 6d3f6e6ef03..cf1ce90fa15 100644 --- a/dd-trace-api/src/main/java/datadog/trace/api/internal/TraceSegment.java +++ b/dd-trace-api/src/main/java/datadog/trace/api/internal/TraceSegment.java @@ -75,13 +75,6 @@ default void setTagCurrent(String key, Object value) { */ void setDataCurrent(String key, Object value); - /** - * Set flag to indicate the need for post-processing - * - * @param postProcessing flag to indicate the need for post-processing - */ - void setRequiresPostProcessing(boolean postProcessing); - /** * Gets the value of the tag from the current span in this {@code TraceSegment}. * @@ -119,8 +112,5 @@ public void setDataCurrent(String key, Object value) {} public Object getDataCurrent(String key) { return null; } - - @Override - public void setRequiresPostProcessing(boolean postProcessing) {} } } diff --git a/dd-trace-core/src/main/java/datadog/trace/core/DDSpanContext.java b/dd-trace-core/src/main/java/datadog/trace/core/DDSpanContext.java index a2cc0bc6d2e..d2efca11e36 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/DDSpanContext.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/DDSpanContext.java @@ -946,7 +946,6 @@ private String getTagName(String key) { return "_dd." + key + ".json"; } - @Override public void setRequiresPostProcessing(boolean postProcessing) { this.requiresPostProcessing = postProcessing; } From 8ac8c0e17ba0152c6f98bc7fe5ec10eb6662fbaa Mon Sep 17 00:00:00 2001 From: Valentin Zakharov Date: Mon, 18 Mar 2024 18:11:41 +0100 Subject: [PATCH 03/11] Removed Thread creation and added timeout boolean supplier --- .../common/writer/TraceProcessingWorker.java | 27 ++++++------------- .../postprocessor/AppSecPostProcessor.java | 4 ++- .../postprocessor/TracePostProcessor.java | 3 ++- 3 files changed, 13 insertions(+), 21 deletions(-) diff --git a/dd-trace-core/src/main/java/datadog/trace/common/writer/TraceProcessingWorker.java b/dd-trace-core/src/main/java/datadog/trace/common/writer/TraceProcessingWorker.java index 77c3c818246..fca7b6ca81d 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/writer/TraceProcessingWorker.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/writer/TraceProcessingWorker.java @@ -17,10 +17,11 @@ import datadog.trace.core.monitor.HealthMetrics; import datadog.trace.core.postprocessor.AppSecPostProcessor; import datadog.trace.core.postprocessor.TracePostProcessor; -import datadog.trace.util.AgentThreadFactory; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.function.BooleanSupplier; + import org.jctools.queues.MessagePassingQueue; import org.jctools.queues.MpscBlockingConsumerArrayQueue; import org.slf4j.Logger; @@ -317,25 +318,13 @@ private void maybeTracePostProcessing(List trace) { return; } - Thread thread = - AgentThreadFactory.newAgentThread( - AgentThreadFactory.AgentThread.TRACE_POST_PROCESSOR, - () -> { - // do a trace post-processing work - tracePostProcessor.process(trace, context); - }); - thread.start(); + long timeout = Config.get().getTracePostProcessingTimeout(); + long deadline = System.currentTimeMillis() + timeout; + BooleanSupplier timeoutCheck = + () -> System.currentTimeMillis() > deadline; - try { - long timeout = Config.get().getTracePostProcessingTimeout(); - thread.join(timeout); - if (thread.isAlive()) { - thread.interrupt(); - } - } catch (InterruptedException e) { - if (log.isDebugEnabled()) { - log.debug("Trace post-processing is interrupted.", e); - } + if (!tracePostProcessor.process(trace, context, timeoutCheck)) { + log.debug("Trace post-processing is interrupted due to timeout."); } } } diff --git a/dd-trace-core/src/main/java/datadog/trace/core/postprocessor/AppSecPostProcessor.java b/dd-trace-core/src/main/java/datadog/trace/core/postprocessor/AppSecPostProcessor.java index 1c8bd10dc35..92ada522823 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/postprocessor/AppSecPostProcessor.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/postprocessor/AppSecPostProcessor.java @@ -3,10 +3,12 @@ import datadog.trace.core.DDSpan; import datadog.trace.core.DDSpanContext; import java.util.List; +import java.util.function.BooleanSupplier; public class AppSecPostProcessor implements TracePostProcessor { @Override - public void process(List trace, DDSpanContext context) { + public boolean process(List trace, DDSpanContext context, BooleanSupplier timeoutCheck) { // Do AppSec post-processing + return true; } } diff --git a/dd-trace-core/src/main/java/datadog/trace/core/postprocessor/TracePostProcessor.java b/dd-trace-core/src/main/java/datadog/trace/core/postprocessor/TracePostProcessor.java index c8b678bdfa7..191ef2f5071 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/postprocessor/TracePostProcessor.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/postprocessor/TracePostProcessor.java @@ -3,7 +3,8 @@ import datadog.trace.core.DDSpan; import datadog.trace.core.DDSpanContext; import java.util.List; +import java.util.function.BooleanSupplier; public interface TracePostProcessor { - void process(List trace, DDSpanContext context); + boolean process(List trace, DDSpanContext context, BooleanSupplier timeoutCheck); } From d70d1cfef58f0cc9a143d92e702181c6051baaf9 Mon Sep 17 00:00:00 2001 From: Valentin Zakharov Date: Mon, 18 Mar 2024 18:19:53 +0100 Subject: [PATCH 04/11] Removed option to disable post-processing --- .../main/java/datadog/trace/api/config/TracerConfig.java | 2 -- .../trace/common/writer/TraceProcessingWorker.java | 8 +------- internal-api/src/main/java/datadog/trace/api/Config.java | 9 --------- 3 files changed, 1 insertion(+), 18 deletions(-) diff --git a/dd-trace-api/src/main/java/datadog/trace/api/config/TracerConfig.java b/dd-trace-api/src/main/java/datadog/trace/api/config/TracerConfig.java index f82697d40cb..cb1e1c1baa1 100644 --- a/dd-trace-api/src/main/java/datadog/trace/api/config/TracerConfig.java +++ b/dd-trace-api/src/main/java/datadog/trace/api/config/TracerConfig.java @@ -133,8 +133,6 @@ public final class TracerConfig { public static final String TRACE_FLUSH_INTERVAL = "trace.flush.interval"; - public static final String TRACE_POST_PROCESSING_ENABLED = "trace.post-processing.enabled"; - public static final String TRACE_POST_PROCESSING_TIMEOUT = "trace.post-processing.timeout"; private TracerConfig() {} diff --git a/dd-trace-core/src/main/java/datadog/trace/common/writer/TraceProcessingWorker.java b/dd-trace-core/src/main/java/datadog/trace/common/writer/TraceProcessingWorker.java index fca7b6ca81d..7827b89bb18 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/writer/TraceProcessingWorker.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/writer/TraceProcessingWorker.java @@ -21,7 +21,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.function.BooleanSupplier; - import org.jctools.queues.MessagePassingQueue; import org.jctools.queues.MpscBlockingConsumerArrayQueue; import org.slf4j.Logger; @@ -305,10 +304,6 @@ protected boolean queuesAreEmpty() { } private void maybeTracePostProcessing(List trace) { - if (!Config.get().isTracePostProcessingEnabled()) { - return; - } - if (trace == null || trace.isEmpty()) { return; } @@ -320,8 +315,7 @@ private void maybeTracePostProcessing(List trace) { long timeout = Config.get().getTracePostProcessingTimeout(); long deadline = System.currentTimeMillis() + timeout; - BooleanSupplier timeoutCheck = - () -> System.currentTimeMillis() > deadline; + BooleanSupplier timeoutCheck = () -> System.currentTimeMillis() > deadline; if (!tracePostProcessor.process(trace, context, timeoutCheck)) { log.debug("Trace post-processing is interrupted due to timeout."); diff --git a/internal-api/src/main/java/datadog/trace/api/Config.java b/internal-api/src/main/java/datadog/trace/api/Config.java index 21391e7618f..e95a11d9ca5 100644 --- a/internal-api/src/main/java/datadog/trace/api/Config.java +++ b/internal-api/src/main/java/datadog/trace/api/Config.java @@ -436,7 +436,6 @@ import static datadog.trace.api.config.TracerConfig.TRACE_PEER_SERVICE_COMPONENT_OVERRIDES; import static datadog.trace.api.config.TracerConfig.TRACE_PEER_SERVICE_DEFAULTS_ENABLED; import static datadog.trace.api.config.TracerConfig.TRACE_PEER_SERVICE_MAPPING; -import static datadog.trace.api.config.TracerConfig.TRACE_POST_PROCESSING_ENABLED; import static datadog.trace.api.config.TracerConfig.TRACE_POST_PROCESSING_TIMEOUT; import static datadog.trace.api.config.TracerConfig.TRACE_PROPAGATION_EXTRACT_FIRST; import static datadog.trace.api.config.TracerConfig.TRACE_PROPAGATION_STYLE; @@ -930,7 +929,6 @@ static class HostNameHolder { private final boolean axisPromoteResourceName; private final float traceFlushIntervalSeconds; - private final boolean tracePostProcessingEnabled; private final long tracePostProcessingTimeout; private final boolean telemetryDebugRequestsEnabled; @@ -2046,9 +2044,6 @@ PROFILING_DATADOG_PROFILER_ENABLED, isDatadogProfilerSafeInCurrentEnvironment()) "Agentless profiling activated but no api key provided. Profile uploading will likely fail"); } - this.tracePostProcessingEnabled = - configProvider.getBoolean(TRACE_POST_PROCESSING_ENABLED, true); - this.tracePostProcessingTimeout = configProvider.getLong(TRACE_POST_PROCESSING_TIMEOUT, 1000); if (isCiVisibilityEnabled() @@ -2151,10 +2146,6 @@ public float getTraceFlushIntervalSeconds() { return traceFlushIntervalSeconds; } - public boolean isTracePostProcessingEnabled() { - return tracePostProcessingEnabled; - } - public long getTracePostProcessingTimeout() { return tracePostProcessingTimeout; } From 2dc3605c87c305cdca6ef78e25a672fafa355c4a Mon Sep 17 00:00:00 2001 From: Valentin Zakharov Date: Tue, 26 Mar 2024 10:03:47 +0100 Subject: [PATCH 05/11] TracePostProcessor replaced with SpanPostProcessor --- .../common/writer/TraceProcessingWorker.java | 27 ++++++++++++------- .../postprocessor/AppSecPostProcessor.java | 6 ++--- ...tProcessor.java => SpanPostProcessor.java} | 5 ++-- 3 files changed, 22 insertions(+), 16 deletions(-) rename dd-trace-core/src/main/java/datadog/trace/core/postprocessor/{TracePostProcessor.java => SpanPostProcessor.java} (51%) diff --git a/dd-trace-core/src/main/java/datadog/trace/common/writer/TraceProcessingWorker.java b/dd-trace-core/src/main/java/datadog/trace/common/writer/TraceProcessingWorker.java index 7827b89bb18..b32788f7bbe 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/writer/TraceProcessingWorker.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/writer/TraceProcessingWorker.java @@ -16,7 +16,7 @@ import datadog.trace.core.DDSpanContext; import datadog.trace.core.monitor.HealthMetrics; import datadog.trace.core.postprocessor.AppSecPostProcessor; -import datadog.trace.core.postprocessor.TracePostProcessor; +import datadog.trace.core.postprocessor.SpanPostProcessor; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -210,7 +210,7 @@ public abstract static class TraceSerializingHandler implements Runnable { private final boolean doTimeFlush; private final PayloadDispatcher payloadDispatcher; private long lastTicks; - private final TracePostProcessor tracePostProcessor; + private final SpanPostProcessor spanPostProcessor; public TraceSerializingHandler( final MpscBlockingConsumerArrayQueue primaryQueue, @@ -230,7 +230,7 @@ public TraceSerializingHandler( } else { this.ticksRequiredToFlush = Long.MAX_VALUE; } - this.tracePostProcessor = new AppSecPostProcessor(); + this.spanPostProcessor = new AppSecPostProcessor(); } @SuppressWarnings("unchecked") @@ -308,17 +308,24 @@ private void maybeTracePostProcessing(List trace) { return; } - DDSpanContext context = trace.get(0).context(); - if (context == null || !context.isRequiresPostProcessing()) { - return; - } - long timeout = Config.get().getTracePostProcessingTimeout(); long deadline = System.currentTimeMillis() + timeout; BooleanSupplier timeoutCheck = () -> System.currentTimeMillis() > deadline; - if (!tracePostProcessor.process(trace, context, timeoutCheck)) { - log.debug("Trace post-processing is interrupted due to timeout."); + for (DDSpan span : trace) { + if (timeoutCheck.getAsBoolean()) { + log.debug("Span post-processing is interrupted due to timeout."); + break; + } + + DDSpanContext context = span.context(); + if (context == null) { + continue; + } + + if (context.isRequiresPostProcessing()) { + spanPostProcessor.process(span, context, timeoutCheck); + } } } } diff --git a/dd-trace-core/src/main/java/datadog/trace/core/postprocessor/AppSecPostProcessor.java b/dd-trace-core/src/main/java/datadog/trace/core/postprocessor/AppSecPostProcessor.java index 92ada522823..0af37f49931 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/postprocessor/AppSecPostProcessor.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/postprocessor/AppSecPostProcessor.java @@ -2,12 +2,12 @@ import datadog.trace.core.DDSpan; import datadog.trace.core.DDSpanContext; -import java.util.List; import java.util.function.BooleanSupplier; -public class AppSecPostProcessor implements TracePostProcessor { +public class AppSecPostProcessor implements SpanPostProcessor { + @Override - public boolean process(List trace, DDSpanContext context, BooleanSupplier timeoutCheck) { + public boolean process(DDSpan span, DDSpanContext context, BooleanSupplier timeoutCheck) { // Do AppSec post-processing return true; } diff --git a/dd-trace-core/src/main/java/datadog/trace/core/postprocessor/TracePostProcessor.java b/dd-trace-core/src/main/java/datadog/trace/core/postprocessor/SpanPostProcessor.java similarity index 51% rename from dd-trace-core/src/main/java/datadog/trace/core/postprocessor/TracePostProcessor.java rename to dd-trace-core/src/main/java/datadog/trace/core/postprocessor/SpanPostProcessor.java index 191ef2f5071..3e7a21bfbc5 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/postprocessor/TracePostProcessor.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/postprocessor/SpanPostProcessor.java @@ -2,9 +2,8 @@ import datadog.trace.core.DDSpan; import datadog.trace.core.DDSpanContext; -import java.util.List; import java.util.function.BooleanSupplier; -public interface TracePostProcessor { - boolean process(List trace, DDSpanContext context, BooleanSupplier timeoutCheck); +public interface SpanPostProcessor { + boolean process(DDSpan trace, DDSpanContext context, BooleanSupplier timeoutCheck); } From 81bd615351261c0ffb3cf058485b744b7038d764 Mon Sep 17 00:00:00 2001 From: ValentinZakharov Date: Wed, 27 Mar 2024 15:02:43 +0200 Subject: [PATCH 06/11] Update dd-trace-core/src/main/java/datadog/trace/core/postprocessor/SpanPostProcessor.java Co-authored-by: Stuart McCulloch --- .../datadog/trace/core/postprocessor/SpanPostProcessor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dd-trace-core/src/main/java/datadog/trace/core/postprocessor/SpanPostProcessor.java b/dd-trace-core/src/main/java/datadog/trace/core/postprocessor/SpanPostProcessor.java index 3e7a21bfbc5..466ef3ab870 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/postprocessor/SpanPostProcessor.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/postprocessor/SpanPostProcessor.java @@ -5,5 +5,5 @@ import java.util.function.BooleanSupplier; public interface SpanPostProcessor { - boolean process(DDSpan trace, DDSpanContext context, BooleanSupplier timeoutCheck); + boolean process(DDSpan span, DDSpanContext context, BooleanSupplier timeoutCheck); } From a7ccdef27b5a9bf8d842d5185f3b789ade0aa947 Mon Sep 17 00:00:00 2001 From: Valentin Zakharov Date: Wed, 27 Mar 2024 16:06:07 +0100 Subject: [PATCH 07/11] Minor improvements for pr-review --- .../common/writer/TraceProcessingWorker.java | 31 ++++++++++++------- .../datadog/trace/core/DDSpanContext.java | 2 +- .../postprocessor/AppSecPostProcessor.java | 3 +- .../core/postprocessor/SpanPostProcessor.java | 19 ++++++++++-- .../trace/util/AgentThreadFactory.java | 1 - 5 files changed, 39 insertions(+), 17 deletions(-) diff --git a/dd-trace-core/src/main/java/datadog/trace/common/writer/TraceProcessingWorker.java b/dd-trace-core/src/main/java/datadog/trace/common/writer/TraceProcessingWorker.java index b32788f7bbe..e67a49b8f9e 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/writer/TraceProcessingWorker.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/writer/TraceProcessingWorker.java @@ -17,6 +17,8 @@ import datadog.trace.core.monitor.HealthMetrics; import datadog.trace.core.postprocessor.AppSecPostProcessor; import datadog.trace.core.postprocessor.SpanPostProcessor; + +import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -308,24 +310,31 @@ private void maybeTracePostProcessing(List trace) { return; } + // Filter spans that need post-processing + List spansToPostProcess = null; + for (DDSpan span : trace) { + DDSpanContext context = span.context(); + if (context != null && context.isRequiresPostProcessing()) { + if (spansToPostProcess == null) { + spansToPostProcess = new ArrayList<>(); + } + spansToPostProcess.add(span); + } + } + + if (spansToPostProcess == null) { + return; + } + long timeout = Config.get().getTracePostProcessingTimeout(); long deadline = System.currentTimeMillis() + timeout; BooleanSupplier timeoutCheck = () -> System.currentTimeMillis() > deadline; - for (DDSpan span : trace) { - if (timeoutCheck.getAsBoolean()) { + for (DDSpan span : spansToPostProcess) { + if (timeoutCheck.getAsBoolean() || !spanPostProcessor.process(span, timeoutCheck)) { log.debug("Span post-processing is interrupted due to timeout."); break; } - - DDSpanContext context = span.context(); - if (context == null) { - continue; - } - - if (context.isRequiresPostProcessing()) { - spanPostProcessor.process(span, context, timeoutCheck); - } } } } diff --git a/dd-trace-core/src/main/java/datadog/trace/core/DDSpanContext.java b/dd-trace-core/src/main/java/datadog/trace/core/DDSpanContext.java index d2efca11e36..eb11bb93b40 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/DDSpanContext.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/DDSpanContext.java @@ -140,7 +140,7 @@ public class DDSpanContext private final boolean injectBaggageAsTags; private volatile int encodedOperationName; private volatile int encodedResourceName; - private boolean requiresPostProcessing; + private volatile boolean requiresPostProcessing; public DDSpanContext( final DDTraceId traceId, diff --git a/dd-trace-core/src/main/java/datadog/trace/core/postprocessor/AppSecPostProcessor.java b/dd-trace-core/src/main/java/datadog/trace/core/postprocessor/AppSecPostProcessor.java index 0af37f49931..4897d183a19 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/postprocessor/AppSecPostProcessor.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/postprocessor/AppSecPostProcessor.java @@ -1,13 +1,12 @@ package datadog.trace.core.postprocessor; import datadog.trace.core.DDSpan; -import datadog.trace.core.DDSpanContext; import java.util.function.BooleanSupplier; public class AppSecPostProcessor implements SpanPostProcessor { @Override - public boolean process(DDSpan span, DDSpanContext context, BooleanSupplier timeoutCheck) { + public boolean process(DDSpan span, BooleanSupplier timeoutCheck) { // Do AppSec post-processing return true; } diff --git a/dd-trace-core/src/main/java/datadog/trace/core/postprocessor/SpanPostProcessor.java b/dd-trace-core/src/main/java/datadog/trace/core/postprocessor/SpanPostProcessor.java index 466ef3ab870..14708410534 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/postprocessor/SpanPostProcessor.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/postprocessor/SpanPostProcessor.java @@ -1,9 +1,24 @@ package datadog.trace.core.postprocessor; import datadog.trace.core.DDSpan; -import datadog.trace.core.DDSpanContext; import java.util.function.BooleanSupplier; +/** + * Span Post-processing with a timeout check capability. + *

+ * Implementations of this interface should carry out post-processing of spans while supporting + * interruption when a specified time limit is exceeded. The method {@code process} + * must check the state of {@code timeoutCheck} while processing span. If {@code timeoutCheck.getAsBoolean()} + * returns {@code true}, processing should be immediately halted, and the method should return {@code false}. + * If post-processing completes successfully before the timeout, the method should return {@code true}. + */ public interface SpanPostProcessor { - boolean process(DDSpan span, DDSpanContext context, BooleanSupplier timeoutCheck); + /** + * Post-processes a span. + *

+ * @param span The span to be post-processed. + * @param timeoutCheck A timeout check returning {@code true} if the allotted time has elapsed. + * @return {@code true} if the span was successfully processed; {@code false} in case of a timeout. + */ + boolean process(DDSpan span, BooleanSupplier timeoutCheck); } diff --git a/internal-api/src/main/java/datadog/trace/util/AgentThreadFactory.java b/internal-api/src/main/java/datadog/trace/util/AgentThreadFactory.java index 46c1fc0f95d..a860b35b833 100644 --- a/internal-api/src/main/java/datadog/trace/util/AgentThreadFactory.java +++ b/internal-api/src/main/java/datadog/trace/util/AgentThreadFactory.java @@ -16,7 +16,6 @@ public enum AgentThread { TRACE_STARTUP("dd-agent-startup-datadog-tracer"), TRACE_MONITOR("dd-trace-monitor"), TRACE_PROCESSOR("dd-trace-processor"), - TRACE_POST_PROCESSOR("dd-trace-post-processor"), SPAN_SAMPLING_PROCESSOR("dd-span-sampling-processor"), TRACE_CASSANDRA_ASYNC_SESSION("dd-cassandra-session-executor"), From ed04cb27a51224b33269eef1a6a1d09b9e895655 Mon Sep 17 00:00:00 2001 From: Valentin Zakharov Date: Wed, 27 Mar 2024 16:18:05 +0100 Subject: [PATCH 08/11] Spotless --- .../common/writer/TraceProcessingWorker.java | 1 - .../core/postprocessor/SpanPostProcessor.java | 18 ++++++++++-------- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/dd-trace-core/src/main/java/datadog/trace/common/writer/TraceProcessingWorker.java b/dd-trace-core/src/main/java/datadog/trace/common/writer/TraceProcessingWorker.java index e67a49b8f9e..dfc5e5c8d45 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/writer/TraceProcessingWorker.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/writer/TraceProcessingWorker.java @@ -17,7 +17,6 @@ import datadog.trace.core.monitor.HealthMetrics; import datadog.trace.core.postprocessor.AppSecPostProcessor; import datadog.trace.core.postprocessor.SpanPostProcessor; - import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; diff --git a/dd-trace-core/src/main/java/datadog/trace/core/postprocessor/SpanPostProcessor.java b/dd-trace-core/src/main/java/datadog/trace/core/postprocessor/SpanPostProcessor.java index 14708410534..8751959456f 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/postprocessor/SpanPostProcessor.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/postprocessor/SpanPostProcessor.java @@ -5,20 +5,22 @@ /** * Span Post-processing with a timeout check capability. - *

- * Implementations of this interface should carry out post-processing of spans while supporting - * interruption when a specified time limit is exceeded. The method {@code process} - * must check the state of {@code timeoutCheck} while processing span. If {@code timeoutCheck.getAsBoolean()} - * returns {@code true}, processing should be immediately halted, and the method should return {@code false}. - * If post-processing completes successfully before the timeout, the method should return {@code true}. + * + *

Implementations of this interface should carry out post-processing of spans while supporting + * interruption when a specified time limit is exceeded. The method {@code process} must check the + * state of {@code timeoutCheck} while processing span. If {@code timeoutCheck.getAsBoolean()} + * returns {@code true}, processing should be immediately halted, and the method should return + * {@code false}. If post-processing completes successfully before the timeout, the method should + * return {@code true}. */ public interface SpanPostProcessor { /** * Post-processes a span. - *

+ * * @param span The span to be post-processed. * @param timeoutCheck A timeout check returning {@code true} if the allotted time has elapsed. - * @return {@code true} if the span was successfully processed; {@code false} in case of a timeout. + * @return {@code true} if the span was successfully processed; {@code false} in case of a + * timeout. */ boolean process(DDSpan span, BooleanSupplier timeoutCheck); } From 96342fffa911d31e4168260cc8c7997f8ee35416 Mon Sep 17 00:00:00 2001 From: Valentin Zakharov Date: Wed, 27 Mar 2024 22:47:04 +0100 Subject: [PATCH 09/11] Default trace post-processing timeout value --- .../src/main/java/datadog/trace/api/ConfigDefaults.java | 2 ++ internal-api/src/main/java/datadog/trace/api/Config.java | 4 +++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/dd-trace-api/src/main/java/datadog/trace/api/ConfigDefaults.java b/dd-trace-api/src/main/java/datadog/trace/api/ConfigDefaults.java index 02f41c3a9ab..2988e995164 100644 --- a/dd-trace-api/src/main/java/datadog/trace/api/ConfigDefaults.java +++ b/dd-trace-api/src/main/java/datadog/trace/api/ConfigDefaults.java @@ -215,6 +215,8 @@ public final class ConfigDefaults { static final float DEFAULT_TRACE_FLUSH_INTERVAL = 1; + static final long DEFAULT_TRACE_POST_PROCESSING_TIMEOUT = 1000; // 1 second + static final boolean DEFAULT_COUCHBASE_INTERNAL_SPANS_ENABLED = true; static final boolean DEFAULT_ELASTICSEARCH_BODY_ENABLED = false; static final boolean DEFAULT_ELASTICSEARCH_PARAMS_ENABLED = true; diff --git a/internal-api/src/main/java/datadog/trace/api/Config.java b/internal-api/src/main/java/datadog/trace/api/Config.java index e95a11d9ca5..ee0f152782d 100644 --- a/internal-api/src/main/java/datadog/trace/api/Config.java +++ b/internal-api/src/main/java/datadog/trace/api/Config.java @@ -2044,7 +2044,9 @@ PROFILING_DATADOG_PROFILER_ENABLED, isDatadogProfilerSafeInCurrentEnvironment()) "Agentless profiling activated but no api key provided. Profile uploading will likely fail"); } - this.tracePostProcessingTimeout = configProvider.getLong(TRACE_POST_PROCESSING_TIMEOUT, 1000); + this.tracePostProcessingTimeout = + configProvider.getLong( + TRACE_POST_PROCESSING_TIMEOUT, ConfigDefaults.DEFAULT_TRACE_POST_PROCESSING_TIMEOUT); if (isCiVisibilityEnabled() && ciVisibilityAgentlessEnabled From 749f577418e9f8d3a75ab186230a4b56e3a3675e Mon Sep 17 00:00:00 2001 From: Valentin Zakharov Date: Fri, 12 Apr 2024 12:55:26 +0200 Subject: [PATCH 10/11] Added test --- .../trace/common/writer/DDAgentWriter.java | 3 +- .../trace/common/writer/DDIntakeWriter.java | 3 +- .../common/writer/TraceProcessingWorker.java | 51 ++++++++++---- .../postprocessor/AppSecPostProcessor.java | 13 ---- .../writer/TraceProcessingWorkerTest.groovy | 69 +++++++++++++++++-- 5 files changed, 106 insertions(+), 33 deletions(-) delete mode 100644 dd-trace-core/src/main/java/datadog/trace/core/postprocessor/AppSecPostProcessor.java diff --git a/dd-trace-core/src/main/java/datadog/trace/common/writer/DDAgentWriter.java b/dd-trace-core/src/main/java/datadog/trace/common/writer/DDAgentWriter.java index a29b07a4af9..0c85cb49abc 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/writer/DDAgentWriter.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/writer/DDAgentWriter.java @@ -154,7 +154,8 @@ public DDAgentWriter build() { null == prioritization ? FAST_LANE : prioritization, flushIntervalMilliseconds, TimeUnit.MILLISECONDS, - singleSpanSampler); + singleSpanSampler, + null); return new DDAgentWriter(traceProcessingWorker, dispatcher, healthMetrics, alwaysFlush); } diff --git a/dd-trace-core/src/main/java/datadog/trace/common/writer/DDIntakeWriter.java b/dd-trace-core/src/main/java/datadog/trace/common/writer/DDIntakeWriter.java index d5e70dd6b47..7075266352c 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/writer/DDIntakeWriter.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/writer/DDIntakeWriter.java @@ -122,7 +122,8 @@ public DDIntakeWriter build() { prioritization, flushIntervalMilliseconds, TimeUnit.MILLISECONDS, - singleSpanSampler); + singleSpanSampler, + null); return new DDIntakeWriter( traceProcessingWorker, diff --git a/dd-trace-core/src/main/java/datadog/trace/common/writer/TraceProcessingWorker.java b/dd-trace-core/src/main/java/datadog/trace/common/writer/TraceProcessingWorker.java index dfc5e5c8d45..a5ec8961caa 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/writer/TraceProcessingWorker.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/writer/TraceProcessingWorker.java @@ -15,7 +15,6 @@ import datadog.trace.core.DDSpan; import datadog.trace.core.DDSpanContext; import datadog.trace.core.monitor.HealthMetrics; -import datadog.trace.core.postprocessor.AppSecPostProcessor; import datadog.trace.core.postprocessor.SpanPostProcessor; import java.util.ArrayList; import java.util.List; @@ -55,7 +54,8 @@ public TraceProcessingWorker( final Prioritization prioritization, final long flushInterval, final TimeUnit timeUnit, - final SingleSpanSampler singleSpanSampler) { + final SingleSpanSampler singleSpanSampler, + final SpanPostProcessor spanPostProcessor) { this.capacity = capacity; this.primaryQueue = createQueue(capacity); this.secondaryQueue = createQueue(capacity); @@ -78,9 +78,21 @@ public TraceProcessingWorker( this.serializingHandler = runAsDaemon ? new DaemonTraceSerializingHandler( - primaryQueue, secondaryQueue, healthMetrics, dispatcher, flushInterval, timeUnit) + primaryQueue, + secondaryQueue, + healthMetrics, + dispatcher, + flushInterval, + timeUnit, + spanPostProcessor) : new NonDaemonTraceSerializingHandler( - primaryQueue, secondaryQueue, healthMetrics, dispatcher, flushInterval, timeUnit); + primaryQueue, + secondaryQueue, + healthMetrics, + dispatcher, + flushInterval, + timeUnit, + spanPostProcessor); this.serializerThread = newAgentThread(TRACE_PROCESSOR, serializingHandler, runAsDaemon); } @@ -139,9 +151,16 @@ public DaemonTraceSerializingHandler( HealthMetrics healthMetrics, PayloadDispatcher payloadDispatcher, long flushInterval, - TimeUnit timeUnit) { + TimeUnit timeUnit, + SpanPostProcessor spanPostProcessor) { super( - primaryQueue, secondaryQueue, healthMetrics, payloadDispatcher, flushInterval, timeUnit); + primaryQueue, + secondaryQueue, + healthMetrics, + payloadDispatcher, + flushInterval, + timeUnit, + spanPostProcessor); } @Override @@ -174,9 +193,16 @@ public NonDaemonTraceSerializingHandler( HealthMetrics healthMetrics, PayloadDispatcher payloadDispatcher, long flushInterval, - TimeUnit timeUnit) { + TimeUnit timeUnit, + SpanPostProcessor spanPostProcessor) { super( - primaryQueue, secondaryQueue, healthMetrics, payloadDispatcher, flushInterval, timeUnit); + primaryQueue, + secondaryQueue, + healthMetrics, + payloadDispatcher, + flushInterval, + timeUnit, + spanPostProcessor); } @Override @@ -219,7 +245,8 @@ public TraceSerializingHandler( final HealthMetrics healthMetrics, final PayloadDispatcher payloadDispatcher, final long flushInterval, - final TimeUnit timeUnit) { + final TimeUnit timeUnit, + final SpanPostProcessor spanPostProcessor) { this.primaryQueue = primaryQueue; this.secondaryQueue = secondaryQueue; this.healthMetrics = healthMetrics; @@ -231,7 +258,7 @@ public TraceSerializingHandler( } else { this.ticksRequiredToFlush = Long.MAX_VALUE; } - this.spanPostProcessor = new AppSecPostProcessor(); + this.spanPostProcessor = spanPostProcessor; } @SuppressWarnings("unchecked") @@ -330,8 +357,8 @@ private void maybeTracePostProcessing(List trace) { BooleanSupplier timeoutCheck = () -> System.currentTimeMillis() > deadline; for (DDSpan span : spansToPostProcess) { - if (timeoutCheck.getAsBoolean() || !spanPostProcessor.process(span, timeoutCheck)) { - log.debug("Span post-processing is interrupted due to timeout."); + if (!spanPostProcessor.process(span, timeoutCheck)) { + log.debug("Span post-processing interrupted due to timeout."); break; } } diff --git a/dd-trace-core/src/main/java/datadog/trace/core/postprocessor/AppSecPostProcessor.java b/dd-trace-core/src/main/java/datadog/trace/core/postprocessor/AppSecPostProcessor.java deleted file mode 100644 index 4897d183a19..00000000000 --- a/dd-trace-core/src/main/java/datadog/trace/core/postprocessor/AppSecPostProcessor.java +++ /dev/null @@ -1,13 +0,0 @@ -package datadog.trace.core.postprocessor; - -import datadog.trace.core.DDSpan; -import java.util.function.BooleanSupplier; - -public class AppSecPostProcessor implements SpanPostProcessor { - - @Override - public boolean process(DDSpan span, BooleanSupplier timeoutCheck) { - // Do AppSec post-processing - return true; - } -} diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/writer/TraceProcessingWorkerTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/writer/TraceProcessingWorkerTest.groovy index dd823a587be..b976dcdbe1e 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/writer/TraceProcessingWorkerTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/writer/TraceProcessingWorkerTest.groovy @@ -4,7 +4,10 @@ import datadog.trace.common.sampling.SingleSpanSampler import datadog.trace.common.writer.ddagent.PrioritizationStrategy.PublishResult import datadog.trace.core.CoreSpan import datadog.trace.core.DDSpan +import datadog.trace.core.DDSpanContext +import datadog.trace.core.PendingTrace import datadog.trace.core.monitor.HealthMetrics +import datadog.trace.core.postprocessor.SpanPostProcessor import datadog.trace.test.util.DDSpecification import spock.util.concurrent.PollingConditions @@ -41,6 +44,7 @@ class TraceProcessingWorkerTest extends DDSpecification { FAST_LANE, 1, TimeUnit.NANOSECONDS, + null, null ) // stop heartbeats from being throttled @@ -67,6 +71,7 @@ class TraceProcessingWorkerTest extends DDSpecification { FAST_LANE, 1, TimeUnit.NANOSECONDS, + null, null ) // stop heartbeats from being throttled def timeConditions = new PollingConditions(timeout: 1, initialDelay: 1, factor: 1.25) @@ -92,7 +97,7 @@ class TraceProcessingWorkerTest extends DDSpecification { false }, FAST_LANE, - 100, TimeUnit.SECONDS, null) // prevent heartbeats from helping the flush happen + 100, TimeUnit.SECONDS, null, null) // prevent heartbeats from helping the flush happen when: "there is pending work it is completed before a flush" // processing this span will throw an exception, but it should be caught @@ -131,7 +136,7 @@ class TraceProcessingWorkerTest extends DDSpecification { throwingDispatcher, { false }, FAST_LANE, - 100, TimeUnit.SECONDS, null) // prevent heartbeats from helping the flush happen + 100, TimeUnit.SECONDS, null, null) // prevent heartbeats from helping the flush happen worker.start() when: "a trace is processed but can't be passed on" @@ -149,6 +154,58 @@ class TraceProcessingWorkerTest extends DDSpecification { priority << [SAMPLER_DROP, USER_DROP, SAMPLER_KEEP, USER_KEEP, UNSET] } + def "trace should be post-processed"() { + setup: + AtomicInteger acceptedCount = new AtomicInteger() + PayloadDispatcherImpl countingDispatcher = Mock(PayloadDispatcherImpl) + countingDispatcher.addTrace(_) >> { + acceptedCount.getAndIncrement() + } + HealthMetrics healthMetrics = Mock(HealthMetrics) + + // Span 1 - should be post-processed + def span1 = DDSpan.create("test", 0, Mock(DDSpanContext) { + isRequiresPostProcessing() >> true + getTrace() >> Mock(PendingTrace) { + getCurrentTimeNano() >> 0 + } + }, []) + def processedSpan1 = false + + // Span 2 - should NOT be post-processed + def span2 = DDSpan.create("test", 0, Mock(DDSpanContext) { + isRequiresPostProcessing() >> false + getTrace() >> Mock(PendingTrace) { + getCurrentTimeNano() >> 0 + } + }, []) + def processedSpan2 = false + + SpanPostProcessor spanPostProcessor = Mock(SpanPostProcessor) { + process(span1, _) >> { processedSpan1 = true } + process(span2, _) >> { processedSpan2 = true } + } + + TraceProcessingWorker worker = new TraceProcessingWorker(10, healthMetrics, + countingDispatcher, { + false + }, FAST_LANE, 100, TimeUnit.SECONDS, null, spanPostProcessor) + worker.start() + + when: "traces are submitted" + worker.publish(span1, SAMPLER_KEEP, [span1, span2]) + worker.publish(span2, SAMPLER_KEEP, [span1, span2]) + + then: "traces are passed through unless rejected on submission" + conditions.eventually { + assert processedSpan1 == true + assert processedSpan2 == false + } + + cleanup: + worker.close() + } + def "traces should be processed"() { setup: AtomicInteger acceptedCount = new AtomicInteger() @@ -160,7 +217,7 @@ class TraceProcessingWorkerTest extends DDSpecification { TraceProcessingWorker worker = new TraceProcessingWorker(10, healthMetrics, countingDispatcher, { false - }, FAST_LANE, 100, TimeUnit.SECONDS, null) + }, FAST_LANE, 100, TimeUnit.SECONDS, null, null) // prevent heartbeats from helping the flush happen worker.start() @@ -211,7 +268,7 @@ class TraceProcessingWorkerTest extends DDSpecification { TraceProcessingWorker worker = new TraceProcessingWorker(10, healthMetrics, countingDispatcher, { false - }, FAST_LANE, 100, TimeUnit.SECONDS, null) + }, FAST_LANE, 100, TimeUnit.SECONDS, null, null) worker.start() worker.close() int queueSize = 0 @@ -248,7 +305,7 @@ class TraceProcessingWorkerTest extends DDSpecification { return false } } - TraceProcessingWorker worker = new TraceProcessingWorker(10, healthMetrics, countingDispatcher, { true }, FAST_LANE, 100, TimeUnit.SECONDS, singleSpanSampler) + TraceProcessingWorker worker = new TraceProcessingWorker(10, healthMetrics, countingDispatcher, { true }, FAST_LANE, 100, TimeUnit.SECONDS, singleSpanSampler, null) worker.start() when: "traces are submitted" @@ -324,7 +381,7 @@ class TraceProcessingWorkerTest extends DDSpecification { return false } } - TraceProcessingWorker worker = new TraceProcessingWorker(10, healthMetrics, countingDispatcher, { false }, FAST_LANE, 100, TimeUnit.SECONDS, singleSpanSampler) + TraceProcessingWorker worker = new TraceProcessingWorker(10, healthMetrics, countingDispatcher, { false }, FAST_LANE, 100, TimeUnit.SECONDS, singleSpanSampler, null) worker.start() when: "traces are submitted" From ae1ecea607456595a57b8ead0c88d0a410fcf725 Mon Sep 17 00:00:00 2001 From: Valentin Zakharov Date: Tue, 16 Apr 2024 21:07:45 +0200 Subject: [PATCH 11/11] Catch any exception, if happens during span post-processing --- .../common/writer/TraceProcessingWorker.java | 22 ++++++++++++------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/dd-trace-core/src/main/java/datadog/trace/common/writer/TraceProcessingWorker.java b/dd-trace-core/src/main/java/datadog/trace/common/writer/TraceProcessingWorker.java index a5ec8961caa..c17c00932bb 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/writer/TraceProcessingWorker.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/writer/TraceProcessingWorker.java @@ -352,14 +352,20 @@ private void maybeTracePostProcessing(List trace) { return; } - long timeout = Config.get().getTracePostProcessingTimeout(); - long deadline = System.currentTimeMillis() + timeout; - BooleanSupplier timeoutCheck = () -> System.currentTimeMillis() > deadline; - - for (DDSpan span : spansToPostProcess) { - if (!spanPostProcessor.process(span, timeoutCheck)) { - log.debug("Span post-processing interrupted due to timeout."); - break; + try { + long timeout = Config.get().getTracePostProcessingTimeout(); + long deadline = System.currentTimeMillis() + timeout; + BooleanSupplier timeoutCheck = () -> System.currentTimeMillis() > deadline; + + for (DDSpan span : spansToPostProcess) { + if (!spanPostProcessor.process(span, timeoutCheck)) { + log.debug("Span post-processing interrupted due to timeout."); + break; + } + } + } catch (Throwable e) { + if (log.isDebugEnabled()) { + log.debug("Error while trace post-processing", e); } } }