Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,8 @@ public final class ConfigDefaults {

static final float DEFAULT_TRACE_FLUSH_INTERVAL = 1;

static final long DEFAULT_TRACE_POST_PROCESSING_TIMEOUT = 1000; // 1 second

static final boolean DEFAULT_COUCHBASE_INTERNAL_SPANS_ENABLED = true;
static final boolean DEFAULT_ELASTICSEARCH_BODY_ENABLED = false;
static final boolean DEFAULT_ELASTICSEARCH_PARAMS_ENABLED = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,5 +133,7 @@ public final class TracerConfig {

public static final String TRACE_FLUSH_INTERVAL = "trace.flush.interval";

public static final String TRACE_POST_PROCESSING_TIMEOUT = "trace.post-processing.timeout";

private TracerConfig() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,8 @@ public DDAgentWriter build() {
null == prioritization ? FAST_LANE : prioritization,
flushIntervalMilliseconds,
TimeUnit.MILLISECONDS,
singleSpanSampler);
singleSpanSampler,
null);

return new DDAgentWriter(traceProcessingWorker, dispatcher, healthMetrics, alwaysFlush);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ public DDIntakeWriter build() {
prioritization,
flushIntervalMilliseconds,
TimeUnit.MILLISECONDS,
singleSpanSampler);
singleSpanSampler,
null);

return new DDIntakeWriter(
traceProcessingWorker,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,14 @@
import datadog.trace.common.writer.ddagent.PrioritizationStrategy;
import datadog.trace.core.CoreSpan;
import datadog.trace.core.DDSpan;
import datadog.trace.core.DDSpanContext;
import datadog.trace.core.monitor.HealthMetrics;
import datadog.trace.core.postprocessor.SpanPostProcessor;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.BooleanSupplier;
import org.jctools.queues.MessagePassingQueue;
import org.jctools.queues.MpscBlockingConsumerArrayQueue;
import org.slf4j.Logger;
Expand Down Expand Up @@ -50,7 +54,8 @@ public TraceProcessingWorker(
final Prioritization prioritization,
final long flushInterval,
final TimeUnit timeUnit,
final SingleSpanSampler singleSpanSampler) {
final SingleSpanSampler singleSpanSampler,
final SpanPostProcessor spanPostProcessor) {
this.capacity = capacity;
this.primaryQueue = createQueue(capacity);
this.secondaryQueue = createQueue(capacity);
Expand All @@ -73,9 +78,21 @@ public TraceProcessingWorker(
this.serializingHandler =
runAsDaemon
? new DaemonTraceSerializingHandler(
primaryQueue, secondaryQueue, healthMetrics, dispatcher, flushInterval, timeUnit)
primaryQueue,
secondaryQueue,
healthMetrics,
dispatcher,
flushInterval,
timeUnit,
spanPostProcessor)
: new NonDaemonTraceSerializingHandler(
primaryQueue, secondaryQueue, healthMetrics, dispatcher, flushInterval, timeUnit);
primaryQueue,
secondaryQueue,
healthMetrics,
dispatcher,
flushInterval,
timeUnit,
spanPostProcessor);
this.serializerThread = newAgentThread(TRACE_PROCESSOR, serializingHandler, runAsDaemon);
}

Expand Down Expand Up @@ -134,9 +151,16 @@ public DaemonTraceSerializingHandler(
HealthMetrics healthMetrics,
PayloadDispatcher payloadDispatcher,
long flushInterval,
TimeUnit timeUnit) {
TimeUnit timeUnit,
SpanPostProcessor spanPostProcessor) {
super(
primaryQueue, secondaryQueue, healthMetrics, payloadDispatcher, flushInterval, timeUnit);
primaryQueue,
secondaryQueue,
healthMetrics,
payloadDispatcher,
flushInterval,
timeUnit,
spanPostProcessor);
}

@Override
Expand Down Expand Up @@ -169,9 +193,16 @@ public NonDaemonTraceSerializingHandler(
HealthMetrics healthMetrics,
PayloadDispatcher payloadDispatcher,
long flushInterval,
TimeUnit timeUnit) {
TimeUnit timeUnit,
SpanPostProcessor spanPostProcessor) {
super(
primaryQueue, secondaryQueue, healthMetrics, payloadDispatcher, flushInterval, timeUnit);
primaryQueue,
secondaryQueue,
healthMetrics,
payloadDispatcher,
flushInterval,
timeUnit,
spanPostProcessor);
}

@Override
Expand Down Expand Up @@ -206,14 +237,16 @@ public abstract static class TraceSerializingHandler implements Runnable {
private final boolean doTimeFlush;
private final PayloadDispatcher payloadDispatcher;
private long lastTicks;
private final SpanPostProcessor spanPostProcessor;

public TraceSerializingHandler(
final MpscBlockingConsumerArrayQueue<Object> primaryQueue,
final MpscBlockingConsumerArrayQueue<Object> secondaryQueue,
final HealthMetrics healthMetrics,
final PayloadDispatcher payloadDispatcher,
final long flushInterval,
final TimeUnit timeUnit) {
final TimeUnit timeUnit,
final SpanPostProcessor spanPostProcessor) {
this.primaryQueue = primaryQueue;
this.secondaryQueue = secondaryQueue;
this.healthMetrics = healthMetrics;
Expand All @@ -225,6 +258,7 @@ public TraceSerializingHandler(
} else {
this.ticksRequiredToFlush = Long.MAX_VALUE;
}
this.spanPostProcessor = spanPostProcessor;
}

@SuppressWarnings("unchecked")
Expand All @@ -235,6 +269,7 @@ public void onEvent(Object event) {
try {
if (event instanceof List) {
List<DDSpan> trace = (List<DDSpan>) event;
maybeTracePostProcessing(trace);
// TODO populate `_sample_rate` metric in a way that accounts for lost/dropped traces
payloadDispatcher.addTrace(trace);
} else if (event instanceof FlushEvent) {
Expand Down Expand Up @@ -295,5 +330,44 @@ private void consumeBatch(MessagePassingQueue<Object> queue) {
protected boolean queuesAreEmpty() {
return primaryQueue.isEmpty() && secondaryQueue.isEmpty();
}

private void maybeTracePostProcessing(List<DDSpan> trace) {
if (trace == null || trace.isEmpty()) {
return;
}

// Filter spans that need post-processing
List<DDSpan> spansToPostProcess = null;
for (DDSpan span : trace) {
DDSpanContext context = span.context();
if (context != null && context.isRequiresPostProcessing()) {
if (spansToPostProcess == null) {
spansToPostProcess = new ArrayList<>();
}
spansToPostProcess.add(span);
}
}

if (spansToPostProcess == null) {
return;
}

try {
long timeout = Config.get().getTracePostProcessingTimeout();
long deadline = System.currentTimeMillis() + timeout;
BooleanSupplier timeoutCheck = () -> System.currentTimeMillis() > deadline;

for (DDSpan span : spansToPostProcess) {
if (!spanPostProcessor.process(span, timeoutCheck)) {
log.debug("Span post-processing interrupted due to timeout.");
break;
}
}
} catch (Throwable e) {
if (log.isDebugEnabled()) {
log.debug("Error while trace post-processing", e);
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ public class DDSpanContext
private final boolean injectBaggageAsTags;
private volatile int encodedOperationName;
private volatile int encodedResourceName;
private volatile boolean requiresPostProcessing;

public DDSpanContext(
final DDTraceId traceId,
Expand Down Expand Up @@ -944,4 +945,12 @@ private String getTagName(String key) {
// TODO is this decided?
return "_dd." + key + ".json";
}

public void setRequiresPostProcessing(boolean postProcessing) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't particularly like that these methods are public.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my case, setRequiresPostProcessing, should be called from appsec module, so we need to expose it in some way. Either in TraceSegmet, either in DDSpanContext.
Do you have any other options in mind?

this.requiresPostProcessing = postProcessing;
}

public boolean isRequiresPostProcessing() {
return requiresPostProcessing;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package datadog.trace.core.postprocessor;

import datadog.trace.core.DDSpan;
import java.util.function.BooleanSupplier;

/**
* Span Post-processing with a timeout check capability.
*
* <p>Implementations of this interface should carry out post-processing of spans while supporting
* interruption when a specified time limit is exceeded. The method {@code process} must check the
* state of {@code timeoutCheck} while processing span. If {@code timeoutCheck.getAsBoolean()}
* returns {@code true}, processing should be immediately halted, and the method should return
* {@code false}. If post-processing completes successfully before the timeout, the method should
* return {@code true}.
*/
public interface SpanPostProcessor {
/**
* Post-processes a span.
*
* @param span The span to be post-processed.
* @param timeoutCheck A timeout check returning {@code true} if the allotted time has elapsed.
* @return {@code true} if the span was successfully processed; {@code false} in case of a
* timeout.
*/
boolean process(DDSpan span, BooleanSupplier timeoutCheck);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the BooleanSupplier? Would it not be better to just have two different methods to cover the two different cases?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea of the supplier was for situations where the processor was doing something non-trivial - assuming that processing could be broken down into smaller chunks then the supplier would allow it to check to see if the timeout had been exceeded mid-processing (otherwise there's no easy way for a processor to check to see if it should abort its processing)

In other words during the process call the value returned by the supplier could change, and it's not possible to represent that as two methods.

This is only really needed when the process call might take a while and need to be cancelled mid-call.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, that makes sense. Thanks for the clarification.

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ import datadog.trace.common.sampling.SingleSpanSampler
import datadog.trace.common.writer.ddagent.PrioritizationStrategy.PublishResult
import datadog.trace.core.CoreSpan
import datadog.trace.core.DDSpan
import datadog.trace.core.DDSpanContext
import datadog.trace.core.PendingTrace
import datadog.trace.core.monitor.HealthMetrics
import datadog.trace.core.postprocessor.SpanPostProcessor
import datadog.trace.test.util.DDSpecification
import spock.util.concurrent.PollingConditions

Expand Down Expand Up @@ -41,6 +44,7 @@ class TraceProcessingWorkerTest extends DDSpecification {
FAST_LANE,
1,
TimeUnit.NANOSECONDS,
null,
null
) // stop heartbeats from being throttled

Expand All @@ -67,6 +71,7 @@ class TraceProcessingWorkerTest extends DDSpecification {
FAST_LANE,
1,
TimeUnit.NANOSECONDS,
null,
null
) // stop heartbeats from being throttled
def timeConditions = new PollingConditions(timeout: 1, initialDelay: 1, factor: 1.25)
Expand All @@ -92,7 +97,7 @@ class TraceProcessingWorkerTest extends DDSpecification {
false
},
FAST_LANE,
100, TimeUnit.SECONDS, null) // prevent heartbeats from helping the flush happen
100, TimeUnit.SECONDS, null, null) // prevent heartbeats from helping the flush happen

when: "there is pending work it is completed before a flush"
// processing this span will throw an exception, but it should be caught
Expand Down Expand Up @@ -131,7 +136,7 @@ class TraceProcessingWorkerTest extends DDSpecification {
throwingDispatcher, {
false
}, FAST_LANE,
100, TimeUnit.SECONDS, null) // prevent heartbeats from helping the flush happen
100, TimeUnit.SECONDS, null, null) // prevent heartbeats from helping the flush happen
worker.start()

when: "a trace is processed but can't be passed on"
Expand All @@ -149,6 +154,58 @@ class TraceProcessingWorkerTest extends DDSpecification {
priority << [SAMPLER_DROP, USER_DROP, SAMPLER_KEEP, USER_KEEP, UNSET]
}

def "trace should be post-processed"() {
setup:
AtomicInteger acceptedCount = new AtomicInteger()
PayloadDispatcherImpl countingDispatcher = Mock(PayloadDispatcherImpl)
countingDispatcher.addTrace(_) >> {
acceptedCount.getAndIncrement()
}
HealthMetrics healthMetrics = Mock(HealthMetrics)

// Span 1 - should be post-processed
def span1 = DDSpan.create("test", 0, Mock(DDSpanContext) {
isRequiresPostProcessing() >> true
getTrace() >> Mock(PendingTrace) {
getCurrentTimeNano() >> 0
}
}, [])
def processedSpan1 = false

// Span 2 - should NOT be post-processed
def span2 = DDSpan.create("test", 0, Mock(DDSpanContext) {
isRequiresPostProcessing() >> false
getTrace() >> Mock(PendingTrace) {
getCurrentTimeNano() >> 0
}
}, [])
def processedSpan2 = false

SpanPostProcessor spanPostProcessor = Mock(SpanPostProcessor) {
process(span1, _) >> { processedSpan1 = true }
process(span2, _) >> { processedSpan2 = true }
}

TraceProcessingWorker worker = new TraceProcessingWorker(10, healthMetrics,
countingDispatcher, {
false
}, FAST_LANE, 100, TimeUnit.SECONDS, null, spanPostProcessor)
worker.start()

when: "traces are submitted"
worker.publish(span1, SAMPLER_KEEP, [span1, span2])
worker.publish(span2, SAMPLER_KEEP, [span1, span2])

then: "traces are passed through unless rejected on submission"
conditions.eventually {
assert processedSpan1 == true
assert processedSpan2 == false
}

cleanup:
worker.close()
}

def "traces should be processed"() {
setup:
AtomicInteger acceptedCount = new AtomicInteger()
Expand All @@ -160,7 +217,7 @@ class TraceProcessingWorkerTest extends DDSpecification {
TraceProcessingWorker worker = new TraceProcessingWorker(10, healthMetrics,
countingDispatcher, {
false
}, FAST_LANE, 100, TimeUnit.SECONDS, null)
}, FAST_LANE, 100, TimeUnit.SECONDS, null, null)
// prevent heartbeats from helping the flush happen
worker.start()

Expand Down Expand Up @@ -211,7 +268,7 @@ class TraceProcessingWorkerTest extends DDSpecification {
TraceProcessingWorker worker = new TraceProcessingWorker(10, healthMetrics,
countingDispatcher, {
false
}, FAST_LANE, 100, TimeUnit.SECONDS, null)
}, FAST_LANE, 100, TimeUnit.SECONDS, null, null)
worker.start()
worker.close()
int queueSize = 0
Expand Down Expand Up @@ -248,7 +305,7 @@ class TraceProcessingWorkerTest extends DDSpecification {
return false
}
}
TraceProcessingWorker worker = new TraceProcessingWorker(10, healthMetrics, countingDispatcher, { true }, FAST_LANE, 100, TimeUnit.SECONDS, singleSpanSampler)
TraceProcessingWorker worker = new TraceProcessingWorker(10, healthMetrics, countingDispatcher, { true }, FAST_LANE, 100, TimeUnit.SECONDS, singleSpanSampler, null)
worker.start()

when: "traces are submitted"
Expand Down Expand Up @@ -324,7 +381,7 @@ class TraceProcessingWorkerTest extends DDSpecification {
return false
}
}
TraceProcessingWorker worker = new TraceProcessingWorker(10, healthMetrics, countingDispatcher, { false }, FAST_LANE, 100, TimeUnit.SECONDS, singleSpanSampler)
TraceProcessingWorker worker = new TraceProcessingWorker(10, healthMetrics, countingDispatcher, { false }, FAST_LANE, 100, TimeUnit.SECONDS, singleSpanSampler, null)
worker.start()

when: "traces are submitted"
Expand Down
Loading