Skip to content

Commit 2fad63b

Browse files
committed
Wait for the final platform report metrics on shutdown
1 parent bd81367 commit 2fad63b

File tree

3 files changed

+79
-60
lines changed

3 files changed

+79
-60
lines changed

apmproxy/receiver.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,8 @@ func (c *Client) handleInfoRequest() (func(w http.ResponseWriter, r *http.Reques
8888
reverseProxy.Transport = customTransport
8989

9090
reverseProxy.ErrorHandler = func(w http.ResponseWriter, r *http.Request, err error) {
91-
c.UpdateStatus(r.Context(), Failing)
91+
// Don't update the status of the transport as it is possible that the extension
92+
// is frozen while processing the request and context is canceled due to timeout.
9293
c.logger.Errorf("Error querying version from the APM server: %v", err)
9394
}
9495

app/run.go

Lines changed: 42 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,14 @@ func (app *App) Run(ctx context.Context) error {
5555
}
5656
}()
5757

58+
// Flush all data before shutting down.
59+
defer func() {
60+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
61+
defer cancel()
62+
63+
app.apmClient.FlushAPMData(ctx)
64+
}()
65+
5866
if app.logsClient != nil {
5967
if err := app.logsClient.StartService(app.extensionClient.ExtensionID); err != nil {
6068
app.logger.Warnf("Error while subscribing to the Logs API: %v", err)
@@ -71,22 +79,13 @@ func (app *App) Run(ctx context.Context) error {
7179
}
7280
}
7381

74-
// Flush all data before shutting down.
75-
defer func() {
76-
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
77-
defer cancel()
78-
79-
app.apmClient.FlushAPMData(ctx)
80-
}()
81-
8282
// The previous event id is used to validate the received Lambda metrics
8383
var prevEvent *extension.NextEventResponse
8484

8585
for {
8686
select {
8787
case <-ctx.Done():
8888
app.logger.Info("Received a signal, exiting...")
89-
9089
return nil
9190
default:
9291
// Use a wait group to ensure the background go routine sending to the APM server
@@ -96,13 +95,12 @@ func (app *App) Run(ctx context.Context) error {
9695
if err != nil {
9796
return err
9897
}
99-
98+
app.logger.Debug("Waiting for background data send to end")
99+
backgroundDataSendWg.Wait()
100100
if event.EventType == extension.Shutdown {
101-
app.logger.Infof("Received shutdown event: %s. Exiting...", event.ShutdownReason)
101+
app.logger.Infof("Exiting due to shutdown event with reason %s", event.ShutdownReason)
102102
return nil
103103
}
104-
app.logger.Debug("Waiting for background data send to end")
105-
backgroundDataSendWg.Wait()
106104
if app.apmClient.ShouldFlush() {
107105
// Use a new cancellable context for flushing APM data to make sure
108106
// that the underlying transport is reset for next invocation without
@@ -163,15 +161,27 @@ func (app *App) processEvent(
163161
// At shutdown we can not expect platform.runtimeDone events to be reported
164162
// for the remaining invocations. If we haven't received the transaction
165163
// from agents at this point then it is safe to assume that the function
166-
// timed out. We will create proxy transaction for all invocations that
164+
// failed. We will create proxy transaction for all invocations that
167165
// haven't received a full transaction from the agent yet. If extension
168166
// doesn't have enough CPU time it is possible that the extension might
169167
// not receive the shutdown signal for timeouts or runtime crashes. In
170168
// these cases we will miss the transaction.
171-
if err := app.batch.OnShutdown("timeout"); err != nil {
172-
app.logger.Errorf("Error finalizing invocation on shutdown: %v", err)
169+
app.logger.Debugf("Received shutdown event with reason %s", event.ShutdownReason)
170+
defer func() {
171+
if err := app.batch.OnShutdown(event.ShutdownReason); err != nil {
172+
app.logger.Errorf("Error finalizing invocation on shutdown: %v", err)
173+
}
174+
}()
175+
176+
// platform.report metric (and some other metrics) might not have been
177+
// reported by the logs API even till shutdown. At shutdown we will make
178+
// a last attempt to collect and report these metrics. However, it is
179+
// also possible that lambda has init a few execution env preemptively,
180+
// for such cases the extension will see only a SHUTDOWN event and
181+
// there is no need to wait for any log event.
182+
if prevEvent == nil {
183+
return event, nil
173184
}
174-
return event, nil
175185
}
176186

177187
// APM Data Processing
@@ -185,50 +195,49 @@ func (app *App) processEvent(
185195

186196
// Lambda Service Logs Processing, also used to extract metrics from APM logs
187197
// This goroutine should not be started if subscription failed
188-
runtimeDone := make(chan struct{})
198+
logProcessingDone := make(chan struct{})
189199
if app.logsClient != nil {
190200
go func() {
201+
defer close(logProcessingDone)
191202
if err := app.logsClient.ProcessLogs(
192203
invocationCtx,
193204
event.RequestID,
194205
event.InvokedFunctionArn,
195206
app.apmClient.LambdaDataChannel,
196-
runtimeDone,
197207
prevEvent,
208+
event.EventType == extension.Shutdown,
198209
); err != nil {
199210
app.logger.Errorf("Error while processing Lambda Logs ; %v", err)
200-
} else {
201-
close(runtimeDone)
202211
}
203212
}()
204213
} else {
205214
app.logger.Warn("Logs collection not started due to earlier subscription failure")
206-
close(runtimeDone)
215+
close(logProcessingDone)
207216
}
208217

209218
// Calculate how long to wait for a runtimeDoneSignal or AgentDoneSignal signal
210-
flushDeadlineMs := event.DeadlineMs - 200
219+
flushDeadlineMs := event.DeadlineMs - 50
211220
durationUntilFlushDeadline := time.Until(time.Unix(flushDeadlineMs/1000, 0))
212221

213222
// Create a timer that expires after durationUntilFlushDeadline
214223
timer := time.NewTimer(durationUntilFlushDeadline)
215224
defer timer.Stop()
216225

217-
// The extension relies on 3 independent mechanisms to minimize the time interval between the end of the execution of
218-
// the lambda function and the end of the execution of processEvent()
219-
// 1) AgentDoneSignal is triggered upon reception of a `flushed=true` query from the agent
220-
// 2) [Backup 1] RuntimeDone is triggered upon reception of a Lambda log entry certifying the end of the execution of the current function
221-
// 3) [Backup 2] If all else fails, the extension relies of the timeout of the Lambda function to interrupt itself 200 ms before the specified deadline.
222-
// This time interval is large enough to attempt a last flush attempt (if SendStrategy == syncFlush) before the environment gets shut down.
223-
226+
// The extension relies on 3 independent mechanisms to minimize the time interval
227+
// between the end of the execution of the lambda function and the end of the
228+
// execution of processEvent():
229+
// 1) AgentDoneSignal triggered upon reception of a `flushed=true` query from the agent
230+
// 2) [Backup 1] All expected log events are processed.
231+
// 3) [Backup 2] If all else fails, the extension relies of the timeout of the Lambda
232+
// function to interrupt itself 200ms before the specified deadline to give the extension
233+
// time to flush data before shutdown.
224234
select {
225235
case <-app.apmClient.WaitForFlush():
226236
app.logger.Debug("APM client has sent flush signal")
227-
case <-runtimeDone:
237+
case <-logProcessingDone:
228238
app.logger.Debug("Received runtimeDone signal")
229239
case <-timer.C:
230-
app.logger.Info("Time expired waiting for agent signal or runtimeDone event")
240+
app.logger.Info("Time expired while waiting for final log event")
231241
}
232-
233242
return event, nil
234243
}

logsapi/event.go

Lines changed: 35 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package logsapi
1919

2020
import (
2121
"context"
22+
"fmt"
2223
"time"
2324

2425
"github.com/elastic/apm-aws-lambda/extension"
@@ -53,15 +54,18 @@ type LogEventRecord struct {
5354
Metrics PlatformMetrics `json:"metrics"`
5455
}
5556

56-
// ProcessLogs consumes events until a RuntimeDone event corresponding
57-
// to requestID is received, or ctx is canceled, and then returns.
57+
// ProcessLogs consumes log events until there are no more log events that
58+
// can be consumed or ctx is cancelled. For INVOKE event this state is
59+
// reached when runtimeDone event for the current requestID is processed
60+
// whereas for SHUTDOWN event this state is reached when the platformReport
61+
// event for the previous requestID is processed.
5862
func (lc *Client) ProcessLogs(
5963
ctx context.Context,
6064
requestID string,
6165
invokedFnArn string,
6266
dataChan chan []byte,
63-
runtimeDoneSignal chan struct{},
6467
prevEvent *extension.NextEventResponse,
68+
isShutdown bool,
6569
) error {
6670
// platformStartReqID is to identify the requestID for the function
6771
// logs under the assumption that function logs for a specific request
@@ -70,7 +74,7 @@ func (lc *Client) ProcessLogs(
7074
for {
7175
select {
7276
case logEvent := <-lc.logsChannel:
73-
lc.logger.Debugf("Received log event %v", logEvent.Type)
77+
lc.logger.Debugf("Received log event %v for request ID %s", logEvent.Type, logEvent.Record.RequestID)
7478
switch logEvent.Type {
7579
case PlatformStart:
7680
platformStartReqID = logEvent.Record.RequestID
@@ -82,47 +86,52 @@ func (lc *Client) ProcessLogs(
8286
); err != nil {
8387
lc.logger.Warnf("Failed to finalize invocation with request ID %s: %v", logEvent.Record.RequestID, err)
8488
}
85-
// For the current invocation the platform.runtimeDone would be the last event
86-
if logEvent.Record.RequestID == requestID {
87-
lc.logger.Info("Received runtimeDone event for this function invocation")
88-
runtimeDoneSignal <- struct{}{}
89+
// For invocation events the platform.runtimeDone would be the last possible event.
90+
if !isShutdown && logEvent.Record.RequestID == requestID {
91+
lc.logger.Debugf(
92+
"Processed runtime done event for reqID %s as the last log event for the invocation",
93+
logEvent.Record.RequestID,
94+
)
8995
return nil
9096
}
91-
lc.logger.Debug("Log API runtimeDone event request id didn't match")
92-
// Check if the logEvent contains metrics and verify that they can be linked to the previous invocation
9397
case PlatformReport:
9498
// TODO: @lahsivjar Refactor usage of prevEvent.RequestID (should now query the batch?)
9599
if prevEvent != nil && logEvent.Record.RequestID == prevEvent.RequestID {
96-
lc.logger.Debug("Received platform report for the previous function invocation")
100+
lc.logger.Debugf("Received platform report for %s", logEvent.Record.RequestID)
97101
processedMetrics, err := ProcessPlatformReport(prevEvent, logEvent)
98102
if err != nil {
99-
lc.logger.Errorf("Error processing Lambda platform metrics: %v", err)
100-
} else {
101-
select {
102-
case dataChan <- processedMetrics:
103-
case <-ctx.Done():
104-
}
103+
return fmt.Errorf("Error processing Lambda platform metrics: %v", err)
104+
}
105+
select {
106+
case dataChan <- processedMetrics:
107+
case <-ctx.Done():
108+
}
109+
// For shutdown event the platform report metrics for the previous log event
110+
// would be the last possible log event.
111+
if isShutdown {
112+
lc.logger.Debugf(
113+
"Processed platform report event for reqID %s as the last log event before shutdown",
114+
logEvent.Record.RequestID,
115+
)
116+
return nil
105117
}
106118
} else {
107-
lc.logger.Warn("report event request id didn't match the previous event id")
108-
lc.logger.Debug("Log API runtimeDone event request id didn't match")
119+
lc.logger.Warn("Report event request id didn't match the previous event id")
109120
}
110121
case PlatformLogsDropped:
111122
lc.logger.Warnf("Logs dropped due to extension falling behind: %v", logEvent.Record)
112123
case FunctionLog:
113-
lc.logger.Debug("Received function log")
114124
processedLog, err := ProcessFunctionLog(
115125
platformStartReqID,
116126
invokedFnArn,
117127
logEvent,
118128
)
119129
if err != nil {
120-
lc.logger.Errorf("Error processing function log : %v", err)
121-
} else {
122-
select {
123-
case dataChan <- processedLog:
124-
case <-ctx.Done():
125-
}
130+
return fmt.Errorf("Error processing function log : %v", err)
131+
}
132+
select {
133+
case dataChan <- processedLog:
134+
case <-ctx.Done():
126135
}
127136
}
128137
case <-ctx.Done():

0 commit comments

Comments
 (0)