Skip to content

Commit 3963b5a

Browse files
authored
Process platform report metrics when extn is lagging (#358)
1 parent 5cf6e87 commit 3963b5a

File tree

9 files changed

+71
-45
lines changed

9 files changed

+71
-45
lines changed

CHANGELOG.asciidoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ https://github.com/elastic/apm-aws-lambda/compare/v1.2.0...main[View commits]
2626
===== Features
2727
- experimental:[] Create proxy transaction with error results if not reported by agent {lambda-pull}315[315]
2828
- Wait for the final platform report metrics on shutdown {lambda-pull}347[347]
29+
- Process platform report metrics when extension is lagging {lambda-pull}358[358]
2930
3031
[float]
3132
[[lambda-1.2.0]]

accumulator/batch.go

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,13 @@ func NewBatch(maxSize int, maxAge time.Duration) *Batch {
8282
}
8383
}
8484

85+
// Size returns the number of invocations cached in the batch.
86+
func (b *Batch) Size() int {
87+
b.mu.RLock()
88+
defer b.mu.RUnlock()
89+
return len(b.invocations)
90+
}
91+
8592
// RegisterInvocation registers a new function invocation against its request
8693
// ID. It also updates the caches for currently executing request ID.
8794
func (b *Batch) RegisterInvocation(
@@ -185,6 +192,21 @@ func (b *Batch) OnLambdaLogRuntimeDone(reqID, status string, time time.Time) err
185192
return b.finalizeInvocation(reqID, status, time)
186193
}
187194

195+
// OnPlatformReport should be the last event for a request ID. On receiving the
196+
// platform.report event the batch will cleanup any datastructure for the request
197+
// ID. It will return some of the function metadata to allow the caller to enrich
198+
// the report metrics.
199+
func (b *Batch) OnPlatformReport(reqID string) (string, int64, time.Time, error) {
200+
b.mu.Lock()
201+
defer b.mu.Unlock()
202+
inc, ok := b.invocations[reqID]
203+
if !ok {
204+
return "", 0, time.Time{}, fmt.Errorf("invocation for requestID %s does not exist", reqID)
205+
}
206+
delete(b.invocations, reqID)
207+
return inc.FunctionARN, inc.DeadlineMs, inc.Timestamp, nil
208+
}
209+
188210
// OnShutdown flushes the data for shipping to APM Server by finalizing all
189211
// the invocation in the batch. If we haven't received a platform.runtimeDone
190212
// event for an invocation so far we won't be able to recieve it in time thus
@@ -201,6 +223,7 @@ func (b *Batch) OnShutdown(status string) error {
201223
if err := b.finalizeInvocation(inc.RequestID, status, time); err != nil {
202224
return err
203225
}
226+
delete(b.invocations, inc.RequestID)
204227
}
205228
return nil
206229
}
@@ -257,12 +280,16 @@ func (b *Batch) finalizeInvocation(reqID, status string, time time.Time) error {
257280
if !ok {
258281
return fmt.Errorf("invocation for requestID %s does not exist", reqID)
259282
}
260-
defer delete(b.invocations, reqID)
261-
proxyTxn, err := inc.Finalize(status, time)
283+
proxyTxn, err := inc.CreateProxyTxn(status, time)
262284
if err != nil {
263285
return err
264286
}
265-
return b.addData(proxyTxn)
287+
err = b.addData(proxyTxn)
288+
if err != nil {
289+
return err
290+
}
291+
inc.Finalized = true
292+
return nil
266293
}
267294

268295
func (b *Batch) addData(data []byte) error {

accumulator/invocation.go

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -47,26 +47,24 @@ type Invocation struct {
4747
// TransactionObserved is true if the root transaction ID for the
4848
// invocation is observed by the extension.
4949
TransactionObserved bool
50+
// Finalized tracks if the invocation has been finalized or not.
51+
Finalized bool
5052
}
5153

5254
// NeedProxyTransaction returns true if a proxy transaction needs to be
5355
// created based on the information available.
5456
func (inc *Invocation) NeedProxyTransaction() bool {
55-
return inc.TransactionID != "" && !inc.TransactionObserved
57+
return !inc.Finalized && inc.TransactionID != "" && !inc.TransactionObserved
5658
}
5759

58-
// Finalize creates a proxy transaction for an invocation if required.
60+
// CreateProxyTxn creates a proxy transaction for an invocation if required.
5961
// A proxy transaction will be required to be created if the agent has
6062
// registered a transaction for the invocation but has not sent the
6163
// corresponding transaction to the extension.
62-
func (inc *Invocation) Finalize(status string, time time.Time) ([]byte, error) {
64+
func (inc *Invocation) CreateProxyTxn(status string, time time.Time) ([]byte, error) {
6365
if !inc.NeedProxyTransaction() {
6466
return nil, nil
6567
}
66-
return inc.createProxyTxn(status, time)
67-
}
68-
69-
func (inc *Invocation) createProxyTxn(status string, time time.Time) ([]byte, error) {
7068
txn, err := sjson.SetBytes(inc.AgentPayload, "transaction.result", status)
7169
if err != nil {
7270
return nil, err

accumulator/invocation_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import (
2525
"github.com/stretchr/testify/assert"
2626
)
2727

28-
func TestFinalize(t *testing.T) {
28+
func TestCreateProxyTransaction(t *testing.T) {
2929
txnDur := time.Second
3030
for _, tc := range []struct {
3131
name string
@@ -89,7 +89,7 @@ func TestFinalize(t *testing.T) {
8989
AgentPayload: []byte(tc.payload),
9090
TransactionObserved: tc.txnObserved,
9191
}
92-
result, err := inc.Finalize(tc.runtimeDoneStatus, ts.Add(txnDur))
92+
result, err := inc.CreateProxyTxn(tc.runtimeDoneStatus, ts.Add(txnDur))
9393
assert.Nil(t, err)
9494
if len(tc.output) > 0 {
9595
assert.JSONEq(t, tc.output, string(result))
@@ -114,7 +114,7 @@ func BenchmarkCreateProxyTxn(b *testing.B) {
114114
b.ReportAllocs()
115115
b.ResetTimer()
116116
for i := 0; i < b.N; i++ {
117-
_, err := inc.createProxyTxn("success", txnDur)
117+
_, err := inc.CreateProxyTxn("success", txnDur)
118118
if err != nil {
119119
b.Fail()
120120
}

app/run.go

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -79,9 +79,6 @@ func (app *App) Run(ctx context.Context) error {
7979
}
8080
}
8181

82-
// The previous event id is used to validate the received Lambda metrics
83-
var prevEvent *extension.NextEventResponse
84-
8582
for {
8683
select {
8784
case <-ctx.Done():
@@ -91,7 +88,7 @@ func (app *App) Run(ctx context.Context) error {
9188
// Use a wait group to ensure the background go routine sending to the APM server
9289
// completes before signaling that the extension is ready for the next invocation.
9390
var backgroundDataSendWg sync.WaitGroup
94-
event, err := app.processEvent(ctx, &backgroundDataSendWg, prevEvent)
91+
event, err := app.processEvent(ctx, &backgroundDataSendWg)
9592
if err != nil {
9693
return err
9794
}
@@ -110,15 +107,13 @@ func (app *App) Run(ctx context.Context) error {
110107
app.apmClient.FlushAPMData(flushCtx)
111108
cancel()
112109
}
113-
prevEvent = event
114110
}
115111
}
116112
}
117113

118114
func (app *App) processEvent(
119115
ctx context.Context,
120116
backgroundDataSendWg *sync.WaitGroup,
121-
prevEvent *extension.NextEventResponse,
122117
) (*extension.NextEventResponse, error) {
123118
// Reset flush state for future events.
124119
defer app.apmClient.ResetFlush()
@@ -179,7 +174,7 @@ func (app *App) processEvent(
179174
// also possible that lambda has init a few execution env preemptively,
180175
// for such cases the extension will see only a SHUTDOWN event and
181176
// there is no need to wait for any log event.
182-
if prevEvent == nil {
177+
if app.batch.Size() == 0 {
183178
return event, nil
184179
}
185180
}
@@ -204,7 +199,6 @@ func (app *App) processEvent(
204199
event.RequestID,
205200
event.InvokedFunctionArn,
206201
app.apmClient.LambdaDataChannel,
207-
prevEvent,
208202
event.EventType == extension.Shutdown,
209203
)
210204
}()

logsapi/client.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,9 @@ type ClientOption func(*Client)
4848

4949
type invocationLifecycler interface {
5050
OnLambdaLogRuntimeDone(requestID, status string, time time.Time) error
51+
OnPlatformReport(reqID string) (fnARN string, deadlineMs int64, ts time.Time, err error)
52+
// Size should return the number of invocations waiting on platform.report
53+
Size() int
5154
}
5255

5356
// Client is the client used to subscribe to the Logs API.

logsapi/event.go

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,6 @@ package logsapi
2020
import (
2121
"context"
2222
"time"
23-
24-
"github.com/elastic/apm-aws-lambda/extension"
2523
)
2624

2725
// LogEventType represents the log type that is received in the log messages
@@ -63,7 +61,6 @@ func (lc *Client) ProcessLogs(
6361
requestID string,
6462
invokedFnArn string,
6563
dataChan chan []byte,
66-
prevEvent *extension.NextEventResponse,
6764
isShutdown bool,
6865
) {
6966
// platformStartReqID is to identify the requestID for the function
@@ -94,10 +91,12 @@ func (lc *Client) ProcessLogs(
9491
return
9592
}
9693
case PlatformReport:
97-
// TODO: @lahsivjar Refactor usage of prevEvent.RequestID (should now query the batch?)
98-
if prevEvent != nil && logEvent.Record.RequestID == prevEvent.RequestID {
94+
fnARN, deadlineMs, ts, err := lc.invocationLifecycler.OnPlatformReport(logEvent.Record.RequestID)
95+
if err != nil {
96+
lc.logger.Warnf("Failed to process platform report: %v", err)
97+
} else {
9998
lc.logger.Debugf("Received platform report for %s", logEvent.Record.RequestID)
100-
processedMetrics, err := ProcessPlatformReport(prevEvent, logEvent)
99+
processedMetrics, err := ProcessPlatformReport(fnARN, deadlineMs, ts, logEvent)
101100
if err != nil {
102101
lc.logger.Errorf("Error processing Lambda platform metrics: %v", err)
103102
} else {
@@ -106,17 +105,16 @@ func (lc *Client) ProcessLogs(
106105
case <-ctx.Done():
107106
}
108107
}
109-
// For shutdown event the platform report metrics for the previous log event
110-
// would be the last possible log event.
111-
if isShutdown {
112-
lc.logger.Debugf(
113-
"Processed platform report event for reqID %s as the last log event before shutdown",
114-
logEvent.Record.RequestID,
115-
)
116-
return
117-
}
118-
} else {
119-
lc.logger.Warn("Report event request id didn't match the previous event id")
108+
}
109+
// For shutdown event the platform report metrics for the previous log event
110+
// would be the last possible log event. After processing this metric the
111+
// invocation lifecycler's cache should be empty.
112+
if isShutdown && lc.invocationLifecycler.Size() == 0 {
113+
lc.logger.Debugf(
114+
"Processed platform report event for reqID %s as the last log event before shutdown",
115+
logEvent.Record.RequestID,
116+
)
117+
return
120118
}
121119
case PlatformLogsDropped:
122120
lc.logger.Warnf("Logs dropped due to extension falling behind: %v", logEvent.Record)

logsapi/metrics.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@ package logsapi
1919

2020
import (
2121
"math"
22+
"time"
2223

23-
"github.com/elastic/apm-aws-lambda/extension"
2424
"go.elastic.co/apm/v2/model"
2525
"go.elastic.co/fastjson"
2626
)
@@ -64,7 +64,7 @@ func (mc MetricsContainer) MarshalFastJSON(json *fastjson.Writer) error {
6464
// ProcessPlatformReport processes the `platform.report` log line from lambda logs API and
6565
// returns a byte array containing the JSON body for the extracted platform metrics. A non
6666
// nil error is returned when marshaling of platform metrics into JSON fails.
67-
func ProcessPlatformReport(functionData *extension.NextEventResponse, platformReport LogEvent) ([]byte, error) {
67+
func ProcessPlatformReport(fnARN string, deadlineMs int64, ts time.Time, platformReport LogEvent) ([]byte, error) {
6868
metricsContainer := MetricsContainer{
6969
Metrics: &model.Metrics{},
7070
}
@@ -78,7 +78,7 @@ func ProcessPlatformReport(functionData *extension.NextEventResponse, platformRe
7878
// FaaS Fields
7979
metricsContainer.Metrics.FAAS = &model.FAAS{
8080
Execution: platformReport.Record.RequestID,
81-
ID: functionData.InvokedFunctionArn,
81+
ID: fnARN,
8282
Coldstart: platformReportMetrics.InitDurationMs > 0,
8383
}
8484

@@ -95,7 +95,7 @@ func ProcessPlatformReport(functionData *extension.NextEventResponse, platformRe
9595
// - The epoch corresponding to the end of the current invocation (its "deadline")
9696
// - The epoch corresponding to the start of the current invocation
9797
// - The multiplication / division then rounds the value to obtain a number of ms that can be expressed a multiple of 1000 (see initial assumption)
98-
metricsContainer.Add("faas.timeout", math.Ceil(float64(functionData.DeadlineMs-functionData.Timestamp.UnixMilli())/1e3)*1e3) // Unit : Milliseconds
98+
metricsContainer.Add("faas.timeout", math.Ceil(float64(deadlineMs-ts.UnixMilli())/1e3)*1e3) // Unit : Milliseconds
9999

100100
var jsonWriter fastjson.Writer
101101
if err := metricsContainer.MarshalFastJSON(&jsonWriter); err != nil {

logsapi/metrics_test.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ func TestProcessPlatformReport_Coldstart(t *testing.T) {
6666

6767
desiredOutputMetrics := fmt.Sprintf(`{"metricset":{"samples":{"faas.coldstart_duration":{"value":422.9700012207031},"faas.timeout":{"value":5000},"system.memory.total":{"value":1.34217728e+08},"system.memory.actual.free":{"value":5.4525952e+07},"faas.duration":{"value":182.42999267578125},"faas.billed_duration":{"value":183}},"timestamp":%d,"faas":{"coldstart":true,"execution":"6f7f0961f83442118a7af6fe80b88d56","id":"arn:aws:lambda:us-east-2:123456789012:function:custom-runtime"}}}`, timestamp.UnixNano()/1e3)
6868

69-
data, err := ProcessPlatformReport(&event, logEvent)
69+
data, err := ProcessPlatformReport(event.InvokedFunctionArn, event.DeadlineMs, event.Timestamp, logEvent)
7070
require.NoError(t, err)
7171

7272
assert.JSONEq(t, desiredOutputMetrics, string(data))
@@ -110,7 +110,7 @@ func TestProcessPlatformReport_NoColdstart(t *testing.T) {
110110

111111
desiredOutputMetrics := fmt.Sprintf(`{"metricset":{"samples":{"faas.coldstart_duration":{"value":0},"faas.timeout":{"value":5000},"system.memory.total":{"value":1.34217728e+08},"system.memory.actual.free":{"value":5.4525952e+07},"faas.duration":{"value":182.42999267578125},"faas.billed_duration":{"value":183}},"timestamp":%d,"faas":{"coldstart":false,"execution":"6f7f0961f83442118a7af6fe80b88d56","id":"arn:aws:lambda:us-east-2:123456789012:function:custom-runtime"}}}`, timestamp.UnixNano()/1e3)
112112

113-
data, err := ProcessPlatformReport(&event, logEvent)
113+
data, err := ProcessPlatformReport(event.InvokedFunctionArn, event.DeadlineMs, event.Timestamp, logEvent)
114114
require.NoError(t, err)
115115

116116
assert.JSONEq(t, desiredOutputMetrics, string(data))
@@ -142,7 +142,12 @@ func BenchmarkPlatformReport(b *testing.B) {
142142
}
143143

144144
for n := 0; n < b.N; n++ {
145-
_, err := ProcessPlatformReport(nextEventResp, logEvent)
145+
_, err := ProcessPlatformReport(
146+
nextEventResp.InvokedFunctionArn,
147+
nextEventResp.DeadlineMs,
148+
nextEventResp.Timestamp,
149+
logEvent,
150+
)
146151
require.NoError(b, err)
147152
}
148153
}

0 commit comments

Comments
 (0)