From adc5a9dc6eb83e7889b899f46e949458f407c1e4 Mon Sep 17 00:00:00 2001 From: Dylan Yang Date: Wed, 5 Oct 2022 14:12:32 -0400 Subject: [PATCH 01/12] init universal instrumentation --- internal/extension/extension.go | 106 +++++++++++++++++++++++++++---- internal/trace/listener.go | 21 +++++- internal/trace/listener_test.go | 8 +-- internal/wrapper/wrap_handler.go | 2 + 4 files changed, 117 insertions(+), 20 deletions(-) diff --git a/internal/extension/extension.go b/internal/extension/extension.go index 3347d47d..1a660c72 100644 --- a/internal/extension/extension.go +++ b/internal/extension/extension.go @@ -9,12 +9,29 @@ package extension import ( + "bytes" + "context" + "encoding/json" "fmt" "net/http" "os" "time" "github.com/DataDog/datadog-lambda-go/internal/logger" + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace" +) + +type ddTraceContext string + +const ( + DdTraceId ddTraceContext = "x-datadog-trace-id" + DdParentId ddTraceContext = "x-datadog-parent-id" + DdSpanId ddTraceContext = "x-datadog-span-id" + DdSamplingPriority ddTraceContext = "x-datadog-sampling-priority" + DdInvocationError ddTraceContext = "x-datadog-invocation-error" + + DdSeverlessSpan ddTraceContext = "dd-tracer-serverless-span" + DdLambdaResponse ddTraceContext = "dd-response" ) const ( @@ -23,8 +40,10 @@ const ( // want to let it having some time for its cold start so we should not set this too low. timeout = 3000 * time.Millisecond - helloUrl = "http://localhost:8124/lambda/hello" - flushUrl = "http://localhost:8124/lambda/flush" + helloUrl = "http://localhost:8124/lambda/hello" + flushUrl = "http://localhost:8124/lambda/flush" + startInvocationUrl = "http://localhost:8124/lambda/start-invocation" + endInvocationUrl = "http://localhost:8124/lambda/end-invocation" extensionPath = "/opt/extensions/datadog-agent" ) @@ -33,6 +52,8 @@ type ExtensionManager struct { helloRoute string flushRoute string extensionPath string + startInvocationUrl string + endInvocationUrl string httpClient HTTPClient isExtensionRunning bool } @@ -43,10 +64,12 @@ type HTTPClient interface { func BuildExtensionManager() *ExtensionManager { em := &ExtensionManager{ - helloRoute: helloUrl, - flushRoute: flushUrl, - extensionPath: extensionPath, - httpClient: &http.Client{Timeout: timeout}, + helloRoute: helloUrl, + flushRoute: flushUrl, + startInvocationUrl: startInvocationUrl, + endInvocationUrl: endInvocationUrl, + extensionPath: extensionPath, + httpClient: &http.Client{Timeout: timeout}, } em.checkAgentRunning() return em @@ -57,14 +80,71 @@ func (em *ExtensionManager) checkAgentRunning() { logger.Debug("Will use the API") em.isExtensionRunning = false } else { - req, _ := http.NewRequest(http.MethodGet, em.helloRoute, nil) - if response, err := em.httpClient.Do(req); err == nil && response.StatusCode == 200 { - logger.Debug("Will use the Serverless Agent") - em.isExtensionRunning = true - } else { - logger.Debug("Will use the API since the Serverless Agent was detected but the hello route was unreachable") - em.isExtensionRunning = false + logger.Debug("Will use the Serverless Agent") + em.isExtensionRunning = true + } +} + +func (em *ExtensionManager) SendStartInvocationRequest(ctx context.Context, eventPayload json.RawMessage) context.Context { + body := bytes.NewBuffer(eventPayload) + req, _ := http.NewRequest(http.MethodPost, em.startInvocationUrl, body) + + if response, err := em.httpClient.Do(req); err == nil && response.StatusCode == 200 { + // Propagate dd-trace context from the extension response if found in the response headers + traceId := response.Header.Values(string(DdTraceId)) + if len(traceId) > 0 { + ctx = context.WithValue(ctx, DdTraceId, traceId[0]) + } + parentId := response.Header.Values(string(DdParentId)) + if len(parentId) > 0 { + ctx = context.WithValue(ctx, DdParentId, parentId[0]) } + samplingPriority := response.Header.Values(string(DdSamplingPriority)) + if len(samplingPriority) > 0 { + ctx = context.WithValue(ctx, DdSamplingPriority, samplingPriority[0]) + } + } + return ctx +} + +func (em *ExtensionManager) SendEndInvocationRequest(ctx context.Context, functionExecutionSpan ddtrace.Span, err error) { + // Handle Lambda response + lambdaResponse, ok := ctx.Value(DdLambdaResponse).([]byte) + content, _ := json.Marshal(lambdaResponse) + if !ok { + content, _ = json.Marshal("{}") + } + body := bytes.NewBuffer(content) + + // Build the request + req, _ := http.NewRequest(http.MethodPost, em.endInvocationUrl, body) + + // Mark the invocation as an error if any + if err != nil { + req.Header[string(DdInvocationError)] = append(req.Header[string(DdInvocationError)], "true") + } + + // Extract the DD trace context and pass them to the extension via request headers + traceId, ok := ctx.Value(DdTraceId).(string) + if ok { + req.Header[string(DdTraceId)] = append(req.Header[string(DdTraceId)], traceId) + if parentId, ok := ctx.Value(DdParentId).(string); ok { + req.Header[string(DdParentId)] = append(req.Header[string(DdParentId)], parentId) + } + if spanId, ok := ctx.Value(DdSpanId).(string); ok { + req.Header[string(DdSpanId)] = append(req.Header[string(DdSpanId)], spanId) + } + if samplingPriority, ok := ctx.Value(DdSamplingPriority).(string); ok { + req.Header[string(DdSamplingPriority)] = append(req.Header[string(DdSamplingPriority)], samplingPriority) + } + } else { + req.Header[string(DdTraceId)] = append(req.Header[string(DdTraceId)], fmt.Sprint(functionExecutionSpan.Context().TraceID())) + req.Header[string(DdSpanId)] = append(req.Header[string(DdSpanId)], fmt.Sprint(functionExecutionSpan.Context().SpanID())) + } + + response, err := em.httpClient.Do(req) + if response.StatusCode != 200 || err != nil { + logger.Debug("Unable to make a request to the extension's end invocation endpoint") } } diff --git a/internal/trace/listener.go b/internal/trace/listener.go index ce6a049d..876a92d0 100644 --- a/internal/trace/listener.go +++ b/internal/trace/listener.go @@ -72,11 +72,15 @@ func (l *Listener) HandlerStarted(ctx context.Context, msg json.RawMessage) cont tracerInitialized = true } - functionExecutionSpan = startFunctionExecutionSpan(ctx, l.mergeXrayTraces) + functionExecutionSpan = startFunctionExecutionSpan(ctx, l.mergeXrayTraces, l.extensionManager.IsExtensionRunning()) // Add the span to the context so the user can create child spans ctx = tracer.ContextWithSpan(ctx, functionExecutionSpan) + if l.extensionManager.IsExtensionRunning() { + ctx = l.extensionManager.SendStartInvocationRequest(ctx, msg) + } + return ctx } @@ -84,13 +88,18 @@ func (l *Listener) HandlerStarted(ctx context.Context, msg json.RawMessage) cont func (l *Listener) HandlerFinished(ctx context.Context, err error) { if functionExecutionSpan != nil { functionExecutionSpan.Finish(tracer.WithError(err)) + + if l.extensionManager.IsExtensionRunning() { + l.extensionManager.SendEndInvocationRequest(ctx, functionExecutionSpan, err) + } } + tracer.Flush() } // startFunctionExecutionSpan starts a span that represents the current Lambda function execution // and returns the span so that it can be finished when the function execution is complete -func startFunctionExecutionSpan(ctx context.Context, mergeXrayTraces bool) tracer.Span { +func startFunctionExecutionSpan(ctx context.Context, mergeXrayTraces bool, isExtensionRunning bool) tracer.Span { // Extract information from context lambdaCtx, _ := lambdacontext.FromContext(ctx) rootTraceContext, ok := ctx.Value(traceContextKey).(TraceContext) @@ -109,11 +118,17 @@ func startFunctionExecutionSpan(ctx context.Context, mergeXrayTraces bool) trace parentSpanContext = convertedSpanContext } + resourceName := lambdacontext.FunctionName + if isExtensionRunning { + // The extension will drop this span, prioritizing the execution span the extension creates + resourceName = string(extension.DdSeverlessSpan) + } + span := tracer.StartSpan( "aws.lambda", // This operation name will be replaced with the value of the service tag by the Forwarder tracer.SpanType("serverless"), tracer.ChildOf(parentSpanContext), - tracer.ResourceName(lambdacontext.FunctionName), + tracer.ResourceName(resourceName), tracer.Tag("cold_start", ctx.Value("cold_start")), tracer.Tag("function_arn", functionArn), tracer.Tag("function_version", functionVersion), diff --git a/internal/trace/listener_test.go b/internal/trace/listener_test.go index 64cd3822..76bf6254 100644 --- a/internal/trace/listener_test.go +++ b/internal/trace/listener_test.go @@ -74,7 +74,7 @@ func TestStartFunctionExecutionSpanFromXrayWithMergeEnabled(t *testing.T) { mt := mocktracer.Start() defer mt.Stop() - span := startFunctionExecutionSpan(ctx, true) + span := startFunctionExecutionSpan(ctx, true, false) span.Finish() finishedSpan := mt.FinishedSpans()[0] @@ -104,7 +104,7 @@ func TestStartFunctionExecutionSpanFromXrayWithMergeDisabled(t *testing.T) { mt := mocktracer.Start() defer mt.Stop() - span := startFunctionExecutionSpan(ctx, false) + span := startFunctionExecutionSpan(ctx, false, false) span.Finish() finishedSpan := mt.FinishedSpans()[0] @@ -123,7 +123,7 @@ func TestStartFunctionExecutionSpanFromEventWithMergeEnabled(t *testing.T) { mt := mocktracer.Start() defer mt.Stop() - span := startFunctionExecutionSpan(ctx, true) + span := startFunctionExecutionSpan(ctx, true, false) span.Finish() finishedSpan := mt.FinishedSpans()[0] @@ -142,7 +142,7 @@ func TestStartFunctionExecutionSpanFromEventWithMergeDisabled(t *testing.T) { mt := mocktracer.Start() defer mt.Stop() - span := startFunctionExecutionSpan(ctx, false) + span := startFunctionExecutionSpan(ctx, false, false) span.Finish() finishedSpan := mt.FinishedSpans()[0] diff --git a/internal/wrapper/wrap_handler.go b/internal/wrapper/wrap_handler.go index a226f76c..b8fc3d7e 100644 --- a/internal/wrapper/wrap_handler.go +++ b/internal/wrapper/wrap_handler.go @@ -14,6 +14,7 @@ import ( "errors" "fmt" + "github.com/DataDog/datadog-lambda-go/internal/extension" "github.com/DataDog/datadog-lambda-go/internal/logger" "github.com/aws/aws-lambda-go/lambda" @@ -83,6 +84,7 @@ func (h *DatadogHandler) Invoke(ctx context.Context, payload []byte) ([]byte, er CurrentContext = ctx result, err := h.handler.Invoke(ctx, payload) for _, listener := range h.listeners { + ctx = context.WithValue(ctx, extension.DdLambdaResponse, result) listener.HandlerFinished(ctx, err) } h.coldStart = false From 664915eddc0d63830ae65f27fdbf4526f807b4cc Mon Sep 17 00:00:00 2001 From: Dylan Yang Date: Wed, 5 Oct 2022 14:54:55 -0400 Subject: [PATCH 02/12] fix response handling --- internal/extension/extension.go | 2 +- internal/wrapper/wrap_handler.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/extension/extension.go b/internal/extension/extension.go index 1a660c72..45269db7 100644 --- a/internal/extension/extension.go +++ b/internal/extension/extension.go @@ -109,7 +109,7 @@ func (em *ExtensionManager) SendStartInvocationRequest(ctx context.Context, even func (em *ExtensionManager) SendEndInvocationRequest(ctx context.Context, functionExecutionSpan ddtrace.Span, err error) { // Handle Lambda response - lambdaResponse, ok := ctx.Value(DdLambdaResponse).([]byte) + lambdaResponse, ok := ctx.Value(DdLambdaResponse).(interface{}) content, _ := json.Marshal(lambdaResponse) if !ok { content, _ = json.Marshal("{}") diff --git a/internal/wrapper/wrap_handler.go b/internal/wrapper/wrap_handler.go index b8fc3d7e..8cd994cd 100644 --- a/internal/wrapper/wrap_handler.go +++ b/internal/wrapper/wrap_handler.go @@ -60,6 +60,7 @@ func WrapHandlerWithListeners(handler interface{}, listeners ...HandlerListener) CurrentContext = ctx result, err := callHandler(ctx, msg, handler) for _, listener := range listeners { + ctx = context.WithValue(ctx, extension.DdLambdaResponse, result) listener.HandlerFinished(ctx, err) } coldStart = false @@ -84,7 +85,6 @@ func (h *DatadogHandler) Invoke(ctx context.Context, payload []byte) ([]byte, er CurrentContext = ctx result, err := h.handler.Invoke(ctx, payload) for _, listener := range h.listeners { - ctx = context.WithValue(ctx, extension.DdLambdaResponse, result) listener.HandlerFinished(ctx, err) } h.coldStart = false From 9a255d19136cae578334a80e226e5cb9fbad36bd Mon Sep 17 00:00:00 2001 From: Dylan Yang Date: Wed, 5 Oct 2022 15:14:22 -0400 Subject: [PATCH 03/12] fix lint --- internal/extension/extension.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/extension/extension.go b/internal/extension/extension.go index 45269db7..58ea4204 100644 --- a/internal/extension/extension.go +++ b/internal/extension/extension.go @@ -109,9 +109,9 @@ func (em *ExtensionManager) SendStartInvocationRequest(ctx context.Context, even func (em *ExtensionManager) SendEndInvocationRequest(ctx context.Context, functionExecutionSpan ddtrace.Span, err error) { // Handle Lambda response - lambdaResponse, ok := ctx.Value(DdLambdaResponse).(interface{}) - content, _ := json.Marshal(lambdaResponse) - if !ok { + lambdaResponse := ctx.Value(DdLambdaResponse) + content, responseErr := json.Marshal(lambdaResponse) + if responseErr != nil { content, _ = json.Marshal("{}") } body := bytes.NewBuffer(content) From 2b2351b8b33db5c23eafd4a4a1c93481b59419d5 Mon Sep 17 00:00:00 2001 From: Dylan Yang Date: Fri, 7 Oct 2022 16:47:53 -0400 Subject: [PATCH 04/12] gate behind env var --- ddlambda.go | 11 +++++++++-- internal/trace/listener.go | 36 ++++++++++++++++++++---------------- 2 files changed, 29 insertions(+), 18 deletions(-) diff --git a/ddlambda.go b/ddlambda.go index d07b9d18..7ad61936 100644 --- a/ddlambda.go +++ b/ddlambda.go @@ -89,6 +89,8 @@ const ( DatadogTraceEnabledEnvVar = "DD_TRACE_ENABLED" // MergeXrayTracesEnvVar is the environment variable that enables the merging of X-Ray and Datadog traces. MergeXrayTracesEnvVar = "DD_MERGE_XRAY_TRACES" + // UniversalInstrumentation is the environment variable that enables universal instrumentation with the DD Extension + UniversalInstrumentation = "DD_UNIVERSAL_INSTRUMENTATION" // DefaultSite to send API messages to. DefaultSite = "datadoghq.com" @@ -183,8 +185,9 @@ func InvokeDryRun(callback func(ctx context.Context), cfg *Config) (interface{}, func (cfg *Config) toTraceConfig() trace.Config { traceConfig := trace.Config{ - DDTraceEnabled: false, - MergeXrayTraces: false, + DDTraceEnabled: false, + MergeXrayTraces: false, + UniversalInstrumentation: false, } if cfg != nil { @@ -205,6 +208,10 @@ func (cfg *Config) toTraceConfig() trace.Config { traceConfig.MergeXrayTraces, _ = strconv.ParseBool(os.Getenv(MergeXrayTracesEnvVar)) } + if !traceConfig.UniversalInstrumentation { + traceConfig.UniversalInstrumentation, _ = strconv.ParseBool(os.Getenv(UniversalInstrumentation)) + } + return traceConfig } diff --git a/internal/trace/listener.go b/internal/trace/listener.go index 876a92d0..99312ef6 100644 --- a/internal/trace/listener.go +++ b/internal/trace/listener.go @@ -25,17 +25,19 @@ import ( type ( // Listener creates a function execution span and injects it into the context Listener struct { - ddTraceEnabled bool - mergeXrayTraces bool - extensionManager *extension.ExtensionManager - traceContextExtractor ContextExtractor + ddTraceEnabled bool + mergeXrayTraces bool + universalInstrumentation bool + extensionManager *extension.ExtensionManager + traceContextExtractor ContextExtractor } // Config gives options for how the Listener should work Config struct { - DDTraceEnabled bool - MergeXrayTraces bool - TraceContextExtractor ContextExtractor + DDTraceEnabled bool + MergeXrayTraces bool + UniversalInstrumentation bool + TraceContextExtractor ContextExtractor } ) @@ -48,10 +50,11 @@ var tracerInitialized = false func MakeListener(config Config, extensionManager *extension.ExtensionManager) Listener { return Listener{ - ddTraceEnabled: config.DDTraceEnabled, - mergeXrayTraces: config.MergeXrayTraces, - extensionManager: extensionManager, - traceContextExtractor: config.TraceContextExtractor, + ddTraceEnabled: config.DDTraceEnabled, + mergeXrayTraces: config.MergeXrayTraces, + universalInstrumentation: config.UniversalInstrumentation, + extensionManager: extensionManager, + traceContextExtractor: config.TraceContextExtractor, } } @@ -72,12 +75,13 @@ func (l *Listener) HandlerStarted(ctx context.Context, msg json.RawMessage) cont tracerInitialized = true } - functionExecutionSpan = startFunctionExecutionSpan(ctx, l.mergeXrayTraces, l.extensionManager.IsExtensionRunning()) + isDdServerlessSpan := l.universalInstrumentation && l.extensionManager.IsExtensionRunning() + functionExecutionSpan = startFunctionExecutionSpan(ctx, l.mergeXrayTraces, isDdServerlessSpan) // Add the span to the context so the user can create child spans ctx = tracer.ContextWithSpan(ctx, functionExecutionSpan) - if l.extensionManager.IsExtensionRunning() { + if l.universalInstrumentation && l.extensionManager.IsExtensionRunning() { ctx = l.extensionManager.SendStartInvocationRequest(ctx, msg) } @@ -89,7 +93,7 @@ func (l *Listener) HandlerFinished(ctx context.Context, err error) { if functionExecutionSpan != nil { functionExecutionSpan.Finish(tracer.WithError(err)) - if l.extensionManager.IsExtensionRunning() { + if l.universalInstrumentation && l.extensionManager.IsExtensionRunning() { l.extensionManager.SendEndInvocationRequest(ctx, functionExecutionSpan, err) } } @@ -99,7 +103,7 @@ func (l *Listener) HandlerFinished(ctx context.Context, err error) { // startFunctionExecutionSpan starts a span that represents the current Lambda function execution // and returns the span so that it can be finished when the function execution is complete -func startFunctionExecutionSpan(ctx context.Context, mergeXrayTraces bool, isExtensionRunning bool) tracer.Span { +func startFunctionExecutionSpan(ctx context.Context, mergeXrayTraces bool, isDdServerlessSpan bool) tracer.Span { // Extract information from context lambdaCtx, _ := lambdacontext.FromContext(ctx) rootTraceContext, ok := ctx.Value(traceContextKey).(TraceContext) @@ -119,7 +123,7 @@ func startFunctionExecutionSpan(ctx context.Context, mergeXrayTraces bool, isExt } resourceName := lambdacontext.FunctionName - if isExtensionRunning { + if isDdServerlessSpan { // The extension will drop this span, prioritizing the execution span the extension creates resourceName = string(extension.DdSeverlessSpan) } From 388731eddba07ee53682e6bb4f3b76262fb80dc1 Mon Sep 17 00:00:00 2001 From: Dylan Yang Date: Tue, 11 Oct 2022 17:12:14 -0400 Subject: [PATCH 05/12] test extension and refactor start invocation request --- internal/extension/extension.go | 18 +++++------ internal/extension/extension_test.go | 47 ++++++++++++++++++++++++++++ 2 files changed, 56 insertions(+), 9 deletions(-) diff --git a/internal/extension/extension.go b/internal/extension/extension.go index 58ea4204..cc5c19a2 100644 --- a/internal/extension/extension.go +++ b/internal/extension/extension.go @@ -91,17 +91,17 @@ func (em *ExtensionManager) SendStartInvocationRequest(ctx context.Context, even if response, err := em.httpClient.Do(req); err == nil && response.StatusCode == 200 { // Propagate dd-trace context from the extension response if found in the response headers - traceId := response.Header.Values(string(DdTraceId)) - if len(traceId) > 0 { - ctx = context.WithValue(ctx, DdTraceId, traceId[0]) + traceId := response.Header.Get(string(DdTraceId)) + if traceId != "" { + ctx = context.WithValue(ctx, DdTraceId, traceId) } - parentId := response.Header.Values(string(DdParentId)) - if len(parentId) > 0 { - ctx = context.WithValue(ctx, DdParentId, parentId[0]) + parentId := response.Header.Get(string(DdParentId)) + if parentId != "" { + ctx = context.WithValue(ctx, DdParentId, parentId) } - samplingPriority := response.Header.Values(string(DdSamplingPriority)) - if len(samplingPriority) > 0 { - ctx = context.WithValue(ctx, DdSamplingPriority, samplingPriority[0]) + samplingPriority := response.Header.Get(string(DdSamplingPriority)) + if samplingPriority != "" { + ctx = context.WithValue(ctx, DdSamplingPriority, samplingPriority) } } return ctx diff --git a/internal/extension/extension_test.go b/internal/extension/extension_test.go index 5a7baad7..4392b213 100644 --- a/internal/extension/extension_test.go +++ b/internal/extension/extension_test.go @@ -9,6 +9,7 @@ package extension import ( + "context" "fmt" "net/http" "os" @@ -26,6 +27,18 @@ type ClientSuccessMock struct { type ClientSuccess202Mock struct { } +type ClientSuccessStartInvoke struct { +} + +type ClientSuccessEndInvoke struct { +} + +const ( + mockTraceId = "1" + mockParentId = "2" + mockSamplingPriority = "3" +) + func (c *ClientErrorMock) Do(req *http.Request) (*http.Response, error) { return nil, fmt.Errorf("KO") } @@ -38,10 +51,25 @@ func (c *ClientSuccess202Mock) Do(req *http.Request) (*http.Response, error) { return &http.Response{StatusCode: 202, Status: "KO"}, nil } +func (c *ClientSuccessStartInvoke) Do(req *http.Request) (*http.Response, error) { + header := http.Header{} + header.Set(string(DdTraceId), mockTraceId) + header.Set(string(DdParentId), mockParentId) + header.Set(string(DdSamplingPriority), mockSamplingPriority) + return &http.Response{StatusCode: 200, Status: "KO", Header: header}, nil +} + +func (c *ClientSuccessEndInvoke) Do(req *http.Request) (*http.Response, error) { + header := map[string][]string{} + return &http.Response{StatusCode: 200, Status: "KO", Header: header}, nil +} + func TestBuildExtensionManager(t *testing.T) { em := BuildExtensionManager() assert.Equal(t, "http://localhost:8124/lambda/hello", em.helloRoute) assert.Equal(t, "http://localhost:8124/lambda/flush", em.flushRoute) + assert.Equal(t, "http://localhost:8124/lambda/start-invocation", em.startInvocationUrl) + assert.Equal(t, "http://localhost:8124/lambda/end-invocation", em.endInvocationUrl) assert.Equal(t, "/opt/extensions/datadog-agent", em.extensionPath) assert.NotNil(t, em.httpClient) } @@ -96,3 +124,22 @@ func TestFlushSuccess(t *testing.T) { err := em.Flush() assert.Nil(t, err) } + +func TestExtensionStartInvokeWithTraceContext(t *testing.T) { + em := &ExtensionManager{ + startInvocationUrl: startInvocationUrl, + httpClient: &ClientSuccessStartInvoke{}, + } + ctx := em.SendStartInvocationRequest(context.TODO(), []byte{}) + + // Ensure that we pull the DD trace context if this is a distributed trace + traceId := ctx.Value(DdTraceId) + parentId := ctx.Value(DdParentId) + samplingPriority := ctx.Value(DdSamplingPriority) + err := em.Flush() + + assert.Nil(t, err) + assert.Equal(t, mockTraceId, traceId) + assert.Equal(t, mockParentId, parentId) + assert.Equal(t, mockSamplingPriority, samplingPriority) +} From 74214012028ddfa31d3c637eba55ea162425f415 Mon Sep 17 00:00:00 2001 From: Dylan Yang Date: Wed, 12 Oct 2022 11:20:50 -0400 Subject: [PATCH 06/12] test end invocation --- internal/extension/extension.go | 16 ++++----- internal/extension/extension_test.go | 49 +++++++++++++++++++++++----- 2 files changed, 48 insertions(+), 17 deletions(-) diff --git a/internal/extension/extension.go b/internal/extension/extension.go index cc5c19a2..e42993af 100644 --- a/internal/extension/extension.go +++ b/internal/extension/extension.go @@ -112,7 +112,7 @@ func (em *ExtensionManager) SendEndInvocationRequest(ctx context.Context, functi lambdaResponse := ctx.Value(DdLambdaResponse) content, responseErr := json.Marshal(lambdaResponse) if responseErr != nil { - content, _ = json.Marshal("{}") + content = []byte("{}") } body := bytes.NewBuffer(content) @@ -121,25 +121,25 @@ func (em *ExtensionManager) SendEndInvocationRequest(ctx context.Context, functi // Mark the invocation as an error if any if err != nil { - req.Header[string(DdInvocationError)] = append(req.Header[string(DdInvocationError)], "true") + req.Header.Set(string(DdInvocationError), "true") } // Extract the DD trace context and pass them to the extension via request headers traceId, ok := ctx.Value(DdTraceId).(string) if ok { - req.Header[string(DdTraceId)] = append(req.Header[string(DdTraceId)], traceId) + req.Header.Set(string(DdTraceId), traceId) if parentId, ok := ctx.Value(DdParentId).(string); ok { - req.Header[string(DdParentId)] = append(req.Header[string(DdParentId)], parentId) + req.Header.Set(string(DdParentId), parentId) } if spanId, ok := ctx.Value(DdSpanId).(string); ok { - req.Header[string(DdSpanId)] = append(req.Header[string(DdSpanId)], spanId) + req.Header.Set(string(DdSpanId), spanId) } if samplingPriority, ok := ctx.Value(DdSamplingPriority).(string); ok { - req.Header[string(DdSamplingPriority)] = append(req.Header[string(DdSamplingPriority)], samplingPriority) + req.Header.Set(string(DdSamplingPriority), samplingPriority) } } else { - req.Header[string(DdTraceId)] = append(req.Header[string(DdTraceId)], fmt.Sprint(functionExecutionSpan.Context().TraceID())) - req.Header[string(DdSpanId)] = append(req.Header[string(DdSpanId)], fmt.Sprint(functionExecutionSpan.Context().SpanID())) + req.Header.Set(string(DdTraceId), fmt.Sprint(functionExecutionSpan.Context().TraceID())) + req.Header.Set(string(DdSpanId), fmt.Sprint(functionExecutionSpan.Context().SpanID())) } response, err := em.httpClient.Do(req) diff --git a/internal/extension/extension_test.go b/internal/extension/extension_test.go index 4392b213..0566bd29 100644 --- a/internal/extension/extension_test.go +++ b/internal/extension/extension_test.go @@ -16,6 +16,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" ) type ClientErrorMock struct { @@ -28,6 +29,7 @@ type ClientSuccess202Mock struct { } type ClientSuccessStartInvoke struct { + headers http.Header } type ClientSuccessEndInvoke struct { @@ -52,16 +54,11 @@ func (c *ClientSuccess202Mock) Do(req *http.Request) (*http.Response, error) { } func (c *ClientSuccessStartInvoke) Do(req *http.Request) (*http.Response, error) { - header := http.Header{} - header.Set(string(DdTraceId), mockTraceId) - header.Set(string(DdParentId), mockParentId) - header.Set(string(DdSamplingPriority), mockSamplingPriority) - return &http.Response{StatusCode: 200, Status: "KO", Header: header}, nil + return &http.Response{StatusCode: 200, Status: "KO", Header: c.headers}, nil } func (c *ClientSuccessEndInvoke) Do(req *http.Request) (*http.Response, error) { - header := map[string][]string{} - return &http.Response{StatusCode: 200, Status: "KO", Header: header}, nil + return &http.Response{StatusCode: 200, Status: "KO"}, nil } func TestBuildExtensionManager(t *testing.T) { @@ -125,14 +122,36 @@ func TestFlushSuccess(t *testing.T) { assert.Nil(t, err) } -func TestExtensionStartInvokeWithTraceContext(t *testing.T) { +func TestExtensionStartInvoke(t *testing.T) { em := &ExtensionManager{ startInvocationUrl: startInvocationUrl, httpClient: &ClientSuccessStartInvoke{}, } ctx := em.SendStartInvocationRequest(context.TODO(), []byte{}) + traceId := ctx.Value(DdTraceId) + parentId := ctx.Value(DdParentId) + samplingPriority := ctx.Value(DdSamplingPriority) + err := em.Flush() - // Ensure that we pull the DD trace context if this is a distributed trace + assert.Nil(t, err) + assert.Nil(t, traceId) + assert.Nil(t, parentId) + assert.Nil(t, samplingPriority) +} + +func TestExtensionStartInvokeWithTraceContext(t *testing.T) { + headers := http.Header{} + headers.Set(string(DdTraceId), mockTraceId) + headers.Set(string(DdParentId), mockParentId) + headers.Set(string(DdSamplingPriority), mockSamplingPriority) + + em := &ExtensionManager{ + startInvocationUrl: startInvocationUrl, + httpClient: &ClientSuccessStartInvoke{ + headers: headers, + }, + } + ctx := em.SendStartInvocationRequest(context.TODO(), []byte{}) traceId := ctx.Value(DdTraceId) parentId := ctx.Value(DdParentId) samplingPriority := ctx.Value(DdSamplingPriority) @@ -143,3 +162,15 @@ func TestExtensionStartInvokeWithTraceContext(t *testing.T) { assert.Equal(t, mockParentId, parentId) assert.Equal(t, mockSamplingPriority, samplingPriority) } + +func TestExtensionEndInvocation(t *testing.T) { + em := &ExtensionManager{ + endInvocationUrl: endInvocationUrl, + httpClient: &ClientSuccessEndInvoke{}, + } + span := tracer.StartSpan("aws.lambda") + em.SendEndInvocationRequest(context.TODO(), span, nil) + err := em.Flush() + + assert.Nil(t, err) +} From fe287c8b93fddec26476cc496e99812756045fc1 Mon Sep 17 00:00:00 2001 From: Dylan Yang Date: Thu, 13 Oct 2022 14:18:53 -0400 Subject: [PATCH 07/12] end invocation tests --- internal/extension/extension.go | 8 +++----- internal/extension/extension_test.go | 28 +++++++++++++++++++++++++--- 2 files changed, 28 insertions(+), 8 deletions(-) diff --git a/internal/extension/extension.go b/internal/extension/extension.go index e42993af..d9306334 100644 --- a/internal/extension/extension.go +++ b/internal/extension/extension.go @@ -115,8 +115,6 @@ func (em *ExtensionManager) SendEndInvocationRequest(ctx context.Context, functi content = []byte("{}") } body := bytes.NewBuffer(content) - - // Build the request req, _ := http.NewRequest(http.MethodPost, em.endInvocationUrl, body) // Mark the invocation as an error if any @@ -142,9 +140,9 @@ func (em *ExtensionManager) SendEndInvocationRequest(ctx context.Context, functi req.Header.Set(string(DdSpanId), fmt.Sprint(functionExecutionSpan.Context().SpanID())) } - response, err := em.httpClient.Do(req) - if response.StatusCode != 200 || err != nil { - logger.Debug("Unable to make a request to the extension's end invocation endpoint") + resp, err := em.httpClient.Do(req) + if err != nil || (resp.StatusCode >= 200 && resp.StatusCode <= 299) { + logger.Error(fmt.Errorf("could not send end invocation payload to the extension")) } } diff --git a/internal/extension/extension_test.go b/internal/extension/extension_test.go index 0566bd29..4cad97de 100644 --- a/internal/extension/extension_test.go +++ b/internal/extension/extension_test.go @@ -9,12 +9,14 @@ package extension import ( + "bytes" "context" "fmt" "net/http" "os" "testing" + "github.com/DataDog/datadog-lambda-go/internal/logger" "github.com/stretchr/testify/assert" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" ) @@ -61,6 +63,14 @@ func (c *ClientSuccessEndInvoke) Do(req *http.Request) (*http.Response, error) { return &http.Response{StatusCode: 200, Status: "KO"}, nil } +func captureLog(f func()) string { + var buf bytes.Buffer + logger.SetOutput(&buf) + f() + logger.SetOutput(os.Stdout) + return buf.String() +} + func TestBuildExtensionManager(t *testing.T) { em := BuildExtensionManager() assert.Equal(t, "http://localhost:8124/lambda/hello", em.helloRoute) @@ -169,8 +179,20 @@ func TestExtensionEndInvocation(t *testing.T) { httpClient: &ClientSuccessEndInvoke{}, } span := tracer.StartSpan("aws.lambda") - em.SendEndInvocationRequest(context.TODO(), span, nil) - err := em.Flush() + logOutput := captureLog(func() { em.SendEndInvocationRequest(context.TODO(), span, nil) }) + span.Finish() - assert.Nil(t, err) + assert.Equal(t, "", logOutput) +} + +func TestExtensionEndInvocationError(t *testing.T) { + em := &ExtensionManager{ + endInvocationUrl: endInvocationUrl, + httpClient: &ClientErrorMock{}, + } + span := tracer.StartSpan("aws.lambda") + logOutput := captureLog(func() { em.SendEndInvocationRequest(context.TODO(), span, nil) }) + span.Finish() + + assert.Contains(t, logOutput, "could not send end invocation payload to the extension") } From 79a519bf11eb6232561fe10cc82a05a67cb4d0a9 Mon Sep 17 00:00:00 2001 From: Dylan Yang Date: Thu, 13 Oct 2022 14:40:01 -0400 Subject: [PATCH 08/12] test execution span for extension --- internal/extension/extension.go | 2 +- internal/trace/listener_test.go | 20 ++++++++++++++++++++ 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/internal/extension/extension.go b/internal/extension/extension.go index d9306334..3992e3a1 100644 --- a/internal/extension/extension.go +++ b/internal/extension/extension.go @@ -142,7 +142,7 @@ func (em *ExtensionManager) SendEndInvocationRequest(ctx context.Context, functi resp, err := em.httpClient.Do(req) if err != nil || (resp.StatusCode >= 200 && resp.StatusCode <= 299) { - logger.Error(fmt.Errorf("could not send end invocation payload to the extension")) + logger.Error(fmt.Errorf("could not send end invocation payload to the extension: %v", err)) } } diff --git a/internal/trace/listener_test.go b/internal/trace/listener_test.go index 76bf6254..1e5176af 100644 --- a/internal/trace/listener_test.go +++ b/internal/trace/listener_test.go @@ -12,6 +12,7 @@ import ( "context" "testing" + "github.com/DataDog/datadog-lambda-go/internal/extension" "github.com/aws/aws-lambda-go/lambdacontext" "github.com/stretchr/testify/assert" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/mocktracer" @@ -148,3 +149,22 @@ func TestStartFunctionExecutionSpanFromEventWithMergeDisabled(t *testing.T) { assert.Equal(t, nil, finishedSpan.Tag("_dd.parent_source")) } + +func TestStartFunctionExecutionSpanWithExtension(t *testing.T) { + ctx := context.Background() + + lambdacontext.FunctionName = "MockFunctionName" + ctx = lambdacontext.NewContext(ctx, &mockLambdaContext) + ctx = context.WithValue(ctx, traceContextKey, traceContextFromEvent) + //nolint + ctx = context.WithValue(ctx, "cold_start", true) + + mt := mocktracer.Start() + defer mt.Stop() + + span := startFunctionExecutionSpan(ctx, false, true) + span.Finish() + finishedSpan := mt.FinishedSpans()[0] + + assert.Equal(t, string(extension.DdSeverlessSpan), finishedSpan.Tag("resource.name")) +} From 4fe68174de478e8f18ba1fcd999fc51c94341a5b Mon Sep 17 00:00:00 2001 From: Dylan Yang Date: Thu, 13 Oct 2022 15:03:51 -0400 Subject: [PATCH 09/12] fix test --- internal/extension/extension.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/extension/extension.go b/internal/extension/extension.go index 3992e3a1..d50e3bc7 100644 --- a/internal/extension/extension.go +++ b/internal/extension/extension.go @@ -141,7 +141,7 @@ func (em *ExtensionManager) SendEndInvocationRequest(ctx context.Context, functi } resp, err := em.httpClient.Do(req) - if err != nil || (resp.StatusCode >= 200 && resp.StatusCode <= 299) { + if err != nil || resp.StatusCode != 200 { logger.Error(fmt.Errorf("could not send end invocation payload to the extension: %v", err)) } } From 5ba3dcff3fe9e2050195ce9e6e0e2c0d7b4362db Mon Sep 17 00:00:00 2001 From: Dylan Yang Date: Tue, 25 Oct 2022 14:17:21 -0400 Subject: [PATCH 10/12] retain /hello route logic --- internal/extension/extension.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/internal/extension/extension.go b/internal/extension/extension.go index d50e3bc7..7223466d 100644 --- a/internal/extension/extension.go +++ b/internal/extension/extension.go @@ -15,6 +15,7 @@ import ( "fmt" "net/http" "os" + "strconv" "time" "github.com/DataDog/datadog-lambda-go/internal/logger" @@ -82,6 +83,15 @@ func (em *ExtensionManager) checkAgentRunning() { } else { logger.Debug("Will use the Serverless Agent") em.isExtensionRunning = true + + // Tell the extension not to create an execution span if universal instrumentation is disabled + isUniversalInstrumentation, _ := strconv.ParseBool(os.Getenv("DD_UNIVERSAL_INSTRUMENTATION")) + if !isUniversalInstrumentation { + req, _ := http.NewRequest(http.MethodGet, em.helloRoute, nil) + if response, err := em.httpClient.Do(req); err == nil && response.StatusCode == 200 { + logger.Debug("Hit the extension /hello route") + } + } } } From 047dc3b39247bb92974f8db9264b81082804ad08 Mon Sep 17 00:00:00 2001 From: Dylan Yang Date: Tue, 25 Oct 2022 17:22:24 -0400 Subject: [PATCH 11/12] clean up env var check --- ddlambda.go | 8 ++++--- internal/extension/extension.go | 35 ++++++++++++++-------------- internal/extension/extension_test.go | 3 ++- 3 files changed, 25 insertions(+), 21 deletions(-) diff --git a/ddlambda.go b/ddlambda.go index 7ad61936..a848f49a 100644 --- a/ddlambda.go +++ b/ddlambda.go @@ -220,12 +220,14 @@ func initializeListeners(cfg *Config) []wrapper.HandlerListener { if strings.EqualFold(logLevel, "debug") || (cfg != nil && cfg.DebugLogging) { logger.SetLogLevel(logger.LevelDebug) } - extensionManager := extension.BuildExtensionManager() + traceConfig := cfg.toTraceConfig() + extensionManager := extension.BuildExtensionManager(traceConfig.UniversalInstrumentation) isExtensionRunning := extensionManager.IsExtensionRunning() + metricsConfig := cfg.toMetricsConfig(isExtensionRunning) // Wrap the handler with listeners that add instrumentation for traces and metrics. - tl := trace.MakeListener(cfg.toTraceConfig(), extensionManager) - ml := metrics.MakeListener(cfg.toMetricsConfig(isExtensionRunning), extensionManager) + tl := trace.MakeListener(traceConfig, extensionManager) + ml := metrics.MakeListener(metricsConfig, extensionManager) return []wrapper.HandlerListener{ &tl, &ml, } diff --git a/internal/extension/extension.go b/internal/extension/extension.go index 7223466d..cc3c2113 100644 --- a/internal/extension/extension.go +++ b/internal/extension/extension.go @@ -15,10 +15,10 @@ import ( "fmt" "net/http" "os" - "strconv" "time" "github.com/DataDog/datadog-lambda-go/internal/logger" + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace" ) @@ -50,27 +50,29 @@ const ( ) type ExtensionManager struct { - helloRoute string - flushRoute string - extensionPath string - startInvocationUrl string - endInvocationUrl string - httpClient HTTPClient - isExtensionRunning bool + helloRoute string + flushRoute string + extensionPath string + startInvocationUrl string + endInvocationUrl string + httpClient HTTPClient + isExtensionRunning bool + isUniversalInstrumentation bool } type HTTPClient interface { Do(req *http.Request) (*http.Response, error) } -func BuildExtensionManager() *ExtensionManager { +func BuildExtensionManager(isUniversalInstrumentation bool) *ExtensionManager { em := &ExtensionManager{ - helloRoute: helloUrl, - flushRoute: flushUrl, - startInvocationUrl: startInvocationUrl, - endInvocationUrl: endInvocationUrl, - extensionPath: extensionPath, - httpClient: &http.Client{Timeout: timeout}, + helloRoute: helloUrl, + flushRoute: flushUrl, + startInvocationUrl: startInvocationUrl, + endInvocationUrl: endInvocationUrl, + extensionPath: extensionPath, + httpClient: &http.Client{Timeout: timeout}, + isUniversalInstrumentation: isUniversalInstrumentation, } em.checkAgentRunning() return em @@ -85,8 +87,7 @@ func (em *ExtensionManager) checkAgentRunning() { em.isExtensionRunning = true // Tell the extension not to create an execution span if universal instrumentation is disabled - isUniversalInstrumentation, _ := strconv.ParseBool(os.Getenv("DD_UNIVERSAL_INSTRUMENTATION")) - if !isUniversalInstrumentation { + if !em.isUniversalInstrumentation { req, _ := http.NewRequest(http.MethodGet, em.helloRoute, nil) if response, err := em.httpClient.Do(req); err == nil && response.StatusCode == 200 { logger.Debug("Hit the extension /hello route") diff --git a/internal/extension/extension_test.go b/internal/extension/extension_test.go index 4cad97de..3c960f08 100644 --- a/internal/extension/extension_test.go +++ b/internal/extension/extension_test.go @@ -72,12 +72,13 @@ func captureLog(f func()) string { } func TestBuildExtensionManager(t *testing.T) { - em := BuildExtensionManager() + em := BuildExtensionManager(false) assert.Equal(t, "http://localhost:8124/lambda/hello", em.helloRoute) assert.Equal(t, "http://localhost:8124/lambda/flush", em.flushRoute) assert.Equal(t, "http://localhost:8124/lambda/start-invocation", em.startInvocationUrl) assert.Equal(t, "http://localhost:8124/lambda/end-invocation", em.endInvocationUrl) assert.Equal(t, "/opt/extensions/datadog-agent", em.extensionPath) + assert.Equal(t, false, em.isUniversalInstrumentation) assert.NotNil(t, em.httpClient) } From 73a5325fb1e5ff04ffdb176224df458130701aa8 Mon Sep 17 00:00:00 2001 From: Dylan Yang Date: Thu, 27 Oct 2022 16:31:43 -0400 Subject: [PATCH 12/12] retain handling of /hello route error --- internal/extension/extension.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/internal/extension/extension.go b/internal/extension/extension.go index cc3c2113..33befb6d 100644 --- a/internal/extension/extension.go +++ b/internal/extension/extension.go @@ -91,6 +91,9 @@ func (em *ExtensionManager) checkAgentRunning() { req, _ := http.NewRequest(http.MethodGet, em.helloRoute, nil) if response, err := em.httpClient.Do(req); err == nil && response.StatusCode == 200 { logger.Debug("Hit the extension /hello route") + } else { + logger.Debug("Will use the API since the Serverless Agent was detected but the hello route was unreachable") + em.isExtensionRunning = false } } }