Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ dependencies {
implementation 'org.jetbrains:annotations:15.0'
implementation 'io.opentracing:opentracing-api:0.33.0'
implementation 'io.opentracing:opentracing-util:0.33.0'
implementation 'com.datadoghq:dd-trace-api:0.72.0'
implementation 'com.datadoghq:dd-trace-api:0.89.0'
implementation 'com.datadoghq:java-dogstatsd-client:2.13.0'

// Use JUnit test framework
testImplementation 'junit:junit:4.12'
Expand All @@ -43,6 +44,8 @@ dependencies {
testImplementation 'com.github.stefanbirkner:system-rules:1.19.0'
testImplementation 'com.amazonaws:aws-java-sdk-kinesis:1.11.980'

// Use wiremock for stubbing http calls
testCompile 'com.github.tomakehurst:wiremock-jre8:2.31.0'
}

sourceCompatibility = 1.8
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,18 @@ public void write(){
MetricWriter writer = MetricWriter.getMetricWriterImpl();
writer.write(this);
}

public String getName() {
return this.name;
}

public double getValue() {
return this.value;
}

public Map<String, Object> getTags() {
return this.tags;
}
}

class PersistedCustomMetric{
Expand Down
41 changes: 31 additions & 10 deletions src/main/java/com/datadoghq/datadog_lambda_java/DDLambda.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,28 @@ public class DDLambda {
private Tracing tracing;
private boolean enhanced = true;
private Scope tracingScope;
private boolean shouldUseExtension = false;


/**
* Private constructor, called from existing constructors to detect whether the extension is present
*/
private DDLambda() {
this.shouldUseExtension = Extension.setup();
if(this.shouldUseExtension) {
DDLogger.getLoggerImpl().debug("Setting the writer to extension");
ExtensionMetricWriter emw = new ExtensionMetricWriter();
MetricWriter.setMetricWriter(emw);
}
}

/**
* Create a new DDLambda instrumenter given some Lambda context
*
* @param cxt Enhanced Metrics pulls information from the Lambda context.
*/
public DDLambda(Context cxt) {
this();
this.tracing = new Tracing();
this.enhanced = checkEnhanced();
recordEnhanced(INVOCATION, cxt);
Expand All @@ -63,6 +78,7 @@ public DDLambda(Context cxt) {
* @param xrayTraceInfo This would normally be the contents of the "_X_AMZN_TRACE_ID" env var
*/
protected DDLambda(Context cxt, String xrayTraceInfo) {
this();
this.tracing = new Tracing(xrayTraceInfo);
this.enhanced = checkEnhanced();
recordEnhanced(INVOCATION, cxt);
Expand All @@ -78,6 +94,7 @@ protected DDLambda(Context cxt, String xrayTraceInfo) {
* @param cxt Enhanced Metrics pulls information from the Lambda context.
*/
public DDLambda(APIGatewayProxyRequestEvent req, Context cxt) {
this();
this.enhanced = checkEnhanced();
recordEnhanced(INVOCATION, cxt);
this.tracing = new Tracing(req);
Expand All @@ -94,6 +111,7 @@ public DDLambda(APIGatewayProxyRequestEvent req, Context cxt) {
* @param cxt Enhanced Metrics pulls information from the Lambda context.
*/
public DDLambda(APIGatewayV2ProxyRequestEvent req, Context cxt) {
this();
this.enhanced = checkEnhanced();
recordEnhanced(INVOCATION, cxt);
this.tracing = new Tracing(req);
Expand All @@ -110,6 +128,7 @@ public DDLambda(APIGatewayV2ProxyRequestEvent req, Context cxt) {
* @param cxt Enhanced Metrics pulls information from the Lambda context.
*/
public DDLambda(SQSEvent event, Context cxt) {
this();
this.enhanced = checkEnhanced();
recordEnhanced(INVOCATION, cxt);
SQSHeaderable headerable = new SQSHeaderable(event);
Expand All @@ -127,6 +146,7 @@ public DDLambda(SQSEvent event, Context cxt) {
* @param cxt Enhanced Metrics pulls information from the Lambda context.
*/
public DDLambda(KinesisEvent event, Context cxt) {
this();
this.enhanced = checkEnhanced();
recordEnhanced(INVOCATION, cxt);
KinesisHeaderable headerable = new KinesisHeaderable(event);
Expand All @@ -144,6 +164,7 @@ public DDLambda(KinesisEvent event, Context cxt) {
* @param cxt Enhanced Metrics pulls information from the Lambda context.
*/
public DDLambda(Headerable req, Context cxt) {
this();
this.enhanced = checkEnhanced();
recordEnhanced(INVOCATION, cxt);
this.tracing = new Tracing(req);
Expand Down Expand Up @@ -197,18 +218,18 @@ private void startSpan(Map<String, String> headers, Context cxt) {
*/
public void finish() {
Span span = GlobalTracer.get().activeSpan();

if (this.tracingScope == null) {
DDLogger.getLoggerImpl().debug("Unable to close tracing scope because it is null.");
return;
}
this.tracingScope.close();

if (span != null) {
span.finish();
} else {
DDLogger.getLoggerImpl().debug("Unable to finish span because it is null.");
return;
this.tracingScope.close();
if (span != null) {
span.finish();
} else {
DDLogger.getLoggerImpl().debug("Unable to finish span because it is null.");
}
}
if(this.shouldUseExtension) {
Extension.flush();
}
}

Expand Down Expand Up @@ -343,8 +364,8 @@ private void recordEnhanced(String basename, Context cxt) {
if (this.enhanced) {
metricName = ENHANCED_PREFIX + basename;
tags = EnhancedMetric.makeTagsFromContext(cxt);
new CustomMetric(metricName, 1, tags).write();
}
new CustomMetric(metricName, 1, tags).write();
}

/**
Expand Down
65 changes: 65 additions & 0 deletions src/main/java/com/datadoghq/datadog_lambda_java/Extension.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package com.datadoghq.datadog_lambda_java;

import java.io.File;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;

class Extension {

private final static String AGENT_URL = "http://127.0.0.1:8124";
private final static String HELLO_PATH = "/lambda/hello";
private final static String FLUSH_PATH = "/lambda/flush";
private final static String EXTENSION_PATH = "/opt/extensions/datadog-agent";

protected static boolean setup() {
boolean shouldUseExtension = false;
if(isExtensionRunning(EXTENSION_PATH)) {
DDLogger.getLoggerImpl().debug("Extension has been detected");
if(hitHelloRoute(AGENT_URL, HELLO_PATH)) {
shouldUseExtension = true;
} else {
DDLogger.getLoggerImpl().debug("Could not call the hello route");
}
}
return shouldUseExtension;
}

protected static void flush() {
if(!hitFlushRoute(AGENT_URL, FLUSH_PATH)) {
DDLogger.getLoggerImpl().debug("Could not call the flush routeg");
}
}

protected static boolean isExtensionRunning(final String extensionPath) {
File f = new File(extensionPath);
return (f.exists() && !f.isDirectory());
}

protected static boolean hitHelloRoute(final String agentUrl, final String helloPath) {
try {
URL url = new URL(agentUrl + helloPath);
HttpURLConnection http = (HttpURLConnection) url.openConnection();
return http.getResponseCode() == HttpURLConnection.HTTP_OK;
} catch (MalformedURLException e) {
return false;
} catch (IOException e) {
return false;
}
}

protected static boolean hitFlushRoute(final String agentUrl, final String flushPath) {
try {
URL url = new URL(agentUrl + flushPath);
HttpURLConnection http = (HttpURLConnection) url.openConnection();
http.setRequestMethod("POST");
return http.getResponseCode() == HttpURLConnection.HTTP_OK;
} catch (MalformedURLException e) {
return false;
} catch (IOException e) {
return false;
}
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.datadoghq.datadog_lambda_java;

import com.timgroup.statsd.NonBlockingStatsDClientBuilder;
import com.timgroup.statsd.StatsDClient;

abstract class MetricWriter {
private static MetricWriter IMPL;
Expand Down Expand Up @@ -33,3 +35,31 @@ public void write(CustomMetric cm){
public void flush(){}
}

class ExtensionMetricWriter extends MetricWriter{

private StatsDClient client;

public ExtensionMetricWriter() {
this.client = new NonBlockingStatsDClientBuilder()
.prefix("")
.hostname("127.0.0.1")
.port(8125)
.build();
}

@Override
public void write(CustomMetric cm){
StringBuilder tagsSb = new StringBuilder();
if (cm.getTags() != null) {
cm.getTags().forEach((k, val) ->
tagsSb.append(k.toLowerCase())
.append(":")
.append(val.toString().toLowerCase()));
}
client.distribution(cm.getName(), cm.getValue(), tagsSb.toString());
}

@Override
public void flush(){}
}

Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,17 @@

import org.junit.Test;

import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.SocketException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;

import static org.junit.Assert.*;

Expand Down Expand Up @@ -43,6 +51,57 @@ public class CustomMetricTest {
assertEquals("{\"m\":\"foo\",\"v\":24.3,\"t\":[],\"e\":1559152800}",smw.getMetricsWritten());
}

@Test public void testExtensionMetricWriter() {
Map<String, Object> map = new LinkedHashMap<>(); // to save the order to avoid flaky test
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice

map.put("firstTag", "firstTagValue");
map.put("secondTag", 100.34);
CustomMetric ddm = new CustomMetric("foo", 24.3, map);
ExtensionMetricWriter emw = new ExtensionMetricWriter();
MetricWriter.setMetricWriter(emw);
final String[] text = new String[1];

new Thread(new Runnable() {
public void run() {
byte[] msg = new byte[256];
DatagramPacket dp = new DatagramPacket(msg, msg.length);
DatagramSocket ds = null;
try {
text[0] = "notYetReceived";
ds = new DatagramSocket(8125);
ds.receive(dp);
text[0] = new String(dp.getData());
} catch (SocketException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
if (ds != null) {
ds.close();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

awesome

}
}
}
}).start();

ddm.write();

int i = 0;
for(; i < 10; ++i) {
try {
if (null== text[0] || text[0].equals("notYetReceived")) {
Thread.sleep(1000);
} else {
assertTrue(text[0].startsWith("foo:24.3|d|#firsttag:firsttagvaluesecondtag:100.34"));
break;
}
} catch (InterruptedException e) {
fail();
}
}
if( i == 10) {
fail();
}
}

/**
* For tests!
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package com.datadoghq.datadog_lambda_java;

import com.github.tomakehurst.wiremock.junit.WireMockRule;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;

import java.nio.file.Path;
import java.nio.file.Paths;

import static com.github.tomakehurst.wiremock.client.WireMock.*;


public class ExtensionTest {

@Rule
public WireMockRule wireMockRule = new WireMockRule(9999);

@Test public void testDetectExtensionSuccess() {
Path resourceDirectory = Paths.get("src","test","resources");
String fakeExtensionPath = resourceDirectory.toFile().getAbsolutePath() + "/fakeExtension";
Assert.assertTrue(Extension.isExtensionRunning(fakeExtensionPath));
}
@Test public void testDetectExtensionFailure() {
String invalidPath = "/fakeExtension";
Assert.assertFalse(Extension.isExtensionRunning(invalidPath));
}

@Test public void testHitHelloRouteIncorrectUrl() {
Assert.assertFalse(Extension.hitHelloRoute("toto", "titi"));
}

@Test public void testHitHelloRouteNotRespondingEndpoint() {
Assert.assertFalse(Extension.hitHelloRoute("http://localhost:1111", "/invalid"));
}

@Test public void testHitHelloRouteValidEndpoint() {
stubFor(get(urlEqualTo("/valid"))
.willReturn(aResponse()));
Assert.assertTrue(Extension.hitHelloRoute("http://localhost:9999", "/valid"));
}

@Test public void testHitFlushRouteIncorrectUrl() {
Assert.assertFalse(Extension.hitFlushRoute("toto", "titi"));
}

@Test public void testHitFlushRouteNotRespondingEndpoint() {
Assert.assertFalse(Extension.hitFlushRoute("http://localhost:1111", "/invalid"));
}

@Test public void testHitFlushRouteValidEndpoint() {
stubFor(post(urlEqualTo("/valid"))
.willReturn(aResponse()));
Assert.assertTrue(Extension.hitFlushRoute("http://localhost:9999", "/valid"));
}

}
Empty file.