From 4b4bbc51bf67c6fa3b6e5fac3fcef60e6b2515ff Mon Sep 17 00:00:00 2001 From: DJ Gregor Date: Tue, 14 Oct 2025 10:12:44 -0700 Subject: [PATCH 1/2] Avoid pending queue wedge if tracer flare is generated multiple times In DumpDrain, the collectTraces method replaces the 'data' field with an empty ArrayList, but at the same time, it does not also reset the 'index' field. If another dump is performed later, this leads the get method reaching the 'return null' statement, and as the comment states, this can (and does) break the queue. This change does a few things: - Resets the index in collectTraces when the data field is replaced (and marks the index field as volatile). This should prevent the above situation from happening. - In case the situation still happens, a stand-in CommandElement is returned to avoid returning null. A warning message is also logged. - The existing "testing tracer flare dump with multiple traces" test case is expanded to exercise problem. Here is an example stack trace when the hang happens: "dd-trace-monitor" #38 daemon prio=5 os_prio=31 tid=0x0000000110e6e000 nid=0x7617 runnable [0x0000000171032000] java.lang.Thread.State: RUNNABLE at org.jctools.queues.MpscBlockingConsumerArrayQueue.spinWaitForElement(MpscBlockingConsumerArrayQueue.java:634) at org.jctools.queues.MpscBlockingConsumerArrayQueue.parkUntilNext(MpscBlockingConsumerArrayQueue.java:566) at org.jctools.queues.MpscBlockingConsumerArrayQueue.take(MpscBlockingConsumerArrayQueue.java:482) at datadog.trace.core.PendingTraceBuffer$DelayingPendingTraceBuffer$Worker.run(PendingTraceBuffer.java:317) at java.lang.Thread.run(Thread.java:750) --- .../trace/core/PendingTraceBuffer.java | 15 ++++- .../trace/core/PendingTraceBufferTest.groovy | 55 ++++++++++++++----- 2 files changed, 54 insertions(+), 16 deletions(-) diff --git a/dd-trace-core/src/main/java/datadog/trace/core/PendingTraceBuffer.java b/dd-trace-core/src/main/java/datadog/trace/core/PendingTraceBuffer.java index c81a113f0f7..89b68882b5d 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/PendingTraceBuffer.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/PendingTraceBuffer.java @@ -25,6 +25,7 @@ import org.slf4j.LoggerFactory; public abstract class PendingTraceBuffer implements AutoCloseable { + private static final Logger LOGGER = LoggerFactory.getLogger(PendingTraceBuffer.class); private static final int BUFFER_SIZE = 1 << 12; // 4096 public boolean longRunningSpansEnabled() { @@ -58,6 +59,7 @@ private static class DelayingPendingTraceBuffer extends PendingTraceBuffer { private static final long SLEEP_TIME_MS = 100; private static final CommandElement FLUSH_ELEMENT = new CommandElement(); private static final CommandElement DUMP_ELEMENT = new CommandElement(); + private static final CommandElement STAND_IN_ELEMENT = new CommandElement(); private final MpscBlockingConsumerArrayQueue queue; private final Thread worker; @@ -154,7 +156,7 @@ private static final class DumpDrain element -> !(element instanceof PendingTrace); private volatile List data = new ArrayList<>(); - private int index = 0; + private volatile int index = 0; @Override public void accept(Element pendingTrace) { @@ -166,13 +168,20 @@ public Element get() { if (index < data.size()) { return data.get(index++); } - return null; // Should never reach here or else queue may break according to - // MessagePassingQueue docs + // Should never reach here or else queue may break according to + // MessagePassingQueue docs if we return a null. Return a stand-in + // Element instead. + LOGGER.warn( + "Index {} is out of bounds for data size {} in DumpDrain.get so returning filler CommandElement to prevent pending trace queue from breaking.", + index, + data.size()); + return STAND_IN_ELEMENT; } public List collectTraces() { List traces = data; data = new ArrayList<>(); + index = 0; traces.removeIf(NOT_PENDING_TRACE); // Storing oldest traces first traces.sort(TRACE_BY_START_TIME); diff --git a/dd-trace-core/src/test/groovy/datadog/trace/core/PendingTraceBufferTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/core/PendingTraceBufferTest.groovy index c0a97cfd3ed..c9bf72beee7 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/core/PendingTraceBufferTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/core/PendingTraceBufferTest.groovy @@ -460,36 +460,65 @@ class PendingTraceBufferTest extends DDSpecification { def parent2 = newSpanOf(trace2, UNSET, System.currentTimeMillis() * 2000) def child2 = newSpanOf(parent2) - when: + when: "first flare dump with two traces" parent1.finish() parent2.finish() buffer.start() - def entries = buildAndExtractZip() + def entries1 = buildAndExtractZip() then: 1 * dumpReporter.prepareForFlare() 1 * dumpReporter.addReportToFlare(_) 1 * dumpReporter.cleanupAfterFlare() - entries.size() == 1 - def pendingTraceText = entries["pending_traces.txt"] as String - (entries["pending_traces.txt"] as String).startsWith('[{"service":"fakeService","name":"fakeOperation","resource":"fakeResource","trace_id":1,"span_id":1,"parent_id":0') // Rest of dump is timestamp specific - - def parsedTraces = pendingTraceText.split('\n').collect { new JsonSlurper().parseText(it) }.flatten() - parsedTraces.size() == 2 - parsedTraces[0]["trace_id"] == 1 //Asserting both traces exist - parsedTraces[1]["trace_id"] == 2 - parsedTraces[0]["start"] < parsedTraces[1]["start"] //Asserting the dump has the oldest trace first + entries1.size() == 1 + def pendingTraceText1 = entries1["pending_traces.txt"] as String + pendingTraceText1.startsWith('[{"service":"fakeService","name":"fakeOperation","resource":"fakeResource","trace_id":1,"span_id":1,"parent_id":0') // Rest of dump is timestamp specific + + def parsedTraces1 = pendingTraceText1.split('\n').collect { new JsonSlurper().parseText(it) }.flatten() + parsedTraces1.size() == 2 + parsedTraces1[0]["trace_id"] == 1 //Asserting both traces exist + parsedTraces1[1]["trace_id"] == 2 + parsedTraces1[0]["start"] < parsedTraces1[1]["start"] //Asserting the dump has the oldest trace first + + // New pending traces are needed here because generating the first flare takes long enough that the + // earlier pending traces are flushed (within 500ms). + when: "second flare dump with new pending traces" + // Finish the first set of traces + child1.finish() + child2.finish() + // Create new pending traces + def trace3 = factory.create(DDTraceId.from(3)) + def parent3 = newSpanOf(trace3, UNSET, System.currentTimeMillis() * 3000) + def child3 = newSpanOf(parent3) + def trace4 = factory.create(DDTraceId.from(4)) + def parent4 = newSpanOf(trace4, UNSET, System.currentTimeMillis() * 4000) + def child4 = newSpanOf(parent4) + parent3.finish() + parent4.finish() + def entries2 = buildAndExtractZip() + then: + 1 * dumpReporter.prepareForFlare() + 1 * dumpReporter.addReportToFlare(_) + 1 * dumpReporter.cleanupAfterFlare() + entries2.size() == 1 + def pendingTraceText2 = entries2["pending_traces.txt"] as String + def parsedTraces2 = pendingTraceText2.split('\n').collect { new JsonSlurper().parseText(it) }.flatten() + parsedTraces2.size() == 2 then: - child1.finish() - child2.finish() + child3.finish() + child4.finish() then: trace1.size() == 0 trace1.pendingReferenceCount == 0 trace2.size() == 0 trace2.pendingReferenceCount == 0 + trace3.size() == 0 + trace3.pendingReferenceCount == 0 + trace4.size() == 0 + trace4.pendingReferenceCount == 0 } From 3f35463cf8d3c5aebcce63a609e28a1c26b19b55 Mon Sep 17 00:00:00 2001 From: DJ Gregor Date: Wed, 22 Oct 2025 13:08:33 -0700 Subject: [PATCH 2/2] Use SEND_TELEMETRY for DumpDrain's index out of bounds warning log --- .../src/main/java/datadog/trace/core/PendingTraceBuffer.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/dd-trace-core/src/main/java/datadog/trace/core/PendingTraceBuffer.java b/dd-trace-core/src/main/java/datadog/trace/core/PendingTraceBuffer.java index 89b68882b5d..0057eb2ce7d 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/PendingTraceBuffer.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/PendingTraceBuffer.java @@ -1,5 +1,6 @@ package datadog.trace.core; +import static datadog.trace.api.telemetry.LogCollector.SEND_TELEMETRY; import static datadog.trace.util.AgentThreadFactory.AgentThread.TRACE_MONITOR; import static datadog.trace.util.AgentThreadFactory.THREAD_JOIN_TIMOUT_MS; import static datadog.trace.util.AgentThreadFactory.newAgentThread; @@ -25,7 +26,6 @@ import org.slf4j.LoggerFactory; public abstract class PendingTraceBuffer implements AutoCloseable { - private static final Logger LOGGER = LoggerFactory.getLogger(PendingTraceBuffer.class); private static final int BUFFER_SIZE = 1 << 12; // 4096 public boolean longRunningSpansEnabled() { @@ -147,6 +147,7 @@ public void accept(Element pendingTrace) { private static final class DumpDrain implements MessagePassingQueue.Consumer, MessagePassingQueue.Supplier { + private static final Logger LOGGER = LoggerFactory.getLogger(DumpDrain.class); private static final DumpDrain DUMP_DRAIN = new DumpDrain(); private static final int MAX_DUMPED_TRACES = 50; @@ -172,6 +173,7 @@ public Element get() { // MessagePassingQueue docs if we return a null. Return a stand-in // Element instead. LOGGER.warn( + SEND_TELEMETRY, "Index {} is out of bounds for data size {} in DumpDrain.get so returning filler CommandElement to prevent pending trace queue from breaking.", index, data.size());