diff --git a/dd-trace-core/src/main/java/datadog/trace/core/PendingTraceBuffer.java b/dd-trace-core/src/main/java/datadog/trace/core/PendingTraceBuffer.java index c81a113f0f7..0057eb2ce7d 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/PendingTraceBuffer.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/PendingTraceBuffer.java @@ -1,5 +1,6 @@ package datadog.trace.core; +import static datadog.trace.api.telemetry.LogCollector.SEND_TELEMETRY; import static datadog.trace.util.AgentThreadFactory.AgentThread.TRACE_MONITOR; import static datadog.trace.util.AgentThreadFactory.THREAD_JOIN_TIMOUT_MS; import static datadog.trace.util.AgentThreadFactory.newAgentThread; @@ -58,6 +59,7 @@ private static class DelayingPendingTraceBuffer extends PendingTraceBuffer { private static final long SLEEP_TIME_MS = 100; private static final CommandElement FLUSH_ELEMENT = new CommandElement(); private static final CommandElement DUMP_ELEMENT = new CommandElement(); + private static final CommandElement STAND_IN_ELEMENT = new CommandElement(); private final MpscBlockingConsumerArrayQueue queue; private final Thread worker; @@ -145,6 +147,7 @@ public void accept(Element pendingTrace) { private static final class DumpDrain implements MessagePassingQueue.Consumer, MessagePassingQueue.Supplier { + private static final Logger LOGGER = LoggerFactory.getLogger(DumpDrain.class); private static final DumpDrain DUMP_DRAIN = new DumpDrain(); private static final int MAX_DUMPED_TRACES = 50; @@ -154,7 +157,7 @@ private static final class DumpDrain element -> !(element instanceof PendingTrace); private volatile List data = new ArrayList<>(); - private int index = 0; + private volatile int index = 0; @Override public void accept(Element pendingTrace) { @@ -166,13 +169,21 @@ public Element get() { if (index < data.size()) { return data.get(index++); } - return null; // Should never reach here or else queue may break according to - // MessagePassingQueue docs + // Should never reach here or else queue may break according to + // MessagePassingQueue docs if we return a null. Return a stand-in + // Element instead. + LOGGER.warn( + SEND_TELEMETRY, + "Index {} is out of bounds for data size {} in DumpDrain.get so returning filler CommandElement to prevent pending trace queue from breaking.", + index, + data.size()); + return STAND_IN_ELEMENT; } public List collectTraces() { List traces = data; data = new ArrayList<>(); + index = 0; traces.removeIf(NOT_PENDING_TRACE); // Storing oldest traces first traces.sort(TRACE_BY_START_TIME); diff --git a/dd-trace-core/src/test/groovy/datadog/trace/core/PendingTraceBufferTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/core/PendingTraceBufferTest.groovy index c0a97cfd3ed..c9bf72beee7 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/core/PendingTraceBufferTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/core/PendingTraceBufferTest.groovy @@ -460,36 +460,65 @@ class PendingTraceBufferTest extends DDSpecification { def parent2 = newSpanOf(trace2, UNSET, System.currentTimeMillis() * 2000) def child2 = newSpanOf(parent2) - when: + when: "first flare dump with two traces" parent1.finish() parent2.finish() buffer.start() - def entries = buildAndExtractZip() + def entries1 = buildAndExtractZip() then: 1 * dumpReporter.prepareForFlare() 1 * dumpReporter.addReportToFlare(_) 1 * dumpReporter.cleanupAfterFlare() - entries.size() == 1 - def pendingTraceText = entries["pending_traces.txt"] as String - (entries["pending_traces.txt"] as String).startsWith('[{"service":"fakeService","name":"fakeOperation","resource":"fakeResource","trace_id":1,"span_id":1,"parent_id":0') // Rest of dump is timestamp specific - - def parsedTraces = pendingTraceText.split('\n').collect { new JsonSlurper().parseText(it) }.flatten() - parsedTraces.size() == 2 - parsedTraces[0]["trace_id"] == 1 //Asserting both traces exist - parsedTraces[1]["trace_id"] == 2 - parsedTraces[0]["start"] < parsedTraces[1]["start"] //Asserting the dump has the oldest trace first + entries1.size() == 1 + def pendingTraceText1 = entries1["pending_traces.txt"] as String + pendingTraceText1.startsWith('[{"service":"fakeService","name":"fakeOperation","resource":"fakeResource","trace_id":1,"span_id":1,"parent_id":0') // Rest of dump is timestamp specific + + def parsedTraces1 = pendingTraceText1.split('\n').collect { new JsonSlurper().parseText(it) }.flatten() + parsedTraces1.size() == 2 + parsedTraces1[0]["trace_id"] == 1 //Asserting both traces exist + parsedTraces1[1]["trace_id"] == 2 + parsedTraces1[0]["start"] < parsedTraces1[1]["start"] //Asserting the dump has the oldest trace first + + // New pending traces are needed here because generating the first flare takes long enough that the + // earlier pending traces are flushed (within 500ms). + when: "second flare dump with new pending traces" + // Finish the first set of traces + child1.finish() + child2.finish() + // Create new pending traces + def trace3 = factory.create(DDTraceId.from(3)) + def parent3 = newSpanOf(trace3, UNSET, System.currentTimeMillis() * 3000) + def child3 = newSpanOf(parent3) + def trace4 = factory.create(DDTraceId.from(4)) + def parent4 = newSpanOf(trace4, UNSET, System.currentTimeMillis() * 4000) + def child4 = newSpanOf(parent4) + parent3.finish() + parent4.finish() + def entries2 = buildAndExtractZip() + then: + 1 * dumpReporter.prepareForFlare() + 1 * dumpReporter.addReportToFlare(_) + 1 * dumpReporter.cleanupAfterFlare() + entries2.size() == 1 + def pendingTraceText2 = entries2["pending_traces.txt"] as String + def parsedTraces2 = pendingTraceText2.split('\n').collect { new JsonSlurper().parseText(it) }.flatten() + parsedTraces2.size() == 2 then: - child1.finish() - child2.finish() + child3.finish() + child4.finish() then: trace1.size() == 0 trace1.pendingReferenceCount == 0 trace2.size() == 0 trace2.pendingReferenceCount == 0 + trace3.size() == 0 + trace3.pendingReferenceCount == 0 + trace4.size() == 0 + trace4.pendingReferenceCount == 0 }