From 2fad63bb63fe130683cbd7396bf87a66ec11f515 Mon Sep 17 00:00:00 2001 From: Vishal Raj Date: Tue, 22 Nov 2022 16:26:02 +0800 Subject: [PATCH 1/5] Wait for the final platform report metrics on shutdown --- apmproxy/receiver.go | 3 +- app/run.go | 75 +++++++++++++++++++++++++------------------- logsapi/event.go | 61 ++++++++++++++++++++--------------- 3 files changed, 79 insertions(+), 60 deletions(-) diff --git a/apmproxy/receiver.go b/apmproxy/receiver.go index 15ebeed7..cad3fdbf 100644 --- a/apmproxy/receiver.go +++ b/apmproxy/receiver.go @@ -88,7 +88,8 @@ func (c *Client) handleInfoRequest() (func(w http.ResponseWriter, r *http.Reques reverseProxy.Transport = customTransport reverseProxy.ErrorHandler = func(w http.ResponseWriter, r *http.Request, err error) { - c.UpdateStatus(r.Context(), Failing) + // Don't update the status of the transport as it is possible that the extension + // is frozen while processing the request and context is canceled due to timeout. c.logger.Errorf("Error querying version from the APM server: %v", err) } diff --git a/app/run.go b/app/run.go index 33d705c2..24dfa2d6 100644 --- a/app/run.go +++ b/app/run.go @@ -55,6 +55,14 @@ func (app *App) Run(ctx context.Context) error { } }() + // Flush all data before shutting down. + defer func() { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + app.apmClient.FlushAPMData(ctx) + }() + if app.logsClient != nil { if err := app.logsClient.StartService(app.extensionClient.ExtensionID); err != nil { app.logger.Warnf("Error while subscribing to the Logs API: %v", err) @@ -71,14 +79,6 @@ func (app *App) Run(ctx context.Context) error { } } - // Flush all data before shutting down. - defer func() { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - - app.apmClient.FlushAPMData(ctx) - }() - // The previous event id is used to validate the received Lambda metrics var prevEvent *extension.NextEventResponse @@ -86,7 +86,6 @@ func (app *App) Run(ctx context.Context) error { select { case <-ctx.Done(): app.logger.Info("Received a signal, exiting...") - return nil default: // Use a wait group to ensure the background go routine sending to the APM server @@ -96,13 +95,12 @@ func (app *App) Run(ctx context.Context) error { if err != nil { return err } - + app.logger.Debug("Waiting for background data send to end") + backgroundDataSendWg.Wait() if event.EventType == extension.Shutdown { - app.logger.Infof("Received shutdown event: %s. Exiting...", event.ShutdownReason) + app.logger.Infof("Exiting due to shutdown event with reason %s", event.ShutdownReason) return nil } - app.logger.Debug("Waiting for background data send to end") - backgroundDataSendWg.Wait() if app.apmClient.ShouldFlush() { // Use a new cancellable context for flushing APM data to make sure // that the underlying transport is reset for next invocation without @@ -163,15 +161,27 @@ func (app *App) processEvent( // At shutdown we can not expect platform.runtimeDone events to be reported // for the remaining invocations. If we haven't received the transaction // from agents at this point then it is safe to assume that the function - // timed out. We will create proxy transaction for all invocations that + // failed. We will create proxy transaction for all invocations that // haven't received a full transaction from the agent yet. If extension // doesn't have enough CPU time it is possible that the extension might // not receive the shutdown signal for timeouts or runtime crashes. In // these cases we will miss the transaction. - if err := app.batch.OnShutdown("timeout"); err != nil { - app.logger.Errorf("Error finalizing invocation on shutdown: %v", err) + app.logger.Debugf("Received shutdown event with reason %s", event.ShutdownReason) + defer func() { + if err := app.batch.OnShutdown(event.ShutdownReason); err != nil { + app.logger.Errorf("Error finalizing invocation on shutdown: %v", err) + } + }() + + // platform.report metric (and some other metrics) might not have been + // reported by the logs API even till shutdown. At shutdown we will make + // a last attempt to collect and report these metrics. However, it is + // also possible that lambda has init a few execution env preemptively, + // for such cases the extension will see only a SHUTDOWN event and + // there is no need to wait for any log event. + if prevEvent == nil { + return event, nil } - return event, nil } // APM Data Processing @@ -185,50 +195,49 @@ func (app *App) processEvent( // Lambda Service Logs Processing, also used to extract metrics from APM logs // This goroutine should not be started if subscription failed - runtimeDone := make(chan struct{}) + logProcessingDone := make(chan struct{}) if app.logsClient != nil { go func() { + defer close(logProcessingDone) if err := app.logsClient.ProcessLogs( invocationCtx, event.RequestID, event.InvokedFunctionArn, app.apmClient.LambdaDataChannel, - runtimeDone, prevEvent, + event.EventType == extension.Shutdown, ); err != nil { app.logger.Errorf("Error while processing Lambda Logs ; %v", err) - } else { - close(runtimeDone) } }() } else { app.logger.Warn("Logs collection not started due to earlier subscription failure") - close(runtimeDone) + close(logProcessingDone) } // Calculate how long to wait for a runtimeDoneSignal or AgentDoneSignal signal - flushDeadlineMs := event.DeadlineMs - 200 + flushDeadlineMs := event.DeadlineMs - 50 durationUntilFlushDeadline := time.Until(time.Unix(flushDeadlineMs/1000, 0)) // Create a timer that expires after durationUntilFlushDeadline timer := time.NewTimer(durationUntilFlushDeadline) defer timer.Stop() - // The extension relies on 3 independent mechanisms to minimize the time interval between the end of the execution of - // the lambda function and the end of the execution of processEvent() - // 1) AgentDoneSignal is triggered upon reception of a `flushed=true` query from the agent - // 2) [Backup 1] RuntimeDone is triggered upon reception of a Lambda log entry certifying the end of the execution of the current function - // 3) [Backup 2] If all else fails, the extension relies of the timeout of the Lambda function to interrupt itself 200 ms before the specified deadline. - // This time interval is large enough to attempt a last flush attempt (if SendStrategy == syncFlush) before the environment gets shut down. - + // The extension relies on 3 independent mechanisms to minimize the time interval + // between the end of the execution of the lambda function and the end of the + // execution of processEvent(): + // 1) AgentDoneSignal triggered upon reception of a `flushed=true` query from the agent + // 2) [Backup 1] All expected log events are processed. + // 3) [Backup 2] If all else fails, the extension relies of the timeout of the Lambda + // function to interrupt itself 200ms before the specified deadline to give the extension + // time to flush data before shutdown. select { case <-app.apmClient.WaitForFlush(): app.logger.Debug("APM client has sent flush signal") - case <-runtimeDone: + case <-logProcessingDone: app.logger.Debug("Received runtimeDone signal") case <-timer.C: - app.logger.Info("Time expired waiting for agent signal or runtimeDone event") + app.logger.Info("Time expired while waiting for final log event") } - return event, nil } diff --git a/logsapi/event.go b/logsapi/event.go index 7b05698a..7a8d79ca 100644 --- a/logsapi/event.go +++ b/logsapi/event.go @@ -19,6 +19,7 @@ package logsapi import ( "context" + "fmt" "time" "github.com/elastic/apm-aws-lambda/extension" @@ -53,15 +54,18 @@ type LogEventRecord struct { Metrics PlatformMetrics `json:"metrics"` } -// ProcessLogs consumes events until a RuntimeDone event corresponding -// to requestID is received, or ctx is canceled, and then returns. +// ProcessLogs consumes log events until there are no more log events that +// can be consumed or ctx is cancelled. For INVOKE event this state is +// reached when runtimeDone event for the current requestID is processed +// whereas for SHUTDOWN event this state is reached when the platformReport +// event for the previous requestID is processed. func (lc *Client) ProcessLogs( ctx context.Context, requestID string, invokedFnArn string, dataChan chan []byte, - runtimeDoneSignal chan struct{}, prevEvent *extension.NextEventResponse, + isShutdown bool, ) error { // platformStartReqID is to identify the requestID for the function // logs under the assumption that function logs for a specific request @@ -70,7 +74,7 @@ func (lc *Client) ProcessLogs( for { select { case logEvent := <-lc.logsChannel: - lc.logger.Debugf("Received log event %v", logEvent.Type) + lc.logger.Debugf("Received log event %v for request ID %s", logEvent.Type, logEvent.Record.RequestID) switch logEvent.Type { case PlatformStart: platformStartReqID = logEvent.Record.RequestID @@ -82,47 +86,52 @@ func (lc *Client) ProcessLogs( ); err != nil { lc.logger.Warnf("Failed to finalize invocation with request ID %s: %v", logEvent.Record.RequestID, err) } - // For the current invocation the platform.runtimeDone would be the last event - if logEvent.Record.RequestID == requestID { - lc.logger.Info("Received runtimeDone event for this function invocation") - runtimeDoneSignal <- struct{}{} + // For invocation events the platform.runtimeDone would be the last possible event. + if !isShutdown && logEvent.Record.RequestID == requestID { + lc.logger.Debugf( + "Processed runtime done event for reqID %s as the last log event for the invocation", + logEvent.Record.RequestID, + ) return nil } - lc.logger.Debug("Log API runtimeDone event request id didn't match") - // Check if the logEvent contains metrics and verify that they can be linked to the previous invocation case PlatformReport: // TODO: @lahsivjar Refactor usage of prevEvent.RequestID (should now query the batch?) if prevEvent != nil && logEvent.Record.RequestID == prevEvent.RequestID { - lc.logger.Debug("Received platform report for the previous function invocation") + lc.logger.Debugf("Received platform report for %s", logEvent.Record.RequestID) processedMetrics, err := ProcessPlatformReport(prevEvent, logEvent) if err != nil { - lc.logger.Errorf("Error processing Lambda platform metrics: %v", err) - } else { - select { - case dataChan <- processedMetrics: - case <-ctx.Done(): - } + return fmt.Errorf("Error processing Lambda platform metrics: %v", err) + } + select { + case dataChan <- processedMetrics: + case <-ctx.Done(): + } + // For shutdown event the platform report metrics for the previous log event + // would be the last possible log event. + if isShutdown { + lc.logger.Debugf( + "Processed platform report event for reqID %s as the last log event before shutdown", + logEvent.Record.RequestID, + ) + return nil } } else { - lc.logger.Warn("report event request id didn't match the previous event id") - lc.logger.Debug("Log API runtimeDone event request id didn't match") + lc.logger.Warn("Report event request id didn't match the previous event id") } case PlatformLogsDropped: lc.logger.Warnf("Logs dropped due to extension falling behind: %v", logEvent.Record) case FunctionLog: - lc.logger.Debug("Received function log") processedLog, err := ProcessFunctionLog( platformStartReqID, invokedFnArn, logEvent, ) if err != nil { - lc.logger.Errorf("Error processing function log : %v", err) - } else { - select { - case dataChan <- processedLog: - case <-ctx.Done(): - } + return fmt.Errorf("Error processing function log : %v", err) + } + select { + case dataChan <- processedLog: + case <-ctx.Done(): } } case <-ctx.Done(): From 0897019aeeb3a1b3ee20acdb6a680c65da2ae31c Mon Sep 17 00:00:00 2001 From: Vishal Raj Date: Thu, 24 Nov 2022 20:18:02 +0800 Subject: [PATCH 2/5] Fix log message --- app/run.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/run.go b/app/run.go index 24dfa2d6..444bd0cb 100644 --- a/app/run.go +++ b/app/run.go @@ -237,7 +237,7 @@ func (app *App) processEvent( case <-logProcessingDone: app.logger.Debug("Received runtimeDone signal") case <-timer.C: - app.logger.Info("Time expired while waiting for final log event") + app.logger.Info("Time expired while waiting for agent done signal or final log event") } return event, nil } From ad3ebdc5f2c6355ff8e0a7b3a37db47c7ae64e79 Mon Sep 17 00:00:00 2001 From: Vishal Raj Date: Thu, 24 Nov 2022 21:43:27 +0800 Subject: [PATCH 3/5] Revert flush buffer to 200ms --- app/run.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/run.go b/app/run.go index 444bd0cb..8f20f0f6 100644 --- a/app/run.go +++ b/app/run.go @@ -216,7 +216,7 @@ func (app *App) processEvent( } // Calculate how long to wait for a runtimeDoneSignal or AgentDoneSignal signal - flushDeadlineMs := event.DeadlineMs - 50 + flushDeadlineMs := event.DeadlineMs - 200 durationUntilFlushDeadline := time.Until(time.Unix(flushDeadlineMs/1000, 0)) // Create a timer that expires after durationUntilFlushDeadline From 11a68d48e41e0c253c4609b4f0fe78e8f2d60587 Mon Sep 17 00:00:00 2001 From: Vishal Raj Date: Fri, 25 Nov 2022 10:42:28 +0800 Subject: [PATCH 4/5] Refactor process logs to not return an error --- app/run.go | 6 ++---- logsapi/event.go | 31 ++++++++++++++++--------------- 2 files changed, 18 insertions(+), 19 deletions(-) diff --git a/app/run.go b/app/run.go index 8f20f0f6..9a8f032b 100644 --- a/app/run.go +++ b/app/run.go @@ -199,16 +199,14 @@ func (app *App) processEvent( if app.logsClient != nil { go func() { defer close(logProcessingDone) - if err := app.logsClient.ProcessLogs( + app.logsClient.ProcessLogs( invocationCtx, event.RequestID, event.InvokedFunctionArn, app.apmClient.LambdaDataChannel, prevEvent, event.EventType == extension.Shutdown, - ); err != nil { - app.logger.Errorf("Error while processing Lambda Logs ; %v", err) - } + ) }() } else { app.logger.Warn("Logs collection not started due to earlier subscription failure") diff --git a/logsapi/event.go b/logsapi/event.go index 7a8d79ca..cb3c5f8b 100644 --- a/logsapi/event.go +++ b/logsapi/event.go @@ -19,7 +19,6 @@ package logsapi import ( "context" - "fmt" "time" "github.com/elastic/apm-aws-lambda/extension" @@ -66,7 +65,7 @@ func (lc *Client) ProcessLogs( dataChan chan []byte, prevEvent *extension.NextEventResponse, isShutdown bool, -) error { +) { // platformStartReqID is to identify the requestID for the function // logs under the assumption that function logs for a specific request // ID will be bounded by PlatformStart and PlatformEnd events. @@ -92,7 +91,7 @@ func (lc *Client) ProcessLogs( "Processed runtime done event for reqID %s as the last log event for the invocation", logEvent.Record.RequestID, ) - return nil + return } case PlatformReport: // TODO: @lahsivjar Refactor usage of prevEvent.RequestID (should now query the batch?) @@ -100,11 +99,12 @@ func (lc *Client) ProcessLogs( lc.logger.Debugf("Received platform report for %s", logEvent.Record.RequestID) processedMetrics, err := ProcessPlatformReport(prevEvent, logEvent) if err != nil { - return fmt.Errorf("Error processing Lambda platform metrics: %v", err) - } - select { - case dataChan <- processedMetrics: - case <-ctx.Done(): + lc.logger.Errorf("Error processing Lambda platform metrics: %v", err) + } else { + select { + case dataChan <- processedMetrics: + case <-ctx.Done(): + } } // For shutdown event the platform report metrics for the previous log event // would be the last possible log event. @@ -113,7 +113,7 @@ func (lc *Client) ProcessLogs( "Processed platform report event for reqID %s as the last log event before shutdown", logEvent.Record.RequestID, ) - return nil + return } } else { lc.logger.Warn("Report event request id didn't match the previous event id") @@ -127,16 +127,17 @@ func (lc *Client) ProcessLogs( logEvent, ) if err != nil { - return fmt.Errorf("Error processing function log : %v", err) - } - select { - case dataChan <- processedLog: - case <-ctx.Done(): + lc.logger.Warnf("Error processing function log : %v", err) + } else { + select { + case dataChan <- processedLog: + case <-ctx.Done(): + } } } case <-ctx.Done(): lc.logger.Debug("Current invocation over. Interrupting logs processing goroutine") - return nil + return } } } From 68357b5363d120e60df62d60045bff294ee1490c Mon Sep 17 00:00:00 2001 From: Vishal Raj Date: Mon, 28 Nov 2022 17:32:00 +0800 Subject: [PATCH 5/5] Add changelog --- CHANGELOG.asciidoc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 7f8087eb..f907f1f9 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -24,7 +24,8 @@ https://github.com/elastic/apm-aws-lambda/compare/v1.2.0...main[View commits] [float] ===== Features -- Create proxy transaction with error results if not reported by agent {lambda-pull}315[315] +- experimental:[] Create proxy transaction with error results if not reported by agent {lambda-pull}315[315] +- Wait for the final platform report metrics on shutdown {lambda-pull}347[347] [float] [[lambda-1.2.0]]