Skip to content

Commit 90ff9a9

Browse files
Introduced trace post-processing
1 parent 39767bb commit 90ff9a9

File tree

7 files changed

+86
-0
lines changed

7 files changed

+86
-0
lines changed

dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/AgentTestRunner.groovy

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -547,6 +547,12 @@ abstract class AgentTestRunner extends DDSpecification implements AgentBuilder.L
547547
check()
548548
return delegate.getDataCurrent(key)
549549
}
550+
551+
@Override
552+
void setRequiresPostProcessing(boolean postProcessing) {
553+
check()
554+
delegate.setRequiresPostProcessing(postProcessing)
555+
}
550556
}
551557

552558
/** Override to clean up things after the agent is removed */

dd-trace-api/src/main/java/datadog/trace/api/config/TracerConfig.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,5 +133,9 @@ public final class TracerConfig {
133133

134134
public static final String TRACE_FLUSH_INTERVAL = "trace.flush.interval";
135135

136+
public static final String TRACE_POST_PROCESSING_ENABLED = "trace.post-processing.enabled";
137+
138+
public static final String TRACE_POST_PROCESSING_TIMEOUT = "trace.post-processing.timeout";
139+
136140
private TracerConfig() {}
137141
}

dd-trace-api/src/main/java/datadog/trace/api/internal/TraceSegment.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,13 @@ default void setTagCurrent(String key, Object value) {
7575
*/
7676
void setDataCurrent(String key, Object value);
7777

78+
/**
79+
* Set flag to indicate the need for post-processing
80+
*
81+
* @param postProcessing flag to indicate the need for post-processing
82+
*/
83+
void setRequiresPostProcessing(boolean postProcessing);
84+
7885
/**
7986
* Gets the value of the tag from the current span in this {@code TraceSegment}.
8087
*
@@ -112,5 +119,8 @@ public void setDataCurrent(String key, Object value) {}
112119
public Object getDataCurrent(String key) {
113120
return null;
114121
}
122+
123+
@Override
124+
public void setRequiresPostProcessing(boolean postProcessing) {}
115125
}
116126
}

dd-trace-core/src/main/java/datadog/trace/common/writer/TraceProcessingWorker.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,9 @@
1313
import datadog.trace.common.writer.ddagent.PrioritizationStrategy;
1414
import datadog.trace.core.CoreSpan;
1515
import datadog.trace.core.DDSpan;
16+
import datadog.trace.core.DDSpanContext;
1617
import datadog.trace.core.monitor.HealthMetrics;
18+
import datadog.trace.util.AgentThreadFactory;
1719
import java.util.List;
1820
import java.util.concurrent.CountDownLatch;
1921
import java.util.concurrent.TimeUnit;
@@ -235,6 +237,7 @@ public void onEvent(Object event) {
235237
try {
236238
if (event instanceof List) {
237239
List<DDSpan> trace = (List<DDSpan>) event;
240+
maybePostProcessing(trace);
238241
// TODO populate `_sample_rate` metric in a way that accounts for lost/dropped traces
239242
payloadDispatcher.addTrace(trace);
240243
} else if (event instanceof FlushEvent) {
@@ -295,5 +298,40 @@ private void consumeBatch(MessagePassingQueue<Object> queue) {
295298
protected boolean queuesAreEmpty() {
296299
return primaryQueue.isEmpty() && secondaryQueue.isEmpty();
297300
}
301+
302+
private void maybePostProcessing(List<DDSpan> trace) {
303+
if (!Config.get().isTracePostProcessingEnabled()) {
304+
return;
305+
}
306+
307+
if (trace == null || trace.isEmpty()) {
308+
return;
309+
}
310+
311+
DDSpanContext context = trace.get(0).context();
312+
if (!context.isRequiresPostProcessing()) {
313+
return;
314+
}
315+
316+
Thread thread =
317+
AgentThreadFactory.newAgentThread(
318+
AgentThreadFactory.AgentThread.TRACE_POST_PROCESSOR,
319+
() -> {
320+
// DO A POST PROCESSING
321+
});
322+
thread.start();
323+
324+
try {
325+
long timeout = Config.get().getTracePostProcessingTimeout();
326+
thread.join(timeout);
327+
if (thread.isAlive()) {
328+
thread.interrupt();
329+
}
330+
} catch (InterruptedException e) {
331+
if (log.isDebugEnabled()) {
332+
log.debug("Trace post-processing is interrupted.", e);
333+
}
334+
}
335+
}
298336
}
299337
}

dd-trace-core/src/main/java/datadog/trace/core/DDSpanContext.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ public class DDSpanContext
140140
private final boolean injectBaggageAsTags;
141141
private volatile int encodedOperationName;
142142
private volatile int encodedResourceName;
143+
private boolean requiresPostProcessing;
143144

144145
public DDSpanContext(
145146
final DDTraceId traceId,
@@ -945,4 +946,13 @@ private String getTagName(String key) {
945946
// TODO is this decided?
946947
return "_dd." + key + ".json";
947948
}
949+
950+
@Override
951+
public void setRequiresPostProcessing(boolean postProcessing) {
952+
this.requiresPostProcessing = postProcessing;
953+
}
954+
955+
public boolean isRequiresPostProcessing() {
956+
return requiresPostProcessing;
957+
}
948958
}

internal-api/src/main/java/datadog/trace/api/Config.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -434,6 +434,8 @@
434434
import static datadog.trace.api.config.TracerConfig.TRACE_PEER_SERVICE_COMPONENT_OVERRIDES;
435435
import static datadog.trace.api.config.TracerConfig.TRACE_PEER_SERVICE_DEFAULTS_ENABLED;
436436
import static datadog.trace.api.config.TracerConfig.TRACE_PEER_SERVICE_MAPPING;
437+
import static datadog.trace.api.config.TracerConfig.TRACE_POST_PROCESSING_ENABLED;
438+
import static datadog.trace.api.config.TracerConfig.TRACE_POST_PROCESSING_TIMEOUT;
437439
import static datadog.trace.api.config.TracerConfig.TRACE_PROPAGATION_EXTRACT_FIRST;
438440
import static datadog.trace.api.config.TracerConfig.TRACE_PROPAGATION_STYLE;
439441
import static datadog.trace.api.config.TracerConfig.TRACE_PROPAGATION_STYLE_EXTRACT;
@@ -924,6 +926,8 @@ static class HostNameHolder {
924926

925927
private final boolean axisPromoteResourceName;
926928
private final float traceFlushIntervalSeconds;
929+
private final boolean tracePostProcessingEnabled;
930+
private final long tracePostProcessingTimeout;
927931

928932
private final boolean telemetryDebugRequestsEnabled;
929933

@@ -2036,6 +2040,11 @@ PROFILING_DATADOG_PROFILER_ENABLED, isDatadogProfilerSafeInCurrentEnvironment())
20362040
"Agentless profiling activated but no api key provided. Profile uploading will likely fail");
20372041
}
20382042

2043+
this.tracePostProcessingEnabled =
2044+
configProvider.getBoolean(TRACE_POST_PROCESSING_ENABLED, true);
2045+
2046+
this.tracePostProcessingTimeout = configProvider.getLong(TRACE_POST_PROCESSING_TIMEOUT, 1000);
2047+
20392048
if (isCiVisibilityEnabled()
20402049
&& ciVisibilityAgentlessEnabled
20412050
&& (apiKey == null || apiKey.isEmpty())) {
@@ -2136,6 +2145,14 @@ public float getTraceFlushIntervalSeconds() {
21362145
return traceFlushIntervalSeconds;
21372146
}
21382147

2148+
public boolean isTracePostProcessingEnabled() {
2149+
return tracePostProcessingEnabled;
2150+
}
2151+
2152+
public long getTracePostProcessingTimeout() {
2153+
return tracePostProcessingTimeout;
2154+
}
2155+
21392156
public boolean isIntegrationSynapseLegacyOperationName() {
21402157
return integrationSynapseLegacyOperationName;
21412158
}

internal-api/src/main/java/datadog/trace/util/AgentThreadFactory.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ public enum AgentThread {
1616
TRACE_STARTUP("dd-agent-startup-datadog-tracer"),
1717
TRACE_MONITOR("dd-trace-monitor"),
1818
TRACE_PROCESSOR("dd-trace-processor"),
19+
TRACE_POST_PROCESSOR("dd-trace-post-processor"),
1920
SPAN_SAMPLING_PROCESSOR("dd-span-sampling-processor"),
2021
TRACE_CASSANDRA_ASYNC_SESSION("dd-cassandra-session-executor"),
2122

0 commit comments

Comments
 (0)