Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,11 @@
import co.elastic.apm.agent.impl.metadata.MetaDataFuture;
import co.elastic.apm.agent.impl.metadata.NameAndIdField;
import co.elastic.apm.agent.impl.metadata.ServiceFactory;
import co.elastic.apm.agent.impl.transaction.Id;
import co.elastic.apm.agent.sdk.internal.util.LoggerUtils;
import co.elastic.apm.agent.tracer.metrics.DoubleSupplier;
import co.elastic.apm.agent.tracer.metrics.Labels;
import co.elastic.apm.agent.tracer.pooling.Allocator;
import co.elastic.apm.agent.tracer.service.Service;
import co.elastic.apm.agent.tracer.service.ServiceInfo;
import co.elastic.apm.agent.configuration.SpanConfiguration;
Expand Down Expand Up @@ -127,6 +129,7 @@ public class ElasticApmTracer implements Tracer {
private final ObjectPool<Span> spanPool;
private final ObjectPool<ErrorCapture> errorPool;
private final ObjectPool<TraceContext> spanLinkPool;
private final ObjectPool<Id> profilingCorrelationStackTraceIdPool;
private final Reporter reporter;
private final ObjectPoolFactory objectPoolFactory;

Expand Down Expand Up @@ -245,6 +248,13 @@ public void onChange(ConfigurationOption<?> configurationOption, Boolean oldValu
// span links pool allows for 10X the maximum allowed span links per span
spanLinkPool = poolFactory.createSpanLinkPool(AbstractSpan.MAX_ALLOWED_SPAN_LINKS * 10, this);

profilingCorrelationStackTraceIdPool = poolFactory.createRecyclableObjectPool(maxPooledElements, new Allocator<Id>() {
@Override
public Id createInstance() {
return Id.new128BitId();
}
});

sampler = ProbabilitySampler.of(coreConfiguration.getSampleRate().get());
coreConfiguration.getSampleRate().addChangeListener(new ConfigurationOption.ChangeListener<Double>() {
@Override
Expand Down Expand Up @@ -604,6 +614,10 @@ public TraceContext createSpanLink() {
return spanLinkPool.createInstance();
}

public Id createProfilingCorrelationStackTraceId() {
return profilingCorrelationStackTraceIdPool.createInstance();
}

public void recycle(Transaction transaction) {
transactionPool.recycle(transaction);
}
Expand All @@ -620,6 +634,10 @@ public void recycle(TraceContext traceContext) {
spanLinkPool.recycle(traceContext);
}

public void recycleProfilingCorrelationStackTraceId(Id id) {
profilingCorrelationStackTraceIdPool.recycle(id);
}

public synchronized void stop() {
if (tracerState == TracerState.STOPPED) {
// may happen if explicitly stopped in a unit test and executed again within a shutdown hook
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package co.elastic.apm.agent.impl.transaction;

import co.elastic.apm.agent.report.serialize.Base64SerializationUtils;
import co.elastic.apm.agent.report.serialize.HexSerializationUtils;
import co.elastic.apm.agent.tracer.pooling.Recyclable;
import co.elastic.apm.agent.tracer.util.HexUtils;
Expand Down Expand Up @@ -175,6 +176,10 @@ public void writeAsHex(JsonWriter jw) {
HexSerializationUtils.writeBytesAsHex(data, jw);
}

public void writeAsBase64UrlSafe(JsonWriter jw) {
Base64SerializationUtils.writeBytesAsBase64UrlSafe(data, jw);
}

public void writeAsHex(StringBuilder sb) {
HexUtils.writeBytesAsHex(data, sb);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.HdrHistogram.WriterReaderPhaser;

import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

Expand Down Expand Up @@ -107,6 +108,8 @@ public class Transaction extends AbstractSpan<Transaction> implements co.elastic
@Nullable
private Throwable pendingException;

private final ArrayList<Id> profilingCorrelationStackTraceIds = new ArrayList<>();

/**
* Faas
* <p>
Expand Down Expand Up @@ -341,9 +344,23 @@ public void resetState() {
faas.resetState();
wasActivated.set(false);
pendingException = null;
recycleProfilingCorrelationStackTraceIds();
// don't clear timerBySpanTypeAndSubtype map (see field-level javadoc)
}

private void recycleProfilingCorrelationStackTraceIds() {
for (Id toRecycle : profilingCorrelationStackTraceIds) {
tracer.recycleProfilingCorrelationStackTraceId(toRecycle);
}
if (profilingCorrelationStackTraceIds.size() > 100) {
profilingCorrelationStackTraceIds.clear();
//trim overly big lists
profilingCorrelationStackTraceIds.trimToSize();
} else {
profilingCorrelationStackTraceIds.clear();
}
}

@Override
public boolean isNoop() {
return noop;
Expand Down Expand Up @@ -552,4 +569,19 @@ public Throwable getPendingTransactionException() {
return this.pendingException;
}

public void addProfilerCorrelationStackTrace(Id idToCopy) {
Id id = tracer.createProfilingCorrelationStackTraceId();
id.copyFrom(idToCopy);
synchronized (profilingCorrelationStackTraceIds) {
this.profilingCorrelationStackTraceIds.add(id);
}
}

/**
* Returns the list of stacktrace-IDs from the profiler associated with this transaction
* To protect agains concurrent modifications, consumers must synchronize on the returned list.
*/
public List<Id> getProfilingCorrelationStackTraceIds() {
return profilingCorrelationStackTraceIds;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package co.elastic.apm.agent.report.serialize;

import com.dslplatform.json.JsonWriter;

public class Base64SerializationUtils {

private static final byte[] BASE64_URL_CHARS = new byte[]{
'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H',
'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P',
'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X',
'Y', 'Z', 'a', 'b', 'c', 'd', 'e', 'f',
'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n',
'o', 'p', 'q', 'r', 's', 't', 'u', 'v',
'w', 'x', 'y', 'z', '0', '1', '2', '3',
'4', '5', '6', '7', '8', '9', '-', '_',
};


public static void writeBytesAsBase64UrlSafe(byte[] data, JsonWriter jw) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The java Base64 class used in the otel-distro is only available in Java 8+, that's why we need to add a custom implementation here. Also this implementation is allocation free, keeping it in line with the rest of the agent design.

int i = 0;
for (; i + 2 < data.length; i += 3) {
int b0 = ((int) data[i]) & 0xFF;
int b1 = ((int) data[i + 1]) & 0xFF;
int b2 = ((int) data[i + 2]) & 0xFF;
jw.writeByte(BASE64_URL_CHARS[b0 >> 2]);
jw.writeByte(BASE64_URL_CHARS[((b0 << 4) & 63) | (b1 >> 4)]);
jw.writeByte(BASE64_URL_CHARS[((b1 << 2) & 63) | (b2 >> 6)]);
jw.writeByte(BASE64_URL_CHARS[b2 & 63]);
}
int leftOver = data.length - i;
if (leftOver == 1) {
int b0 = ((int) data[i]) & 0xFF;
jw.writeByte(BASE64_URL_CHARS[b0 >> 2]);
jw.writeByte(BASE64_URL_CHARS[(b0 << 4) & 63]);
} else if (leftOver == 2) {
int b0 = ((int) data[i]) & 0xFF;
int b1 = ((int) data[i + 1]) & 0xFF;
jw.writeByte(BASE64_URL_CHARS[b0 >> 2]);
jw.writeByte(BASE64_URL_CHARS[((b0 << 4) & 63) | (b1 >> 4)]);
jw.writeByte(BASE64_URL_CHARS[(b1 << 2) & 63]);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -1050,10 +1051,22 @@ private void serializeSpanLinks(List<TraceContext> spanLinks) {
}
}

private void serializeOTel(AbstractSpan<?> span) {
private void serializeOTel(Span span) {
serializeOtel(span, Collections.<Id>emptyList());
}

private void serializeOTel(Transaction transaction) {
List<Id> profilingCorrelationStackTraceIds = transaction.getProfilingCorrelationStackTraceIds();
synchronized (profilingCorrelationStackTraceIds) {
serializeOtel(transaction, profilingCorrelationStackTraceIds);
}
}

private void serializeOtel(AbstractSpan<?> span, List<Id> profilingStackTraceIds) {
OTelSpanKind kind = span.getOtelKind();
Map<String, Object> attributes = span.getOtelAttributes();
boolean hasAttributes = !attributes.isEmpty();

boolean hasAttributes = !attributes.isEmpty() || !profilingStackTraceIds.isEmpty();
boolean hasKind = kind != null;
if (hasKind || hasAttributes) {
writeFieldName("otel");
Expand All @@ -1070,11 +1083,13 @@ private void serializeOTel(AbstractSpan<?> span) {
}
writeFieldName("attributes");
jw.writeByte(OBJECT_START);
int index = 0;
boolean isFirstAttrib = true;
for (Map.Entry<String, Object> entry : attributes.entrySet()) {
if (index++ > 0) {
if (!isFirstAttrib) {
jw.writeByte(COMMA);
}
isFirstAttrib = false;

writeFieldName(entry.getKey());
Object o = entry.getValue();
if (o instanceof Number) {
Expand All @@ -1085,6 +1100,22 @@ private void serializeOTel(AbstractSpan<?> span) {
BoolConverter.serialize((Boolean) o, jw);
}
}
if (!profilingStackTraceIds.isEmpty()) {
if (!isFirstAttrib) {
jw.writeByte(COMMA);
}
writeFieldName("elastic.profiler_stack_trace_ids");
jw.writeByte(ARRAY_START);
for (int i = 0; i < profilingStackTraceIds.size(); i++) {
if (i != 0) {
jw.writeByte(COMMA);
}
jw.writeByte(QUOTE);
profilingStackTraceIds.get(i).writeAsBase64UrlSafe(jw);
jw.writeByte(QUOTE);
}
jw.writeByte(ARRAY_END);
}
jw.writeByte(OBJECT_END);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package co.elastic.apm.agent.report.serialize;

import com.dslplatform.json.DslJson;
import com.dslplatform.json.JsonWriter;
import org.junit.jupiter.api.Test;

import java.util.Base64;
import java.util.Random;

import static org.assertj.core.api.AssertionsForClassTypes.assertThat;

public class Base64SerializationUtilTest {

@Test
public void empty() {
JsonWriter jw = new DslJson<>(new DslJson.Settings<>()).newWriter();
Base64SerializationUtils.writeBytesAsBase64UrlSafe(new byte[0], jw);
assertThat(jw.size()).isEqualTo(0);
}

@Test
public void randomInputs() {
DslJson<Object> dslJson = new DslJson<>(new DslJson.Settings<>());

Base64.Encoder reference = Base64.getUrlEncoder().withoutPadding();

Random rnd = new Random(42);
for (int i = 0; i < 100_000; i++) {
int len = rnd.nextInt(31) + 1;
byte[] data = new byte[len];
rnd.nextBytes(data);

String expectedResult = reference.encodeToString(data);

JsonWriter jw = dslJson.newWriter();
Base64SerializationUtils.writeBytesAsBase64UrlSafe(data, jw);

assertThat(jw.toString()).isEqualTo(expectedResult);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1584,6 +1584,31 @@ void testSpanLinksSerialization() {
assertThat(parent2link.get("span_id").textValue()).isEqualTo(parent2.getTraceContext().getId().toString());
}

private static Id create128BitId(String id) {
Id idObj = Id.new128BitId();
idObj.fromHexString(id, 0);
return idObj;
}

@Test
void testProfilingStackTraceIdSerialization() {
Transaction transaction = tracer.startRootTransaction(null);

transaction.addProfilerCorrelationStackTrace(create128BitId("a1a2a3a4a5a6a7a8b1b2b3b4b5b6b7b8"));
transaction.addProfilerCorrelationStackTrace(create128BitId("c1c2c3c4c5c6c7c8d1d2d3d4d5d6d7d8"));

JsonNode transactionJson = readJsonString(writer.toJsonString(transaction));
JsonNode otel = transactionJson.get("otel");
assertThat(otel).isNotNull();
JsonNode attributes = otel.get("attributes");
assertThat(attributes).isNotNull();
JsonNode ids = attributes.get("elastic.profiler_stack_trace_ids");
assertThat(ids.isArray()).isTrue();
assertThat(ids.size()).isEqualTo(2);
assertThat(ids.get(0).asText()).isEqualTo("oaKjpKWmp6ixsrO0tba3uA");
assertThat(ids.get(1).asText()).isEqualTo("wcLDxMXGx8jR0tPU1dbX2A");
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
void testSerializeLog(boolean asString) {
Expand Down