Skip to content

Commit 7f1c919

Browse files
Otanikotaniagocs
andauthored
Add KinesisHeaderable, Add helper method to build JSON from the tracing context (#38)
* Add support for SQSEvents * Add DDLambda.getTraceContextJSONString to be able to pass context to message queues such as SQS * Add KinesisHeaderable Make SQSHeaderable.DATADOG_ATTRIBUTE_NAME public to be able to serialize DD tracing info when sending SQS messages Co-authored-by: Christopher Agocs <[email protected]>
1 parent 81cea6c commit 7f1c919

File tree

7 files changed

+289
-2
lines changed

7 files changed

+289
-2
lines changed

build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ repositories {
2525
dependencies {
2626
implementation 'com.amazonaws:aws-lambda-java-log4j:1.0.0'
2727
implementation 'com.amazonaws:aws-lambda-java-events:2.2.7'
28+
compileOnly 'com.amazonaws:aws-java-sdk-kinesis:1.11.980'
2829
implementation 'com.google.code.gson:gson:2.8.6'
2930
implementation 'org.apache.httpcomponents:httpclient:4.5.11'
3031
implementation 'com.squareup.okhttp3:okhttp:4.4.0'
@@ -40,6 +41,7 @@ dependencies {
4041
testImplementation 'org.apache.logging.log4j:log4j-api:2.13.3'
4142
testImplementation 'org.apache.logging.log4j:log4j-core:2.13.3'
4243
testImplementation 'com.github.stefanbirkner:system-rules:1.19.0'
44+
testImplementation 'com.amazonaws:aws-java-sdk-kinesis:1.11.980'
4345

4446
}
4547

src/main/java/com/datadoghq/datadog_lambda_java/DDLambda.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -481,6 +481,34 @@ public String getTraceContextString() {
481481
return formatTraceContext(this.tracing.TRACE_ID_KEY, traceID, this.tracing.SPAN_ID_KEY, spanID);
482482
}
483483

484+
/**
485+
* Get the trace context as a JSON string. This can be used to inject tracing context
486+
* into message queues. SQS Example:
487+
* <pre> {@code
488+
* class X {
489+
* // ...
490+
*
491+
* public void sendMessageToSQS(DDLambda ddl) {
492+
* SendMessageRequest sendMessageRequest = new SendMessageRequest()
493+
* .withQueueUrl("somewhere")
494+
* .withMessageBody("foobar")
495+
* .withMessageAttributes(Map.of(
496+
* SQSHeaderable.DATADOG_ATTRIBUTE_NAME, new MessageAttributeValue().withStringValue(ddLambda.getTraceContextJSONString())
497+
* ));
498+
* sqs.sendMessage(sendMessageRequest);
499+
* }
500+
* }}</pre>
501+
* @return a JSON string representation of the current trace context. Empty string if tracing IDs are not found.
502+
*/
503+
public String getTraceContextJSONString() {
504+
String json = this.tracing.makeOutboundJson();
505+
if (json == null || json.isEmpty()) {
506+
DDLogger.getLoggerImpl().debug("No Trace/Log correlation IDs returned");
507+
return "";
508+
}
509+
return json;
510+
}
511+
484512
private String formatTraceContext(String traceKey, String trace, String spanKey, String span) {
485513
return String.format("[%s=%s %s=%s]", traceKey, trace, spanKey, span);
486514
}
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
package com.datadoghq.datadog_lambda_java;
2+
3+
import com.amazonaws.services.lambda.runtime.events.KinesisEvent;
4+
import com.google.gson.Gson;
5+
import com.google.gson.JsonSyntaxException;
6+
import com.google.gson.annotations.SerializedName;
7+
import java.nio.ByteBuffer;
8+
import java.nio.charset.StandardCharsets;
9+
import java.util.Collections;
10+
import java.util.HashMap;
11+
import java.util.Map;
12+
13+
/**
14+
* KinesisHeaderable extracts DD's attributes from Kinesis Events. The conventions are mirrored to those
15+
* defined in datadog-lambda-js library.
16+
*/
17+
public class KinesisHeaderable implements Headerable {
18+
19+
public static final String DATADOG_ATTRIBUTE_NAME = "_datadog";
20+
21+
private Map<String, String> headers;
22+
23+
public KinesisHeaderable(KinesisEvent event) {
24+
if (event != null &&
25+
(event.getRecords() != null) &&
26+
!event.getRecords().isEmpty() &&
27+
(event.getRecords().get(0).getKinesis() != null) &&
28+
(event.getRecords().get(0).getKinesis().getData() != null)) {
29+
ByteBuffer firstRecordPayload = event.getRecords().get(0).getKinesis().getData();
30+
Gson g = new Gson();
31+
try {
32+
String payloadStr = new String(firstRecordPayload.array(), StandardCharsets.UTF_8);
33+
KinesisPayload payload = g.fromJson(payloadStr, KinesisPayload.class);
34+
this.headers = payload.datadogTracingInfo.asMap();
35+
} catch (JsonSyntaxException jse) {
36+
this.headers = Collections.emptyMap();
37+
}
38+
} else {
39+
this.headers = Collections.emptyMap();
40+
}
41+
}
42+
43+
@Override
44+
public Map<String, String> getHeaders() {
45+
return this.headers;
46+
}
47+
48+
@Override
49+
public void setHeaders(Map<String, String> headers) {
50+
this.headers = headers;
51+
}
52+
53+
private static class KinesisPayload {
54+
55+
@SerializedName(value = "_datadog")
56+
private DatadogTracingInfo datadogTracingInfo;
57+
}
58+
59+
private static class DatadogTracingInfo {
60+
61+
@SerializedName(value = "x-datadog-trace-id")
62+
private String traceID;
63+
@SerializedName(value = "x-datadog-parent-id")
64+
private String parentID;
65+
@SerializedName(value = "x-datadog-sampling-priority")
66+
private String samplingPriority;
67+
68+
Map<String, String> asMap() {
69+
if (traceID == null || parentID == null) {
70+
return Collections.emptyMap();
71+
}
72+
Map<String, String> tracingInfo = new HashMap<>();
73+
tracingInfo.put("x-datadog-trace-id", traceID);
74+
tracingInfo.put("x-datadog-parent-id", parentID);
75+
tracingInfo.put("x-datadog-sampling-priority", samplingPriority == null ? "2" : samplingPriority);
76+
77+
return Collections.unmodifiableMap(tracingInfo);
78+
}
79+
}
80+
}

src/main/java/com/datadoghq/datadog_lambda_java/SQSHeaderable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
*/
1414
public class SQSHeaderable implements Headerable {
1515

16-
static final String DATADOG_ATTRIBUTE_NAME = "_datadog";
16+
public static final String DATADOG_ATTRIBUTE_NAME = "_datadog";
1717
private static final Type HEADERS_GSON_TYPE = new TypeToken<Map<String, String>>(){}.getType();
1818

1919
private Map<String, String> headers;

src/main/java/com/datadoghq/datadog_lambda_java/Tracing.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,12 @@ protected Map<String,String> makeOutboundHttpTraceHeaders(){
139139

140140
return traceHeaders;
141141
}
142+
143+
protected String makeOutboundJson(){
144+
Map<String, String> headers = makeOutboundHttpTraceHeaders();
145+
Gson g = new Gson();
146+
return g.toJson(headers);
147+
}
142148
}
143149

144150

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
package com.datadoghq.datadog_lambda_java;
2+
3+
import static java.util.Collections.singletonList;
4+
import static org.junit.Assert.assertTrue;
5+
6+
import com.amazonaws.services.lambda.runtime.events.KinesisEvent;
7+
import com.amazonaws.services.lambda.runtime.events.KinesisEvent.KinesisEventRecord;
8+
import java.nio.ByteBuffer;
9+
import java.nio.charset.StandardCharsets;
10+
import java.util.HashMap;
11+
import java.util.Map;
12+
import org.junit.Assert;
13+
import org.junit.Test;
14+
15+
public class KinesisHeaderableTest {
16+
17+
@Test
18+
public void testParsingNullKinesisEvent_returnsEmptyHeaders() {
19+
Map<String, String> headers = new KinesisHeaderable(null).getHeaders();
20+
21+
assertTrue(headers.isEmpty());
22+
}
23+
24+
@Test
25+
public void testParsingKinesisEventNoRecords_returnsEmptyHeaders() {
26+
KinesisEvent event = new KinesisEvent();
27+
28+
Map<String, String> headers = new KinesisHeaderable(event).getHeaders();
29+
30+
assertTrue(headers.isEmpty());
31+
}
32+
33+
@Test
34+
public void testParsingKinesisEventFirstRecordHasNoDatadogAttributes_returnsEmptyHeaders() {
35+
KinesisEvent event = new KinesisEvent();
36+
event.setRecords(singletonList(new KinesisEventRecord()));
37+
38+
Map<String, String> headers = new KinesisHeaderable(event).getHeaders();
39+
40+
assertTrue(headers.isEmpty());
41+
}
42+
43+
@Test
44+
public void testParsingKinesisEventWithRecordWithDatadogAttributes_returnsHeaders() {
45+
KinesisEvent event = new KinesisEvent();
46+
KinesisEventRecord kinesisEventRecord = new KinesisEventRecord();
47+
KinesisEvent.Record record = new KinesisEvent.Record();
48+
String payload = "{"
49+
+ " \"_datadog\": {"
50+
+ " \"x-datadog-trace-id\": \"123\","
51+
+ " \"x-datadog-parent-id\": \"456\","
52+
+ " \"x-datadog-sampling-priority\": \"4\""
53+
+ " },"
54+
+ " \"payload\": {"
55+
+ " \"foo\": \"bar\""
56+
+ " }"
57+
+ "}";
58+
record.setData(ByteBuffer.wrap(payload.getBytes(StandardCharsets.UTF_8)));
59+
kinesisEventRecord.setKinesis(record);
60+
event.setRecords(singletonList(kinesisEventRecord));
61+
62+
Map<String, String> headers = new KinesisHeaderable(event).getHeaders();
63+
64+
Map<String, String> expectedHeaders = new HashMap<>();
65+
expectedHeaders.put("x-datadog-trace-id", "123");
66+
expectedHeaders.put("x-datadog-parent-id", "456");
67+
expectedHeaders.put("x-datadog-sampling-priority", "4");
68+
69+
Assert.assertEquals(expectedHeaders, headers);
70+
}
71+
72+
@Test
73+
public void testParsingKinesisEventWithRecordWithoutTraceID_returnsEmptyHeaders() {
74+
KinesisEvent event = new KinesisEvent();
75+
KinesisEventRecord kinesisEventRecord = new KinesisEventRecord();
76+
KinesisEvent.Record record = new KinesisEvent.Record();
77+
String payload = "{"
78+
+ " \"_datadog\": {"
79+
+ " \"x-datadog-parent-id\": \"456\","
80+
+ " \"x-datadog-sampling-priority\": \"2\""
81+
+ " },"
82+
+ " \"payload\": {"
83+
+ " \"foo\": \"bar\""
84+
+ " }"
85+
+ "}";
86+
record.setData(ByteBuffer.wrap(payload.getBytes(StandardCharsets.UTF_8)));
87+
kinesisEventRecord.setKinesis(record);
88+
event.setRecords(singletonList(kinesisEventRecord));
89+
90+
Map<String, String> headers = new KinesisHeaderable(event).getHeaders();
91+
92+
assertTrue(headers.isEmpty());
93+
}
94+
95+
@Test
96+
public void testParsingKinesisEventWithRecordWithoutSamplingPriority_returnsHeadersWithDefaultSamplingPriority() {
97+
KinesisEvent event = new KinesisEvent();
98+
KinesisEventRecord kinesisEventRecord = new KinesisEventRecord();
99+
KinesisEvent.Record record = new KinesisEvent.Record();
100+
String payload = "{"
101+
+ " \"_datadog\": {"
102+
+ " \"x-datadog-trace-id\": \"123\","
103+
+ " \"x-datadog-parent-id\": \"456\""
104+
+ " },"
105+
+ " \"payload\": {"
106+
+ " \"foo\": \"bar\""
107+
+ " }"
108+
+ "}";
109+
record.setData(ByteBuffer.wrap(payload.getBytes(StandardCharsets.UTF_8)));
110+
kinesisEventRecord.setKinesis(record);
111+
event.setRecords(singletonList(kinesisEventRecord));
112+
113+
Map<String, String> headers = new KinesisHeaderable(event).getHeaders();
114+
115+
Map<String, String> expectedHeaders = new HashMap<>();
116+
expectedHeaders.put("x-datadog-trace-id", "123");
117+
expectedHeaders.put("x-datadog-parent-id", "456");
118+
expectedHeaders.put("x-datadog-sampling-priority", "2");
119+
120+
Assert.assertEquals(expectedHeaders, headers);
121+
}
122+
123+
@Test
124+
public void testParsingKinesisEventWithRecordWithMalformedJson_ReturnsEmptyHeadsers() {
125+
KinesisEvent event = new KinesisEvent();
126+
KinesisEventRecord kinesisEventRecord = new KinesisEventRecord();
127+
KinesisEvent.Record record = new KinesisEvent.Record();
128+
String payload = "NOT A JSON";
129+
record.setData(ByteBuffer.wrap(payload.getBytes(StandardCharsets.UTF_8)));
130+
kinesisEventRecord.setKinesis(record);
131+
event.setRecords(singletonList(kinesisEventRecord));
132+
133+
Map<String, String> headers = new KinesisHeaderable(event).getHeaders();
134+
135+
assertTrue(headers.isEmpty());
136+
}
137+
}

src/test/java/com/datadoghq/datadog_lambda_java/TracingTest.java

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
package com.datadoghq.datadog_lambda_java;
22

3+
import com.google.gson.Gson;
4+
import com.google.gson.reflect.TypeToken;
5+
import java.lang.reflect.Type;
6+
import java.util.HashMap;
37
import org.junit.Assert;
48
import org.junit.Test;
59

@@ -51,4 +55,34 @@ public void makeOutboundHttpTraceHeaders_no_parent() {
5155

5256
Assert.assertEquals(0, headers.size());
5357
}
54-
}
58+
59+
@Test
60+
public void makeOutboundJson() {
61+
DDTraceContext cxt = new DDTraceContext();
62+
XRayTraceContext xrt = new XRayTraceContext();
63+
64+
cxt.samplingPriority = "2";
65+
cxt.parentID = "32342354323445";
66+
cxt.traceID = "12344567890";
67+
68+
xrt.parentId = "53995c3f42cd8ad8";
69+
xrt.traceId = "1-5e41a79d-e6a0db584029dba86a594b7e";
70+
71+
Tracing t = new Tracing();
72+
t.cxt = cxt;
73+
t.xrt = xrt;
74+
75+
String json = t.makeOutboundJson();
76+
77+
Type type = new TypeToken<Map<String, String>>(){}.getType();
78+
Gson g = new Gson();
79+
Map<String, String> tracingInfo = g.fromJson(json, type);
80+
81+
Map<String, String> expectedTracingInfo = new HashMap<>();
82+
expectedTracingInfo.put(cxt.ddTraceKey, cxt.traceID);
83+
expectedTracingInfo.put(cxt.ddParentKey, "6023947403358210776");
84+
expectedTracingInfo.put(cxt.ddSamplingKey, cxt.samplingPriority);
85+
86+
Assert.assertEquals(expectedTracingInfo, tracingInfo);
87+
}
88+
}

0 commit comments

Comments
 (0)