Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package datadog.trace.core;

import static datadog.trace.api.telemetry.LogCollector.SEND_TELEMETRY;
import static datadog.trace.util.AgentThreadFactory.AgentThread.TRACE_MONITOR;
import static datadog.trace.util.AgentThreadFactory.THREAD_JOIN_TIMOUT_MS;
import static datadog.trace.util.AgentThreadFactory.newAgentThread;
Expand Down Expand Up @@ -58,6 +59,7 @@ private static class DelayingPendingTraceBuffer extends PendingTraceBuffer {
private static final long SLEEP_TIME_MS = 100;
private static final CommandElement FLUSH_ELEMENT = new CommandElement();
private static final CommandElement DUMP_ELEMENT = new CommandElement();
private static final CommandElement STAND_IN_ELEMENT = new CommandElement();

private final MpscBlockingConsumerArrayQueue<Element> queue;
private final Thread worker;
Expand Down Expand Up @@ -145,6 +147,7 @@ public void accept(Element pendingTrace) {

private static final class DumpDrain
implements MessagePassingQueue.Consumer<Element>, MessagePassingQueue.Supplier<Element> {
private static final Logger LOGGER = LoggerFactory.getLogger(DumpDrain.class);
private static final DumpDrain DUMP_DRAIN = new DumpDrain();
private static final int MAX_DUMPED_TRACES = 50;

Expand All @@ -154,7 +157,7 @@ private static final class DumpDrain
element -> !(element instanceof PendingTrace);

private volatile List<Element> data = new ArrayList<>();
private int index = 0;
private volatile int index = 0;

@Override
public void accept(Element pendingTrace) {
Expand All @@ -166,13 +169,21 @@ public Element get() {
if (index < data.size()) {
return data.get(index++);
}
return null; // Should never reach here or else queue may break according to
// MessagePassingQueue docs
// Should never reach here or else queue may break according to
// MessagePassingQueue docs if we return a null. Return a stand-in
// Element instead.
LOGGER.warn(
SEND_TELEMETRY,
"Index {} is out of bounds for data size {} in DumpDrain.get so returning filler CommandElement to prevent pending trace queue from breaking.",
index,
data.size());
return STAND_IN_ELEMENT;
}

public List<Element> collectTraces() {
List<Element> traces = data;
data = new ArrayList<>();
index = 0;
traces.removeIf(NOT_PENDING_TRACE);
// Storing oldest traces first
traces.sort(TRACE_BY_START_TIME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,36 +460,65 @@ class PendingTraceBufferTest extends DDSpecification {
def parent2 = newSpanOf(trace2, UNSET, System.currentTimeMillis() * 2000)
def child2 = newSpanOf(parent2)

when:
when: "first flare dump with two traces"
parent1.finish()
parent2.finish()
buffer.start()
def entries = buildAndExtractZip()
def entries1 = buildAndExtractZip()

then:
1 * dumpReporter.prepareForFlare()
1 * dumpReporter.addReportToFlare(_)
1 * dumpReporter.cleanupAfterFlare()
entries.size() == 1
def pendingTraceText = entries["pending_traces.txt"] as String
(entries["pending_traces.txt"] as String).startsWith('[{"service":"fakeService","name":"fakeOperation","resource":"fakeResource","trace_id":1,"span_id":1,"parent_id":0') // Rest of dump is timestamp specific

def parsedTraces = pendingTraceText.split('\n').collect { new JsonSlurper().parseText(it) }.flatten()
parsedTraces.size() == 2
parsedTraces[0]["trace_id"] == 1 //Asserting both traces exist
parsedTraces[1]["trace_id"] == 2
parsedTraces[0]["start"] < parsedTraces[1]["start"] //Asserting the dump has the oldest trace first
entries1.size() == 1
def pendingTraceText1 = entries1["pending_traces.txt"] as String
pendingTraceText1.startsWith('[{"service":"fakeService","name":"fakeOperation","resource":"fakeResource","trace_id":1,"span_id":1,"parent_id":0') // Rest of dump is timestamp specific

def parsedTraces1 = pendingTraceText1.split('\n').collect { new JsonSlurper().parseText(it) }.flatten()
parsedTraces1.size() == 2
parsedTraces1[0]["trace_id"] == 1 //Asserting both traces exist
parsedTraces1[1]["trace_id"] == 2
parsedTraces1[0]["start"] < parsedTraces1[1]["start"] //Asserting the dump has the oldest trace first

// New pending traces are needed here because generating the first flare takes long enough that the
// earlier pending traces are flushed (within 500ms).
when: "second flare dump with new pending traces"
// Finish the first set of traces
child1.finish()
child2.finish()
// Create new pending traces
def trace3 = factory.create(DDTraceId.from(3))
def parent3 = newSpanOf(trace3, UNSET, System.currentTimeMillis() * 3000)
def child3 = newSpanOf(parent3)
def trace4 = factory.create(DDTraceId.from(4))
def parent4 = newSpanOf(trace4, UNSET, System.currentTimeMillis() * 4000)
def child4 = newSpanOf(parent4)
parent3.finish()
parent4.finish()
def entries2 = buildAndExtractZip()

then:
1 * dumpReporter.prepareForFlare()
1 * dumpReporter.addReportToFlare(_)
1 * dumpReporter.cleanupAfterFlare()
entries2.size() == 1
def pendingTraceText2 = entries2["pending_traces.txt"] as String
def parsedTraces2 = pendingTraceText2.split('\n').collect { new JsonSlurper().parseText(it) }.flatten()
parsedTraces2.size() == 2

then:
child1.finish()
child2.finish()
child3.finish()
child4.finish()

then:
trace1.size() == 0
trace1.pendingReferenceCount == 0
trace2.size() == 0
trace2.pendingReferenceCount == 0
trace3.size() == 0
trace3.pendingReferenceCount == 0
trace4.size() == 0
trace4.pendingReferenceCount == 0
}


Expand Down