From 953c909b685f6a53e967a8a6c70f6d19b9e09456 Mon Sep 17 00:00:00 2001 From: Felix Barnsteiner Date: Tue, 16 Apr 2019 09:45:01 +0200 Subject: [PATCH 01/21] Recycle via reference counting --- .../apm/agent/impl/ElasticApmTracer.java | 36 ++-- .../async/SpanInScopeCallableWrapper.java | 2 + .../async/SpanInScopeRunnableWrapper.java | 2 + .../apm/agent/impl/error/ErrorPayload.java | 7 - .../apm/agent/impl/payload/Payload.java | 1 - .../impl/payload/TransactionPayload.java | 9 - .../agent/impl/transaction/AbstractSpan.java | 163 ++++++++++++++---- .../apm/agent/impl/transaction/Span.java | 43 ++++- .../agent/impl/transaction/TraceContext.java | 5 + .../impl/transaction/TraceContextHolder.java | 2 + .../agent/impl/transaction/Transaction.java | 25 ++- .../apm/agent/report/ApmServerReporter.java | 4 +- .../report/IntakeV2ReportingEventHandler.java | 4 +- .../co/elastic/apm/agent/MockReporter.java | 22 +++ .../apm/agent/impl/ElasticApmTracerTest.java | 9 +- .../apm/agent/impl/ScopeManagementTest.java | 43 +++-- .../api/ElasticApmApiInstrumentationTest.java | 4 +- ...asticsearchClientAsyncInstrumentation.java | 2 - ...asticsearchClientAsyncInstrumentation.java | 2 - .../concurrent/ExecutorInstrumentation.java | 3 +- .../FailingExecutorInstrumentationTest.java | 8 +- .../OkHttp3ClientAsyncInstrumentation.java | 2 +- .../OkHttpClientAsyncInstrumentation.java | 2 +- .../agent/servlet/AsyncInstrumentation.java | 1 - .../opentracing/OpenTracingBridgeTest.java | 8 +- .../pom.xml | 5 + ...stractServletContainerIntegrationTest.java | 4 +- .../java/co/elastic/apm/servlet/JettyIT.java | 3 +- 28 files changed, 297 insertions(+), 124 deletions(-) diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/ElasticApmTracer.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/ElasticApmTracer.java index c6e9eb2554..2c4c044019 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/ElasticApmTracer.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/ElasticApmTracer.java @@ -210,7 +210,7 @@ public Transaction startTransaction(TraceContext.ChildContextCreator chil if (!coreConfiguration.isActive()) { transaction = noopTransaction(); } else { - transaction = transactionPool.createInstance().start(childContextCreator, parent, epochMicros, sampler); + transaction = createTransaction().start(childContextCreator, parent, epochMicros, sampler); } if (logger.isDebugEnabled()) { logger.debug("startTransaction {} {", transaction); @@ -227,7 +227,16 @@ public Transaction startTransaction(TraceContext.ChildContextCreator chil } public Transaction noopTransaction() { - return transactionPool.createInstance().startNoop(); + return createTransaction().startNoop(); + } + + private Transaction createTransaction() { + Transaction transaction = transactionPool.createInstance(); + while (transaction.getReferenceCount() != 0) { + logger.warn("Tried to start a transaction with a non-zero reference count {} {}", transaction.getReferenceCount(), transaction); + transaction = transactionPool.createInstance(); + } + return transaction; } @Nullable @@ -270,7 +279,7 @@ public Span startSpan(AbstractSpan parent, long epochMicros) { * @see #startSpan(TraceContext.ChildContextCreator, Object) */ public Span startSpan(TraceContext.ChildContextCreator childContextCreator, T parentContext, long epochMicros) { - Span span = spanPool.createInstance(); + Span span = createSpan(); final boolean dropped; Transaction transaction = currentTransaction(); if (transaction != null) { @@ -288,6 +297,15 @@ public Span startSpan(TraceContext.ChildContextCreator childContextCreato return span; } + private Span createSpan() { + Span span = spanPool.createInstance(); + while (span.getReferenceCount() != 0) { + logger.warn("Tried to start a span with a non-zero reference count {} {}", span.getReferenceCount(), span); + span = spanPool.createInstance(); + } + return span; + } + private boolean isTransactionSpanLimitReached(Transaction transaction) { return coreConfiguration.getTransactionMaxSpans() <= transaction.getSpanCount().getStarted().get(); } @@ -347,7 +365,7 @@ public void endTransaction(Transaction transaction) { // we do report non-sampled transactions (without the context) reporter.report(transaction); } else { - transaction.recycle(); + transaction.decrementReferences(); } } @@ -356,13 +374,13 @@ public void endSpan(Span span) { if (span.isSampled()) { long spanFramesMinDurationMs = stacktraceConfiguration.getSpanFramesMinDurationMs(); if (spanFramesMinDurationMs != 0 && span.isSampled()) { - if (span.getDuration() >= spanFramesMinDurationMs) { + if (span.getDurationMs() >= spanFramesMinDurationMs) { span.withStacktrace(new Throwable()); } } reporter.report(span); } else { - span.recycle(); + span.decrementReferences(); } } @@ -475,12 +493,6 @@ public void deactivate(TraceContextHolder holder) { } final Deque> stack = activeStack.get(); assertIsActive(holder, stack.poll()); - if (holder == stack.peekLast()) { - // if this is the bottom of the stack - // clear to avoid potential leaks in case some spans didn't deactivate properly - // makes all leaked spans eligible for GC - stack.clear(); - } } private void assertIsActive(Object span, @Nullable Object currentlyActive) { diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/async/SpanInScopeCallableWrapper.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/async/SpanInScopeCallableWrapper.java index 4bfe6e7ec5..0f1008e631 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/async/SpanInScopeCallableWrapper.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/async/SpanInScopeCallableWrapper.java @@ -46,6 +46,7 @@ public SpanInScopeCallableWrapper(ElasticApmTracer tracer) { public SpanInScopeCallableWrapper wrap(Callable delegate, AbstractSpan span) { this.delegate = delegate; this.span = span; + span.incrementReferences(); return this; } @@ -74,6 +75,7 @@ public V call() throws Exception { try { if (localSpan != null) { localSpan.deactivate(); + span.decrementReferences(); } tracer.recycle(this); } catch (Throwable t) { diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/async/SpanInScopeRunnableWrapper.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/async/SpanInScopeRunnableWrapper.java index 4eff4f6658..3844e10227 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/async/SpanInScopeRunnableWrapper.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/async/SpanInScopeRunnableWrapper.java @@ -45,6 +45,7 @@ public SpanInScopeRunnableWrapper(ElasticApmTracer tracer) { public SpanInScopeRunnableWrapper wrap(Runnable delegate, AbstractSpan span) { this.delegate = delegate; this.span = span; + span.incrementReferences(); return this; } @@ -73,6 +74,7 @@ public void run() { try { if (localSpan != null) { localSpan.deactivate(); + localSpan.decrementReferences(); } tracer.recycle(this); } catch (Throwable t) { diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/error/ErrorPayload.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/error/ErrorPayload.java index ee4341f444..0f94656ff3 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/error/ErrorPayload.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/error/ErrorPayload.java @@ -63,13 +63,6 @@ public int getPayloadSize() { return errors.size(); } - @Override - public void recycle() { - for (ErrorCapture error : errors) { - error.recycle(); - } - } - @Override public void resetState() { errors.clear(); diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/payload/Payload.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/payload/Payload.java index 5685bd5d33..3f39e7154c 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/payload/Payload.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/payload/Payload.java @@ -76,5 +76,4 @@ public SystemInfo getSystem() { public abstract int getPayloadSize(); - public abstract void recycle(); } diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/payload/TransactionPayload.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/payload/TransactionPayload.java index d26e8ae44c..90c25aa0f2 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/payload/TransactionPayload.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/payload/TransactionPayload.java @@ -73,13 +73,4 @@ public int getPayloadSize() { return transactions.size() + spans.size(); } - @Override - public void recycle() { - for (int i = 0; i < transactions.size(); i++) { - transactions.get(i).recycle(); - } - for (int i = 0; i < spans.size(); i++) { - spans.get(i).recycle(); - } - } } diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/AbstractSpan.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/AbstractSpan.java index 7f3a03c0cf..19f6791fde 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/AbstractSpan.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/AbstractSpan.java @@ -20,47 +20,115 @@ package co.elastic.apm.agent.impl.transaction; import co.elastic.apm.agent.impl.ElasticApmTracer; +import co.elastic.apm.agent.objectpool.Recyclable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nullable; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; public abstract class AbstractSpan extends TraceContextHolder { private static final Logger logger = LoggerFactory.getLogger(AbstractSpan.class); protected static final double MS_IN_MICROS = TimeUnit.MILLISECONDS.toMicros(1); protected final TraceContext traceContext; - // used to mark this span as expected to switch lifecycle-managing-thread, eg span created by one thread and ended by another - private volatile boolean isLifecycleManagingThreadSwitch; - /** * Generic designation of a transaction in the scope of a single service (eg: 'GET /users/:id') */ protected final StringBuilder name = new StringBuilder(); private long timestamp; - /** - * How long the transaction took to complete, in ms with 3 decimal points - * (Required) - */ - protected double duration; - private volatile boolean finished = true; + // in microseconds + protected long duration; + protected ReentrantTimer childDurations = new ReentrantTimer(); + protected AtomicInteger references = new AtomicInteger(); + protected volatile boolean finished = true; + + public int getReferenceCount() { + return references.get(); + } + + public static class ReentrantTimer implements Recyclable { + + private AtomicInteger nestingLevel = new AtomicInteger(); + private AtomicLong start = new AtomicLong(); + private AtomicLong duration = new AtomicLong(); + + /** + * Starts the timer if it has not been started already. + * + * @param startTimestamp + */ + public void start(long startTimestamp) { + if (nestingLevel.incrementAndGet() == 1) { + start.set(startTimestamp); + } + } + + /** + * Stops the timer and increments the duration if no other direct children are still running + * @param endTimestamp + */ + public void stop(long endTimestamp) { + if (nestingLevel.decrementAndGet() == 0) { + incrementDuration(endTimestamp); + } + } + + /** + * Stops the timer and increments the duration even if there are direct children which are still running + * + * @param endTimestamp + */ + public void forceStop(long endTimestamp) { + if (nestingLevel.getAndSet(0) != 0) { + incrementDuration(endTimestamp); + } + } + + private void incrementDuration(long epochMicros) { + duration.addAndGet(epochMicros - start.get()); + } + + @Override + public void resetState() { + nestingLevel.set(0); + start.set(0); + duration.set(0); + } + + public long getDuration() { + return duration.get(); + } + } public AbstractSpan(ElasticApmTracer tracer) { super(tracer); traceContext = TraceContext.with64BitId(this.tracer); } + public boolean isReferenced() { + return references.get() > 0; + } + /** - * How long the transaction took to complete, in ms with 3 decimal points - * (Required) + * How long the transaction took to complete, in µs */ - public double getDuration() { + public long getDuration() { return duration; } + public long getSelfDuration() { + return duration - childDurations.getDuration(); + } + + public double getDurationMs() { + return duration / AbstractSpan.MS_IN_MICROS; + } + /** * Generic designation of a transaction in the scope of a single service (eg: 'GET /users/:id') */ @@ -113,8 +181,9 @@ public void resetState() { name.setLength(0); timestamp = 0; duration = 0; - isLifecycleManagingThreadSwitch = false; traceContext.resetState(); + childDurations.resetState(); + references.set(0); } public boolean isChildOf(AbstractSpan parent) { @@ -126,8 +195,11 @@ public Span createSpan() { return createSpan(traceContext.getClock().getEpochMicros()); } + @Override public Span createSpan(long epochMicros) { - return tracer.startSpan(this, epochMicros); + final Span span = tracer.startSpan(this, epochMicros); + onChildStart(span, epochMicros); + return span; } public abstract void addLabel(String key, String value); @@ -136,8 +208,15 @@ public Span createSpan(long epochMicros) { public abstract void addLabel(String key, Boolean value); - protected void onStart() { + /** + * Called after the span has been started and its parent references are set + */ + protected void onAfterStart() { this.finished = false; + // this final reference is decremented when the span is reported + // or even after its reported and the last child span is ended + references.set(0); + incrementReferences(); } public void end() { @@ -146,12 +225,14 @@ public void end() { public final void end(long epochMicros) { if (!finished) { - this.finished = true; - this.duration = (epochMicros - timestamp) / AbstractSpan.MS_IN_MICROS; + this.duration = (epochMicros - timestamp); if (name.length() == 0) { name.append("unnamed"); } + childDurations.forceStop(epochMicros); doEnd(epochMicros); + // has to be set last so doEnd callbacks don't think it has already been finished + this.finished = true; } else { logger.warn("End has already been called: {}", this); assert false; @@ -165,20 +246,19 @@ public boolean isChildOf(TraceContextHolder other) { return getTraceContext().isChildOf(other); } - public void markLifecycleManagingThreadSwitchExpected() { - isLifecycleManagingThreadSwitch = true; + @Override + public T activate() { + incrementReferences(); + return super.activate(); } @Override - public T activate() { - if (isLifecycleManagingThreadSwitch) { - // This serves two goals: - // 1. resets the lifecycle management flag, so that the executing thread will remain in charge until set otherwise - // by setting this flag once more - // 2. reading this volatile field when span is activated on a new thread ensures proper visibility of other span data - isLifecycleManagingThreadSwitch = false; + public T deactivate() { + try { + return super.deactivate(); + } finally { + decrementReferences(); } - return super.activate(); } /** @@ -191,11 +271,7 @@ public T activate() { */ @Override public Runnable withActive(Runnable runnable) { - if (isLifecycleManagingThreadSwitch) { - return tracer.wrapRunnable(runnable, this); - } else { - return tracer.wrapRunnable(runnable, traceContext); - } + return tracer.wrapRunnable(runnable, this); } /** @@ -208,15 +284,28 @@ public Runnable withActive(Runnable runnable) { */ @Override public Callable withActive(Callable callable) { - if (isLifecycleManagingThreadSwitch) { - return tracer.wrapCallable(callable, this); - } else { - return tracer.wrapCallable(callable, traceContext); - } + return tracer.wrapCallable(callable, this); } public void setStartTimestamp(long epochMicros) { timestamp = epochMicros; } + private void onChildStart(Span span, long epochMicros) { + incrementReferences(); + childDurations.start(epochMicros); + } + + void onChildEnd(Span span, long epochMicros) { + childDurations.stop(epochMicros); + decrementReferences(); + } + + public void incrementReferences() { + references.incrementAndGet(); + logger.trace("increment references to {} ({})", this, references); + } + + public abstract void decrementReferences(); + } diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/Span.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/Span.java index 0456ca8a46..afa17f9cc3 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/Span.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/Span.java @@ -58,14 +58,25 @@ public class Span extends AbstractSpan implements Recyclable { private final SpanContext context = new SpanContext(); @Nullable private Throwable stacktrace; + @Nullable + private AbstractSpan parent; + @Nullable + private Transaction transaction; public Span(ElasticApmTracer tracer) { super(tracer); } public Span start(TraceContext.ChildContextCreator childContextCreator, T parentContext, long epochMicros, boolean dropped) { - onStart(); childContextCreator.asChildOf(traceContext, parentContext); + if (parentContext instanceof Transaction) { + this.transaction = (Transaction) parentContext; + this.parent = this.transaction; + } else if (parentContext instanceof Span) { + final Span parentSpan = (Span) parentContext; + this.parent = parentSpan; + this.transaction = parentSpan.transaction; + } if (dropped) { traceContext.setRecorded(false); } @@ -81,6 +92,7 @@ public Span start(TraceContext.ChildContextCreator childContextCreator, T new RuntimeException("this exception is just used to record where the span has been started from")); } } + onAfterStart(); return this; } @@ -178,6 +190,9 @@ public void doEnd(long epochMicros) { if (type == null) { type = "custom"; } + if (parent != null) { + parent.onChildEnd(this, epochMicros); + } this.tracer.endSpan(this); } @@ -189,6 +204,8 @@ public void resetState() { type = null; subtype = null; action = null; + parent = null; + transaction = null; } @Override @@ -206,10 +223,6 @@ public void addLabel(String key, Boolean value) { context.addLabel(key, value); } - public void recycle() { - tracer.recycle(this); - } - @Override public String toString() { return String.format("'%s' %s", name, traceContext); @@ -219,4 +232,24 @@ public Span withStacktrace(Throwable stacktrace) { this.stacktrace = stacktrace; return this; } + + @Override + public void incrementReferences() { + if (transaction != null) { + transaction.incrementReferences(); + } + super.incrementReferences(); + } + + @Override + public void decrementReferences() { + if (transaction != null) { + transaction.decrementReferences(); + } + final int referenceCount = references.decrementAndGet(); + if (referenceCount == 0) { + tracer.recycle(this); + } + logger.trace("decrement references to {} ({})", this, referenceCount); + } } diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/TraceContext.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/TraceContext.java index 5b7c88128a..0d97b1f786 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/TraceContext.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/TraceContext.java @@ -382,6 +382,11 @@ public Span createSpan() { return tracer.startSpan(fromParent(), this); } + @Override + public Span createSpan(long epochMicros) { + return tracer.startSpan(fromParent(), this, epochMicros); + } + public interface ChildContextCreator { boolean asChildOf(TraceContext child, T parent); } diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/TraceContextHolder.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/TraceContextHolder.java index f0f229e90f..96ad68788e 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/TraceContextHolder.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/TraceContextHolder.java @@ -70,6 +70,8 @@ public TraceContextHolder asExit() { public abstract Span createSpan(); + public abstract Span createSpan(long epochMicros); + /** * Creates a child Span representing a remote call event, unless this TraceContextHolder already represents an exit event. * If current TraceContextHolder is representing an Exit- returns null diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/Transaction.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/Transaction.java index fb3ad2d32e..9e170d95c4 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/Transaction.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/Transaction.java @@ -22,6 +22,8 @@ import co.elastic.apm.agent.impl.ElasticApmTracer; import co.elastic.apm.agent.impl.context.TransactionContext; import co.elastic.apm.agent.impl.sampling.Sampler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.annotation.Nullable; @@ -30,6 +32,8 @@ */ public class Transaction extends AbstractSpan { + private static final Logger logger = LoggerFactory.getLogger(Transaction.class); + public static final String TYPE_REQUEST = "request"; /** @@ -63,7 +67,6 @@ public Transaction(ElasticApmTracer tracer) { } public Transaction start(TraceContext.ChildContextCreator childContextCreator, @Nullable T parent, long epochMicros, Sampler sampler) { - onStart(); if (parent == null || !childContextCreator.asChildOf(traceContext, parent)) { traceContext.asRootSpan(sampler); } @@ -72,13 +75,14 @@ public Transaction start(TraceContext.ChildContextCreator childContextCre } else { setStartTimestamp(traceContext.getClock().getEpochMicros()); } + onAfterStart(); return this; } public Transaction startNoop() { - onStart(); this.name.append("noop"); this.noop = true; + onAfterStart(); return this; } @@ -189,10 +193,6 @@ public void resetState() { type = null; } - public void recycle() { - tracer.recycle(this); - } - public boolean isNoop() { return noop; } @@ -213,4 +213,17 @@ public String getType() { public String toString() { return String.format("'%s' %s", name, traceContext); } + + @Override + public void incrementReferences() { + super.incrementReferences(); + } + + public void decrementReferences() { + final int referenceCount = this.references.decrementAndGet(); + logger.trace("decrement references to {} ({})", this, referenceCount); + if (referenceCount == 0) { + tracer.recycle(this); + } + } } diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/report/ApmServerReporter.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/report/ApmServerReporter.java index f85f6771f5..a649782705 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/report/ApmServerReporter.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/report/ApmServerReporter.java @@ -115,7 +115,7 @@ public Thread newThread(Runnable r) { @Override public void report(Transaction transaction) { if (!tryAddEventToRingBuffer(transaction, TRANSACTION_EVENT_TRANSLATOR)) { - transaction.recycle(); + transaction.decrementReferences(); } if (syncReport) { waitForFlush(); @@ -125,7 +125,7 @@ public void report(Transaction transaction) { @Override public void report(Span span) { if (!tryAddEventToRingBuffer(span, SPAN_EVENT_TRANSLATOR)) { - span.recycle(); + span.decrementReferences(); } if (syncReport) { waitForFlush(); diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/report/IntakeV2ReportingEventHandler.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/report/IntakeV2ReportingEventHandler.java index c7c7758dfc..a14e545888 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/report/IntakeV2ReportingEventHandler.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/report/IntakeV2ReportingEventHandler.java @@ -187,11 +187,11 @@ private void writeEvent(ReportingEvent event) { if (event.getTransaction() != null) { currentlyTransmitting++; payloadSerializer.serializeTransactionNdJson(event.getTransaction()); - event.getTransaction().recycle(); + event.getTransaction().decrementReferences(); } else if (event.getSpan() != null) { currentlyTransmitting++; payloadSerializer.serializeSpanNdJson(event.getSpan()); - event.getSpan().recycle(); + event.getSpan().decrementReferences(); } else if (event.getError() != null) { currentlyTransmitting++; payloadSerializer.serializeErrorNdJson(event.getError()); diff --git a/apm-agent-core/src/test/java/co/elastic/apm/agent/MockReporter.java b/apm-agent-core/src/test/java/co/elastic/apm/agent/MockReporter.java index 6446cad677..e6ef61b720 100644 --- a/apm-agent-core/src/test/java/co/elastic/apm/agent/MockReporter.java +++ b/apm-agent-core/src/test/java/co/elastic/apm/agent/MockReporter.java @@ -24,10 +24,13 @@ import co.elastic.apm.agent.impl.payload.PayloadUtils; import co.elastic.apm.agent.impl.payload.TransactionPayload; import co.elastic.apm.agent.impl.stacktrace.StacktraceConfiguration; +import co.elastic.apm.agent.impl.transaction.AbstractSpan; import co.elastic.apm.agent.impl.transaction.Span; import co.elastic.apm.agent.impl.transaction.Transaction; import co.elastic.apm.agent.metrics.MetricRegistry; +import co.elastic.apm.agent.report.IntakeV2ReportingEventHandler; import co.elastic.apm.agent.report.Reporter; +import co.elastic.apm.agent.report.ReportingEvent; import co.elastic.apm.agent.report.serialize.DslJsonSerializer; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; @@ -243,4 +246,23 @@ public void reset() { errors.clear(); spans.clear(); } + + /** + * Calls {@link AbstractSpan#decrementReferences()} for all reported transactions and spans to emulate the references being decremented + * after reporting to the APM Server. + * See {@link IntakeV2ReportingEventHandler#writeEvent(ReportingEvent)} + */ + public void decrementReferences() { + transactions.forEach(Transaction::decrementReferences); + spans.forEach(Span::decrementReferences); + } + + public void assertRecycledAfterDecrementingReferences() { + transactions.forEach(t -> assertThat(t.getTraceContext().getId().isEmpty()).isFalse()); + spans.forEach(s -> assertThat(s.getTraceContext().getId().isEmpty()).isFalse()); + transactions.forEach(Transaction::decrementReferences); + spans.forEach(Span::decrementReferences); + transactions.forEach(t -> assertThat(t.getTraceContext().getId().isEmpty()).isTrue()); + spans.forEach(s -> assertThat(s.getTraceContext().getId().isEmpty()).isTrue()); + } } diff --git a/apm-agent-core/src/test/java/co/elastic/apm/agent/impl/ElasticApmTracerTest.java b/apm-agent-core/src/test/java/co/elastic/apm/agent/impl/ElasticApmTracerTest.java index da5bb3067f..31eaa935c9 100644 --- a/apm-agent-core/src/test/java/co/elastic/apm/agent/impl/ElasticApmTracerTest.java +++ b/apm-agent-core/src/test/java/co/elastic/apm/agent/impl/ElasticApmTracerTest.java @@ -62,7 +62,7 @@ void testThreadLocalStorage() { Transaction transaction = tracerImpl.startTransaction(TraceContext.asRoot(), null, getClass().getClassLoader()); try (Scope scope = transaction.activateInScope()) { assertThat(tracerImpl.currentTransaction()).isSameAs(transaction); - Span span = tracerImpl.getActive().createSpan(); + Span span = tracerImpl.getActive().createSpan().withType("app"); try (Scope spanScope = span.activateInScope()) { assertThat(tracerImpl.currentTransaction()).isSameAs(transaction); assertThat(tracerImpl.getActive()).isSameAs(span); @@ -346,9 +346,9 @@ void testTimestamps() { transaction.end(30); assertThat(transaction.getTimestamp()).isEqualTo(0); - assertThat(transaction.getDuration()).isEqualTo(0.03); + assertThat(transaction.getDuration()).isEqualTo(30); assertThat(span.getTimestamp()).isEqualTo(10); - assertThat(span.getDuration()).isEqualTo(0.01); + assertThat(span.getDuration()).isEqualTo(10); } @Test @@ -356,7 +356,8 @@ void testStartSpanAfterTransactionHasEnded() { final Transaction transaction = tracerImpl.startTransaction(TraceContext.asRoot(), null, getClass().getClassLoader()); final TraceContext transactionTraceContext = transaction.getTraceContext().copy(); transaction.end(); - transaction.resetState(); + + reporter.assertRecycledAfterDecrementingReferences(); tracerImpl.activate(transactionTraceContext); try { diff --git a/apm-agent-core/src/test/java/co/elastic/apm/agent/impl/ScopeManagementTest.java b/apm-agent-core/src/test/java/co/elastic/apm/agent/impl/ScopeManagementTest.java index fb69b34efd..372fa6a250 100644 --- a/apm-agent-core/src/test/java/co/elastic/apm/agent/impl/ScopeManagementTest.java +++ b/apm-agent-core/src/test/java/co/elastic/apm/agent/impl/ScopeManagementTest.java @@ -93,21 +93,18 @@ void testActivateTwice() { } @Test - void testMissingDeactivation() { - runTestWithAssertionsDisabled(() -> { - final Transaction transaction = tracer.startTransaction(TraceContext.asRoot(), null, null).activate(); - transaction.createSpan().activate(); - transaction.deactivate(); + void testRedundantActivation() { + tracer.startTransaction(TraceContext.asRoot(), null, null) + .activate().activate() + .deactivate().deactivate(); - assertThat(tracer.getActive()).isNull(); - }); + assertThat(tracer.getActive()).isNull(); } @Test void testContextAndSpanRunnableActivation() { runTestWithAssertionsDisabled(() -> { final Transaction transaction = tracer.startTransaction(TraceContext.asRoot(), null, null).activate(); - transaction.markLifecycleManagingThreadSwitchExpected(); transaction.withActive(transaction.withActive((Runnable) () -> assertThat(tracer.getActive()).isSameAs(transaction))).run(); transaction.deactivate(); @@ -120,7 +117,6 @@ void testContextAndSpanRunnableActivation() { void testContextAndSpanCallableActivation() { runTestWithAssertionsDisabled(() -> { final Transaction transaction = tracer.startTransaction(TraceContext.asRoot(), null, null).activate(); - transaction.markLifecycleManagingThreadSwitchExpected(); try { assertThat(transaction.withActive(transaction.withActive(() -> tracer.currentTransaction())).call()).isSameAs(transaction); } catch (Exception e) { @@ -138,7 +134,6 @@ void testSpanAndContextRunnableActivation() { final Transaction transaction = tracer.startTransaction(TraceContext.asRoot(), null, null).activate(); Runnable runnable = transaction.withActive((Runnable) () -> assertThat(tracer.currentTransaction()).isSameAs(transaction)); - transaction.markLifecycleManagingThreadSwitchExpected(); transaction.withActive(runnable).run(); transaction.deactivate(); @@ -151,7 +146,6 @@ void testSpanAndContextCallableActivation() { runTestWithAssertionsDisabled(() -> { final Transaction transaction = tracer.startTransaction(TraceContext.asRoot(), null, null).activate(); Callable callable = transaction.withActive(() -> tracer.currentTransaction()); - transaction.markLifecycleManagingThreadSwitchExpected(); try { assertThat(transaction.withActive(callable).call()).isSameAs(transaction); } catch (Exception e) { @@ -166,7 +160,6 @@ void testSpanAndContextCallableActivation() { @Test void testContextAndSpanRunnableActivationInDifferentThread() throws Exception { final Transaction transaction = tracer.startTransaction(TraceContext.asRoot(), null, null).activate(); - transaction.markLifecycleManagingThreadSwitchExpected(); Executors.newSingleThreadExecutor().submit(transaction.withActive(transaction.withActive(() -> { assertThat(tracer.getActive()).isSameAs(transaction); assertThat(tracer.currentTransaction()).isSameAs(transaction); @@ -179,7 +172,6 @@ void testContextAndSpanRunnableActivationInDifferentThread() throws Exception { @Test void testContextAndSpanCallableActivationInDifferentThread() throws Exception { final Transaction transaction = tracer.startTransaction(TraceContext.asRoot(), null, null).activate(); - transaction.markLifecycleManagingThreadSwitchExpected(); Future transactionFuture = Executors.newSingleThreadExecutor().submit(transaction.withActive(transaction.withActive(() -> { assertThat(tracer.getActive()).isSameAs(transaction); return tracer.currentTransaction(); @@ -195,9 +187,8 @@ void testSpanAndContextRunnableActivationInDifferentThread() throws Exception { final Transaction transaction = tracer.startTransaction(TraceContext.asRoot(), null, null).activate(); Runnable runnable = transaction.withActive(() -> { assertThat(tracer.currentTransaction()).isSameAs(transaction); - assertThat(tracer.getActive()).isInstanceOf(TraceContext.class); + assertThat(tracer.getActive()).isSameAs(transaction); }); - transaction.markLifecycleManagingThreadSwitchExpected(); Executors.newSingleThreadExecutor().submit(transaction.withActive(runnable)).get(); transaction.deactivate(); @@ -208,13 +199,31 @@ void testSpanAndContextRunnableActivationInDifferentThread() throws Exception { void testSpanAndContextCallableActivationInDifferentThread() throws Exception { final Transaction transaction = tracer.startTransaction(TraceContext.asRoot(), null, null).activate(); Callable callable = transaction.withActive(() -> { - assertThat(tracer.getActive()).isInstanceOf(TraceContext.class); + assertThat(tracer.currentTransaction()).isSameAs(transaction); return tracer.currentTransaction(); }); - transaction.markLifecycleManagingThreadSwitchExpected(); assertThat(Executors.newSingleThreadExecutor().submit(transaction.withActive(callable)).get()).isSameAs(transaction); transaction.deactivate(); assertThat(tracer.getActive()).isNull(); } + + @Test + void testAsyncActivationAfterEnd() throws Exception { + final Transaction transaction = tracer.startTransaction(TraceContext.asRoot(), null, null).activate(); + Callable callable = transaction.withActive(() -> { + assertThat(tracer.getActive()).isSameAs(transaction); + return tracer.currentTransaction(); + }); + transaction.deactivate().end(); + reporter.decrementReferences(); + assertThat(transaction.isReferenced()).isTrue(); + + assertThat(Executors.newSingleThreadExecutor().submit(callable).get()).isSameAs(transaction); + assertThat(transaction.isReferenced()).isFalse(); + // recycled because the transaction is finished, reported and the reference counter is 0 + assertThat(transaction.getTraceContext().getTraceId().isEmpty()).isTrue(); + + assertThat(tracer.getActive()).isNull(); + } } diff --git a/apm-agent-plugins/apm-api-plugin/src/test/java/co/elastic/apm/api/ElasticApmApiInstrumentationTest.java b/apm-agent-plugins/apm-api-plugin/src/test/java/co/elastic/apm/api/ElasticApmApiInstrumentationTest.java index 9f99c728d6..58beb65101 100644 --- a/apm-agent-plugins/apm-api-plugin/src/test/java/co/elastic/apm/api/ElasticApmApiInstrumentationTest.java +++ b/apm-agent-plugins/apm-api-plugin/src/test/java/co/elastic/apm/api/ElasticApmApiInstrumentationTest.java @@ -277,8 +277,8 @@ void testManualTimestamps() { transaction.startSpan().setStartTimestamp(1000).end(2000); transaction.end(3000); - assertThat(reporter.getFirstTransaction().getDuration()).isEqualTo(3); - assertThat(reporter.getFirstSpan().getDuration()).isEqualTo(1); + assertThat(reporter.getFirstTransaction().getDuration()).isEqualTo(3000); + assertThat(reporter.getFirstSpan().getDuration()).isEqualTo(1000); } @Test diff --git a/apm-agent-plugins/apm-es-restclient-plugin/apm-es-restclient-plugin-5_6/src/main/java/co/elastic/apm/agent/es/restclient/v5_6/ElasticsearchClientAsyncInstrumentation.java b/apm-agent-plugins/apm-es-restclient-plugin/apm-es-restclient-plugin-5_6/src/main/java/co/elastic/apm/agent/es/restclient/v5_6/ElasticsearchClientAsyncInstrumentation.java index f9ab768a7a..183d72af78 100644 --- a/apm-agent-plugins/apm-es-restclient-plugin/apm-es-restclient-plugin-5_6/src/main/java/co/elastic/apm/agent/es/restclient/v5_6/ElasticsearchClientAsyncInstrumentation.java +++ b/apm-agent-plugins/apm-es-restclient-plugin/apm-es-restclient-plugin-5_6/src/main/java/co/elastic/apm/agent/es/restclient/v5_6/ElasticsearchClientAsyncInstrumentation.java @@ -83,8 +83,6 @@ private static void onBeforeExecute(@Advice.Argument(0) String method, span = helper.createClientSpan(method, endpoint, entity); if (span != null) { responseListener = helper.wrapResponseListener(responseListener, span); - // write to the span's volatile field to ensure proper visibility on the executing thread - span.markLifecycleManagingThreadSwitchExpected(); wrapped = true; } } diff --git a/apm-agent-plugins/apm-es-restclient-plugin/apm-es-restclient-plugin-6_4/src/main/java/co/elastic/apm/agent/es/restclient/v6_4/ElasticsearchClientAsyncInstrumentation.java b/apm-agent-plugins/apm-es-restclient-plugin/apm-es-restclient-plugin-6_4/src/main/java/co/elastic/apm/agent/es/restclient/v6_4/ElasticsearchClientAsyncInstrumentation.java index b327d13d26..c1bfd55211 100644 --- a/apm-agent-plugins/apm-es-restclient-plugin/apm-es-restclient-plugin-6_4/src/main/java/co/elastic/apm/agent/es/restclient/v6_4/ElasticsearchClientAsyncInstrumentation.java +++ b/apm-agent-plugins/apm-es-restclient-plugin/apm-es-restclient-plugin-6_4/src/main/java/co/elastic/apm/agent/es/restclient/v6_4/ElasticsearchClientAsyncInstrumentation.java @@ -75,8 +75,6 @@ private static void onBeforeExecute(@Advice.Argument(0) Request request, span = helper.createClientSpan(request.getMethod(), request.getEndpoint(), request.getEntity()); if (span != null) { responseListener = helper.wrapResponseListener(responseListener, span); - // write to the span's volatile field to ensure proper visibility on the executing thread - span.markLifecycleManagingThreadSwitchExpected(); wrapped = true; } } diff --git a/apm-agent-plugins/apm-java-concurrent-plugin/src/main/java/co/elastic/apm/agent/concurrent/ExecutorInstrumentation.java b/apm-agent-plugins/apm-java-concurrent-plugin/src/main/java/co/elastic/apm/agent/concurrent/ExecutorInstrumentation.java index b77e670e5b..4fdc03ff24 100644 --- a/apm-agent-plugins/apm-java-concurrent-plugin/src/main/java/co/elastic/apm/agent/concurrent/ExecutorInstrumentation.java +++ b/apm-agent-plugins/apm-java-concurrent-plugin/src/main/java/co/elastic/apm/agent/concurrent/ExecutorInstrumentation.java @@ -61,7 +61,8 @@ public ElementMatcher getTypeMatcherPreFilter() { public ElementMatcher getTypeMatcher() { return hasSuperType(named("java.util.concurrent.Executor")) // hazelcast tries to serialize the Runnables/Callables to execute them on remote JVMs - .and(not(nameStartsWith("com.hazelcast"))); + .and(not(nameStartsWith("com.hazelcast"))) + .and(not(nameStartsWith("org.apache.felix.resolver"))); } @Override diff --git a/apm-agent-plugins/apm-java-concurrent-plugin/src/test/java/co/elastic/apm/agent/concurrent/FailingExecutorInstrumentationTest.java b/apm-agent-plugins/apm-java-concurrent-plugin/src/test/java/co/elastic/apm/agent/concurrent/FailingExecutorInstrumentationTest.java index e7ca8c2504..d2ef892006 100644 --- a/apm-agent-plugins/apm-java-concurrent-plugin/src/test/java/co/elastic/apm/agent/concurrent/FailingExecutorInstrumentationTest.java +++ b/apm-agent-plugins/apm-java-concurrent-plugin/src/test/java/co/elastic/apm/agent/concurrent/FailingExecutorInstrumentationTest.java @@ -20,8 +20,8 @@ package co.elastic.apm.agent.concurrent; import co.elastic.apm.agent.AbstractInstrumentationTest; -import co.elastic.apm.agent.impl.async.ContextInScopeCallableWrapper; -import co.elastic.apm.agent.impl.async.ContextInScopeRunnableWrapper; +import co.elastic.apm.agent.impl.async.SpanInScopeCallableWrapper; +import co.elastic.apm.agent.impl.async.SpanInScopeRunnableWrapper; import co.elastic.apm.agent.impl.transaction.TraceContext; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -47,7 +47,7 @@ void setUp() { executor = ExecutorServiceWrapper.wrap(new ForkJoinPool() { @Override public ForkJoinTask submit(Runnable task) { - if (task instanceof ContextInScopeRunnableWrapper) { + if (task instanceof SpanInScopeRunnableWrapper) { submitWithWrapperCounter.incrementAndGet(); throw new ClassCastException(); } @@ -56,7 +56,7 @@ public ForkJoinTask submit(Runnable task) { @Override public ForkJoinTask submit(Callable task) { - if (task instanceof ContextInScopeCallableWrapper) { + if (task instanceof SpanInScopeCallableWrapper) { submitWithWrapperCounter.incrementAndGet(); throw new IllegalArgumentException(); } diff --git a/apm-agent-plugins/apm-okhttp-plugin/src/main/java/co/elastic/apm/agent/okhttp/OkHttp3ClientAsyncInstrumentation.java b/apm-agent-plugins/apm-okhttp-plugin/src/main/java/co/elastic/apm/agent/okhttp/OkHttp3ClientAsyncInstrumentation.java index dfe852207e..274c534d8e 100644 --- a/apm-agent-plugins/apm-okhttp-plugin/src/main/java/co/elastic/apm/agent/okhttp/OkHttp3ClientAsyncInstrumentation.java +++ b/apm-agent-plugins/apm-okhttp-plugin/src/main/java/co/elastic/apm/agent/okhttp/OkHttp3ClientAsyncInstrumentation.java @@ -90,7 +90,7 @@ private static void onBeforeEnqueue(@Advice.Origin Class clazz, okhttp3.Request request = originalRequest; span = HttpClientHelper.startHttpClientSpan(parent, request.method(), request.url().toString(), request.url().host()); if (span != null) { - span.activate().markLifecycleManagingThreadSwitchExpected(); + span.activate(); originalRequest = originalRequest.newBuilder().addHeader(TraceContext.TRACE_PARENT_HEADER, span.getTraceContext().getOutgoingTraceParentHeader().toString()).build(); callback = wrapperCreator.wrap(callback, span); } diff --git a/apm-agent-plugins/apm-okhttp-plugin/src/main/java/co/elastic/apm/agent/okhttp/OkHttpClientAsyncInstrumentation.java b/apm-agent-plugins/apm-okhttp-plugin/src/main/java/co/elastic/apm/agent/okhttp/OkHttpClientAsyncInstrumentation.java index 2a62707d5f..5e1ac6fda3 100644 --- a/apm-agent-plugins/apm-okhttp-plugin/src/main/java/co/elastic/apm/agent/okhttp/OkHttpClientAsyncInstrumentation.java +++ b/apm-agent-plugins/apm-okhttp-plugin/src/main/java/co/elastic/apm/agent/okhttp/OkHttpClientAsyncInstrumentation.java @@ -89,7 +89,7 @@ private static void onBeforeEnqueue(@Advice.Origin Class clazz, Request request = originalRequest; span = HttpClientHelper.startHttpClientSpan(parent, request.method(), request.url().toString(), request.url().getHost()); if (span != null) { - span.activate().markLifecycleManagingThreadSwitchExpected(); + span.activate(); originalRequest = originalRequest.newBuilder().addHeader(TraceContext.TRACE_PARENT_HEADER, span.getTraceContext().getOutgoingTraceParentHeader().toString()).build(); callback = wrapperCreator.wrap(callback, span); } diff --git a/apm-agent-plugins/apm-servlet-plugin/src/main/java/co/elastic/apm/agent/servlet/AsyncInstrumentation.java b/apm-agent-plugins/apm-servlet-plugin/src/main/java/co/elastic/apm/agent/servlet/AsyncInstrumentation.java index 92a1ad31d0..0ac53fe4cf 100644 --- a/apm-agent-plugins/apm-servlet-plugin/src/main/java/co/elastic/apm/agent/servlet/AsyncInstrumentation.java +++ b/apm-agent-plugins/apm-servlet-plugin/src/main/java/co/elastic/apm/agent/servlet/AsyncInstrumentation.java @@ -175,7 +175,6 @@ private static void onEnterAsyncContextStart(@Advice.Argument(value = 0, readOnl if (tracer != null && runnable != null) { final Transaction transaction = tracer.currentTransaction(); if (transaction != null) { - transaction.markLifecycleManagingThreadSwitchExpected(); runnable = transaction.withActive(runnable); } } diff --git a/apm-opentracing/src/test/java/co/elastic/apm/opentracing/OpenTracingBridgeTest.java b/apm-opentracing/src/test/java/co/elastic/apm/opentracing/OpenTracingBridgeTest.java index 600ed1d85f..2edb07714d 100644 --- a/apm-opentracing/src/test/java/co/elastic/apm/opentracing/OpenTracingBridgeTest.java +++ b/apm-opentracing/src/test/java/co/elastic/apm/opentracing/OpenTracingBridgeTest.java @@ -63,7 +63,7 @@ void testCreateNonActiveTransaction() { span.finish(TimeUnit.MILLISECONDS.toMicros(1)); assertThat(reporter.getTransactions()).hasSize(1); - assertThat(reporter.getFirstTransaction().getDuration()).isEqualTo(1); + assertThat(reporter.getFirstTransaction().getDuration()).isEqualTo(1000); assertThat(reporter.getFirstTransaction().getName().toString()).isEqualTo("test"); } @@ -75,11 +75,11 @@ void sanityCheckRealTimestamps() { final long epochMicros = System.currentTimeMillis() * 1000; assertThat(reporter.getTransactions()).hasSize(1); - assertThat(reporter.getFirstTransaction().getDuration()).isLessThan(MINUTES.toMillis(1)); + assertThat(reporter.getFirstTransaction().getDuration()).isLessThan(MINUTES.toMicros(1)); assertThat(reporter.getFirstTransaction().getTimestamp()).isCloseTo(epochMicros, offset(MINUTES.toMicros(1))); assertThat(reporter.getSpans()).hasSize(1); - assertThat(reporter.getFirstSpan().getDuration()).isLessThan(MINUTES.toMillis(1)); + assertThat(reporter.getFirstSpan().getDuration()).isLessThan(MINUTES.toMicros(1)); assertThat(reporter.getFirstSpan().getTimestamp()).isCloseTo(epochMicros, offset(MINUTES.toMicros(1))); } @@ -184,7 +184,7 @@ void testCreateActiveTransaction() { assertThat(reporter.getTransactions()).hasSize(0); // manually finish span - scope.span().finish(TimeUnit.MILLISECONDS.toMicros(1)); + scope.span().finish(1); assertThat(reporter.getTransactions()).hasSize(1); assertThat(reporter.getFirstTransaction().getDuration()).isEqualTo(1); assertThat(reporter.getFirstTransaction().getName().toString()).isEqualTo("test"); diff --git a/integration-tests/application-server-integration-tests/pom.xml b/integration-tests/application-server-integration-tests/pom.xml index a10e100493..5e195ede47 100644 --- a/integration-tests/application-server-integration-tests/pom.xml +++ b/integration-tests/application-server-integration-tests/pom.xml @@ -85,6 +85,11 @@ commons-io 1.3.2 + + com.github.terma + javaniotcpproxy + 1.5 + + + org/HdrHistogram/WriterReaderPhaser.class + + @@ -97,6 +108,10 @@ com.blogspot.mydailyjava.weaklockfree co.elastic.apm.agent.shaded.weaklockfree + + org.HdrHistogram + co.elastic.apm.agent.shaded.HdrHistogram + From a0d167db3b66386e606eeefd519f12a84047ad74 Mon Sep 17 00:00:00 2001 From: Felix Barnsteiner Date: Wed, 5 Jun 2019 16:22:31 +0200 Subject: [PATCH 15/21] Limit number of metricsets --- .../apm/agent/metrics/MetricRegistry.java | 52 +++-- .../report/serialize/DslJsonSerializer.java | 2 +- .../serialize/MetricRegistrySerializer.java | 3 +- .../apm/agent/impl/SpanTypeBreakdownTest.java | 188 +++++++----------- .../apm/agent/metrics/MetricRegistryTest.java | 15 +- 5 files changed, 131 insertions(+), 129 deletions(-) diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/metrics/MetricRegistry.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/metrics/MetricRegistry.java index a3a0edf1f0..e6e4619582 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/metrics/MetricRegistry.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/metrics/MetricRegistry.java @@ -30,6 +30,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; import java.io.IOException; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -45,6 +47,7 @@ public class MetricRegistry { private static final Logger logger = LoggerFactory.getLogger(MetricRegistry.class); + private static final int METRIC_SET_LIMIT = 1000; private final WriterReaderPhaser phaser = new WriterReaderPhaser(); private final ReporterConfiguration config; /** @@ -116,7 +119,10 @@ public void add(String name, Labels labels, DoubleSupplier metric) { long criticalValueAtEnter = phaser.writerCriticalSectionEnter(); try { - getOrCreateMetricSet(labels).addGauge(name, metric); + final MetricSet metricSet = getOrCreateMetricSet(labels); + if (metricSet != null) { + metricSet.addGauge(name, metric); + } } finally { phaser.writerCriticalSectionExit(criticalValueAtEnter); } @@ -158,7 +164,10 @@ public void updateTimer(String timerName, Labels labels, long durationUs) { public void updateTimer(String timerName, Labels labels, long durationUs, long count) { long criticalValueAtEnter = phaser.writerCriticalSectionEnter(); try { - getOrCreateMetricSet(labels).timer(timerName).update(durationUs, count); + final MetricSet metricSet = getOrCreateMetricSet(labels); + if (metricSet != null) { + metricSet.timer(timerName).update(durationUs, count); + } } finally { phaser.writerCriticalSectionExit(criticalValueAtEnter); } @@ -168,16 +177,32 @@ public void updateTimer(String timerName, Labels labels, long durationUs, long c * Must always be executed in context of a critical section so that the * activeMetricSets and inactiveMetricSets reference can't swap while this method runs */ + @Nullable private MetricSet getOrCreateMetricSet(Labels labels) { MetricSet metricSet = activeMetricSets.get(labels); - if (metricSet == null) { - final Labels.Immutable labelsCopy = labels.immutableCopy(); - // Gauges are the only metric types which are not reset after each report (as opposed to counters and timers) - // that's why both the activeMetricSet and inactiveMetricSet have to contain the exact same gauges. - activeMetricSets.putIfAbsent(labelsCopy, new MetricSet(labelsCopy)); - metricSet = activeMetricSets.get(labelsCopy); - // even if the map already contains this metric set, the gauges reference will be the same - inactiveMetricSets.putIfAbsent(labelsCopy, new MetricSet(labelsCopy, metricSet.getGauges())); + if (metricSet != null) { + return metricSet; + } + if (activeMetricSets.size() < METRIC_SET_LIMIT) { + return createMetricSet(labels.immutableCopy()); + } + return null; + } + + @Nonnull + private MetricSet createMetricSet(Labels.Immutable labelsCopy) { + // Gauges are the only metric types which are not reset after each report (as opposed to counters and timers) + // that's why both the activeMetricSet and inactiveMetricSet have to contain the exact same gauges. + MetricSet metricSet = new MetricSet(labelsCopy); + final MetricSet racyMetricSet = activeMetricSets.putIfAbsent(labelsCopy, metricSet); + if (racyMetricSet != null) { + metricSet = racyMetricSet; + } + // even if the map already contains this metric set, the gauges reference will be the same + inactiveMetricSets.putIfAbsent(labelsCopy, new MetricSet(labelsCopy, metricSet.getGauges())); + if (activeMetricSets.size() >= METRIC_SET_LIMIT) { + logger.warn("The limit of 1000 timers has been reached, no new timers will be created. " + + "Try to name your transactions so that there are less distinct transaction names."); } return metricSet; } @@ -185,7 +210,10 @@ private MetricSet getOrCreateMetricSet(Labels labels) { public void incrementCounter(String name, Labels labels) { long criticalValueAtEnter = phaser.writerCriticalSectionEnter(); try { - getOrCreateMetricSet(labels).incrementCounter(name); + final MetricSet metricSet = getOrCreateMetricSet(labels); + if (metricSet != null) { + metricSet.incrementCounter(name); + } } finally { phaser.writerCriticalSectionExit(criticalValueAtEnter); } @@ -201,7 +229,7 @@ public void doAtomically(Runnable r) { } public interface MetricsReporter { - void report(Map metricSets) throws IOException; + void report(Map metricSets) throws IOException; } } diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/report/serialize/DslJsonSerializer.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/report/serialize/DslJsonSerializer.java index 745afed122..07085ffa9e 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/report/serialize/DslJsonSerializer.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/report/serialize/DslJsonSerializer.java @@ -226,7 +226,7 @@ public int getBufferSize() { } @Override - public void report(Map metricSets) { + public void report(Map metricSets) { MetricRegistrySerializer.serialize(metricSets, replaceBuilder, jw); } diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/report/serialize/MetricRegistrySerializer.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/report/serialize/MetricRegistrySerializer.java index 80165e43a1..9837f2aa03 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/report/serialize/MetricRegistrySerializer.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/report/serialize/MetricRegistrySerializer.java @@ -26,7 +26,6 @@ import co.elastic.apm.agent.metrics.DoubleSupplier; import co.elastic.apm.agent.metrics.Labels; -import co.elastic.apm.agent.metrics.MetricRegistry; import co.elastic.apm.agent.metrics.MetricSet; import co.elastic.apm.agent.metrics.Timer; import com.dslplatform.json.JsonWriter; @@ -40,7 +39,7 @@ public class MetricRegistrySerializer { private static final byte NEW_LINE = '\n'; - public static void serialize(Map metricSets, StringBuilder replaceBuilder, JsonWriter jw) { + public static void serialize(Map metricSets, StringBuilder replaceBuilder, JsonWriter jw) { final long timestamp = System.currentTimeMillis() * 1000; for (MetricSet metricSet : metricSets.values()) { if (metricSet.hasContent()) { diff --git a/apm-agent-core/src/test/java/co/elastic/apm/agent/impl/SpanTypeBreakdownTest.java b/apm-agent-core/src/test/java/co/elastic/apm/agent/impl/SpanTypeBreakdownTest.java index 9b24356d45..f22b6ad1c8 100644 --- a/apm-agent-core/src/test/java/co/elastic/apm/agent/impl/SpanTypeBreakdownTest.java +++ b/apm-agent-core/src/test/java/co/elastic/apm/agent/impl/SpanTypeBreakdownTest.java @@ -71,14 +71,11 @@ void testBreakdown_noSpans() { .withName("test") .withType("request") .end(30); - tracer.getMetricRegistry().report(new MetricRegistry.MetricsReporter() { - @Override - public void report(Map metricSets) { - assertThat(getTimer(metricSets, "span.self_time", "app", null).getCount()).isEqualTo(1); - assertThat(getTimer(metricSets, "span.self_time", "app", null).getTotalTimeUs()).isEqualTo(30); - assertThat(getTimer(metricSets, "transaction.duration", null, null).getTotalTimeUs()).isEqualTo(30); - assertThatTransactionBreakdownCounterCreated(metricSets); - } + tracer.getMetricRegistry().report(metricSets -> { + assertThat(getTimer(metricSets, "span.self_time", "app", null).getCount()).isEqualTo(1); + assertThat(getTimer(metricSets, "span.self_time", "app", null).getTotalTimeUs()).isEqualTo(30); + assertThat(getTimer(metricSets, "transaction.duration", null, null).getTotalTimeUs()).isEqualTo(30); + assertThatTransactionBreakdownCounterCreated(metricSets); }); } @@ -95,16 +92,13 @@ void testBreakdown_singleDbSpan() { transaction.createSpan(10).withType("db").withSubtype("mysql").end(20); transaction.end(30); - tracer.getMetricRegistry().report(new MetricRegistry.MetricsReporter() { - @Override - public void report(Map metricSets) { - assertThat(getTimer(metricSets, "span.self_time", "app", null).getCount()).isEqualTo(1); - assertThat(getTimer(metricSets, "span.self_time", "app", null).getTotalTimeUs()).isEqualTo(20); - assertThat(getTimer(metricSets, "transaction.duration", null, null).getTotalTimeUs()).isEqualTo(30); - assertThatTransactionBreakdownCounterCreated(metricSets); - assertThat(getTimer(metricSets, "span.self_time", "db", "mysql").getCount()).isEqualTo(1); - assertThat(getTimer(metricSets, "span.self_time", "db", "mysql").getTotalTimeUs()).isEqualTo(10); - } + tracer.getMetricRegistry().report(metricSets -> { + assertThat(getTimer(metricSets, "span.self_time", "app", null).getCount()).isEqualTo(1); + assertThat(getTimer(metricSets, "span.self_time", "app", null).getTotalTimeUs()).isEqualTo(20); + assertThat(getTimer(metricSets, "transaction.duration", null, null).getTotalTimeUs()).isEqualTo(30); + assertThatTransactionBreakdownCounterCreated(metricSets); + assertThat(getTimer(metricSets, "span.self_time", "db", "mysql").getCount()).isEqualTo(1); + assertThat(getTimer(metricSets, "span.self_time", "db", "mysql").getTotalTimeUs()).isEqualTo(10); }); } @@ -123,13 +117,10 @@ void testBreakdown_singleDbSpan_breakdownMetricsDisabled() { transaction.end(30); assertThat(transaction.getSpanTimings()).isEmpty(); - tracer.getMetricRegistry().report(new MetricRegistry.MetricsReporter() { - @Override - public void report(Map metricSets) { - assertThat(getTimer(metricSets, "span.self_time", "app", null)).isNull(); - assertThat(getTimer(metricSets, "span.self_time", "db", "mysql")).isNull(); - assertThat(getTimer(metricSets, "transaction.duration", null, null).getTotalTimeUs()).isEqualTo(30); - } + tracer.getMetricRegistry().report(metricSets -> { + assertThat(getTimer(metricSets, "span.self_time", "app", null)).isNull(); + assertThat(getTimer(metricSets, "span.self_time", "db", "mysql")).isNull(); + assertThat(getTimer(metricSets, "transaction.duration", null, null).getTotalTimeUs()).isEqualTo(30); }); } @@ -146,14 +137,11 @@ void testBreakdown_singleAppSpan() { transaction.createSpan(10).withType("app").end(20); transaction.end(30); - tracer.getMetricRegistry().report(new MetricRegistry.MetricsReporter() { - @Override - public void report(Map metricSets) { - assertThat(getTimer(metricSets, "span.self_time", "app", null).getCount()).isEqualTo(2); - assertThat(getTimer(metricSets, "span.self_time", "app", null).getTotalTimeUs()).isEqualTo(30); - assertThat(getTimer(metricSets, "transaction.duration", null, null).getTotalTimeUs()).isEqualTo(30); - assertThatTransactionBreakdownCounterCreated(metricSets); - } + tracer.getMetricRegistry().report(metricSets -> { + assertThat(getTimer(metricSets, "span.self_time", "app", null).getCount()).isEqualTo(2); + assertThat(getTimer(metricSets, "span.self_time", "app", null).getTotalTimeUs()).isEqualTo(30); + assertThat(getTimer(metricSets, "transaction.duration", null, null).getTotalTimeUs()).isEqualTo(30); + assertThatTransactionBreakdownCounterCreated(metricSets); }); } @@ -174,16 +162,13 @@ void testBreakdown_concurrentDbSpans_fullyOverlapping() { span2.end(20); transaction.end(30); - tracer.getMetricRegistry().report(new MetricRegistry.MetricsReporter() { - @Override - public void report(Map metricSets) { - assertThat(getTimer(metricSets, "span.self_time", "app", null).getCount()).isEqualTo(1); - assertThat(getTimer(metricSets, "span.self_time", "app", null).getTotalTimeUs()).isEqualTo(20); - assertThat(getTimer(metricSets, "transaction.duration", null, null).getTotalTimeUs()).isEqualTo(30); - assertThatTransactionBreakdownCounterCreated(metricSets); - assertThat(getTimer(metricSets, "span.self_time", "db", "mysql").getCount()).isEqualTo(2); - assertThat(getTimer(metricSets, "span.self_time", "db", "mysql").getTotalTimeUs()).isEqualTo(20); - } + tracer.getMetricRegistry().report(metricSets -> { + assertThat(getTimer(metricSets, "span.self_time", "app", null).getCount()).isEqualTo(1); + assertThat(getTimer(metricSets, "span.self_time", "app", null).getTotalTimeUs()).isEqualTo(20); + assertThat(getTimer(metricSets, "transaction.duration", null, null).getTotalTimeUs()).isEqualTo(30); + assertThatTransactionBreakdownCounterCreated(metricSets); + assertThat(getTimer(metricSets, "span.self_time", "db", "mysql").getCount()).isEqualTo(2); + assertThat(getTimer(metricSets, "span.self_time", "db", "mysql").getTotalTimeUs()).isEqualTo(20); }); } @@ -204,16 +189,13 @@ void testBreakdown_concurrentDbSpans_partiallyOverlapping() { span2.end(25); transaction.end(30); - tracer.getMetricRegistry().report(new MetricRegistry.MetricsReporter() { - @Override - public void report(Map metricSets) { - assertThat(getTimer(metricSets, "span.self_time", "app", null).getCount()).isEqualTo(1); - assertThat(getTimer(metricSets, "span.self_time", "app", null).getTotalTimeUs()).isEqualTo(15); - assertThat(getTimer(metricSets, "transaction.duration", null, null).getTotalTimeUs()).isEqualTo(30); - assertThatTransactionBreakdownCounterCreated(metricSets); - assertThat(getTimer(metricSets, "span.self_time", "db", "mysql").getCount()).isEqualTo(2); - assertThat(getTimer(metricSets, "span.self_time", "db", "mysql").getTotalTimeUs()).isEqualTo(20); - } + tracer.getMetricRegistry().report(metricSets -> { + assertThat(getTimer(metricSets, "span.self_time", "app", null).getCount()).isEqualTo(1); + assertThat(getTimer(metricSets, "span.self_time", "app", null).getTotalTimeUs()).isEqualTo(15); + assertThat(getTimer(metricSets, "transaction.duration", null, null).getTotalTimeUs()).isEqualTo(30); + assertThatTransactionBreakdownCounterCreated(metricSets); + assertThat(getTimer(metricSets, "span.self_time", "db", "mysql").getCount()).isEqualTo(2); + assertThat(getTimer(metricSets, "span.self_time", "db", "mysql").getTotalTimeUs()).isEqualTo(20); }); } @@ -232,16 +214,13 @@ void testBreakdown_serialDbSpans_notOverlapping_withoutGap() { transaction.createSpan(15).withType("db").withSubtype("mysql").end(25); transaction.end(30); - tracer.getMetricRegistry().report(new MetricRegistry.MetricsReporter() { - @Override - public void report(Map metricSets) { - assertThat(getTimer(metricSets, "span.self_time", "app", null).getCount()).isEqualTo(1); - assertThat(getTimer(metricSets, "span.self_time", "app", null).getTotalTimeUs()).isEqualTo(10); - assertThat(getTimer(metricSets, "transaction.duration", null, null).getTotalTimeUs()).isEqualTo(30); - assertThatTransactionBreakdownCounterCreated(metricSets); - assertThat(getTimer(metricSets, "span.self_time", "db", "mysql").getCount()).isEqualTo(2); - assertThat(getTimer(metricSets, "span.self_time", "db", "mysql").getTotalTimeUs()).isEqualTo(20); - } + tracer.getMetricRegistry().report(metricSets -> { + assertThat(getTimer(metricSets, "span.self_time", "app", null).getCount()).isEqualTo(1); + assertThat(getTimer(metricSets, "span.self_time", "app", null).getTotalTimeUs()).isEqualTo(10); + assertThat(getTimer(metricSets, "transaction.duration", null, null).getTotalTimeUs()).isEqualTo(30); + assertThatTransactionBreakdownCounterCreated(metricSets); + assertThat(getTimer(metricSets, "span.self_time", "db", "mysql").getCount()).isEqualTo(2); + assertThat(getTimer(metricSets, "span.self_time", "db", "mysql").getTotalTimeUs()).isEqualTo(20); }); } @@ -260,16 +239,13 @@ void testBreakdown_serialDbSpans_notOverlapping_withGap() { transaction.createSpan(20).withType("db").withSubtype("mysql").end(25); transaction.end(30); - tracer.getMetricRegistry().report(new MetricRegistry.MetricsReporter() { - @Override - public void report(Map metricSets) { - assertThat(getTimer(metricSets, "span.self_time", "app", null).getCount()).isEqualTo(1); - assertThat(getTimer(metricSets, "span.self_time", "app", null).getTotalTimeUs()).isEqualTo(20); - assertThat(getTimer(metricSets, "transaction.duration", null, null).getTotalTimeUs()).isEqualTo(30); - assertThatTransactionBreakdownCounterCreated(metricSets); - assertThat(getTimer(metricSets, "span.self_time", "db", "mysql").getCount()).isEqualTo(2); - assertThat(getTimer(metricSets, "span.self_time", "db", "mysql").getTotalTimeUs()).isEqualTo(10); - } + tracer.getMetricRegistry().report(metricSets -> { + assertThat(getTimer(metricSets, "span.self_time", "app", null).getCount()).isEqualTo(1); + assertThat(getTimer(metricSets, "span.self_time", "app", null).getTotalTimeUs()).isEqualTo(20); + assertThat(getTimer(metricSets, "transaction.duration", null, null).getTotalTimeUs()).isEqualTo(30); + assertThatTransactionBreakdownCounterCreated(metricSets); + assertThat(getTimer(metricSets, "span.self_time", "db", "mysql").getCount()).isEqualTo(2); + assertThat(getTimer(metricSets, "span.self_time", "db", "mysql").getTotalTimeUs()).isEqualTo(10); }); } @@ -290,16 +266,13 @@ void testBreakdown_asyncGrandchildExceedsChild() { db.end(25); transaction.end(30); - tracer.getMetricRegistry().report(new MetricRegistry.MetricsReporter() { - @Override - public void report(Map metricSets) { - assertThat(getTimer(metricSets, "span.self_time", "app", null).getCount()).isEqualTo(2); - assertThat(getTimer(metricSets, "span.self_time", "app", null).getTotalTimeUs()).isEqualTo(25); - assertThat(getTimer(metricSets, "transaction.duration", null, null).getTotalTimeUs()).isEqualTo(30); - assertThatTransactionBreakdownCounterCreated(metricSets); - assertThat(getTimer(metricSets, "span.self_time", "db", "mysql").getCount()).isEqualTo(1); - assertThat(getTimer(metricSets, "span.self_time", "db", "mysql").getTotalTimeUs()).isEqualTo(10); - } + tracer.getMetricRegistry().report(metricSets -> { + assertThat(getTimer(metricSets, "span.self_time", "app", null).getCount()).isEqualTo(2); + assertThat(getTimer(metricSets, "span.self_time", "app", null).getTotalTimeUs()).isEqualTo(25); + assertThat(getTimer(metricSets, "transaction.duration", null, null).getTotalTimeUs()).isEqualTo(30); + assertThatTransactionBreakdownCounterCreated(metricSets); + assertThat(getTimer(metricSets, "span.self_time", "db", "mysql").getCount()).isEqualTo(1); + assertThat(getTimer(metricSets, "span.self_time", "db", "mysql").getTotalTimeUs()).isEqualTo(10); }); } @@ -333,15 +306,12 @@ void testBreakdown_asyncGrandchildExceedsChildAndTransaction() { assertThat(app.isReferenced()).isFalse(); assertThat(db.isReferenced()).isFalse(); - tracer.getMetricRegistry().report(new MetricRegistry.MetricsReporter() { - @Override - public void report(Map metricSets) { - assertThat(getTimer(metricSets, "span.self_time", "app", null).getCount()).isEqualTo(1); - assertThat(getTimer(metricSets, "span.self_time", "app", null).getTotalTimeUs()).isEqualTo(10); - assertThat(getTimer(metricSets, "transaction.duration", null, null).getTotalTimeUs()).isEqualTo(20); - assertThatTransactionBreakdownCounterCreated(metricSets); - assertThat(getTimer(metricSets, "span.self_time", "db", "mysql")).isNull(); - } + tracer.getMetricRegistry().report(metricSets -> { + assertThat(getTimer(metricSets, "span.self_time", "app", null).getCount()).isEqualTo(1); + assertThat(getTimer(metricSets, "span.self_time", "app", null).getTotalTimeUs()).isEqualTo(10); + assertThat(getTimer(metricSets, "transaction.duration", null, null).getTotalTimeUs()).isEqualTo(20); + assertThatTransactionBreakdownCounterCreated(metricSets); + assertThat(getTimer(metricSets, "span.self_time", "db", "mysql")).isNull(); }); } @@ -366,15 +336,12 @@ void testBreakdown_singleDbSpan_exceedingParent() { reporter.assertRecycledAfterDecrementingReferences(); assertThat(reporter.getFirstTransaction().getSpanTimings().get(Labels.Mutable.of().spanType("db").spanSubType("mysql"))).isNull(); - tracer.getMetricRegistry().report(new MetricRegistry.MetricsReporter() { - @Override - public void report(Map metricSets) { - assertThat(getTimer(metricSets, "span.self_time", "app", null).getCount()).isEqualTo(1); - assertThat(getTimer(metricSets, "span.self_time", "app", null).getTotalTimeUs()).isEqualTo(10); - assertThat(getTimer(metricSets, "transaction.duration", null, null).getTotalTimeUs()).isEqualTo(20); - assertThatTransactionBreakdownCounterCreated(metricSets); - assertThat(getTimer(metricSets, "span.self_time", "db", "mysql")).isNull(); - } + tracer.getMetricRegistry().report(metricSets -> { + assertThat(getTimer(metricSets, "span.self_time", "app", null).getCount()).isEqualTo(1); + assertThat(getTimer(metricSets, "span.self_time", "app", null).getTotalTimeUs()).isEqualTo(10); + assertThat(getTimer(metricSets, "transaction.duration", null, null).getTotalTimeUs()).isEqualTo(20); + assertThatTransactionBreakdownCounterCreated(metricSets); + assertThat(getTimer(metricSets, "span.self_time", "db", "mysql")).isNull(); }); } @@ -402,24 +369,21 @@ void testBreakdown_spanStartedAfterParentEnded() { reporter.assertRecycledAfterDecrementingReferences(); - tracer.getMetricRegistry().report(new MetricRegistry.MetricsReporter() { - @Override - public void report(Map metricSets) { - assertThat(getTimer(metricSets, "span.self_time", "app", null).getCount()).isEqualTo(1); - assertThat(getTimer(metricSets, "span.self_time", "app", null).getTotalTimeUs()).isEqualTo(10); - assertThat(getTimer(metricSets, "transaction.duration", null, null).getTotalTimeUs()).isEqualTo(10); - assertThatTransactionBreakdownCounterCreated(metricSets); - assertThat(getTimer(metricSets, "span.self_time", "db", "mysql")).isNull(); - } + tracer.getMetricRegistry().report(metricSets -> { + assertThat(getTimer(metricSets, "span.self_time", "app", null).getCount()).isEqualTo(1); + assertThat(getTimer(metricSets, "span.self_time", "app", null).getTotalTimeUs()).isEqualTo(10); + assertThat(getTimer(metricSets, "transaction.duration", null, null).getTotalTimeUs()).isEqualTo(10); + assertThatTransactionBreakdownCounterCreated(metricSets); + assertThat(getTimer(metricSets, "span.self_time", "db", "mysql")).isNull(); }); } - private void assertThatTransactionBreakdownCounterCreated(Map metricSets) { + private void assertThatTransactionBreakdownCounterCreated(Map metricSets) { assertThat(metricSets.get(Labels.Mutable.of().transactionName("test").transactionType("request")).getCounters().get("transaction.breakdown.count").get()).isEqualTo(1); } @Nullable - private Timer getTimer(Map metricSets, String timerName, @Nullable String spanType, @Nullable String spanSubType) { + private Timer getTimer(Map metricSets, String timerName, @Nullable String spanType, @Nullable String spanSubType) { final MetricSet metricSet = metricSets.get(Labels.Mutable.of().transactionName("test").transactionType("request").spanType(spanType).spanSubType(spanSubType)); if (metricSet == null) { return null; diff --git a/apm-agent-core/src/test/java/co/elastic/apm/agent/metrics/MetricRegistryTest.java b/apm-agent-core/src/test/java/co/elastic/apm/agent/metrics/MetricRegistryTest.java index 374d4dbca7..79451256ad 100644 --- a/apm-agent-core/src/test/java/co/elastic/apm/agent/metrics/MetricRegistryTest.java +++ b/apm-agent-core/src/test/java/co/elastic/apm/agent/metrics/MetricRegistryTest.java @@ -11,9 +11,9 @@ * the Apache License, Version 2.0 (the "License"); you may * not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -30,6 +30,7 @@ import org.junit.jupiter.api.Test; import java.util.List; +import java.util.stream.IntStream; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.mock; @@ -65,4 +66,14 @@ void testReportGaugeTwice() { // the active and inactive metricSets are now switched, verify that the previous inactive metricSets also contain the same gauges metricRegistry.report(metricSets -> assertThat(metricSets.get(Labels.EMPTY).getGauge("foo").get()).isEqualTo(42)); } + + @Test + void testLimitTimers() { + IntStream.range(1, 505).forEach(i -> metricRegistry.updateTimer("timer" + i, Labels.Mutable.of("foo", Integer.toString(i)), 1)); + IntStream.range(1, 505).forEach(i -> metricRegistry.updateTimer("timer" + i, Labels.Mutable.of("bar", Integer.toString(i)), 1)); + + metricRegistry.report(metricSets -> assertThat(metricSets).hasSize(1000)); + // the active and inactive metricSets are now switched, also check the size of the previously inactive metricSets + metricRegistry.report(metricSets -> assertThat(metricSets).hasSize(1000)); + } } From 9aca16f63cc6a0b428b7d97cbd7aa36f24fcd298 Mon Sep 17 00:00:00 2001 From: Felix Barnsteiner Date: Wed, 5 Jun 2019 17:22:17 +0200 Subject: [PATCH 16/21] Add some Javadocs --- .../java/co/elastic/apm/agent/metrics/Labels.java | 14 ++++++++++++++ .../elastic/apm/agent/metrics/MetricRegistry.java | 14 +++++++------- .../java/co/elastic/apm/agent/metrics/Timer.java | 7 +++---- .../apm/agent/metrics/builtin/JvmGcMetrics.java | 2 +- .../agent/metrics/builtin/JvmMemoryMetrics.java | 12 ++++++------ .../apm/agent/metrics/builtin/SystemMetrics.java | 12 ++++++------ .../apm/agent/metrics/builtin/ThreadMetrics.java | 2 +- .../co/elastic/apm/agent/metrics/LabelsTest.java | 2 +- .../apm/agent/metrics/MetricRegistryTest.java | 8 ++++---- .../metrics/builtin/JvmMemoryMetricsTest.java | 12 ++++++------ .../agent/metrics/builtin/SystemMetricsTest.java | 10 +++++----- .../agent/metrics/builtin/ThreadMetricsTest.java | 4 ++-- 12 files changed, 56 insertions(+), 43 deletions(-) diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/metrics/Labels.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/metrics/Labels.java index cd876b0609..0dc90a1578 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/metrics/Labels.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/metrics/Labels.java @@ -33,6 +33,20 @@ import java.util.Map; import java.util.Objects; +/** + * Labels are key/value pairs and relate to ECS labels. + * However, there are also top-level labels which are not nested under the {@code labels} object, + * for example {@link #getTransactionName()}, {@link #getTransactionType()}, {@link #getSpanType()} and {@link #getSpanSubType()}. + *

+ * Metrics are structured into multiple {@link MetricSet}s. + * For each distinct combination of {@link Labels}, there is one {@link MetricSet}. + *

+ *

+ * Labels allow for {@link CharSequence}s as a value, + * thus avoiding allocations for {@code transaction.name.toString()} when tracking breakdown metrics for a transaction. + * Iterations over the labels also don't allocate an Iterator, in contrast to {@code Map.entrySet().iterator()}. + *

+ */ public interface Labels { Labels EMPTY = Labels.Immutable.empty(); diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/metrics/MetricRegistry.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/metrics/MetricRegistry.java index e6e4619582..20b3821800 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/metrics/MetricRegistry.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/metrics/MetricRegistry.java @@ -32,7 +32,6 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; -import java.io.IOException; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -147,11 +146,7 @@ public void report(MetricsReporter metricsReporter) { inactiveMetricSets = activeMetricSets; activeMetricSets = temp; phaser.flipPhase(); - try { - metricsReporter.report(inactiveMetricSets); - } catch (IOException e) { - logger.error("Error while reporting metrics", e); - } + metricsReporter.report(inactiveMetricSets); } finally { phaser.readerUnlock(); } @@ -229,7 +224,12 @@ public void doAtomically(Runnable r) { } public interface MetricsReporter { - void report(Map metricSets) throws IOException; + /** + * Don't hold a reference to metricSets after this method ends as it will be reused. + * + * @param metricSets the metrics to report + */ + void report(Map metricSets); } } diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/metrics/Timer.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/metrics/Timer.java index 77bfde0c51..bbac5a3553 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/metrics/Timer.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/metrics/Timer.java @@ -29,6 +29,9 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +/** + * This timer track the total time and the count of invocations so that it allows for calculating weighted averages. + */ public class Timer implements Recyclable { private static final double MS_IN_MICROS = TimeUnit.MILLISECONDS.toMicros(1); @@ -52,10 +55,6 @@ public double getTotalTimeMs() { return totalTime.get() / MS_IN_MICROS; } - public double getAverageMs() { - return totalTime.get() / MS_IN_MICROS / count.get(); - } - public long getCount() { return count.get(); } diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/metrics/builtin/JvmGcMetrics.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/metrics/builtin/JvmGcMetrics.java index e29e3dfa6d..b4432d609d 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/metrics/builtin/JvmGcMetrics.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/metrics/builtin/JvmGcMetrics.java @@ -69,7 +69,7 @@ public double get() { // but the actual MBean it uses (com.ibm.lang.management.internal.ExtendedThreadMXBeanImpl) does not implement it if (sunBeanClass.isInstance(ManagementFactory.getThreadMXBean())) { // in reference to JMH's GC profiler (gc.alloc.rate) - registry.add("jvm.gc.alloc", Labels.Immutable.empty(), + registry.add("jvm.gc.alloc", Labels.EMPTY, (DoubleSupplier) Class.forName(getClass().getName() + "$HotspotAllocationSupplier").getEnumConstants()[0]); } } catch (ClassNotFoundException ignore) { diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/metrics/builtin/JvmMemoryMetrics.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/metrics/builtin/JvmMemoryMetrics.java index 1459dfcb9b..2556fe82b3 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/metrics/builtin/JvmMemoryMetrics.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/metrics/builtin/JvmMemoryMetrics.java @@ -42,37 +42,37 @@ public void start(ElasticApmTracer tracer) { void bindTo(final MetricRegistry registry) { final MemoryMXBean platformMXBean = ManagementFactory.getPlatformMXBean(MemoryMXBean.class); - registry.add("jvm.memory.heap.used", Labels.Immutable.empty(), new DoubleSupplier() { + registry.add("jvm.memory.heap.used", Labels.EMPTY, new DoubleSupplier() { @Override public double get() { return platformMXBean.getHeapMemoryUsage().getUsed(); } }); - registry.add("jvm.memory.heap.committed", Labels.Immutable.empty(), new DoubleSupplier() { + registry.add("jvm.memory.heap.committed", Labels.EMPTY, new DoubleSupplier() { @Override public double get() { return platformMXBean.getHeapMemoryUsage().getCommitted(); } }); - registry.add("jvm.memory.heap.max", Labels.Immutable.empty(), new DoubleSupplier() { + registry.add("jvm.memory.heap.max", Labels.EMPTY, new DoubleSupplier() { @Override public double get() { return platformMXBean.getHeapMemoryUsage().getMax(); } }); - registry.add("jvm.memory.non_heap.used", Labels.Immutable.empty(), new DoubleSupplier() { + registry.add("jvm.memory.non_heap.used", Labels.EMPTY, new DoubleSupplier() { @Override public double get() { return platformMXBean.getNonHeapMemoryUsage().getUsed(); } }); - registry.add("jvm.memory.non_heap.committed", Labels.Immutable.empty(), new DoubleSupplier() { + registry.add("jvm.memory.non_heap.committed", Labels.EMPTY, new DoubleSupplier() { @Override public double get() { return platformMXBean.getNonHeapMemoryUsage().getCommitted(); } }); - registry.add("jvm.memory.non_heap.max", Labels.Immutable.empty(), new DoubleSupplier() { + registry.add("jvm.memory.non_heap.max", Labels.EMPTY, new DoubleSupplier() { @Override public double get() { return platformMXBean.getNonHeapMemoryUsage().getMax(); diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/metrics/builtin/SystemMetrics.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/metrics/builtin/SystemMetrics.java index f2dd5e7f95..ab595078a3 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/metrics/builtin/SystemMetrics.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/metrics/builtin/SystemMetrics.java @@ -110,21 +110,21 @@ public void start(ElasticApmTracer tracer) { } void bindTo(MetricRegistry metricRegistry) { - metricRegistry.addUnlessNegative("system.cpu.total.norm.pct", Labels.Immutable.empty(), new DoubleSupplier() { + metricRegistry.addUnlessNegative("system.cpu.total.norm.pct", Labels.EMPTY, new DoubleSupplier() { @Override public double get() { return invoke(systemCpuUsage); } }); - metricRegistry.addUnlessNegative("system.process.cpu.total.norm.pct", Labels.Immutable.empty(), new DoubleSupplier() { + metricRegistry.addUnlessNegative("system.process.cpu.total.norm.pct", Labels.EMPTY, new DoubleSupplier() { @Override public double get() { return invoke(processCpuUsage); } }); - metricRegistry.addUnlessNan("system.memory.total", Labels.Immutable.empty(), new DoubleSupplier() { + metricRegistry.addUnlessNan("system.memory.total", Labels.EMPTY, new DoubleSupplier() { @Override public double get() { return invoke(totalMemory); @@ -132,7 +132,7 @@ public double get() { }); if (memInfoFile.canRead()) { - metricRegistry.addUnlessNan("system.memory.actual.free", Labels.Immutable.empty(), new DoubleSupplier() { + metricRegistry.addUnlessNan("system.memory.actual.free", Labels.EMPTY, new DoubleSupplier() { final List relevantLines = Arrays.asList( caseSensitiveMatcher("MemAvailable:*kB"), caseSensitiveMatcher("MemFree:*kB"), @@ -162,7 +162,7 @@ public double get() { } }); } else { - metricRegistry.addUnlessNan("system.memory.actual.free", Labels.Immutable.empty(), new DoubleSupplier() { + metricRegistry.addUnlessNan("system.memory.actual.free", Labels.EMPTY, new DoubleSupplier() { @Override public double get() { return invoke(freeMemory); @@ -170,7 +170,7 @@ public double get() { }); } - metricRegistry.addUnlessNegative("system.process.memory.size", Labels.Immutable.empty(), new DoubleSupplier() { + metricRegistry.addUnlessNegative("system.process.memory.size", Labels.EMPTY, new DoubleSupplier() { @Override public double get() { return invoke(virtualProcessMemory); diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/metrics/builtin/ThreadMetrics.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/metrics/builtin/ThreadMetrics.java index 0a2e815c35..c1bb17c02a 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/metrics/builtin/ThreadMetrics.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/metrics/builtin/ThreadMetrics.java @@ -42,7 +42,7 @@ public void start(ElasticApmTracer tracer) { void bindTo(final MetricRegistry registry) { final ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean(); - registry.add("jvm.thread.count", Labels.Immutable.empty(), new DoubleSupplier() { + registry.add("jvm.thread.count", Labels.EMPTY, new DoubleSupplier() { @Override public double get() { return threadMXBean.getThreadCount(); diff --git a/apm-agent-core/src/test/java/co/elastic/apm/agent/metrics/LabelsTest.java b/apm-agent-core/src/test/java/co/elastic/apm/agent/metrics/LabelsTest.java index 623faed178..9005402f5d 100644 --- a/apm-agent-core/src/test/java/co/elastic/apm/agent/metrics/LabelsTest.java +++ b/apm-agent-core/src/test/java/co/elastic/apm/agent/metrics/LabelsTest.java @@ -80,7 +80,7 @@ void testRecycle() { immutableLabels, Labels.Mutable.of("foo", "bar").transactionName("baz")); assertNotEqual(resetLabels, immutableLabels); - assertEqualsHashCode(resetLabels, Labels.Immutable.empty()); + assertEqualsHashCode(resetLabels, Labels.EMPTY); } private void assertNotEqual(Labels l1, Labels l2) { diff --git a/apm-agent-core/src/test/java/co/elastic/apm/agent/metrics/MetricRegistryTest.java b/apm-agent-core/src/test/java/co/elastic/apm/agent/metrics/MetricRegistryTest.java index 79451256ad..951af52b50 100644 --- a/apm-agent-core/src/test/java/co/elastic/apm/agent/metrics/MetricRegistryTest.java +++ b/apm-agent-core/src/test/java/co/elastic/apm/agent/metrics/MetricRegistryTest.java @@ -53,15 +53,15 @@ void testDisabledMetrics() { final DoubleSupplier problematicMetric = () -> { throw new RuntimeException("Huston, we have a problem"); }; - metricRegistry.addUnlessNegative("jvm.gc.count", Labels.Immutable.empty(), problematicMetric); - metricRegistry.addUnlessNan("jvm.gc.count", Labels.Immutable.empty(), problematicMetric); - metricRegistry.add("jvm.gc.count", Labels.Immutable.empty(), problematicMetric); + metricRegistry.addUnlessNegative("jvm.gc.count", Labels.EMPTY, problematicMetric); + metricRegistry.addUnlessNan("jvm.gc.count", Labels.EMPTY, problematicMetric); + metricRegistry.add("jvm.gc.count", Labels.EMPTY, problematicMetric); metricRegistry.report(metricSets -> assertThat(metricSets).isEmpty()); } @Test void testReportGaugeTwice() { - metricRegistry.add("foo", Labels.Immutable.empty(), () -> 42); + metricRegistry.add("foo", Labels.EMPTY, () -> 42); metricRegistry.report(metricSets -> assertThat(metricSets.get(Labels.EMPTY).getGauge("foo").get()).isEqualTo(42)); // the active and inactive metricSets are now switched, verify that the previous inactive metricSets also contain the same gauges metricRegistry.report(metricSets -> assertThat(metricSets.get(Labels.EMPTY).getGauge("foo").get()).isEqualTo(42)); diff --git a/apm-agent-core/src/test/java/co/elastic/apm/agent/metrics/builtin/JvmMemoryMetricsTest.java b/apm-agent-core/src/test/java/co/elastic/apm/agent/metrics/builtin/JvmMemoryMetricsTest.java index 645ae6baab..af2c4f551e 100644 --- a/apm-agent-core/src/test/java/co/elastic/apm/agent/metrics/builtin/JvmMemoryMetricsTest.java +++ b/apm-agent-core/src/test/java/co/elastic/apm/agent/metrics/builtin/JvmMemoryMetricsTest.java @@ -41,12 +41,12 @@ void testMetrics() { final MetricRegistry registry = new MetricRegistry(mock(ReporterConfiguration.class)); jvmMemoryMetrics.bindTo(registry); System.out.println(registry.toString()); - assertThat(registry.getGauge("jvm.memory.heap.used", Labels.Immutable.empty())).isNotZero(); - assertThat(registry.getGauge("jvm.memory.heap.committed", Labels.Immutable.empty())).isNotZero(); - assertThat(registry.getGauge("jvm.memory.heap.max", Labels.Immutable.empty())).isNotZero(); - assertThat(registry.getGauge("jvm.memory.non_heap.used", Labels.Immutable.empty())).isNotZero(); - assertThat(registry.getGauge("jvm.memory.non_heap.committed", Labels.Immutable.empty())).isNotZero(); - assertThat(registry.getGauge("jvm.memory.non_heap.max", Labels.Immutable.empty())).isNotZero(); + assertThat(registry.getGauge("jvm.memory.heap.used", Labels.EMPTY)).isNotZero(); + assertThat(registry.getGauge("jvm.memory.heap.committed", Labels.EMPTY)).isNotZero(); + assertThat(registry.getGauge("jvm.memory.heap.max", Labels.EMPTY)).isNotZero(); + assertThat(registry.getGauge("jvm.memory.non_heap.used", Labels.EMPTY)).isNotZero(); + assertThat(registry.getGauge("jvm.memory.non_heap.committed", Labels.EMPTY)).isNotZero(); + assertThat(registry.getGauge("jvm.memory.non_heap.max", Labels.EMPTY)).isNotZero(); final long[] longs = new long[1000000]; System.out.println(registry.toString()); } diff --git a/apm-agent-core/src/test/java/co/elastic/apm/agent/metrics/builtin/SystemMetricsTest.java b/apm-agent-core/src/test/java/co/elastic/apm/agent/metrics/builtin/SystemMetricsTest.java index 10c39a067c..bca6efe285 100644 --- a/apm-agent-core/src/test/java/co/elastic/apm/agent/metrics/builtin/SystemMetricsTest.java +++ b/apm-agent-core/src/test/java/co/elastic/apm/agent/metrics/builtin/SystemMetricsTest.java @@ -47,10 +47,10 @@ void testSystemMetrics() throws InterruptedException { // makes sure system.cpu.total.norm.pct does not return NaN consumeCpu(); Thread.sleep(1000); - assertThat(metricRegistry.getGauge("system.process.cpu.total.norm.pct", Labels.Immutable.empty())).isBetween(0.0, 1.0); - assertThat(metricRegistry.getGauge("system.memory.total", Labels.Immutable.empty())).isGreaterThan(0.0); - assertThat(metricRegistry.getGauge("system.memory.actual.free", Labels.Immutable.empty())).isGreaterThan(0.0); - assertThat(metricRegistry.getGauge("system.process.memory.size", Labels.Immutable.empty())).isGreaterThan(0.0); + assertThat(metricRegistry.getGauge("system.process.cpu.total.norm.pct", Labels.EMPTY)).isBetween(0.0, 1.0); + assertThat(metricRegistry.getGauge("system.memory.total", Labels.EMPTY)).isGreaterThan(0.0); + assertThat(metricRegistry.getGauge("system.memory.actual.free", Labels.EMPTY)).isGreaterThan(0.0); + assertThat(metricRegistry.getGauge("system.process.memory.size", Labels.EMPTY)).isGreaterThan(0.0); } @ParameterizedTest @@ -61,7 +61,7 @@ void testSystemMetrics() throws InterruptedException { void testFreeMemoryMeminfo(String file, long value) throws Exception { SystemMetrics systemMetrics = new SystemMetrics(new File(getClass().getResource(file).toURI())); systemMetrics.bindTo(metricRegistry); - assertThat(metricRegistry.getGauge("system.memory.actual.free", Labels.Immutable.empty())).isEqualTo(value); + assertThat(metricRegistry.getGauge("system.memory.actual.free", Labels.EMPTY)).isEqualTo(value); } private void consumeCpu() { diff --git a/apm-agent-core/src/test/java/co/elastic/apm/agent/metrics/builtin/ThreadMetricsTest.java b/apm-agent-core/src/test/java/co/elastic/apm/agent/metrics/builtin/ThreadMetricsTest.java index 011a60e628..7a6fe52f18 100644 --- a/apm-agent-core/src/test/java/co/elastic/apm/agent/metrics/builtin/ThreadMetricsTest.java +++ b/apm-agent-core/src/test/java/co/elastic/apm/agent/metrics/builtin/ThreadMetricsTest.java @@ -41,7 +41,7 @@ class ThreadMetricsTest { @Test void testThreadCount() { threadMetrics.bindTo(registry); - double numThreads = registry.getGauge("jvm.thread.count", Labels.Immutable.empty()); + double numThreads = registry.getGauge("jvm.thread.count", Labels.EMPTY); assertThat(numThreads).isNotZero(); for (int i = 0; i < NUM_ADDED_THREADS; i++) { Thread thread = new Thread(() -> { @@ -55,6 +55,6 @@ void testThreadCount() { thread.setDaemon(true); thread.start(); } - assertThat(registry.getGauge("jvm.thread.count", Labels.Immutable.empty())).isEqualTo(numThreads + NUM_ADDED_THREADS); + assertThat(registry.getGauge("jvm.thread.count", Labels.EMPTY)).isEqualTo(numThreads + NUM_ADDED_THREADS); } } From 253ad876e6d73cfe70a82c479bedbc2a786a280b Mon Sep 17 00:00:00 2001 From: Felix Barnsteiner Date: Wed, 5 Jun 2019 20:15:42 +0200 Subject: [PATCH 17/21] Eliminate allocations --- .../agent/impl/transaction/Transaction.java | 33 +++++++++---------- .../apm/agent/metrics/MetricRegistry.java | 19 +++++++---- 2 files changed, 28 insertions(+), 24 deletions(-) diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/Transaction.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/Transaction.java index a021056b3f..b3ef871a6c 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/Transaction.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/Transaction.java @@ -278,25 +278,24 @@ private void trackMetrics() { labels.resetState(); labels.transactionName(name).transactionType(type); final MetricRegistry metricRegistry = tracer.getMetricRegistry(); - metricRegistry.doAtomically(new Runnable() { - @Override - public void run() { - metricRegistry.updateTimer("transaction.duration", labels, getDuration()); - if (collectBreakdownMetrics) { - metricRegistry.incrementCounter("transaction.breakdown.count", labels); - final KeyListConcurrentHashMap spanTimings = getSpanTimings(); - List keyList = spanTimings.keyList(); - for (int i = 0; i < keyList.size(); i++) { - Labels spanType = keyList.get(i); - final Timer timer = spanTimings.get(spanType); - if (timer.getCount() > 0) { - labels.spanType(spanType.getSpanType()).spanSubType(spanType.getSpanSubType()); - metricRegistry.updateTimer("span.self_time", labels, timer.getTotalTimeUs(), timer.getCount()); - timer.resetState(); - } + long criticalValueAtEnter = metricRegistry.writerCriticalSectionEnter(); + try { + metricRegistry.updateTimer("transaction.duration", labels, getDuration()); + if (collectBreakdownMetrics) { + metricRegistry.incrementCounter("transaction.breakdown.count", labels); + List keyList = spanTimings.keyList(); + for (int i = 0; i < keyList.size(); i++) { + Labels spanType = keyList.get(i); + final Timer timer = spanTimings.get(spanType); + if (timer.getCount() > 0) { + labels.spanType(spanType.getSpanType()).spanSubType(spanType.getSpanSubType()); + metricRegistry.updateTimer("span.self_time", labels, timer.getTotalTimeUs(), timer.getCount()); + timer.resetState(); } } } - }); + } finally { + metricRegistry.writerCriticalSectionExit(criticalValueAtEnter); + } } } diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/metrics/MetricRegistry.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/metrics/MetricRegistry.java index 20b3821800..1cf288846b 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/metrics/MetricRegistry.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/metrics/MetricRegistry.java @@ -214,13 +214,18 @@ public void incrementCounter(String name, Labels labels) { } } - public void doAtomically(Runnable r) { - long criticalValueAtEnter = phaser.writerCriticalSectionEnter(); - try { - r.run(); - } finally { - phaser.writerCriticalSectionExit(criticalValueAtEnter); - } + /** + * @see WriterReaderPhaser#writerCriticalSectionEnter() + */ + public long writerCriticalSectionEnter() { + return phaser.writerCriticalSectionEnter(); + } + + /** + * @see WriterReaderPhaser#writerCriticalSectionExit(long) + */ + public void writerCriticalSectionExit(long criticalValueAtEnter) { + phaser.writerCriticalSectionExit(criticalValueAtEnter); } public interface MetricsReporter { From dbf5884e8515be53c7d3145234de6ed1f016c78d Mon Sep 17 00:00:00 2001 From: Felix Barnsteiner Date: Fri, 21 Jun 2019 09:52:25 +0200 Subject: [PATCH 18/21] Addressed some comments --- .../agent/impl/transaction/AbstractSpan.java | 12 +++-- .../apm/agent/impl/transaction/Span.java | 10 ++-- .../agent/impl/transaction/Transaction.java | 10 +++- .../co/elastic/apm/agent/metrics/Labels.java | 6 --- .../apm/agent/metrics/MetricRegistry.java | 17 ++++-- .../serialize/MetricRegistrySerializer.java | 7 ++- .../agent/util/KeyListConcurrentHashMap.java | 52 ++++++++++++------- 7 files changed, 69 insertions(+), 45 deletions(-) diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/AbstractSpan.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/AbstractSpan.java index 3b29867c97..6952a240f2 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/AbstractSpan.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/AbstractSpan.java @@ -52,7 +52,7 @@ public abstract class AbstractSpan extends TraceContextH // in microseconds protected long duration; - protected ReentrantTimer childDurations = new ReentrantTimer(); + private ReentrantTimer childDurations = new ReentrantTimer(); protected AtomicInteger references = new AtomicInteger(); protected volatile boolean finished = true; @@ -60,7 +60,7 @@ public int getReferenceCount() { return references.get(); } - public static class ReentrantTimer implements Recyclable { + private static class ReentrantTimer implements Recyclable { private AtomicInteger nestingLevel = new AtomicInteger(); private AtomicLong start = new AtomicLong(); @@ -248,16 +248,18 @@ public final void end(long epochMicros) { name.append("unnamed"); } childDurations.forceStop(epochMicros); - doEnd(epochMicros); - // has to be set last so doEnd callbacks don't think it has already been finished + beforeEnd(epochMicros); this.finished = true; + afterEnd(); } else { logger.warn("End has already been called: {}", this); assert false; } } - protected abstract void doEnd(long epochMicros); + protected abstract void beforeEnd(long epochMicros); + + protected abstract void afterEnd(); @Override public boolean isChildOf(TraceContextHolder other) { diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/Span.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/Span.java index aa83d85f66..e1dba32709 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/Span.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/Span.java @@ -195,7 +195,7 @@ public String getAction() { } @Override - public void doEnd(long epochMicros) { + public void beforeEnd(long epochMicros) { if (logger.isDebugEnabled()) { logger.debug("} endSpan {}", this); if (logger.isTraceEnabled()) { @@ -206,12 +206,16 @@ public void doEnd(long epochMicros) { type = "custom"; } if (transaction != null) { - transaction.incrementTimer(getType(), subtype, getSelfDuration()); + transaction.incrementTimer(type, subtype, getSelfDuration()); } if (parent != null) { - parent.decrementReferences(); parent.onChildEnd(epochMicros); + parent.decrementReferences(); } + } + + @Override + protected void afterEnd() { this.tracer.endSpan(this); } diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/Transaction.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/Transaction.java index b3ef871a6c..bc3612635a 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/Transaction.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/Transaction.java @@ -162,7 +162,7 @@ public void setUser(String id, String email, String username) { } @Override - public void doEnd(long epochMicros) { + public void beforeEnd(long epochMicros) { if (!isSampled()) { context.resetState(); } @@ -171,6 +171,11 @@ public void doEnd(long epochMicros) { } context.onTransactionEnd(); incrementTimer("app", null, getSelfDuration()); + } + + @Override + protected void afterEnd() { + // timers are guaranteed to be stable now - no concurrent updates possible as finished is true trackMetrics(); this.tracer.endTransaction(this); } @@ -253,7 +258,8 @@ void incrementTimer(@Nullable String type, @Nullable String subtype, long durati } final Labels.Mutable spanType = labelsThreadLocal.get(); spanType.resetState(); - Timer timer = spanTimings.get(spanType.spanType(type).spanSubType(subtype)); + spanType.spanType(type).spanSubType(subtype); + Timer timer = spanTimings.get(spanType); if (timer == null) { timer = new Timer(); Timer racyTimer = spanTimings.putIfAbsent(spanType.immutableCopy(), timer); diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/metrics/Labels.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/metrics/Labels.java index 0dc90a1578..163d978931 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/metrics/Labels.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/metrics/Labels.java @@ -211,13 +211,7 @@ class Mutable extends AbstractBase implements Recyclable { private String spanSubType; private Mutable() { - this(Collections.emptyList(), Collections.emptyList()); - } - - private Mutable(List keys, List values) { super(new ArrayList(), new ArrayList()); - this.keys.addAll(keys); - this.values.addAll(values); } public static Mutable of() { diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/metrics/MetricRegistry.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/metrics/MetricRegistry.java index 1cf288846b..07d3929945 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/metrics/MetricRegistry.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/metrics/MetricRegistry.java @@ -54,6 +54,11 @@ public class MetricRegistry { */ private volatile ConcurrentMap activeMetricSets = new ConcurrentHashMap<>(); private ConcurrentMap inactiveMetricSets = new ConcurrentHashMap<>(); + /** + * Final and thus stable references to the two different metric sets. + * See {@link #getOrCreateMetricSet(Labels)} + */ + private final ConcurrentMap metricSets1 = activeMetricSets, metricSets2 = inactiveMetricSets; public MetricRegistry(ReporterConfiguration config) { this.config = config; @@ -187,19 +192,21 @@ private MetricSet getOrCreateMetricSet(Labels labels) { @Nonnull private MetricSet createMetricSet(Labels.Immutable labelsCopy) { // Gauges are the only metric types which are not reset after each report (as opposed to counters and timers) - // that's why both the activeMetricSet and inactiveMetricSet have to contain the exact same gauges. + // that's why both metric sets have to contain the exact same gauges. + // we can't access inactiveMetricSets as it might be swapped as this method is executed + // inactiveMetricSets is only stable after flipping the phase (phaser.flipPhase) MetricSet metricSet = new MetricSet(labelsCopy); - final MetricSet racyMetricSet = activeMetricSets.putIfAbsent(labelsCopy, metricSet); + final MetricSet racyMetricSet = metricSets1.putIfAbsent(labelsCopy, metricSet); if (racyMetricSet != null) { metricSet = racyMetricSet; } // even if the map already contains this metric set, the gauges reference will be the same - inactiveMetricSets.putIfAbsent(labelsCopy, new MetricSet(labelsCopy, metricSet.getGauges())); - if (activeMetricSets.size() >= METRIC_SET_LIMIT) { + metricSets2.putIfAbsent(labelsCopy, new MetricSet(labelsCopy, metricSet.getGauges())); + if (metricSets1.size() >= METRIC_SET_LIMIT) { logger.warn("The limit of 1000 timers has been reached, no new timers will be created. " + "Try to name your transactions so that there are less distinct transaction names."); } - return metricSet; + return activeMetricSets.get(labelsCopy); } public void incrementCounter(String name, Labels labels) { diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/report/serialize/MetricRegistrySerializer.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/report/serialize/MetricRegistrySerializer.java index 9837f2aa03..55133b18c3 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/report/serialize/MetricRegistrySerializer.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/report/serialize/MetricRegistrySerializer.java @@ -133,11 +133,10 @@ private static boolean serializeTimers(Map timers, boolean hasSam return hasSamples; } - private static void serializeCounters(Map timers, boolean hasSamples, JsonWriter jw) { - - final int size = timers.size(); + private static void serializeCounters(Map counters, boolean hasSamples, JsonWriter jw) { + final int size = counters.size(); if (size > 0) { - final Iterator> iterator = timers.entrySet().iterator(); + final Iterator> iterator = counters.entrySet().iterator(); // serialize first valid value AtomicLong value = null; diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/util/KeyListConcurrentHashMap.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/util/KeyListConcurrentHashMap.java index 40c7999a1c..31adb8d1ad 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/util/KeyListConcurrentHashMap.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/util/KeyListConcurrentHashMap.java @@ -11,9 +11,9 @@ * the Apache License, Version 2.0 (the "License"); you may * not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -39,48 +39,60 @@ public class KeyListConcurrentHashMap extends ConcurrentHashMap { @Override public V put(K key, V value) { - final V previousValue = super.put(key, value); - if (previousValue == null) { - keyList.add(key); + synchronized (this) { + final V previousValue = super.put(key, value); + if (previousValue == null) { + keyList.add(key); + } + return previousValue; } - return previousValue; } @Override public void putAll(Map m) { - for (Entry entry : m.entrySet()) { - put(entry.getKey(), entry.getValue()); + synchronized (this) { + for (Entry entry : m.entrySet()) { + put(entry.getKey(), entry.getValue()); + } } } @Override public V remove(Object key) { - keyList.remove(key); - return super.remove(key); + synchronized (this) { + keyList.remove(key); + return super.remove(key); + } } @Override public void clear() { - keyList.clear(); - super.clear(); + synchronized (this) { + keyList.clear(); + super.clear(); + } } @Override public V putIfAbsent(K key, V value) { - final V previousValue = super.putIfAbsent(key, value); - if (previousValue == null) { - keyList.add(key); + synchronized (this) { + final V previousValue = super.putIfAbsent(key, value); + if (previousValue == null) { + keyList.add(key); + } + return previousValue; } - return previousValue; } @Override public boolean remove(Object key, Object value) { - final boolean remove = super.remove(key, value); - if (remove) { - keyList.remove(key); + synchronized (this) { + final boolean remove = super.remove(key, value); + if (remove) { + keyList.remove(key); + } + return remove; } - return remove; } /** From d8394c388f0d39f3cc4d925feb9575507bdbdc96 Mon Sep 17 00:00:00 2001 From: Felix Barnsteiner Date: Fri, 21 Jun 2019 15:43:38 +0200 Subject: [PATCH 19/21] Report timers in microseconds and use .us suffix --- .../report/serialize/MetricRegistrySerializer.java | 2 +- .../apm/agent/impl/SpanTypeBreakdownTest.java | 2 +- .../report/serialize/MetricSetSerializationTest.java | 12 ++++++------ 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/report/serialize/MetricRegistrySerializer.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/report/serialize/MetricRegistrySerializer.java index 55133b18c3..9b4064e266 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/report/serialize/MetricRegistrySerializer.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/report/serialize/MetricRegistrySerializer.java @@ -177,7 +177,7 @@ private static boolean isValid(double value) { private static void serializeTimer(String key, Timer timer, JsonWriter jw) { serializeValue(key, ".count", timer.getCount(), jw); jw.writeByte(JsonWriter.COMMA); - serializeValue(key, ".sum", timer.getTotalTimeMs(), jw); + serializeValue(key, ".sum.us", timer.getTotalTimeUs(), jw); timer.resetState(); } diff --git a/apm-agent-core/src/test/java/co/elastic/apm/agent/impl/SpanTypeBreakdownTest.java b/apm-agent-core/src/test/java/co/elastic/apm/agent/impl/SpanTypeBreakdownTest.java index f22b6ad1c8..d2c6458127 100644 --- a/apm-agent-core/src/test/java/co/elastic/apm/agent/impl/SpanTypeBreakdownTest.java +++ b/apm-agent-core/src/test/java/co/elastic/apm/agent/impl/SpanTypeBreakdownTest.java @@ -82,7 +82,7 @@ void testBreakdown_noSpans() { /* * ██████████░░░░░░░░░░██████████ * └─────────██████████ - * 10 20 30 + * 10 20 30g */ @Test void testBreakdown_singleDbSpan() { diff --git a/apm-agent-core/src/test/java/co/elastic/apm/agent/report/serialize/MetricSetSerializationTest.java b/apm-agent-core/src/test/java/co/elastic/apm/agent/report/serialize/MetricSetSerializationTest.java index 6139bde55a..8b52d94897 100644 --- a/apm-agent-core/src/test/java/co/elastic/apm/agent/report/serialize/MetricSetSerializationTest.java +++ b/apm-agent-core/src/test/java/co/elastic/apm/agent/report/serialize/MetricSetSerializationTest.java @@ -64,9 +64,9 @@ void testSerializeTimers() throws IOException { registry.updateTimer("bar.baz", labels, 42, 2); final JsonNode jsonNode = reportAsJson(labels); final JsonNode samples = jsonNode.get("metricset").get("samples"); - assertThat(samples.get("foo.bar.sum").get("value").doubleValue()).isEqualTo(42 / 1000.0); + assertThat(samples.get("foo.bar.sum.us").get("value").doubleValue()).isEqualTo(42); assertThat(samples.get("foo.bar.count").get("value").doubleValue()).isEqualTo(1); - assertThat(samples.get("bar.baz.sum").get("value").doubleValue()).isEqualTo(42 / 1000.0); + assertThat(samples.get("bar.baz.sum.us").get("value").doubleValue()).isEqualTo(42); assertThat(samples.get("bar.baz.count").get("value").doubleValue()).isEqualTo(2); } @@ -89,7 +89,7 @@ void testSerializeTimersWithTopLevelLabels() throws IOException { assertThat(metricset.get("transaction").get("type").textValue()).isEqualTo("bar"); assertThat(metricset.get("span").get("type").textValue()).isEqualTo("baz"); assertThat(metricset.get("span").get("subtype").textValue()).isEqualTo("qux"); - assertThat(metricset.get("samples").get("foo.bar.sum").get("value").doubleValue()).isEqualTo(42 / 1000.0); + assertThat(metricset.get("samples").get("foo.bar.sum.us").get("value").doubleValue()).isEqualTo(42); } @Test @@ -103,7 +103,7 @@ void testSerializeTimersReset() throws IOException { registry.updateTimer("foo.bar", labels, 42); final JsonNode samples = reportAsJson(labels).get("metricset").get("samples"); - assertThat(samples.get("foo.bar.sum").get("value").doubleValue()).isEqualTo(42 / 1000.0); + assertThat(samples.get("foo.bar.sum.us").get("value").doubleValue()).isEqualTo(42); assertThat(samples.get("foo.bar.count").get("value").doubleValue()).isEqualTo(1); } @@ -165,11 +165,11 @@ void testCounterReset() throws IOException { @Test void testTimerReset() throws IOException { - registry.updateTimer("foo", Labels.EMPTY, 1000); + registry.updateTimer("foo", Labels.EMPTY, 1); JsonNode samples = reportAsJson(Labels.EMPTY).get("metricset").get("samples"); assertThat(samples.size()).isEqualTo(2); - assertThat(samples.get("foo.sum").get("value").intValue()).isOne(); + assertThat(samples.get("foo.sum.us").get("value").intValue()).isOne(); assertThat(samples.get("foo.count").get("value").intValue()).isOne(); assertThat(reportAsJson(Labels.EMPTY).get("metricset").get("samples")).hasSize(0); From bf6c221ff4cb32a4fd7e21cb81c29292cb10ce2e Mon Sep 17 00:00:00 2001 From: Felix Barnsteiner Date: Thu, 18 Jul 2019 11:17:13 +0200 Subject: [PATCH 20/21] Fix race condition between trackMetrics and incrementTimer --- .../agent/impl/transaction/Transaction.java | 117 +++++++++++------- .../apm/agent/impl/SpanTypeBreakdownTest.java | 2 +- 2 files changed, 75 insertions(+), 44 deletions(-) diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/Transaction.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/Transaction.java index 8fe66dc296..5696d402c6 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/Transaction.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/Transaction.java @@ -31,6 +31,7 @@ import co.elastic.apm.agent.metrics.MetricRegistry; import co.elastic.apm.agent.metrics.Timer; import co.elastic.apm.agent.util.KeyListConcurrentHashMap; +import org.HdrHistogram.WriterReaderPhaser; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,7 +60,9 @@ protected Labels.Mutable initialValue() { */ private final TransactionContext context = new TransactionContext(); private final SpanCount spanCount = new SpanCount(); - private final KeyListConcurrentHashMap spanTimings = new KeyListConcurrentHashMap<>(); + // type: subtype: timer + private final KeyListConcurrentHashMap> spanTimings = new KeyListConcurrentHashMap<>(); + private final WriterReaderPhaser phaser = new WriterReaderPhaser(); /** * The result of the transaction. HTTP status code for HTTP-related transactions. @@ -175,7 +178,6 @@ public void beforeEnd(long epochMicros) { @Override protected void afterEnd() { - // timers are guaranteed to be stable now - no concurrent updates possible as finished is true trackMetrics(); this.tracer.endTransaction(this); } @@ -184,7 +186,7 @@ public SpanCount getSpanCount() { return spanCount; } - public KeyListConcurrentHashMap getSpanTimings() { + public KeyListConcurrentHashMap> getSpanTimings() { return spanTimings; } @@ -248,55 +250,84 @@ protected void recycle() { } void incrementTimer(@Nullable String type, @Nullable String subtype, long duration) { - if (!collectBreakdownMetrics || type == null || finished) { - return; - } - final Labels.Mutable spanType = labelsThreadLocal.get(); - spanType.resetState(); - spanType.spanType(type).spanSubType(subtype); - Timer timer = spanTimings.get(spanType); - if (timer == null) { - timer = new Timer(); - Timer racyTimer = spanTimings.putIfAbsent(spanType.immutableCopy(), timer); - if (racyTimer != null) { - timer = racyTimer; + long criticalValueAtEnter = phaser.writerCriticalSectionEnter(); + try { + if (!collectBreakdownMetrics || type == null || finished) { + return; } - } - timer.update(duration); - if (finished) { - // in case end()->trackMetrics() has been called concurrently - // don't leak timers - timer.resetState(); + if (subtype == null) { + subtype = ""; + } + KeyListConcurrentHashMap timersBySubtype = spanTimings.get(type); + if (timersBySubtype == null) { + timersBySubtype = new KeyListConcurrentHashMap<>(); + KeyListConcurrentHashMap racyMap = spanTimings.putIfAbsent(type, timersBySubtype); + if (racyMap != null) { + timersBySubtype = racyMap; + } + } + Timer timer = timersBySubtype.get(subtype); + if (timer == null) { + timer = new Timer(); + Timer racyTimer = timersBySubtype.putIfAbsent(subtype, timer); + if (racyTimer != null) { + timer = racyTimer; + } + } + timer.update(duration); + if (finished) { + // in case end()->trackMetrics() has been called concurrently + // don't leak timers + timer.resetState(); + } + } finally { + phaser.writerCriticalSectionExit(criticalValueAtEnter); } } - private void trackMetrics() { - final String type = getType(); - if (type == null) { - return; - } - final Labels.Mutable labels = labelsThreadLocal.get(); - labels.resetState(); - labels.transactionName(name).transactionType(type); - final MetricRegistry metricRegistry = tracer.getMetricRegistry(); - long criticalValueAtEnter = metricRegistry.writerCriticalSectionEnter(); + public void trackMetrics() { try { - metricRegistry.updateTimer("transaction.duration", labels, getDuration()); - if (collectBreakdownMetrics) { - metricRegistry.incrementCounter("transaction.breakdown.count", labels); - List keyList = spanTimings.keyList(); - for (int i = 0; i < keyList.size(); i++) { - Labels spanType = keyList.get(i); - final Timer timer = spanTimings.get(spanType); - if (timer.getCount() > 0) { - labels.spanType(spanType.getSpanType()).spanSubType(spanType.getSpanSubType()); - metricRegistry.updateTimer("span.self_time", labels, timer.getTotalTimeUs(), timer.getCount()); - timer.resetState(); + phaser.readerLock(); + phaser.flipPhase(); + // timers are guaranteed to be stable now + // - no concurrent updates possible as finished is true + // - no other thread is running the incrementTimer method, + // as flipPhase only returns when all threads have exited that method + + final String type = getType(); + if (type == null) { + return; + } + final Labels.Mutable labels = labelsThreadLocal.get(); + labels.resetState(); + labels.transactionName(name).transactionType(type); + final MetricRegistry metricRegistry = tracer.getMetricRegistry(); + long criticalValueAtEnter = metricRegistry.writerCriticalSectionEnter(); + try { + metricRegistry.updateTimer("transaction.duration", labels, getDuration()); + if (collectBreakdownMetrics) { + metricRegistry.incrementCounter("transaction.breakdown.count", labels); + List types = spanTimings.keyList(); + for (int i = 0; i < types.size(); i++) { + String spanType = types.get(i); + KeyListConcurrentHashMap timerBySubtype = spanTimings.get(spanType); + List subtypes = timerBySubtype.keyList(); + for (int j = 0; j < subtypes.size(); j++) { + String subtype = subtypes.get(j); + final Timer timer = timerBySubtype.get(subtype); + if (timer.getCount() > 0) { + labels.spanType(spanType).spanSubType(!subtype.equals("") ? subtype : null); + metricRegistry.updateTimer("span.self_time", labels, timer.getTotalTimeUs(), timer.getCount()); + timer.resetState(); + } + } } } + } finally { + metricRegistry.writerCriticalSectionExit(criticalValueAtEnter); } } finally { - metricRegistry.writerCriticalSectionExit(criticalValueAtEnter); + phaser.readerUnlock(); } } } diff --git a/apm-agent-core/src/test/java/co/elastic/apm/agent/impl/SpanTypeBreakdownTest.java b/apm-agent-core/src/test/java/co/elastic/apm/agent/impl/SpanTypeBreakdownTest.java index d2c6458127..3dc36dc544 100644 --- a/apm-agent-core/src/test/java/co/elastic/apm/agent/impl/SpanTypeBreakdownTest.java +++ b/apm-agent-core/src/test/java/co/elastic/apm/agent/impl/SpanTypeBreakdownTest.java @@ -334,7 +334,7 @@ void testBreakdown_singleDbSpan_exceedingParent() { // recycled transactions should not leak child timings reporter.assertRecycledAfterDecrementingReferences(); - assertThat(reporter.getFirstTransaction().getSpanTimings().get(Labels.Mutable.of().spanType("db").spanSubType("mysql"))).isNull(); + assertThat(reporter.getFirstTransaction().getSpanTimings().get("db")).isNull(); tracer.getMetricRegistry().report(metricSets -> { assertThat(getTimer(metricSets, "span.self_time", "app", null).getCount()).isEqualTo(1); From f84852add5aec095cdee344536e0f4a89c393bf8 Mon Sep 17 00:00:00 2001 From: Felix Barnsteiner Date: Fri, 19 Jul 2019 10:33:49 +0200 Subject: [PATCH 21/21] Renaming and add comments --- .../agent/impl/transaction/AbstractSpan.java | 26 ++++++++-------- .../agent/impl/transaction/Transaction.java | 31 ++++++++++++------- .../apm/agent/impl/SpanTypeBreakdownTest.java | 10 ++---- 3 files changed, 35 insertions(+), 32 deletions(-) diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/AbstractSpan.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/AbstractSpan.java index e94fd405c1..ee8e39f9df 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/AbstractSpan.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/AbstractSpan.java @@ -52,7 +52,7 @@ public abstract class AbstractSpan extends TraceContextH // in microseconds protected long duration; - private ReentrantTimer childDurations = new ReentrantTimer(); + private ChildDurationTimer childDurations = new ChildDurationTimer(); protected AtomicInteger references = new AtomicInteger(); protected volatile boolean finished = true; @@ -60,9 +60,9 @@ public int getReferenceCount() { return references.get(); } - private static class ReentrantTimer implements Recyclable { + private static class ChildDurationTimer implements Recyclable { - private AtomicInteger nestingLevel = new AtomicInteger(); + private AtomicInteger activeChildren = new AtomicInteger(); private AtomicLong start = new AtomicLong(); private AtomicLong duration = new AtomicLong(); @@ -71,8 +71,8 @@ private static class ReentrantTimer implements Recyclable { * * @param startTimestamp */ - public void start(long startTimestamp) { - if (nestingLevel.incrementAndGet() == 1) { + void onChildStart(long startTimestamp) { + if (activeChildren.incrementAndGet() == 1) { start.set(startTimestamp); } } @@ -81,8 +81,8 @@ public void start(long startTimestamp) { * Stops the timer and increments the duration if no other direct children are still running * @param endTimestamp */ - public void stop(long endTimestamp) { - if (nestingLevel.decrementAndGet() == 0) { + void onChildEnd(long endTimestamp) { + if (activeChildren.decrementAndGet() == 0) { incrementDuration(endTimestamp); } } @@ -92,8 +92,8 @@ public void stop(long endTimestamp) { * * @param endTimestamp */ - public void forceStop(long endTimestamp) { - if (nestingLevel.getAndSet(0) != 0) { + void onSpanEnd(long endTimestamp) { + if (activeChildren.getAndSet(0) != 0) { incrementDuration(endTimestamp); } } @@ -104,7 +104,7 @@ private void incrementDuration(long epochMicros) { @Override public void resetState() { - nestingLevel.set(0); + activeChildren.set(0); start.set(0); duration.set(0); } @@ -247,7 +247,7 @@ public final void end(long epochMicros) { if (name.length() == 0) { name.append("unnamed"); } - childDurations.forceStop(epochMicros); + childDurations.onSpanEnd(epochMicros); beforeEnd(epochMicros); this.finished = true; afterEnd(); @@ -313,13 +313,13 @@ public void setStartTimestamp(long epochMicros) { void onChildStart(long epochMicros) { if (collectBreakdownMetrics) { - childDurations.start(epochMicros); + childDurations.onChildStart(epochMicros); } } void onChildEnd(long epochMicros) { if (collectBreakdownMetrics) { - childDurations.stop(epochMicros); + childDurations.onChildEnd(epochMicros); } } diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/Transaction.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/Transaction.java index 5696d402c6..5ad7bae80e 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/Transaction.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/Transaction.java @@ -11,9 +11,9 @@ * the Apache License, Version 2.0 (the "License"); you may * not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -60,8 +60,16 @@ protected Labels.Mutable initialValue() { */ private final TransactionContext context = new TransactionContext(); private final SpanCount spanCount = new SpanCount(); - // type: subtype: timer - private final KeyListConcurrentHashMap> spanTimings = new KeyListConcurrentHashMap<>(); + /** + * type: subtype: timer + *

+ * This map is not cleared when the transaction is recycled. + * Instead, it accumulates span types and subtypes over time. + * When tracking the metrics, the timers are reset and only those with a count > 0 are examined. + * That is done in order to minimize {@link java.util.Map.Entry} garbage. + *

+ */ + private final KeyListConcurrentHashMap> timerBySpanTypeAndSubtype = new KeyListConcurrentHashMap<>(); private final WriterReaderPhaser phaser = new WriterReaderPhaser(); /** @@ -186,8 +194,8 @@ public SpanCount getSpanCount() { return spanCount; } - public KeyListConcurrentHashMap> getSpanTimings() { - return spanTimings; + public KeyListConcurrentHashMap> getTimerBySpanTypeAndSubtype() { + return timerBySpanTypeAndSubtype; } @Override @@ -198,6 +206,7 @@ public void resetState() { spanCount.resetState(); noop = false; type = null; + // don't clear timerBySpanTypeAndSubtype map (see field-level javadoc) } public boolean isNoop() { @@ -258,10 +267,10 @@ void incrementTimer(@Nullable String type, @Nullable String subtype, long durati if (subtype == null) { subtype = ""; } - KeyListConcurrentHashMap timersBySubtype = spanTimings.get(type); + KeyListConcurrentHashMap timersBySubtype = timerBySpanTypeAndSubtype.get(type); if (timersBySubtype == null) { timersBySubtype = new KeyListConcurrentHashMap<>(); - KeyListConcurrentHashMap racyMap = spanTimings.putIfAbsent(type, timersBySubtype); + KeyListConcurrentHashMap racyMap = timerBySpanTypeAndSubtype.putIfAbsent(type, timersBySubtype); if (racyMap != null) { timersBySubtype = racyMap; } @@ -285,7 +294,7 @@ void incrementTimer(@Nullable String type, @Nullable String subtype, long durati } } - public void trackMetrics() { + private void trackMetrics() { try { phaser.readerLock(); phaser.flipPhase(); @@ -307,10 +316,10 @@ public void trackMetrics() { metricRegistry.updateTimer("transaction.duration", labels, getDuration()); if (collectBreakdownMetrics) { metricRegistry.incrementCounter("transaction.breakdown.count", labels); - List types = spanTimings.keyList(); + List types = timerBySpanTypeAndSubtype.keyList(); for (int i = 0; i < types.size(); i++) { String spanType = types.get(i); - KeyListConcurrentHashMap timerBySubtype = spanTimings.get(spanType); + KeyListConcurrentHashMap timerBySubtype = timerBySpanTypeAndSubtype.get(spanType); List subtypes = timerBySubtype.keyList(); for (int j = 0; j < subtypes.size(); j++) { String subtype = subtypes.get(j); diff --git a/apm-agent-core/src/test/java/co/elastic/apm/agent/impl/SpanTypeBreakdownTest.java b/apm-agent-core/src/test/java/co/elastic/apm/agent/impl/SpanTypeBreakdownTest.java index 3dc36dc544..3593ca9cb8 100644 --- a/apm-agent-core/src/test/java/co/elastic/apm/agent/impl/SpanTypeBreakdownTest.java +++ b/apm-agent-core/src/test/java/co/elastic/apm/agent/impl/SpanTypeBreakdownTest.java @@ -33,18 +33,12 @@ import co.elastic.apm.agent.impl.transaction.TraceContextHolder; import co.elastic.apm.agent.impl.transaction.Transaction; import co.elastic.apm.agent.metrics.Labels; -import co.elastic.apm.agent.metrics.MetricRegistry; import co.elastic.apm.agent.metrics.MetricSet; import co.elastic.apm.agent.metrics.Timer; -import co.elastic.apm.agent.report.serialize.MetricRegistrySerializer; -import com.dslplatform.json.DslJson; -import com.dslplatform.json.JsonWriter; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.stagemonitor.configuration.source.SimpleSource; -import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.util.Map; @@ -116,7 +110,7 @@ void testBreakdown_singleDbSpan_breakdownMetricsDisabled() { transaction.createSpan(10).withType("db").withSubtype("mysql").end(20); transaction.end(30); - assertThat(transaction.getSpanTimings()).isEmpty(); + assertThat(transaction.getTimerBySpanTypeAndSubtype()).isEmpty(); tracer.getMetricRegistry().report(metricSets -> { assertThat(getTimer(metricSets, "span.self_time", "app", null)).isNull(); assertThat(getTimer(metricSets, "span.self_time", "db", "mysql")).isNull(); @@ -334,7 +328,7 @@ void testBreakdown_singleDbSpan_exceedingParent() { // recycled transactions should not leak child timings reporter.assertRecycledAfterDecrementingReferences(); - assertThat(reporter.getFirstTransaction().getSpanTimings().get("db")).isNull(); + assertThat(reporter.getFirstTransaction().getTimerBySpanTypeAndSubtype().get("db")).isNull(); tracer.getMetricRegistry().report(metricSets -> { assertThat(getTimer(metricSets, "span.self_time", "app", null).getCount()).isEqualTo(1);