Skip to content
Merged
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
muzzle {
pass {
group = "software.amazon.awssdk"
module = "eventbridge"
versions = "[2.7,3)"
assertInverse = true
}
}

apply from: "$rootDir/gradle/java.gradle"

addTestSuiteForDir('latestDepTest', 'test')
addTestSuiteExtendingForDir('latestDepForkedTest', 'latestDepTest', 'test')

dependencies {
compileOnly group: 'software.amazon.awssdk', name: 'eventbridge', version: '2.27.19'

// Include httpclient instrumentation for testing because it is a dependency for aws-sdk.
testImplementation project(':dd-java-agent:instrumentation:apache-httpclient-4')
testImplementation project(':dd-java-agent:instrumentation:aws-java-sdk-2.2')
testImplementation 'software.amazon.awssdk:eventbridge:2.27.23'
// SQS and SNS are used to act as the "targets" of the EB bus.
testImplementation 'software.amazon.awssdk:sqs:2.27.23'
testImplementation 'software.amazon.awssdk:sns:2.27.23'
testImplementation 'org.testcontainers:localstack:1.20.1'

latestDepTestImplementation group: 'software.amazon.awssdk', name: 'eventbridge', version: '+'
}

tasks.withType(Test).configureEach {
usesService(testcontainersLimit)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package datadog.trace.instrumentation.aws.v2.eventbridge;

import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;

import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.agent.tooling.InstrumenterModule;
import java.util.List;
import net.bytebuddy.asm.Advice;
import software.amazon.awssdk.core.interceptor.ExecutionInterceptor;

@AutoService(InstrumenterModule.class)
public final class EventBridgeClientInstrumentation extends InstrumenterModule.Tracing
implements Instrumenter.ForSingleType {
public EventBridgeClientInstrumentation() {
super("eventbridge");
}

@Override
public String instrumentedType() {
return "software.amazon.awssdk.core.client.builder.SdkDefaultClientBuilder";
}

@Override
public void methodAdvice(MethodTransformer transformer) {
transformer.applyAdvice(
isMethod().and(named("resolveExecutionInterceptors")),
EventBridgeClientInstrumentation.class.getName() + "$AwsEventBridgeBuilderAdvice");
}

@Override
public String[] helperClassNames() {
return new String[] {
packageName + ".EventBridgeInterceptor", packageName + ".TextMapInjectAdapter"
};
}

public static class AwsEventBridgeBuilderAdvice {
@Advice.OnMethodExit(suppress = Throwable.class)
public static void addHandler(@Advice.Return final List<ExecutionInterceptor> interceptors) {
for (ExecutionInterceptor interceptor : interceptors) {
if (interceptor instanceof EventBridgeInterceptor) {
return; // list already has our interceptor, return to builder
}
}
interceptors.add(new EventBridgeInterceptor());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package datadog.trace.instrumentation.aws.v2.eventbridge;

import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.propagate;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.traceConfig;
import static datadog.trace.core.datastreams.TagsProcessor.BUS_TAG;
import static datadog.trace.core.datastreams.TagsProcessor.DIRECTION_OUT;
import static datadog.trace.core.datastreams.TagsProcessor.DIRECTION_TAG;
import static datadog.trace.core.datastreams.TagsProcessor.TYPE_TAG;
import static datadog.trace.instrumentation.aws.v2.eventbridge.TextMapInjectAdapter.SETTER;

import datadog.trace.bootstrap.InstanceStore;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.api.PathwayContext;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.SdkRequest;
import software.amazon.awssdk.core.interceptor.Context;
import software.amazon.awssdk.core.interceptor.ExecutionAttribute;
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
import software.amazon.awssdk.core.interceptor.ExecutionInterceptor;
import software.amazon.awssdk.services.eventbridge.model.PutEventsRequest;
import software.amazon.awssdk.services.eventbridge.model.PutEventsRequestEntry;

public class EventBridgeInterceptor implements ExecutionInterceptor {
private static final Logger log = LoggerFactory.getLogger(EventBridgeInterceptor.class);

public static final ExecutionAttribute<AgentSpan> SPAN_ATTRIBUTE =
InstanceStore.of(ExecutionAttribute.class)
.putIfAbsent("DatadogSpan", () -> new ExecutionAttribute<>("DatadogSpan"));

private static final String START_TIME_KEY = "x-datadog-start-time";
private static final String RESOURCE_NAME_KEY = "x-datadog-resource-name";

@Override
public SdkRequest modifyRequest(
Context.ModifyRequest context, ExecutionAttributes executionAttributes) {
if (!(context.request() instanceof PutEventsRequest)) {
return context.request();
}

PutEventsRequest request = (PutEventsRequest) context.request();
List<PutEventsRequestEntry> modifiedEntries = new ArrayList<>(request.entries().size());
long startTime = System.currentTimeMillis();

for (PutEventsRequestEntry entry : request.entries()) {
StringBuilder detailBuilder = new StringBuilder(entry.detail().trim());
if (detailBuilder.length() == 0) {
detailBuilder.append("{}");
}
if (detailBuilder.charAt(detailBuilder.length() - 1) != '}') {
log.debug(
"Unable to parse detail JSON. Not injecting trace context into EventBridge payload.");
modifiedEntries.add(entry); // Add the original entry without modification
continue;
}

String traceContext =
getTraceContextToInject(executionAttributes, entry.eventBusName(), startTime);
detailBuilder.setLength(detailBuilder.length() - 1); // Remove the last bracket
if (detailBuilder.length() > 1) {
detailBuilder.append(", "); // Only add a comma if detail is not empty.
}

detailBuilder
.append("\"")
.append(PathwayContext.DATADOG_KEY)
.append("\": ")
.append(traceContext)
.append('}');

String modifiedDetail = detailBuilder.toString();
PutEventsRequestEntry modifiedEntry = entry.toBuilder().detail(modifiedDetail).build();
modifiedEntries.add(modifiedEntry);
}

return request.toBuilder().entries(modifiedEntries).build();
}

private String getTraceContextToInject(
ExecutionAttributes executionAttributes, String eventBusName, long startTime) {
final AgentSpan span = executionAttributes.getAttribute(SPAN_ATTRIBUTE);
StringBuilder jsonBuilder = new StringBuilder();
jsonBuilder.append('{');

// Inject trace context
propagate().inject(span, jsonBuilder, SETTER);

if (traceConfig().isDataStreamsEnabled()) {
propagate().injectPathwayContext(span, jsonBuilder, SETTER, getTags(eventBusName));
}

// Add bus name and start time
jsonBuilder
.append(" \"")
.append(START_TIME_KEY)
.append("\": \"")
.append(startTime)
.append("\", ");
jsonBuilder
.append(" \"")
.append(RESOURCE_NAME_KEY)
.append("\": \"")
.append(eventBusName)
.append("\"");

jsonBuilder.append('}');
return jsonBuilder.toString();
}

private LinkedHashMap<String, String> getTags(String eventBusName) {
LinkedHashMap<String, String> sortedTags = new LinkedHashMap<>();
sortedTags.put(DIRECTION_TAG, DIRECTION_OUT);
sortedTags.put(BUS_TAG, eventBusName);
sortedTags.put(TYPE_TAG, "bus");

return sortedTags;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package datadog.trace.instrumentation.aws.v2.eventbridge;

import datadog.trace.bootstrap.instrumentation.api.AgentPropagation;

public class TextMapInjectAdapter implements AgentPropagation.Setter<StringBuilder> {

public static final TextMapInjectAdapter SETTER = new TextMapInjectAdapter();

@Override
public void set(final StringBuilder builder, final String key, final String value) {
builder.append('"').append(key).append("\":\"").append(value).append("\",");
}
}
Loading