diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index f907f1f9..94c06894 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -26,6 +26,7 @@ https://github.com/elastic/apm-aws-lambda/compare/v1.2.0...main[View commits] ===== Features - experimental:[] Create proxy transaction with error results if not reported by agent {lambda-pull}315[315] - Wait for the final platform report metrics on shutdown {lambda-pull}347[347] +- Process platform report metrics when extension is lagging {lambda-pull}358[358] [float] [[lambda-1.2.0]] diff --git a/accumulator/batch.go b/accumulator/batch.go index 20d773dc..02ddb5a0 100644 --- a/accumulator/batch.go +++ b/accumulator/batch.go @@ -82,6 +82,13 @@ func NewBatch(maxSize int, maxAge time.Duration) *Batch { } } +// Size returns the number of invocations cached in the batch. +func (b *Batch) Size() int { + b.mu.RLock() + defer b.mu.RUnlock() + return len(b.invocations) +} + // RegisterInvocation registers a new function invocation against its request // ID. It also updates the caches for currently executing request ID. func (b *Batch) RegisterInvocation( @@ -185,6 +192,21 @@ func (b *Batch) OnLambdaLogRuntimeDone(reqID, status string, time time.Time) err return b.finalizeInvocation(reqID, status, time) } +// OnPlatformReport should be the last event for a request ID. On receiving the +// platform.report event the batch will cleanup any datastructure for the request +// ID. It will return some of the function metadata to allow the caller to enrich +// the report metrics. +func (b *Batch) OnPlatformReport(reqID string) (string, int64, time.Time, error) { + b.mu.Lock() + defer b.mu.Unlock() + inc, ok := b.invocations[reqID] + if !ok { + return "", 0, time.Time{}, fmt.Errorf("invocation for requestID %s does not exist", reqID) + } + delete(b.invocations, reqID) + return inc.FunctionARN, inc.DeadlineMs, inc.Timestamp, nil +} + // OnShutdown flushes the data for shipping to APM Server by finalizing all // the invocation in the batch. If we haven't received a platform.runtimeDone // event for an invocation so far we won't be able to recieve it in time thus @@ -201,6 +223,7 @@ func (b *Batch) OnShutdown(status string) error { if err := b.finalizeInvocation(inc.RequestID, status, time); err != nil { return err } + delete(b.invocations, inc.RequestID) } return nil } @@ -257,12 +280,16 @@ func (b *Batch) finalizeInvocation(reqID, status string, time time.Time) error { if !ok { return fmt.Errorf("invocation for requestID %s does not exist", reqID) } - defer delete(b.invocations, reqID) - proxyTxn, err := inc.Finalize(status, time) + proxyTxn, err := inc.CreateProxyTxn(status, time) if err != nil { return err } - return b.addData(proxyTxn) + err = b.addData(proxyTxn) + if err != nil { + return err + } + inc.Finalized = true + return nil } func (b *Batch) addData(data []byte) error { diff --git a/accumulator/invocation.go b/accumulator/invocation.go index 985a6d1b..451f3a2e 100644 --- a/accumulator/invocation.go +++ b/accumulator/invocation.go @@ -47,26 +47,24 @@ type Invocation struct { // TransactionObserved is true if the root transaction ID for the // invocation is observed by the extension. TransactionObserved bool + // Finalized tracks if the invocation has been finalized or not. + Finalized bool } // NeedProxyTransaction returns true if a proxy transaction needs to be // created based on the information available. func (inc *Invocation) NeedProxyTransaction() bool { - return inc.TransactionID != "" && !inc.TransactionObserved + return !inc.Finalized && inc.TransactionID != "" && !inc.TransactionObserved } -// Finalize creates a proxy transaction for an invocation if required. +// CreateProxyTxn creates a proxy transaction for an invocation if required. // A proxy transaction will be required to be created if the agent has // registered a transaction for the invocation but has not sent the // corresponding transaction to the extension. -func (inc *Invocation) Finalize(status string, time time.Time) ([]byte, error) { +func (inc *Invocation) CreateProxyTxn(status string, time time.Time) ([]byte, error) { if !inc.NeedProxyTransaction() { return nil, nil } - return inc.createProxyTxn(status, time) -} - -func (inc *Invocation) createProxyTxn(status string, time time.Time) ([]byte, error) { txn, err := sjson.SetBytes(inc.AgentPayload, "transaction.result", status) if err != nil { return nil, err diff --git a/accumulator/invocation_test.go b/accumulator/invocation_test.go index 9ab4bd4a..19ceab39 100644 --- a/accumulator/invocation_test.go +++ b/accumulator/invocation_test.go @@ -25,7 +25,7 @@ import ( "github.com/stretchr/testify/assert" ) -func TestFinalize(t *testing.T) { +func TestCreateProxyTransaction(t *testing.T) { txnDur := time.Second for _, tc := range []struct { name string @@ -89,7 +89,7 @@ func TestFinalize(t *testing.T) { AgentPayload: []byte(tc.payload), TransactionObserved: tc.txnObserved, } - result, err := inc.Finalize(tc.runtimeDoneStatus, ts.Add(txnDur)) + result, err := inc.CreateProxyTxn(tc.runtimeDoneStatus, ts.Add(txnDur)) assert.Nil(t, err) if len(tc.output) > 0 { assert.JSONEq(t, tc.output, string(result)) @@ -114,7 +114,7 @@ func BenchmarkCreateProxyTxn(b *testing.B) { b.ReportAllocs() b.ResetTimer() for i := 0; i < b.N; i++ { - _, err := inc.createProxyTxn("success", txnDur) + _, err := inc.CreateProxyTxn("success", txnDur) if err != nil { b.Fail() } diff --git a/app/run.go b/app/run.go index 9a8f032b..9fa2d08f 100644 --- a/app/run.go +++ b/app/run.go @@ -79,9 +79,6 @@ func (app *App) Run(ctx context.Context) error { } } - // The previous event id is used to validate the received Lambda metrics - var prevEvent *extension.NextEventResponse - for { select { case <-ctx.Done(): @@ -91,7 +88,7 @@ func (app *App) Run(ctx context.Context) error { // Use a wait group to ensure the background go routine sending to the APM server // completes before signaling that the extension is ready for the next invocation. var backgroundDataSendWg sync.WaitGroup - event, err := app.processEvent(ctx, &backgroundDataSendWg, prevEvent) + event, err := app.processEvent(ctx, &backgroundDataSendWg) if err != nil { return err } @@ -110,7 +107,6 @@ func (app *App) Run(ctx context.Context) error { app.apmClient.FlushAPMData(flushCtx) cancel() } - prevEvent = event } } } @@ -118,7 +114,6 @@ func (app *App) Run(ctx context.Context) error { func (app *App) processEvent( ctx context.Context, backgroundDataSendWg *sync.WaitGroup, - prevEvent *extension.NextEventResponse, ) (*extension.NextEventResponse, error) { // Reset flush state for future events. defer app.apmClient.ResetFlush() @@ -179,7 +174,7 @@ func (app *App) processEvent( // also possible that lambda has init a few execution env preemptively, // for such cases the extension will see only a SHUTDOWN event and // there is no need to wait for any log event. - if prevEvent == nil { + if app.batch.Size() == 0 { return event, nil } } @@ -204,7 +199,6 @@ func (app *App) processEvent( event.RequestID, event.InvokedFunctionArn, app.apmClient.LambdaDataChannel, - prevEvent, event.EventType == extension.Shutdown, ) }() diff --git a/logsapi/client.go b/logsapi/client.go index 9bb30096..2ba01454 100644 --- a/logsapi/client.go +++ b/logsapi/client.go @@ -48,6 +48,9 @@ type ClientOption func(*Client) type invocationLifecycler interface { OnLambdaLogRuntimeDone(requestID, status string, time time.Time) error + OnPlatformReport(reqID string) (fnARN string, deadlineMs int64, ts time.Time, err error) + // Size should return the number of invocations waiting on platform.report + Size() int } // Client is the client used to subscribe to the Logs API. diff --git a/logsapi/event.go b/logsapi/event.go index cb3c5f8b..54fc9be7 100644 --- a/logsapi/event.go +++ b/logsapi/event.go @@ -20,8 +20,6 @@ package logsapi import ( "context" "time" - - "github.com/elastic/apm-aws-lambda/extension" ) // LogEventType represents the log type that is received in the log messages @@ -63,7 +61,6 @@ func (lc *Client) ProcessLogs( requestID string, invokedFnArn string, dataChan chan []byte, - prevEvent *extension.NextEventResponse, isShutdown bool, ) { // platformStartReqID is to identify the requestID for the function @@ -94,10 +91,12 @@ func (lc *Client) ProcessLogs( return } case PlatformReport: - // TODO: @lahsivjar Refactor usage of prevEvent.RequestID (should now query the batch?) - if prevEvent != nil && logEvent.Record.RequestID == prevEvent.RequestID { + fnARN, deadlineMs, ts, err := lc.invocationLifecycler.OnPlatformReport(logEvent.Record.RequestID) + if err != nil { + lc.logger.Warnf("Failed to process platform report: %v", err) + } else { lc.logger.Debugf("Received platform report for %s", logEvent.Record.RequestID) - processedMetrics, err := ProcessPlatformReport(prevEvent, logEvent) + processedMetrics, err := ProcessPlatformReport(fnARN, deadlineMs, ts, logEvent) if err != nil { lc.logger.Errorf("Error processing Lambda platform metrics: %v", err) } else { @@ -106,17 +105,16 @@ func (lc *Client) ProcessLogs( case <-ctx.Done(): } } - // For shutdown event the platform report metrics for the previous log event - // would be the last possible log event. - if isShutdown { - lc.logger.Debugf( - "Processed platform report event for reqID %s as the last log event before shutdown", - logEvent.Record.RequestID, - ) - return - } - } else { - lc.logger.Warn("Report event request id didn't match the previous event id") + } + // For shutdown event the platform report metrics for the previous log event + // would be the last possible log event. After processing this metric the + // invocation lifecycler's cache should be empty. + if isShutdown && lc.invocationLifecycler.Size() == 0 { + lc.logger.Debugf( + "Processed platform report event for reqID %s as the last log event before shutdown", + logEvent.Record.RequestID, + ) + return } case PlatformLogsDropped: lc.logger.Warnf("Logs dropped due to extension falling behind: %v", logEvent.Record) diff --git a/logsapi/metrics.go b/logsapi/metrics.go index 70b2b227..2a26aa48 100644 --- a/logsapi/metrics.go +++ b/logsapi/metrics.go @@ -19,8 +19,8 @@ package logsapi import ( "math" + "time" - "github.com/elastic/apm-aws-lambda/extension" "go.elastic.co/apm/v2/model" "go.elastic.co/fastjson" ) @@ -64,7 +64,7 @@ func (mc MetricsContainer) MarshalFastJSON(json *fastjson.Writer) error { // ProcessPlatformReport processes the `platform.report` log line from lambda logs API and // returns a byte array containing the JSON body for the extracted platform metrics. A non // nil error is returned when marshaling of platform metrics into JSON fails. -func ProcessPlatformReport(functionData *extension.NextEventResponse, platformReport LogEvent) ([]byte, error) { +func ProcessPlatformReport(fnARN string, deadlineMs int64, ts time.Time, platformReport LogEvent) ([]byte, error) { metricsContainer := MetricsContainer{ Metrics: &model.Metrics{}, } @@ -78,7 +78,7 @@ func ProcessPlatformReport(functionData *extension.NextEventResponse, platformRe // FaaS Fields metricsContainer.Metrics.FAAS = &model.FAAS{ Execution: platformReport.Record.RequestID, - ID: functionData.InvokedFunctionArn, + ID: fnARN, Coldstart: platformReportMetrics.InitDurationMs > 0, } @@ -95,7 +95,7 @@ func ProcessPlatformReport(functionData *extension.NextEventResponse, platformRe // - The epoch corresponding to the end of the current invocation (its "deadline") // - The epoch corresponding to the start of the current invocation // - The multiplication / division then rounds the value to obtain a number of ms that can be expressed a multiple of 1000 (see initial assumption) - metricsContainer.Add("faas.timeout", math.Ceil(float64(functionData.DeadlineMs-functionData.Timestamp.UnixMilli())/1e3)*1e3) // Unit : Milliseconds + metricsContainer.Add("faas.timeout", math.Ceil(float64(deadlineMs-ts.UnixMilli())/1e3)*1e3) // Unit : Milliseconds var jsonWriter fastjson.Writer if err := metricsContainer.MarshalFastJSON(&jsonWriter); err != nil { diff --git a/logsapi/metrics_test.go b/logsapi/metrics_test.go index 1aa17a04..468dbc9e 100644 --- a/logsapi/metrics_test.go +++ b/logsapi/metrics_test.go @@ -66,7 +66,7 @@ func TestProcessPlatformReport_Coldstart(t *testing.T) { desiredOutputMetrics := fmt.Sprintf(`{"metricset":{"samples":{"faas.coldstart_duration":{"value":422.9700012207031},"faas.timeout":{"value":5000},"system.memory.total":{"value":1.34217728e+08},"system.memory.actual.free":{"value":5.4525952e+07},"faas.duration":{"value":182.42999267578125},"faas.billed_duration":{"value":183}},"timestamp":%d,"faas":{"coldstart":true,"execution":"6f7f0961f83442118a7af6fe80b88d56","id":"arn:aws:lambda:us-east-2:123456789012:function:custom-runtime"}}}`, timestamp.UnixNano()/1e3) - data, err := ProcessPlatformReport(&event, logEvent) + data, err := ProcessPlatformReport(event.InvokedFunctionArn, event.DeadlineMs, event.Timestamp, logEvent) require.NoError(t, err) assert.JSONEq(t, desiredOutputMetrics, string(data)) @@ -110,7 +110,7 @@ func TestProcessPlatformReport_NoColdstart(t *testing.T) { desiredOutputMetrics := fmt.Sprintf(`{"metricset":{"samples":{"faas.coldstart_duration":{"value":0},"faas.timeout":{"value":5000},"system.memory.total":{"value":1.34217728e+08},"system.memory.actual.free":{"value":5.4525952e+07},"faas.duration":{"value":182.42999267578125},"faas.billed_duration":{"value":183}},"timestamp":%d,"faas":{"coldstart":false,"execution":"6f7f0961f83442118a7af6fe80b88d56","id":"arn:aws:lambda:us-east-2:123456789012:function:custom-runtime"}}}`, timestamp.UnixNano()/1e3) - data, err := ProcessPlatformReport(&event, logEvent) + data, err := ProcessPlatformReport(event.InvokedFunctionArn, event.DeadlineMs, event.Timestamp, logEvent) require.NoError(t, err) assert.JSONEq(t, desiredOutputMetrics, string(data)) @@ -142,7 +142,12 @@ func BenchmarkPlatformReport(b *testing.B) { } for n := 0; n < b.N; n++ { - _, err := ProcessPlatformReport(nextEventResp, logEvent) + _, err := ProcessPlatformReport( + nextEventResp.InvokedFunctionArn, + nextEventResp.DeadlineMs, + nextEventResp.Timestamp, + logEvent, + ) require.NoError(b, err) } }