diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 7f8087eb..f907f1f9 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -24,7 +24,8 @@ https://github.com/elastic/apm-aws-lambda/compare/v1.2.0...main[View commits] [float] ===== Features -- Create proxy transaction with error results if not reported by agent {lambda-pull}315[315] +- experimental:[] Create proxy transaction with error results if not reported by agent {lambda-pull}315[315] +- Wait for the final platform report metrics on shutdown {lambda-pull}347[347] [float] [[lambda-1.2.0]] diff --git a/apmproxy/receiver.go b/apmproxy/receiver.go index 15ebeed7..cad3fdbf 100644 --- a/apmproxy/receiver.go +++ b/apmproxy/receiver.go @@ -88,7 +88,8 @@ func (c *Client) handleInfoRequest() (func(w http.ResponseWriter, r *http.Reques reverseProxy.Transport = customTransport reverseProxy.ErrorHandler = func(w http.ResponseWriter, r *http.Request, err error) { - c.UpdateStatus(r.Context(), Failing) + // Don't update the status of the transport as it is possible that the extension + // is frozen while processing the request and context is canceled due to timeout. c.logger.Errorf("Error querying version from the APM server: %v", err) } diff --git a/app/run.go b/app/run.go index 33d705c2..9a8f032b 100644 --- a/app/run.go +++ b/app/run.go @@ -55,6 +55,14 @@ func (app *App) Run(ctx context.Context) error { } }() + // Flush all data before shutting down. + defer func() { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + app.apmClient.FlushAPMData(ctx) + }() + if app.logsClient != nil { if err := app.logsClient.StartService(app.extensionClient.ExtensionID); err != nil { app.logger.Warnf("Error while subscribing to the Logs API: %v", err) @@ -71,14 +79,6 @@ func (app *App) Run(ctx context.Context) error { } } - // Flush all data before shutting down. - defer func() { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - - app.apmClient.FlushAPMData(ctx) - }() - // The previous event id is used to validate the received Lambda metrics var prevEvent *extension.NextEventResponse @@ -86,7 +86,6 @@ func (app *App) Run(ctx context.Context) error { select { case <-ctx.Done(): app.logger.Info("Received a signal, exiting...") - return nil default: // Use a wait group to ensure the background go routine sending to the APM server @@ -96,13 +95,12 @@ func (app *App) Run(ctx context.Context) error { if err != nil { return err } - + app.logger.Debug("Waiting for background data send to end") + backgroundDataSendWg.Wait() if event.EventType == extension.Shutdown { - app.logger.Infof("Received shutdown event: %s. Exiting...", event.ShutdownReason) + app.logger.Infof("Exiting due to shutdown event with reason %s", event.ShutdownReason) return nil } - app.logger.Debug("Waiting for background data send to end") - backgroundDataSendWg.Wait() if app.apmClient.ShouldFlush() { // Use a new cancellable context for flushing APM data to make sure // that the underlying transport is reset for next invocation without @@ -163,15 +161,27 @@ func (app *App) processEvent( // At shutdown we can not expect platform.runtimeDone events to be reported // for the remaining invocations. If we haven't received the transaction // from agents at this point then it is safe to assume that the function - // timed out. We will create proxy transaction for all invocations that + // failed. We will create proxy transaction for all invocations that // haven't received a full transaction from the agent yet. If extension // doesn't have enough CPU time it is possible that the extension might // not receive the shutdown signal for timeouts or runtime crashes. In // these cases we will miss the transaction. - if err := app.batch.OnShutdown("timeout"); err != nil { - app.logger.Errorf("Error finalizing invocation on shutdown: %v", err) + app.logger.Debugf("Received shutdown event with reason %s", event.ShutdownReason) + defer func() { + if err := app.batch.OnShutdown(event.ShutdownReason); err != nil { + app.logger.Errorf("Error finalizing invocation on shutdown: %v", err) + } + }() + + // platform.report metric (and some other metrics) might not have been + // reported by the logs API even till shutdown. At shutdown we will make + // a last attempt to collect and report these metrics. However, it is + // also possible that lambda has init a few execution env preemptively, + // for such cases the extension will see only a SHUTDOWN event and + // there is no need to wait for any log event. + if prevEvent == nil { + return event, nil } - return event, nil } // APM Data Processing @@ -185,25 +195,22 @@ func (app *App) processEvent( // Lambda Service Logs Processing, also used to extract metrics from APM logs // This goroutine should not be started if subscription failed - runtimeDone := make(chan struct{}) + logProcessingDone := make(chan struct{}) if app.logsClient != nil { go func() { - if err := app.logsClient.ProcessLogs( + defer close(logProcessingDone) + app.logsClient.ProcessLogs( invocationCtx, event.RequestID, event.InvokedFunctionArn, app.apmClient.LambdaDataChannel, - runtimeDone, prevEvent, - ); err != nil { - app.logger.Errorf("Error while processing Lambda Logs ; %v", err) - } else { - close(runtimeDone) - } + event.EventType == extension.Shutdown, + ) }() } else { app.logger.Warn("Logs collection not started due to earlier subscription failure") - close(runtimeDone) + close(logProcessingDone) } // Calculate how long to wait for a runtimeDoneSignal or AgentDoneSignal signal @@ -214,21 +221,21 @@ func (app *App) processEvent( timer := time.NewTimer(durationUntilFlushDeadline) defer timer.Stop() - // The extension relies on 3 independent mechanisms to minimize the time interval between the end of the execution of - // the lambda function and the end of the execution of processEvent() - // 1) AgentDoneSignal is triggered upon reception of a `flushed=true` query from the agent - // 2) [Backup 1] RuntimeDone is triggered upon reception of a Lambda log entry certifying the end of the execution of the current function - // 3) [Backup 2] If all else fails, the extension relies of the timeout of the Lambda function to interrupt itself 200 ms before the specified deadline. - // This time interval is large enough to attempt a last flush attempt (if SendStrategy == syncFlush) before the environment gets shut down. - + // The extension relies on 3 independent mechanisms to minimize the time interval + // between the end of the execution of the lambda function and the end of the + // execution of processEvent(): + // 1) AgentDoneSignal triggered upon reception of a `flushed=true` query from the agent + // 2) [Backup 1] All expected log events are processed. + // 3) [Backup 2] If all else fails, the extension relies of the timeout of the Lambda + // function to interrupt itself 200ms before the specified deadline to give the extension + // time to flush data before shutdown. select { case <-app.apmClient.WaitForFlush(): app.logger.Debug("APM client has sent flush signal") - case <-runtimeDone: + case <-logProcessingDone: app.logger.Debug("Received runtimeDone signal") case <-timer.C: - app.logger.Info("Time expired waiting for agent signal or runtimeDone event") + app.logger.Info("Time expired while waiting for agent done signal or final log event") } - return event, nil } diff --git a/logsapi/event.go b/logsapi/event.go index 7b05698a..cb3c5f8b 100644 --- a/logsapi/event.go +++ b/logsapi/event.go @@ -53,16 +53,19 @@ type LogEventRecord struct { Metrics PlatformMetrics `json:"metrics"` } -// ProcessLogs consumes events until a RuntimeDone event corresponding -// to requestID is received, or ctx is canceled, and then returns. +// ProcessLogs consumes log events until there are no more log events that +// can be consumed or ctx is cancelled. For INVOKE event this state is +// reached when runtimeDone event for the current requestID is processed +// whereas for SHUTDOWN event this state is reached when the platformReport +// event for the previous requestID is processed. func (lc *Client) ProcessLogs( ctx context.Context, requestID string, invokedFnArn string, dataChan chan []byte, - runtimeDoneSignal chan struct{}, prevEvent *extension.NextEventResponse, -) error { + isShutdown bool, +) { // platformStartReqID is to identify the requestID for the function // logs under the assumption that function logs for a specific request // ID will be bounded by PlatformStart and PlatformEnd events. @@ -70,7 +73,7 @@ func (lc *Client) ProcessLogs( for { select { case logEvent := <-lc.logsChannel: - lc.logger.Debugf("Received log event %v", logEvent.Type) + lc.logger.Debugf("Received log event %v for request ID %s", logEvent.Type, logEvent.Record.RequestID) switch logEvent.Type { case PlatformStart: platformStartReqID = logEvent.Record.RequestID @@ -82,18 +85,18 @@ func (lc *Client) ProcessLogs( ); err != nil { lc.logger.Warnf("Failed to finalize invocation with request ID %s: %v", logEvent.Record.RequestID, err) } - // For the current invocation the platform.runtimeDone would be the last event - if logEvent.Record.RequestID == requestID { - lc.logger.Info("Received runtimeDone event for this function invocation") - runtimeDoneSignal <- struct{}{} - return nil + // For invocation events the platform.runtimeDone would be the last possible event. + if !isShutdown && logEvent.Record.RequestID == requestID { + lc.logger.Debugf( + "Processed runtime done event for reqID %s as the last log event for the invocation", + logEvent.Record.RequestID, + ) + return } - lc.logger.Debug("Log API runtimeDone event request id didn't match") - // Check if the logEvent contains metrics and verify that they can be linked to the previous invocation case PlatformReport: // TODO: @lahsivjar Refactor usage of prevEvent.RequestID (should now query the batch?) if prevEvent != nil && logEvent.Record.RequestID == prevEvent.RequestID { - lc.logger.Debug("Received platform report for the previous function invocation") + lc.logger.Debugf("Received platform report for %s", logEvent.Record.RequestID) processedMetrics, err := ProcessPlatformReport(prevEvent, logEvent) if err != nil { lc.logger.Errorf("Error processing Lambda platform metrics: %v", err) @@ -103,21 +106,28 @@ func (lc *Client) ProcessLogs( case <-ctx.Done(): } } + // For shutdown event the platform report metrics for the previous log event + // would be the last possible log event. + if isShutdown { + lc.logger.Debugf( + "Processed platform report event for reqID %s as the last log event before shutdown", + logEvent.Record.RequestID, + ) + return + } } else { - lc.logger.Warn("report event request id didn't match the previous event id") - lc.logger.Debug("Log API runtimeDone event request id didn't match") + lc.logger.Warn("Report event request id didn't match the previous event id") } case PlatformLogsDropped: lc.logger.Warnf("Logs dropped due to extension falling behind: %v", logEvent.Record) case FunctionLog: - lc.logger.Debug("Received function log") processedLog, err := ProcessFunctionLog( platformStartReqID, invokedFnArn, logEvent, ) if err != nil { - lc.logger.Errorf("Error processing function log : %v", err) + lc.logger.Warnf("Error processing function log : %v", err) } else { select { case dataChan <- processedLog: @@ -127,7 +137,7 @@ func (lc *Client) ProcessLogs( } case <-ctx.Done(): lc.logger.Debug("Current invocation over. Interrupting logs processing goroutine") - return nil + return } } }