From 27c83f31b23f1295a8d2946599c340d90c6d9fea Mon Sep 17 00:00:00 2001 From: Vishal Raj Date: Tue, 10 Jan 2023 12:44:14 +0530 Subject: [PATCH 1/4] Process platform report metrics when extn is lagging --- accumulator/batch.go | 26 +++++++++++++++++--- accumulator/invocation.go | 12 ++++------ accumulator/invocation_test.go | 6 ++--- logsapi/client.go | 1 + logsapi/event.go | 44 +++++++++++++++++----------------- logsapi/metrics.go | 8 +++---- logsapi/metrics_test.go | 11 ++++++--- 7 files changed, 66 insertions(+), 42 deletions(-) diff --git a/accumulator/batch.go b/accumulator/batch.go index 20d773dc..96d13fae 100644 --- a/accumulator/batch.go +++ b/accumulator/batch.go @@ -185,6 +185,21 @@ func (b *Batch) OnLambdaLogRuntimeDone(reqID, status string, time time.Time) err return b.finalizeInvocation(reqID, status, time) } +// OnPlatformReport should be the last event for a request ID. On receiving the +// platform.report event the batch will cleanup any datastructure for the request +// ID. It will return some of the function metadata to allow the caller to enrich +// the report metrics. +func (b *Batch) OnPlatformReport(reqID string) (string, int64, time.Time, error) { + b.mu.Lock() + defer b.mu.Unlock() + inc, ok := b.invocations[reqID] + if !ok { + return "", 0, time.Time{}, fmt.Errorf("invocation for requestID %s does not exist", reqID) + } + delete(b.invocations, reqID) + return inc.FunctionARN, inc.DeadlineMs, inc.Timestamp, nil +} + // OnShutdown flushes the data for shipping to APM Server by finalizing all // the invocation in the batch. If we haven't received a platform.runtimeDone // event for an invocation so far we won't be able to recieve it in time thus @@ -201,6 +216,7 @@ func (b *Batch) OnShutdown(status string) error { if err := b.finalizeInvocation(inc.RequestID, status, time); err != nil { return err } + delete(b.invocations, inc.RequestID) } return nil } @@ -257,12 +273,16 @@ func (b *Batch) finalizeInvocation(reqID, status string, time time.Time) error { if !ok { return fmt.Errorf("invocation for requestID %s does not exist", reqID) } - defer delete(b.invocations, reqID) - proxyTxn, err := inc.Finalize(status, time) + proxyTxn, err := inc.CreateProxyTxn(status, time) if err != nil { return err } - return b.addData(proxyTxn) + err = b.addData(proxyTxn) + if err != nil { + return err + } + inc.Finalized = true + return nil } func (b *Batch) addData(data []byte) error { diff --git a/accumulator/invocation.go b/accumulator/invocation.go index 985a6d1b..451f3a2e 100644 --- a/accumulator/invocation.go +++ b/accumulator/invocation.go @@ -47,26 +47,24 @@ type Invocation struct { // TransactionObserved is true if the root transaction ID for the // invocation is observed by the extension. TransactionObserved bool + // Finalized tracks if the invocation has been finalized or not. + Finalized bool } // NeedProxyTransaction returns true if a proxy transaction needs to be // created based on the information available. func (inc *Invocation) NeedProxyTransaction() bool { - return inc.TransactionID != "" && !inc.TransactionObserved + return !inc.Finalized && inc.TransactionID != "" && !inc.TransactionObserved } -// Finalize creates a proxy transaction for an invocation if required. +// CreateProxyTxn creates a proxy transaction for an invocation if required. // A proxy transaction will be required to be created if the agent has // registered a transaction for the invocation but has not sent the // corresponding transaction to the extension. -func (inc *Invocation) Finalize(status string, time time.Time) ([]byte, error) { +func (inc *Invocation) CreateProxyTxn(status string, time time.Time) ([]byte, error) { if !inc.NeedProxyTransaction() { return nil, nil } - return inc.createProxyTxn(status, time) -} - -func (inc *Invocation) createProxyTxn(status string, time time.Time) ([]byte, error) { txn, err := sjson.SetBytes(inc.AgentPayload, "transaction.result", status) if err != nil { return nil, err diff --git a/accumulator/invocation_test.go b/accumulator/invocation_test.go index 9ab4bd4a..19ceab39 100644 --- a/accumulator/invocation_test.go +++ b/accumulator/invocation_test.go @@ -25,7 +25,7 @@ import ( "github.com/stretchr/testify/assert" ) -func TestFinalize(t *testing.T) { +func TestCreateProxyTransaction(t *testing.T) { txnDur := time.Second for _, tc := range []struct { name string @@ -89,7 +89,7 @@ func TestFinalize(t *testing.T) { AgentPayload: []byte(tc.payload), TransactionObserved: tc.txnObserved, } - result, err := inc.Finalize(tc.runtimeDoneStatus, ts.Add(txnDur)) + result, err := inc.CreateProxyTxn(tc.runtimeDoneStatus, ts.Add(txnDur)) assert.Nil(t, err) if len(tc.output) > 0 { assert.JSONEq(t, tc.output, string(result)) @@ -114,7 +114,7 @@ func BenchmarkCreateProxyTxn(b *testing.B) { b.ReportAllocs() b.ResetTimer() for i := 0; i < b.N; i++ { - _, err := inc.createProxyTxn("success", txnDur) + _, err := inc.CreateProxyTxn("success", txnDur) if err != nil { b.Fail() } diff --git a/logsapi/client.go b/logsapi/client.go index 9bb30096..d8e036db 100644 --- a/logsapi/client.go +++ b/logsapi/client.go @@ -48,6 +48,7 @@ type ClientOption func(*Client) type invocationLifecycler interface { OnLambdaLogRuntimeDone(requestID, status string, time time.Time) error + OnPlatformReport(reqID string) (fnARN string, deadlineMs int64, ts time.Time, err error) } // Client is the client used to subscribe to the Logs API. diff --git a/logsapi/event.go b/logsapi/event.go index cb3c5f8b..c53eedc0 100644 --- a/logsapi/event.go +++ b/logsapi/event.go @@ -94,29 +94,29 @@ func (lc *Client) ProcessLogs( return } case PlatformReport: - // TODO: @lahsivjar Refactor usage of prevEvent.RequestID (should now query the batch?) - if prevEvent != nil && logEvent.Record.RequestID == prevEvent.RequestID { - lc.logger.Debugf("Received platform report for %s", logEvent.Record.RequestID) - processedMetrics, err := ProcessPlatformReport(prevEvent, logEvent) - if err != nil { - lc.logger.Errorf("Error processing Lambda platform metrics: %v", err) - } else { - select { - case dataChan <- processedMetrics: - case <-ctx.Done(): - } - } - // For shutdown event the platform report metrics for the previous log event - // would be the last possible log event. - if isShutdown { - lc.logger.Debugf( - "Processed platform report event for reqID %s as the last log event before shutdown", - logEvent.Record.RequestID, - ) - return - } + fnARN, deadlineMs, ts, err := lc.invocationLifecycler.OnPlatformReport(logEvent.Record.RequestID) + if err != nil { + lc.logger.Warnf("Failed to process platform report: %v", err) + continue + } + lc.logger.Debugf("Received platform report for %s", logEvent.Record.RequestID) + processedMetrics, err := ProcessPlatformReport(fnARN, deadlineMs, ts, logEvent) + if err != nil { + lc.logger.Errorf("Error processing Lambda platform metrics: %v", err) } else { - lc.logger.Warn("Report event request id didn't match the previous event id") + select { + case dataChan <- processedMetrics: + case <-ctx.Done(): + } + } + // For shutdown event the platform report metrics for the previous log event + // would be the last possible log event. + if isShutdown { + lc.logger.Debugf( + "Processed platform report event for reqID %s as the last log event before shutdown", + logEvent.Record.RequestID, + ) + return } case PlatformLogsDropped: lc.logger.Warnf("Logs dropped due to extension falling behind: %v", logEvent.Record) diff --git a/logsapi/metrics.go b/logsapi/metrics.go index 70b2b227..2a26aa48 100644 --- a/logsapi/metrics.go +++ b/logsapi/metrics.go @@ -19,8 +19,8 @@ package logsapi import ( "math" + "time" - "github.com/elastic/apm-aws-lambda/extension" "go.elastic.co/apm/v2/model" "go.elastic.co/fastjson" ) @@ -64,7 +64,7 @@ func (mc MetricsContainer) MarshalFastJSON(json *fastjson.Writer) error { // ProcessPlatformReport processes the `platform.report` log line from lambda logs API and // returns a byte array containing the JSON body for the extracted platform metrics. A non // nil error is returned when marshaling of platform metrics into JSON fails. -func ProcessPlatformReport(functionData *extension.NextEventResponse, platformReport LogEvent) ([]byte, error) { +func ProcessPlatformReport(fnARN string, deadlineMs int64, ts time.Time, platformReport LogEvent) ([]byte, error) { metricsContainer := MetricsContainer{ Metrics: &model.Metrics{}, } @@ -78,7 +78,7 @@ func ProcessPlatformReport(functionData *extension.NextEventResponse, platformRe // FaaS Fields metricsContainer.Metrics.FAAS = &model.FAAS{ Execution: platformReport.Record.RequestID, - ID: functionData.InvokedFunctionArn, + ID: fnARN, Coldstart: platformReportMetrics.InitDurationMs > 0, } @@ -95,7 +95,7 @@ func ProcessPlatformReport(functionData *extension.NextEventResponse, platformRe // - The epoch corresponding to the end of the current invocation (its "deadline") // - The epoch corresponding to the start of the current invocation // - The multiplication / division then rounds the value to obtain a number of ms that can be expressed a multiple of 1000 (see initial assumption) - metricsContainer.Add("faas.timeout", math.Ceil(float64(functionData.DeadlineMs-functionData.Timestamp.UnixMilli())/1e3)*1e3) // Unit : Milliseconds + metricsContainer.Add("faas.timeout", math.Ceil(float64(deadlineMs-ts.UnixMilli())/1e3)*1e3) // Unit : Milliseconds var jsonWriter fastjson.Writer if err := metricsContainer.MarshalFastJSON(&jsonWriter); err != nil { diff --git a/logsapi/metrics_test.go b/logsapi/metrics_test.go index 1aa17a04..468dbc9e 100644 --- a/logsapi/metrics_test.go +++ b/logsapi/metrics_test.go @@ -66,7 +66,7 @@ func TestProcessPlatformReport_Coldstart(t *testing.T) { desiredOutputMetrics := fmt.Sprintf(`{"metricset":{"samples":{"faas.coldstart_duration":{"value":422.9700012207031},"faas.timeout":{"value":5000},"system.memory.total":{"value":1.34217728e+08},"system.memory.actual.free":{"value":5.4525952e+07},"faas.duration":{"value":182.42999267578125},"faas.billed_duration":{"value":183}},"timestamp":%d,"faas":{"coldstart":true,"execution":"6f7f0961f83442118a7af6fe80b88d56","id":"arn:aws:lambda:us-east-2:123456789012:function:custom-runtime"}}}`, timestamp.UnixNano()/1e3) - data, err := ProcessPlatformReport(&event, logEvent) + data, err := ProcessPlatformReport(event.InvokedFunctionArn, event.DeadlineMs, event.Timestamp, logEvent) require.NoError(t, err) assert.JSONEq(t, desiredOutputMetrics, string(data)) @@ -110,7 +110,7 @@ func TestProcessPlatformReport_NoColdstart(t *testing.T) { desiredOutputMetrics := fmt.Sprintf(`{"metricset":{"samples":{"faas.coldstart_duration":{"value":0},"faas.timeout":{"value":5000},"system.memory.total":{"value":1.34217728e+08},"system.memory.actual.free":{"value":5.4525952e+07},"faas.duration":{"value":182.42999267578125},"faas.billed_duration":{"value":183}},"timestamp":%d,"faas":{"coldstart":false,"execution":"6f7f0961f83442118a7af6fe80b88d56","id":"arn:aws:lambda:us-east-2:123456789012:function:custom-runtime"}}}`, timestamp.UnixNano()/1e3) - data, err := ProcessPlatformReport(&event, logEvent) + data, err := ProcessPlatformReport(event.InvokedFunctionArn, event.DeadlineMs, event.Timestamp, logEvent) require.NoError(t, err) assert.JSONEq(t, desiredOutputMetrics, string(data)) @@ -142,7 +142,12 @@ func BenchmarkPlatformReport(b *testing.B) { } for n := 0; n < b.N; n++ { - _, err := ProcessPlatformReport(nextEventResp, logEvent) + _, err := ProcessPlatformReport( + nextEventResp.InvokedFunctionArn, + nextEventResp.DeadlineMs, + nextEventResp.Timestamp, + logEvent, + ) require.NoError(b, err) } } From 94620cfe8941c5695690540b5685bead77268709 Mon Sep 17 00:00:00 2001 From: Vishal Raj Date: Tue, 10 Jan 2023 13:23:40 +0530 Subject: [PATCH 2/4] Remove prevEvent usage --- accumulator/batch.go | 7 +++++++ app/run.go | 10 ++-------- logsapi/event.go | 3 --- 3 files changed, 9 insertions(+), 11 deletions(-) diff --git a/accumulator/batch.go b/accumulator/batch.go index 96d13fae..02ddb5a0 100644 --- a/accumulator/batch.go +++ b/accumulator/batch.go @@ -82,6 +82,13 @@ func NewBatch(maxSize int, maxAge time.Duration) *Batch { } } +// Size returns the number of invocations cached in the batch. +func (b *Batch) Size() int { + b.mu.RLock() + defer b.mu.RUnlock() + return len(b.invocations) +} + // RegisterInvocation registers a new function invocation against its request // ID. It also updates the caches for currently executing request ID. func (b *Batch) RegisterInvocation( diff --git a/app/run.go b/app/run.go index 9a8f032b..9fa2d08f 100644 --- a/app/run.go +++ b/app/run.go @@ -79,9 +79,6 @@ func (app *App) Run(ctx context.Context) error { } } - // The previous event id is used to validate the received Lambda metrics - var prevEvent *extension.NextEventResponse - for { select { case <-ctx.Done(): @@ -91,7 +88,7 @@ func (app *App) Run(ctx context.Context) error { // Use a wait group to ensure the background go routine sending to the APM server // completes before signaling that the extension is ready for the next invocation. var backgroundDataSendWg sync.WaitGroup - event, err := app.processEvent(ctx, &backgroundDataSendWg, prevEvent) + event, err := app.processEvent(ctx, &backgroundDataSendWg) if err != nil { return err } @@ -110,7 +107,6 @@ func (app *App) Run(ctx context.Context) error { app.apmClient.FlushAPMData(flushCtx) cancel() } - prevEvent = event } } } @@ -118,7 +114,6 @@ func (app *App) Run(ctx context.Context) error { func (app *App) processEvent( ctx context.Context, backgroundDataSendWg *sync.WaitGroup, - prevEvent *extension.NextEventResponse, ) (*extension.NextEventResponse, error) { // Reset flush state for future events. defer app.apmClient.ResetFlush() @@ -179,7 +174,7 @@ func (app *App) processEvent( // also possible that lambda has init a few execution env preemptively, // for such cases the extension will see only a SHUTDOWN event and // there is no need to wait for any log event. - if prevEvent == nil { + if app.batch.Size() == 0 { return event, nil } } @@ -204,7 +199,6 @@ func (app *App) processEvent( event.RequestID, event.InvokedFunctionArn, app.apmClient.LambdaDataChannel, - prevEvent, event.EventType == extension.Shutdown, ) }() diff --git a/logsapi/event.go b/logsapi/event.go index c53eedc0..f19037c8 100644 --- a/logsapi/event.go +++ b/logsapi/event.go @@ -20,8 +20,6 @@ package logsapi import ( "context" "time" - - "github.com/elastic/apm-aws-lambda/extension" ) // LogEventType represents the log type that is received in the log messages @@ -63,7 +61,6 @@ func (lc *Client) ProcessLogs( requestID string, invokedFnArn string, dataChan chan []byte, - prevEvent *extension.NextEventResponse, isShutdown bool, ) { // platformStartReqID is to identify the requestID for the function From 5f8b341f25f3fbb7600fe30d87424ee381afe5bd Mon Sep 17 00:00:00 2001 From: Vishal Raj Date: Tue, 10 Jan 2023 15:31:18 +0530 Subject: [PATCH 3/4] Fix handling shutdown event --- logsapi/client.go | 2 ++ logsapi/event.go | 23 ++++++++++++----------- 2 files changed, 14 insertions(+), 11 deletions(-) diff --git a/logsapi/client.go b/logsapi/client.go index d8e036db..2ba01454 100644 --- a/logsapi/client.go +++ b/logsapi/client.go @@ -49,6 +49,8 @@ type ClientOption func(*Client) type invocationLifecycler interface { OnLambdaLogRuntimeDone(requestID, status string, time time.Time) error OnPlatformReport(reqID string) (fnARN string, deadlineMs int64, ts time.Time, err error) + // Size should return the number of invocations waiting on platform.report + Size() int } // Client is the client used to subscribe to the Logs API. diff --git a/logsapi/event.go b/logsapi/event.go index f19037c8..54fc9be7 100644 --- a/logsapi/event.go +++ b/logsapi/event.go @@ -94,21 +94,22 @@ func (lc *Client) ProcessLogs( fnARN, deadlineMs, ts, err := lc.invocationLifecycler.OnPlatformReport(logEvent.Record.RequestID) if err != nil { lc.logger.Warnf("Failed to process platform report: %v", err) - continue - } - lc.logger.Debugf("Received platform report for %s", logEvent.Record.RequestID) - processedMetrics, err := ProcessPlatformReport(fnARN, deadlineMs, ts, logEvent) - if err != nil { - lc.logger.Errorf("Error processing Lambda platform metrics: %v", err) } else { - select { - case dataChan <- processedMetrics: - case <-ctx.Done(): + lc.logger.Debugf("Received platform report for %s", logEvent.Record.RequestID) + processedMetrics, err := ProcessPlatformReport(fnARN, deadlineMs, ts, logEvent) + if err != nil { + lc.logger.Errorf("Error processing Lambda platform metrics: %v", err) + } else { + select { + case dataChan <- processedMetrics: + case <-ctx.Done(): + } } } // For shutdown event the platform report metrics for the previous log event - // would be the last possible log event. - if isShutdown { + // would be the last possible log event. After processing this metric the + // invocation lifecycler's cache should be empty. + if isShutdown && lc.invocationLifecycler.Size() == 0 { lc.logger.Debugf( "Processed platform report event for reqID %s as the last log event before shutdown", logEvent.Record.RequestID, From 883f682b90dd2c10b16430f2dd474f1f904e2741 Mon Sep 17 00:00:00 2001 From: Vishal Raj Date: Thu, 12 Jan 2023 11:31:07 +0530 Subject: [PATCH 4/4] Add changelog --- CHANGELOG.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index f907f1f9..94c06894 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -26,6 +26,7 @@ https://github.com/elastic/apm-aws-lambda/compare/v1.2.0...main[View commits] ===== Features - experimental:[] Create proxy transaction with error results if not reported by agent {lambda-pull}315[315] - Wait for the final platform report metrics on shutdown {lambda-pull}347[347] +- Process platform report metrics when extension is lagging {lambda-pull}358[358] [float] [[lambda-1.2.0]]