Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ Use subheadings with the "=====" level for adding notes for unreleased changes:
[float]
===== Features
* Added option to make routing-key part of RabbitMQ transaction/span names - {pull}3636[#3636]
* Added internal option for capturing request bodies for apache httpclient v4 - {pull}3692[#3692]

[[release-notes-1.x]]
=== Java Agent version 1.x
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package co.elastic.apm.agent.impl.context;

import co.elastic.apm.agent.objectpool.Resetter;
import co.elastic.apm.agent.objectpool.impl.QueueBasedObjectPool;
import co.elastic.apm.agent.tracer.configuration.WebConfiguration;
import co.elastic.apm.agent.tracer.metadata.BodyCapture;
import co.elastic.apm.agent.tracer.pooling.Allocator;
import co.elastic.apm.agent.tracer.pooling.ObjectPool;
import co.elastic.apm.agent.tracer.pooling.Recyclable;
import org.jctools.queues.atomic.MpmcAtomicArrayQueue;

import javax.annotation.Nullable;
import java.nio.ByteBuffer;

public class BodyCaptureImpl implements BodyCapture, Recyclable {
private static final ObjectPool<ByteBuffer> BYTE_BUFFER_POOL = QueueBasedObjectPool.of(new MpmcAtomicArrayQueue<ByteBuffer>(128), false,
new Allocator<ByteBuffer>() {
@Override
public ByteBuffer createInstance() {
return ByteBuffer.allocate(WebConfiguration.MAX_BODY_CAPTURE_BYTES);
}
},
new Resetter<ByteBuffer>() {
@Override
public void recycle(ByteBuffer object) {
object.clear();
}
});

private enum CaptureState {
NOT_ELIGIBLE,
ELIGIBLE,
STARTED
}

private volatile CaptureState state;

private final StringBuilder charset;

/**
* The maximum number of bytes to capture, if the body is longer remaining bytes will be dropped.
*/
private int numBytesToCapture;

@Nullable
private ByteBuffer bodyBuffer;

BodyCaptureImpl() {
charset = new StringBuilder();
resetState();
}

@Override
public void resetState() {
state = CaptureState.NOT_ELIGIBLE;
charset.setLength(0);
if (bodyBuffer != null) {
BYTE_BUFFER_POOL.recycle(bodyBuffer);
}
}

@Override
public void markEligibleForCapturing() {
if (state == CaptureState.NOT_ELIGIBLE) {
synchronized (this) {
if (state == CaptureState.NOT_ELIGIBLE) {
state = CaptureState.ELIGIBLE;
}
}
}
}

@Override
public boolean isEligibleForCapturing() {
return state != CaptureState.NOT_ELIGIBLE;
}

@Override
public boolean startCapture(@Nullable String requestCharset, int numBytesToCapture) {
if (numBytesToCapture > WebConfiguration.MAX_BODY_CAPTURE_BYTES) {
throw new IllegalArgumentException("Capturing " + numBytesToCapture + " bytes is not supported, maximum is " + WebConfiguration.MAX_BODY_CAPTURE_BYTES + " bytes");
}
if (state == CaptureState.ELIGIBLE) {
synchronized (this) {
if (state == CaptureState.ELIGIBLE) {
if (requestCharset != null) {
this.charset.append(requestCharset);
}
this.numBytesToCapture = numBytesToCapture;
state = CaptureState.STARTED;
return true;
}
}
}
return false;
}

private void acquireBodyBufferIfRequired() {
if (state != CaptureState.STARTED) {
throw new IllegalStateException("Capturing has not been started!");
}
if (bodyBuffer == null) {
bodyBuffer = BYTE_BUFFER_POOL.createInstance();
}
}

@Override
public void append(byte b) {
acquireBodyBufferIfRequired();
if (!isFull()) {
bodyBuffer.put(b);
}
}

@Override
public void append(byte[] b, int offset, int len) {
acquireBodyBufferIfRequired();
int remaining = numBytesToCapture - bodyBuffer.position();
if (remaining > 0) {
bodyBuffer.put(b, offset, Math.min(len, remaining));
}
}

@Override
public boolean isFull() {
if (bodyBuffer == null) {
return false;
}
return bodyBuffer.position() >= numBytesToCapture;
}

@Nullable
public CharSequence getCharset() {
if (charset.length() == 0) {
return null;
}
return charset;
}

@Nullable
public ByteBuffer getBody() {
return bodyBuffer;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ public class HttpImpl implements Recyclable, Http {
*/
private final UrlImpl url = new UrlImpl();

private final BodyCaptureImpl requestBody = new BodyCaptureImpl();

/**
* HTTP method used by this HTTP outgoing span
*/
Expand Down Expand Up @@ -65,6 +67,11 @@ public int getStatusCode() {
return statusCode;
}

@Override
public BodyCaptureImpl getRequestBody() {
return requestBody;
}

@Override
public HttpImpl withUrl(@Nullable String url) {
if (url != null) {
Expand All @@ -88,6 +95,7 @@ public HttpImpl withStatusCode(int statusCode) {
@Override
public void resetState() {
url.resetState();
requestBody.resetState();
method = null;
statusCode = 0;
}
Expand All @@ -98,9 +106,4 @@ public boolean hasContent() {
statusCode > 0;
}

public void copyFrom(HttpImpl other) {
url.copyFrom(other.url);
method = other.method;
statusCode = other.statusCode;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,23 @@
*/
package co.elastic.apm.agent.report.serialize;

import co.elastic.apm.agent.impl.context.*;
import co.elastic.apm.agent.impl.context.AbstractContextImpl;
import co.elastic.apm.agent.impl.context.BodyCaptureImpl;
import co.elastic.apm.agent.impl.context.CloudOriginImpl;
import co.elastic.apm.agent.impl.context.DbImpl;
import co.elastic.apm.agent.impl.context.DestinationImpl;
import co.elastic.apm.agent.impl.context.Headers;
import co.elastic.apm.agent.impl.context.HttpImpl;
import co.elastic.apm.agent.impl.context.MessageImpl;
import co.elastic.apm.agent.impl.context.RequestImpl;
import co.elastic.apm.agent.impl.context.ResponseImpl;
import co.elastic.apm.agent.impl.context.ServiceOriginImpl;
import co.elastic.apm.agent.impl.context.ServiceTargetImpl;
import co.elastic.apm.agent.impl.context.SocketImpl;
import co.elastic.apm.agent.impl.context.SpanContextImpl;
import co.elastic.apm.agent.impl.context.TransactionContextImpl;
import co.elastic.apm.agent.impl.context.UrlImpl;
import co.elastic.apm.agent.impl.context.UserImpl;
import co.elastic.apm.agent.impl.error.ErrorCaptureImpl;
import co.elastic.apm.agent.impl.metadata.Agent;
import co.elastic.apm.agent.impl.metadata.CloudProviderInfo;
Expand All @@ -33,16 +48,26 @@
import co.elastic.apm.agent.impl.metadata.ServiceImpl;
import co.elastic.apm.agent.impl.metadata.SystemInfo;
import co.elastic.apm.agent.impl.stacktrace.StacktraceConfigurationImpl;
import co.elastic.apm.agent.impl.transaction.*;
import co.elastic.apm.agent.impl.transaction.AbstractSpanImpl;
import co.elastic.apm.agent.impl.transaction.Composite;
import co.elastic.apm.agent.impl.transaction.DroppedSpanStats;
import co.elastic.apm.agent.impl.transaction.FaasImpl;
import co.elastic.apm.agent.impl.transaction.FaasTriggerImpl;
import co.elastic.apm.agent.impl.transaction.IdImpl;
import co.elastic.apm.agent.impl.transaction.OTelSpanKind;
import co.elastic.apm.agent.impl.transaction.SpanCount;
import co.elastic.apm.agent.impl.transaction.SpanImpl;
import co.elastic.apm.agent.tracer.metrics.Labels;
import co.elastic.apm.agent.impl.transaction.StackFrame;
import co.elastic.apm.agent.impl.transaction.TraceContextImpl;
import co.elastic.apm.agent.impl.transaction.TransactionImpl;
import co.elastic.apm.agent.report.ApmServerClient;
import co.elastic.apm.agent.sdk.internal.collections.LongList;
import co.elastic.apm.agent.sdk.logging.Logger;
import co.elastic.apm.agent.sdk.logging.LoggerFactory;
import co.elastic.apm.agent.tracer.metadata.PotentiallyMultiValuedMap;
import co.elastic.apm.agent.tracer.pooling.Recyclable;
import co.elastic.apm.agent.tracer.metrics.DslJsonUtil;
import co.elastic.apm.agent.tracer.metrics.Labels;
import co.elastic.apm.agent.tracer.pooling.Recyclable;
import com.dslplatform.json.BoolConverter;
import com.dslplatform.json.DslJson;
import com.dslplatform.json.JsonWriter;
Expand All @@ -54,6 +79,7 @@
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
Expand Down Expand Up @@ -1028,21 +1054,36 @@ private void serializeSpanLinks(List<TraceContextImpl> spanLinks) {
}

private void serializeOTel(SpanImpl span) {
serializeOtel(span, Collections.<IdImpl>emptyList());
serializeOtel(span, Collections.<IdImpl>emptyList(), requestBodyToString(span.getContext().getHttp().getRequestBody()));
}

@Nullable
private CharSequence requestBodyToString(BodyCaptureImpl requestBody) {
//TODO: perform proper, charset aware conversion to string
ByteBuffer buffer = requestBody.getBody();
if (buffer == null || buffer.position() == 0) {
return null;
}
buffer.flip();
StringBuilder result = new StringBuilder();
while (buffer.hasRemaining()) {
result.append((char) buffer.get());
}
return result;
}

private void serializeOTel(TransactionImpl transaction) {
List<IdImpl> profilingCorrelationStackTraceIds = transaction.getProfilingCorrelationStackTraceIds();
synchronized (profilingCorrelationStackTraceIds) {
serializeOtel(transaction, profilingCorrelationStackTraceIds);
serializeOtel(transaction, profilingCorrelationStackTraceIds, null);
}
}

private void serializeOtel(AbstractSpanImpl<?> span, List<IdImpl> profilingStackTraceIds) {
private void serializeOtel(AbstractSpanImpl<?> span, List<IdImpl> profilingStackTraceIds, @Nullable CharSequence httpRequestBody) {
OTelSpanKind kind = span.getOtelKind();
Map<String, Object> attributes = span.getOtelAttributes();

boolean hasAttributes = !attributes.isEmpty() || !profilingStackTraceIds.isEmpty();
boolean hasAttributes = !attributes.isEmpty() || !profilingStackTraceIds.isEmpty() || httpRequestBody != null;
boolean hasKind = kind != null;
if (hasKind || hasAttributes) {
writeFieldName("otel");
Expand Down Expand Up @@ -1092,6 +1133,13 @@ private void serializeOtel(AbstractSpanImpl<?> span, List<IdImpl> profilingStack
}
jw.writeByte(ARRAY_END);
}
if (httpRequestBody != null) {
if (!isFirstAttrib) {
jw.writeByte(COMMA);
}
writeFieldName("http.request.body.content");
jw.writeString(httpRequestBody);
}
jw.writeByte(OBJECT_END);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package co.elastic.apm.agent.impl.context;

import org.junit.jupiter.api.Test;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;

import static co.elastic.apm.agent.testutils.assertions.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

public class BodyCaptureTest {

@Test
public void testAppendTruncation() {
BodyCaptureImpl capture = new BodyCaptureImpl();
capture.markEligibleForCapturing();
capture.startCapture("foobar", 10);
assertThat(capture.isFull()).isFalse();

capture.append("123Hello World!".getBytes(StandardCharsets.UTF_8), 3, 5);
assertThat(capture.isFull()).isFalse();

capture.append(" from the other side".getBytes(StandardCharsets.UTF_8), 0, 20);
assertThat(capture.isFull()).isTrue();

ByteBuffer content = capture.getBody();
int size = content.position();
byte[] contentBytes = new byte[size];
content.position(0);
content.get(contentBytes);

assertThat(contentBytes).isEqualTo("Hello from".getBytes(StandardCharsets.UTF_8));
}

@Test
public void testLifecycle() {
BodyCaptureImpl capture = new BodyCaptureImpl();

assertThat(capture.isEligibleForCapturing()).isFalse();
assertThat(capture.startCapture("foobar", 42))
.isFalse();
assertThatThrownBy(() -> capture.append((byte) 42)).isInstanceOf(IllegalStateException.class);

capture.markEligibleForCapturing();
assertThat(capture.isEligibleForCapturing()).isTrue();
assertThatThrownBy(() -> capture.append((byte) 42)).isInstanceOf(IllegalStateException.class);

assertThat(capture.startCapture("foobar", 42))
.isTrue();
capture.append((byte) 42); //ensure no exception thrown

// startCapture should return true only once
assertThat(capture.startCapture("foobar", 42))
.isFalse();
}
}
Loading