Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ https://github.com/elastic/apm-aws-lambda/compare/v1.2.0...main[View commits]

[float]
===== Features
- Create proxy transaction with error results if not reported by agent {lambda-pull}315[315]
- experimental:[] Create proxy transaction with error results if not reported by agent {lambda-pull}315[315]
- Wait for the final platform report metrics on shutdown {lambda-pull}347[347]

[float]
[[lambda-1.2.0]]
Expand Down
3 changes: 2 additions & 1 deletion apmproxy/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ func (c *Client) handleInfoRequest() (func(w http.ResponseWriter, r *http.Reques
reverseProxy.Transport = customTransport

reverseProxy.ErrorHandler = func(w http.ResponseWriter, r *http.Request, err error) {
c.UpdateStatus(r.Context(), Failing)
// Don't update the status of the transport as it is possible that the extension
// is frozen while processing the request and context is canceled due to timeout.
c.logger.Errorf("Error querying version from the APM server: %v", err)
}

Expand Down
79 changes: 43 additions & 36 deletions app/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,14 @@ func (app *App) Run(ctx context.Context) error {
}
}()

// Flush all data before shutting down.
defer func() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

app.apmClient.FlushAPMData(ctx)
}()

if app.logsClient != nil {
if err := app.logsClient.StartService(app.extensionClient.ExtensionID); err != nil {
app.logger.Warnf("Error while subscribing to the Logs API: %v", err)
Expand All @@ -71,22 +79,13 @@ func (app *App) Run(ctx context.Context) error {
}
}

// Flush all data before shutting down.
defer func() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

app.apmClient.FlushAPMData(ctx)
}()

// The previous event id is used to validate the received Lambda metrics
var prevEvent *extension.NextEventResponse

for {
select {
case <-ctx.Done():
app.logger.Info("Received a signal, exiting...")

return nil
default:
// Use a wait group to ensure the background go routine sending to the APM server
Expand All @@ -96,13 +95,12 @@ func (app *App) Run(ctx context.Context) error {
if err != nil {
return err
}

app.logger.Debug("Waiting for background data send to end")
backgroundDataSendWg.Wait()
if event.EventType == extension.Shutdown {
app.logger.Infof("Received shutdown event: %s. Exiting...", event.ShutdownReason)
app.logger.Infof("Exiting due to shutdown event with reason %s", event.ShutdownReason)
return nil
}
app.logger.Debug("Waiting for background data send to end")
backgroundDataSendWg.Wait()
if app.apmClient.ShouldFlush() {
// Use a new cancellable context for flushing APM data to make sure
// that the underlying transport is reset for next invocation without
Expand Down Expand Up @@ -163,15 +161,27 @@ func (app *App) processEvent(
// At shutdown we can not expect platform.runtimeDone events to be reported
// for the remaining invocations. If we haven't received the transaction
// from agents at this point then it is safe to assume that the function
// timed out. We will create proxy transaction for all invocations that
// failed. We will create proxy transaction for all invocations that
// haven't received a full transaction from the agent yet. If extension
// doesn't have enough CPU time it is possible that the extension might
// not receive the shutdown signal for timeouts or runtime crashes. In
// these cases we will miss the transaction.
if err := app.batch.OnShutdown("timeout"); err != nil {
app.logger.Errorf("Error finalizing invocation on shutdown: %v", err)
app.logger.Debugf("Received shutdown event with reason %s", event.ShutdownReason)
defer func() {
if err := app.batch.OnShutdown(event.ShutdownReason); err != nil {
app.logger.Errorf("Error finalizing invocation on shutdown: %v", err)
}
}()

// platform.report metric (and some other metrics) might not have been
// reported by the logs API even till shutdown. At shutdown we will make
// a last attempt to collect and report these metrics. However, it is
// also possible that lambda has init a few execution env preemptively,
// for such cases the extension will see only a SHUTDOWN event and
// there is no need to wait for any log event.
if prevEvent == nil {
return event, nil
}
return event, nil
}

// APM Data Processing
Expand All @@ -185,25 +195,22 @@ func (app *App) processEvent(

// Lambda Service Logs Processing, also used to extract metrics from APM logs
// This goroutine should not be started if subscription failed
runtimeDone := make(chan struct{})
logProcessingDone := make(chan struct{})
if app.logsClient != nil {
go func() {
if err := app.logsClient.ProcessLogs(
defer close(logProcessingDone)
app.logsClient.ProcessLogs(
invocationCtx,
event.RequestID,
event.InvokedFunctionArn,
app.apmClient.LambdaDataChannel,
runtimeDone,
prevEvent,
); err != nil {
app.logger.Errorf("Error while processing Lambda Logs ; %v", err)
} else {
close(runtimeDone)
}
event.EventType == extension.Shutdown,
)
}()
} else {
app.logger.Warn("Logs collection not started due to earlier subscription failure")
close(runtimeDone)
close(logProcessingDone)
}

// Calculate how long to wait for a runtimeDoneSignal or AgentDoneSignal signal
Expand All @@ -214,21 +221,21 @@ func (app *App) processEvent(
timer := time.NewTimer(durationUntilFlushDeadline)
defer timer.Stop()

// The extension relies on 3 independent mechanisms to minimize the time interval between the end of the execution of
// the lambda function and the end of the execution of processEvent()
// 1) AgentDoneSignal is triggered upon reception of a `flushed=true` query from the agent
// 2) [Backup 1] RuntimeDone is triggered upon reception of a Lambda log entry certifying the end of the execution of the current function
// 3) [Backup 2] If all else fails, the extension relies of the timeout of the Lambda function to interrupt itself 200 ms before the specified deadline.
// This time interval is large enough to attempt a last flush attempt (if SendStrategy == syncFlush) before the environment gets shut down.

// The extension relies on 3 independent mechanisms to minimize the time interval
// between the end of the execution of the lambda function and the end of the
// execution of processEvent():
// 1) AgentDoneSignal triggered upon reception of a `flushed=true` query from the agent
// 2) [Backup 1] All expected log events are processed.
// 3) [Backup 2] If all else fails, the extension relies of the timeout of the Lambda
// function to interrupt itself 200ms before the specified deadline to give the extension
// time to flush data before shutdown.
select {
case <-app.apmClient.WaitForFlush():
app.logger.Debug("APM client has sent flush signal")
case <-runtimeDone:
case <-logProcessingDone:
app.logger.Debug("Received runtimeDone signal")
case <-timer.C:
app.logger.Info("Time expired waiting for agent signal or runtimeDone event")
app.logger.Info("Time expired while waiting for agent done signal or final log event")
}

return event, nil
}
46 changes: 28 additions & 18 deletions logsapi/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,24 +53,27 @@ type LogEventRecord struct {
Metrics PlatformMetrics `json:"metrics"`
}

// ProcessLogs consumes events until a RuntimeDone event corresponding
// to requestID is received, or ctx is canceled, and then returns.
// ProcessLogs consumes log events until there are no more log events that
// can be consumed or ctx is cancelled. For INVOKE event this state is
// reached when runtimeDone event for the current requestID is processed
// whereas for SHUTDOWN event this state is reached when the platformReport
// event for the previous requestID is processed.
func (lc *Client) ProcessLogs(
ctx context.Context,
requestID string,
invokedFnArn string,
dataChan chan []byte,
runtimeDoneSignal chan struct{},
prevEvent *extension.NextEventResponse,
) error {
isShutdown bool,
) {
// platformStartReqID is to identify the requestID for the function
// logs under the assumption that function logs for a specific request
// ID will be bounded by PlatformStart and PlatformEnd events.
var platformStartReqID string
for {
select {
case logEvent := <-lc.logsChannel:
lc.logger.Debugf("Received log event %v", logEvent.Type)
lc.logger.Debugf("Received log event %v for request ID %s", logEvent.Type, logEvent.Record.RequestID)
switch logEvent.Type {
case PlatformStart:
platformStartReqID = logEvent.Record.RequestID
Expand All @@ -82,18 +85,18 @@ func (lc *Client) ProcessLogs(
); err != nil {
lc.logger.Warnf("Failed to finalize invocation with request ID %s: %v", logEvent.Record.RequestID, err)
}
// For the current invocation the platform.runtimeDone would be the last event
if logEvent.Record.RequestID == requestID {
lc.logger.Info("Received runtimeDone event for this function invocation")
runtimeDoneSignal <- struct{}{}
return nil
// For invocation events the platform.runtimeDone would be the last possible event.
if !isShutdown && logEvent.Record.RequestID == requestID {
lc.logger.Debugf(
"Processed runtime done event for reqID %s as the last log event for the invocation",
logEvent.Record.RequestID,
)
return
}
lc.logger.Debug("Log API runtimeDone event request id didn't match")
// Check if the logEvent contains metrics and verify that they can be linked to the previous invocation
case PlatformReport:
// TODO: @lahsivjar Refactor usage of prevEvent.RequestID (should now query the batch?)
if prevEvent != nil && logEvent.Record.RequestID == prevEvent.RequestID {
lc.logger.Debug("Received platform report for the previous function invocation")
lc.logger.Debugf("Received platform report for %s", logEvent.Record.RequestID)
processedMetrics, err := ProcessPlatformReport(prevEvent, logEvent)
if err != nil {
lc.logger.Errorf("Error processing Lambda platform metrics: %v", err)
Expand All @@ -103,21 +106,28 @@ func (lc *Client) ProcessLogs(
case <-ctx.Done():
}
}
// For shutdown event the platform report metrics for the previous log event
// would be the last possible log event.
if isShutdown {
lc.logger.Debugf(
"Processed platform report event for reqID %s as the last log event before shutdown",
logEvent.Record.RequestID,
)
return
}
} else {
lc.logger.Warn("report event request id didn't match the previous event id")
lc.logger.Debug("Log API runtimeDone event request id didn't match")
lc.logger.Warn("Report event request id didn't match the previous event id")
}
case PlatformLogsDropped:
lc.logger.Warnf("Logs dropped due to extension falling behind: %v", logEvent.Record)
case FunctionLog:
lc.logger.Debug("Received function log")
processedLog, err := ProcessFunctionLog(
platformStartReqID,
invokedFnArn,
logEvent,
)
if err != nil {
lc.logger.Errorf("Error processing function log : %v", err)
lc.logger.Warnf("Error processing function log : %v", err)
} else {
select {
case dataChan <- processedLog:
Expand All @@ -127,7 +137,7 @@ func (lc *Client) ProcessLogs(
}
case <-ctx.Done():
lc.logger.Debug("Current invocation over. Interrupting logs processing goroutine")
return nil
return
}
}
}