From 89bbedb66de666cd7e00bbd393626d66dddb4330 Mon Sep 17 00:00:00 2001 From: Vishal Raj Date: Thu, 17 Nov 2022 07:03:07 +0100 Subject: [PATCH 1/4] Handle txn registration if invocation registration is late --- accumulator/batch.go | 51 ++++++++++++++++++++++++---------- accumulator/batch_test.go | 19 +++++++------ accumulator/invocation.go | 11 ++++++-- accumulator/invocation_test.go | 30 ++++++++++++++------ apmproxy/apmserver.go | 15 ++-------- apmproxy/apmserver_test.go | 2 +- apmproxy/receiver.go | 2 +- app/run.go | 42 ++++++++++++++++++---------- logsapi/client.go | 2 +- logsapi/event.go | 8 ++++-- 10 files changed, 115 insertions(+), 67 deletions(-) diff --git a/accumulator/batch.go b/accumulator/batch.go index 07f377aa..c208a2ee 100644 --- a/accumulator/batch.go +++ b/accumulator/batch.go @@ -40,10 +40,11 @@ var ( ) var ( - maxSizeThreshold = 0.9 - zeroTime = time.Time{} - newLineSep = []byte("\n") - transactionKey = []byte("transaction") + maxSizeThreshold = 0.9 + zeroTime = time.Time{} + newLineSep = []byte("\n") + transactionKey = []byte("transaction") + waitingInvocationKey = "waiting" ) // Batch manages the data that needs to be shipped to APM Server. It holds @@ -86,28 +87,43 @@ func (b *Batch) RegisterInvocation( ) { b.mu.Lock() defer b.mu.Unlock() - b.invocations[requestID] = &Invocation{ + inc := &Invocation{ RequestID: requestID, FunctionARN: functionARN, DeadlineMs: deadlineMs, Timestamp: timestamp, } + if w, ok := b.invocations[waitingInvocationKey]; ok { + // If the agent payload is cached before the invocation is registered + // then associate the agent payload with the invocation. + inc.AgentPayload = w.AgentPayload + inc.TransactionID = w.TransactionID + delete(b.invocations, waitingInvocationKey) + } + b.invocations[requestID] = inc b.currentlyExecutingRequestID = requestID } // OnAgentInit caches the transactionID and the payload for the currently // executing invocation as reported by the agent. The traceID and transactionID // will be used to create a new transaction in an event the actual transaction -// is not reported by the agent due to unexpected termination. +// is not reported by the agent due to unexpected termination. If the current +// invocation is not registered yet then the payload is cached temporarily and +// associated with the invocation when it is registered. func (b *Batch) OnAgentInit(transactionID string, payload []byte) error { + if !isTransactionEvent(payload) { + return errors.New("invalid payload") + } b.mu.Lock() defer b.mu.Unlock() i, ok := b.invocations[b.currentlyExecutingRequestID] if !ok { - return fmt.Errorf("invocation for requestID %s does not exist", b.currentlyExecutingRequestID) - } - if !isTransactionEvent(payload) { - return errors.New("invalid payload") + // It is possible that the invocation is registered at a later time + b.invocations[waitingInvocationKey] = &Invocation{ + TransactionID: transactionID, + AgentPayload: payload, + } + return nil } i.TransactionID, i.AgentPayload = transactionID, payload return nil @@ -167,10 +183,10 @@ func (b *Batch) AddAgentData(apmData APMData) error { // OnLambdaLogRuntimeDone prepares the data for the invocation to be shipped // to APM Server. It accepts requestID and status of the invocation both of // which can be retrieved after parsing `platform.runtimeDone` event. -func (b *Batch) OnLambdaLogRuntimeDone(reqID, status string) error { +func (b *Batch) OnLambdaLogRuntimeDone(reqID, status string, time time.Time) error { b.mu.Lock() defer b.mu.Unlock() - return b.finalizeInvocation(reqID, status) + return b.finalizeInvocation(reqID, status, time) } // OnShutdown flushes the data for shipping to APM Server by finalizing all @@ -181,7 +197,12 @@ func (b *Batch) OnShutdown(status string) error { b.mu.Lock() defer b.mu.Unlock() for _, inc := range b.invocations { - if err := b.finalizeInvocation(inc.RequestID, status); err != nil { + // Assume that the transaction took all the function time. + // TODO: @lahsivjar Is it possible to tweak the extension lifecycle in + // a way that we receive the platform.report metric for a invocation + // consistently and enrich the metrics with reported values? + time := time.Unix(0, inc.DeadlineMs*int64(time.Millisecond)) + if err := b.finalizeInvocation(inc.RequestID, status, time); err != nil { return err } } @@ -235,13 +256,13 @@ func (b *Batch) ToAPMData() APMData { } } -func (b *Batch) finalizeInvocation(reqID, status string) error { +func (b *Batch) finalizeInvocation(reqID, status string, time time.Time) error { inc, ok := b.invocations[reqID] if !ok { return fmt.Errorf("invocation for requestID %s does not exist", reqID) } defer delete(b.invocations, reqID) - proxyTxn, err := inc.Finalize(status) + proxyTxn, err := inc.Finalize(status, time) if err != nil { return err } diff --git a/accumulator/batch_test.go b/accumulator/batch_test.go index be75ff39..f8e3f491 100644 --- a/accumulator/batch_test.go +++ b/accumulator/batch_test.go @@ -99,6 +99,7 @@ func TestLifecycle(t *testing.T) { lambdaData := `{"log":{"message":"this is log"}}` txnData := fmt.Sprintf(`{"transaction":{"id":"%s"}}`, txnID) ts := time.Date(2022, time.October, 1, 1, 1, 1, 0, time.UTC) + txnDur := time.Second for _, tc := range []struct { name string @@ -127,7 +128,7 @@ func TestLifecycle(t *testing.T) { expected: fmt.Sprintf( "%s\n%s\n%s", metadata, - generateCompleteTxn(t, txnData, "success", ""), + generateCompleteTxn(t, txnData, "success", "", txnDur), lambdaData, ), }, @@ -139,7 +140,7 @@ func TestLifecycle(t *testing.T) { expected: fmt.Sprintf( "%s\n%s\n%s", metadata, - generateCompleteTxn(t, txnData, "success", ""), + generateCompleteTxn(t, txnData, "success", "", txnDur), lambdaData, ), }, @@ -154,7 +155,7 @@ func TestLifecycle(t *testing.T) { "%s\n%s\n%s", metadata, lambdaData, - generateCompleteTxn(t, txnData, "failure", "failure"), + generateCompleteTxn(t, txnData, "failure", "failure", txnDur), ), }, { @@ -169,14 +170,14 @@ func TestLifecycle(t *testing.T) { "%s\n%s\n%s", metadata, lambdaData, - generateCompleteTxn(t, txnData, "timeout", "failure"), + generateCompleteTxn(t, txnData, "timeout", "failure", txnDur), ), }, } { t.Run(tc.name, func(t *testing.T) { b := NewBatch(100, time.Hour) // NEXT API response creates a new invocation cache - b.RegisterInvocation(reqID, fnARN, 100, ts) + b.RegisterInvocation(reqID, fnARN, ts.Add(txnDur).UnixMilli(), ts) // Agent creates and registers a partial transaction in the extn if tc.agentInit { require.NoError(t, b.OnAgentInit(txnID, []byte(txnData))) @@ -190,7 +191,7 @@ func TestLifecycle(t *testing.T) { Data: []byte(fmt.Sprintf( "%s\n%s", metadata, - generateCompleteTxn(t, txnData, "success", "")), + generateCompleteTxn(t, txnData, "success", "", txnDur)), ), })) } @@ -199,7 +200,7 @@ func TestLifecycle(t *testing.T) { require.NoError(t, b.AddLambdaData([]byte(lambdaData))) if tc.receiveLambdaLogRuntime { // Lambda API receives a platform.runtimeDone event - require.NoError(t, b.OnLambdaLogRuntimeDone(reqID, "failure")) + require.NoError(t, b.OnLambdaLogRuntimeDone(reqID, "failure", ts.Add(txnDur))) } // Instance shutdown require.NoError(t, b.OnShutdown("timeout")) @@ -222,10 +223,12 @@ func TestIsTransactionEvent(t *testing.T) { } } -func generateCompleteTxn(t *testing.T, src, result, outcome string) string { +func generateCompleteTxn(t *testing.T, src, result, outcome string, d time.Duration) string { t.Helper() tmp, err := sjson.SetBytes([]byte(src), "transaction.result", result) assert.NoError(t, err) + tmp, err = sjson.SetBytes(tmp, "transaction.duration", d.Milliseconds()) + assert.NoError(t, err) if outcome != "" { tmp, err = sjson.SetBytes(tmp, "transaction.outcome", outcome) assert.NoError(t, err) diff --git a/accumulator/invocation.go b/accumulator/invocation.go index 0e1b69a9..716d7cfe 100644 --- a/accumulator/invocation.go +++ b/accumulator/invocation.go @@ -59,15 +59,20 @@ func (inc *Invocation) NeedProxyTransaction() bool { // A proxy transaction will be required to be created if the agent has // registered a transaction for the invocation but has not sent the // corresponding transaction to the extension. -func (inc *Invocation) Finalize(status string) ([]byte, error) { +func (inc *Invocation) Finalize(status string, time time.Time) ([]byte, error) { if !inc.NeedProxyTransaction() { return nil, nil } - return inc.createProxyTxn(status) + return inc.createProxyTxn(status, time) } -func (inc *Invocation) createProxyTxn(status string) (txn []byte, err error) { +func (inc *Invocation) createProxyTxn(status string, time time.Time) (txn []byte, err error) { txn, err = sjson.SetBytes(inc.AgentPayload, "transaction.result", status) + // Transaction duration cannot be known in partial transaction payload. Estimate + // the duration based on the time provided. Time can be based on the runtimeDone + // log record or function deadline. + duration := time.Sub(inc.Timestamp) + txn, err = sjson.SetBytes(txn, "transaction.duration", duration.Milliseconds()) if status != "success" { txn, err = sjson.SetBytes(txn, "transaction.outcome", "failure") } diff --git a/accumulator/invocation_test.go b/accumulator/invocation_test.go index 7aaed6fe..9ab4bd4a 100644 --- a/accumulator/invocation_test.go +++ b/accumulator/invocation_test.go @@ -18,6 +18,7 @@ package accumulator import ( + "fmt" "testing" "time" @@ -25,6 +26,7 @@ import ( ) func TestFinalize(t *testing.T) { + txnDur := time.Second for _, tc := range []struct { name string txnID string @@ -39,33 +41,42 @@ func TestFinalize(t *testing.T) { { name: "txn_registered_observed", txnID: "test-txn-id", - payload: `{"transaction":{"id":"test-txn-id","trace_id":"test-trace-id"}}`, + payload: `{"transaction":{"id":"test-txn-id","trace_id":"test-trace-id","duration":-1}}`, txnObserved: true, runtimeDoneStatus: "success", }, { name: "txn_registered_not_observed_runtime_failure", txnID: "test-txn-id", - payload: `{"transaction":{"id":"test-txn-id","trace_id":"test-trace-id"}}`, + payload: `{"transaction":{"id":"test-txn-id","trace_id":"test-trace-id","duration":-1}}`, txnObserved: false, runtimeDoneStatus: "failure", - output: `{"transaction":{"id":"test-txn-id","trace_id":"test-trace-id","result":"failure","outcome":"failure"}}`, + output: fmt.Sprintf( + `{"transaction":{"id":"test-txn-id","trace_id":"test-trace-id","result":"failure","outcome":"failure","duration":%d}}`, + txnDur.Milliseconds(), + ), }, { name: "txn_registered_not_observed_runtime_timeout", txnID: "test-txn-id", - payload: `{"transaction":{"id":"test-txn-id","trace_id":"test-trace-id"}}`, + payload: `{"transaction":{"id":"test-txn-id","trace_id":"test-trace-id","duration":-1}}`, txnObserved: false, runtimeDoneStatus: "timeout", - output: `{"transaction":{"id":"test-txn-id","trace_id":"test-trace-id","result":"timeout","outcome":"failure"}}`, + output: fmt.Sprintf( + `{"transaction":{"id":"test-txn-id","trace_id":"test-trace-id","result":"timeout","outcome":"failure","duration":%d}}`, + txnDur.Milliseconds(), + ), }, { name: "txn_registered_not_observed_runtime_success", txnID: "test-txn-id", - payload: `{"transaction":{"id":"test-txn-id","trace_id":"test-trace-id"}}`, + payload: `{"transaction":{"id":"test-txn-id","trace_id":"test-trace-id","duration":-1}}`, txnObserved: false, runtimeDoneStatus: "success", - output: `{"transaction":{"id":"test-txn-id","trace_id":"test-trace-id","result":"success"}}`, + output: fmt.Sprintf( + `{"transaction":{"id":"test-txn-id","trace_id":"test-trace-id","result":"success","duration":%d}}`, + txnDur.Milliseconds(), + ), }, } { t.Run(tc.name, func(t *testing.T) { @@ -78,7 +89,7 @@ func TestFinalize(t *testing.T) { AgentPayload: []byte(tc.payload), TransactionObserved: tc.txnObserved, } - result, err := inc.Finalize(tc.runtimeDoneStatus) + result, err := inc.Finalize(tc.runtimeDoneStatus, ts.Add(txnDur)) assert.Nil(t, err) if len(tc.output) > 0 { assert.JSONEq(t, tc.output, string(result)) @@ -91,6 +102,7 @@ func TestFinalize(t *testing.T) { func BenchmarkCreateProxyTxn(b *testing.B) { ts := time.Date(2022, time.October, 1, 1, 0, 0, 0, time.UTC) + txnDur := ts.Add(time.Second) inc := &Invocation{ Timestamp: ts, DeadlineMs: ts.Add(time.Minute).UnixMilli(), @@ -102,7 +114,7 @@ func BenchmarkCreateProxyTxn(b *testing.B) { b.ReportAllocs() b.ResetTimer() for i := 0; i < b.N; i++ { - _, err := inc.createProxyTxn("success") + _, err := inc.createProxyTxn("success", txnDur) if err != nil { b.Fail() } diff --git a/apmproxy/apmserver.go b/apmproxy/apmserver.go index 6ae0fddf..0c0661fd 100644 --- a/apmproxy/apmserver.go +++ b/apmproxy/apmserver.go @@ -72,7 +72,7 @@ func (c *Client) ForwardApmData(ctx context.Context) error { } // FlushAPMData reads all the apm data in the apm data channel and sends it to the APM server. -func (c *Client) FlushAPMData(ctx context.Context, shutdown bool) { +func (c *Client) FlushAPMData(ctx context.Context) { if c.IsUnhealthy() { c.logger.Debug("Flush skipped - Transport failing") return @@ -87,17 +87,6 @@ func (c *Client) FlushAPMData(ctx context.Context, shutdown bool) { } } - if shutdown { - // At shutdown we can not expect platform.runtimeDone events to be reported - // for the remaining invocations. If we haven't received the transaction - // from agents at this point then it is safe to assume that the function - // timed out. We will flush all the cached agent data with no transaction - // assuming the outcome of the transaction to be `timeout`. - if err := c.batch.OnShutdown("timeout"); err != nil { - c.logger.Errorf("Error while flushing agent data from batch: %v", err) - } - } - // If metadata still not available then fail fast if c.batch == nil { c.logger.Warnf("Metadata not available at flush, skipping sending lambda data to APM Server") @@ -113,7 +102,7 @@ func (c *Client) FlushAPMData(ctx context.Context, shutdown bool) { c.logger.Errorf("Error sending to APM server, skipping: %v", err) } case <-ctx.Done(): - c.logger.Debugf("Failed to flush completely, may result in data drop") + c.logger.Debug("Failed to flush completely, may result in data drop") return default: // Flush any remaining data in batch diff --git a/apmproxy/apmserver_test.go b/apmproxy/apmserver_test.go index 99e68680..78d9f831 100644 --- a/apmproxy/apmserver_test.go +++ b/apmproxy/apmserver_test.go @@ -702,7 +702,7 @@ func BenchmarkFlushAPMData(b *testing.B) { for j := 0; j < 99; j++ { apmClient.LambdaDataChannel <- []byte(`{"log":{"message":this is test log"}}`) } - apmClient.FlushAPMData(context.Background(), false) + apmClient.FlushAPMData(context.Background()) } } diff --git a/apmproxy/receiver.go b/apmproxy/receiver.go index ed321b06..b776f78b 100644 --- a/apmproxy/receiver.go +++ b/apmproxy/receiver.go @@ -184,7 +184,7 @@ func (c *Client) handleTransactionRegistration() func(w http.ResponseWriter, r * return } if err := c.batch.OnAgentInit(txnID, rawBytes); err != nil { - c.logger.Warnf("Failed to update invocation for transaction ID %s", txnID) + c.logger.Warnf("Failed to update invocation for transaction ID %s: %v", txnID, err) w.WriteHeader(http.StatusUnprocessableEntity) return } diff --git a/app/run.go b/app/run.go index 85c3db1a..33d705c2 100644 --- a/app/run.go +++ b/app/run.go @@ -76,7 +76,7 @@ func (app *App) Run(ctx context.Context) error { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - app.apmClient.FlushAPMData(ctx, true) + app.apmClient.FlushAPMData(ctx) }() // The previous event id is used to validate the received Lambda metrics @@ -109,7 +109,7 @@ func (app *App) Run(ctx context.Context) error { // waiting for grace period if it got to unhealthy state. flushCtx, cancel := context.WithCancel(ctx) // Flush APM data now that the function invocation has completed - app.apmClient.FlushAPMData(flushCtx, false) + app.apmClient.FlushAPMData(flushCtx) cancel() } prevEvent = event @@ -131,7 +131,7 @@ func (app *App) processEvent( // call Next method of extension API. This long polling HTTP method // will block until there's an invocation of the function - app.logger.Infof("Waiting for next event...") + app.logger.Info("Waiting for next event...") event, err := app.extensionClient.NextEvent(ctx) if err != nil { app.logger.Errorf("Error: %s", err) @@ -142,21 +142,35 @@ func (app *App) processEvent( } app.logger.Infof("Exit signal sent to runtime : %s", status) - app.logger.Infof("Exiting") + app.logger.Info("Exiting") return nil, err } - app.batch.RegisterInvocation( - event.RequestID, - event.InvokedFunctionArn, - event.DeadlineMs, - event.Timestamp, - ) + // Used to compute Lambda Timeout event.Timestamp = time.Now() app.logger.Debug("Received event.") app.logger.Debugf("%v", extension.PrettyPrint(event)) - if event.EventType == extension.Shutdown { + switch event.EventType { + case extension.Invoke: + app.batch.RegisterInvocation( + event.RequestID, + event.InvokedFunctionArn, + event.DeadlineMs, + event.Timestamp, + ) + case extension.Shutdown: + // At shutdown we can not expect platform.runtimeDone events to be reported + // for the remaining invocations. If we haven't received the transaction + // from agents at this point then it is safe to assume that the function + // timed out. We will create proxy transaction for all invocations that + // haven't received a full transaction from the agent yet. If extension + // doesn't have enough CPU time it is possible that the extension might + // not receive the shutdown signal for timeouts or runtime crashes. In + // these cases we will miss the transaction. + if err := app.batch.OnShutdown("timeout"); err != nil { + app.logger.Errorf("Error finalizing invocation on shutdown: %v", err) + } return event, nil } @@ -193,7 +207,7 @@ func (app *App) processEvent( } // Calculate how long to wait for a runtimeDoneSignal or AgentDoneSignal signal - flushDeadlineMs := event.DeadlineMs - 100 + flushDeadlineMs := event.DeadlineMs - 200 durationUntilFlushDeadline := time.Until(time.Unix(flushDeadlineMs/1000, 0)) // Create a timer that expires after durationUntilFlushDeadline @@ -204,12 +218,12 @@ func (app *App) processEvent( // the lambda function and the end of the execution of processEvent() // 1) AgentDoneSignal is triggered upon reception of a `flushed=true` query from the agent // 2) [Backup 1] RuntimeDone is triggered upon reception of a Lambda log entry certifying the end of the execution of the current function - // 3) [Backup 2] If all else fails, the extension relies of the timeout of the Lambda function to interrupt itself 100 ms before the specified deadline. + // 3) [Backup 2] If all else fails, the extension relies of the timeout of the Lambda function to interrupt itself 200 ms before the specified deadline. // This time interval is large enough to attempt a last flush attempt (if SendStrategy == syncFlush) before the environment gets shut down. select { case <-app.apmClient.WaitForFlush(): - app.logger.Debug("APM client has pending flush signals") + app.logger.Debug("APM client has sent flush signal") case <-runtimeDone: app.logger.Debug("Received runtimeDone signal") case <-timer.C: diff --git a/logsapi/client.go b/logsapi/client.go index ef3a015b..9bb30096 100644 --- a/logsapi/client.go +++ b/logsapi/client.go @@ -47,7 +47,7 @@ const ( type ClientOption func(*Client) type invocationLifecycler interface { - OnLambdaLogRuntimeDone(requestID, status string) error + OnLambdaLogRuntimeDone(requestID, status string, time time.Time) error } // Client is the client used to subscribe to the Logs API. diff --git a/logsapi/event.go b/logsapi/event.go index 20290dbb..7b05698a 100644 --- a/logsapi/event.go +++ b/logsapi/event.go @@ -75,8 +75,12 @@ func (lc *Client) ProcessLogs( case PlatformStart: platformStartReqID = logEvent.Record.RequestID case PlatformRuntimeDone: - if err := lc.invocationLifecycler.OnLambdaLogRuntimeDone(logEvent.Record.RequestID, logEvent.Record.Status); err != nil { - lc.logger.Warnf("Failed to finalize invocation with request ID %s", logEvent.Record.RequestID) + if err := lc.invocationLifecycler.OnLambdaLogRuntimeDone( + logEvent.Record.RequestID, + logEvent.Record.Status, + logEvent.Time, + ); err != nil { + lc.logger.Warnf("Failed to finalize invocation with request ID %s: %v", logEvent.Record.RequestID, err) } // For the current invocation the platform.runtimeDone would be the last event if logEvent.Record.RequestID == requestID { From d39203940c380cbddcf23fa7340270bd87af813e Mon Sep 17 00:00:00 2001 From: Vishal Raj Date: Thu, 17 Nov 2022 07:57:56 +0100 Subject: [PATCH 2/4] Fix lint --- accumulator/invocation.go | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/accumulator/invocation.go b/accumulator/invocation.go index 716d7cfe..985a6d1b 100644 --- a/accumulator/invocation.go +++ b/accumulator/invocation.go @@ -66,15 +66,24 @@ func (inc *Invocation) Finalize(status string, time time.Time) ([]byte, error) { return inc.createProxyTxn(status, time) } -func (inc *Invocation) createProxyTxn(status string, time time.Time) (txn []byte, err error) { - txn, err = sjson.SetBytes(inc.AgentPayload, "transaction.result", status) +func (inc *Invocation) createProxyTxn(status string, time time.Time) ([]byte, error) { + txn, err := sjson.SetBytes(inc.AgentPayload, "transaction.result", status) + if err != nil { + return nil, err + } // Transaction duration cannot be known in partial transaction payload. Estimate // the duration based on the time provided. Time can be based on the runtimeDone // log record or function deadline. duration := time.Sub(inc.Timestamp) txn, err = sjson.SetBytes(txn, "transaction.duration", duration.Milliseconds()) + if err != nil { + return nil, err + } if status != "success" { txn, err = sjson.SetBytes(txn, "transaction.outcome", "failure") + if err != nil { + return nil, err + } } - return + return txn, nil } From 47291afaff43742b793485b86c171e3e66550093 Mon Sep 17 00:00:00 2001 From: Vishal Raj Date: Fri, 18 Nov 2022 12:23:11 +0800 Subject: [PATCH 3/4] Accept reqID as part of the txn registration request --- accumulator/batch.go | 47 ++++++++++++++++++++------------------- accumulator/batch_test.go | 2 +- apmproxy/receiver.go | 7 +++++- 3 files changed, 31 insertions(+), 25 deletions(-) diff --git a/accumulator/batch.go b/accumulator/batch.go index c208a2ee..6104706b 100644 --- a/accumulator/batch.go +++ b/accumulator/batch.go @@ -60,11 +60,16 @@ type Batch struct { buf bytes.Buffer // invocations holds the data for a specific invocation with // request ID as the key. - invocations map[string]*Invocation - count int - age time.Time - maxSize int - maxAge time.Duration + invocations map[string]*Invocation + count int + age time.Time + maxSize int + maxAge time.Duration + // currentlyExecutingRequestID represents the request ID of the currently + // executing lambda invocation. The ID can be set either on agent init or + // when extension receives the invoke event. If the agent hooks into the + // invoke lifecycle then it is possible to receive the agent init request + // before extension invoke is registered. currentlyExecutingRequestID string } @@ -81,14 +86,14 @@ func NewBatch(maxSize int, maxAge time.Duration) *Batch { // RegisterInvocation registers a new function invocation against its request // ID. It also updates the caches for currently executing request ID. func (b *Batch) RegisterInvocation( - requestID, functionARN string, + reqID, functionARN string, deadlineMs int64, timestamp time.Time, ) { b.mu.Lock() defer b.mu.Unlock() inc := &Invocation{ - RequestID: requestID, + RequestID: reqID, FunctionARN: functionARN, DeadlineMs: deadlineMs, Timestamp: timestamp, @@ -100,32 +105,28 @@ func (b *Batch) RegisterInvocation( inc.TransactionID = w.TransactionID delete(b.invocations, waitingInvocationKey) } - b.invocations[requestID] = inc - b.currentlyExecutingRequestID = requestID + b.invocations[reqID] = inc + b.currentlyExecutingRequestID = reqID } -// OnAgentInit caches the transactionID and the payload for the currently -// executing invocation as reported by the agent. The traceID and transactionID -// will be used to create a new transaction in an event the actual transaction -// is not reported by the agent due to unexpected termination. If the current -// invocation is not registered yet then the payload is cached temporarily and -// associated with the invocation when it is registered. -func (b *Batch) OnAgentInit(transactionID string, payload []byte) error { +// OnAgentInit caches the transaction ID and the payload for the currently +// executing invocation as reported by the agent. The agent payload will be +// used to create a new transaction in an event the actual transaction is +// not reported by the agent due to unexpected termination. +func (b *Batch) OnAgentInit(reqID, txnID string, payload []byte) error { if !isTransactionEvent(payload) { return errors.New("invalid payload") } b.mu.Lock() defer b.mu.Unlock() - i, ok := b.invocations[b.currentlyExecutingRequestID] + i, ok := b.invocations[reqID] if !ok { // It is possible that the invocation is registered at a later time - b.invocations[waitingInvocationKey] = &Invocation{ - TransactionID: transactionID, - AgentPayload: payload, - } - return nil + i = &Invocation{} + b.invocations[reqID] = i } - i.TransactionID, i.AgentPayload = transactionID, payload + i.TransactionID, i.AgentPayload = txnID, payload + b.currentlyExecutingRequestID = reqID return nil } diff --git a/accumulator/batch_test.go b/accumulator/batch_test.go index f8e3f491..2836cc30 100644 --- a/accumulator/batch_test.go +++ b/accumulator/batch_test.go @@ -180,7 +180,7 @@ func TestLifecycle(t *testing.T) { b.RegisterInvocation(reqID, fnARN, ts.Add(txnDur).UnixMilli(), ts) // Agent creates and registers a partial transaction in the extn if tc.agentInit { - require.NoError(t, b.OnAgentInit(txnID, []byte(txnData))) + require.NoError(t, b.OnAgentInit(reqID, txnID, []byte(txnData))) } // Agent sends a request with metadata require.NoError(t, b.AddAgentData(APMData{ diff --git a/apmproxy/receiver.go b/apmproxy/receiver.go index b776f78b..15ebeed7 100644 --- a/apmproxy/receiver.go +++ b/apmproxy/receiver.go @@ -170,6 +170,11 @@ func (c *Client) handleTransactionRegistration() func(w http.ResponseWriter, r * w.WriteHeader(http.StatusUnsupportedMediaType) return } + reqID := r.Header.Get("x-elastic-aws-request-id") + if reqID == "" { + w.WriteHeader(http.StatusBadRequest) + return + } rawBytes, err := io.ReadAll(r.Body) defer r.Body.Close() if err != nil { @@ -183,7 +188,7 @@ func (c *Client) handleTransactionRegistration() func(w http.ResponseWriter, r * w.WriteHeader(http.StatusUnprocessableEntity) return } - if err := c.batch.OnAgentInit(txnID, rawBytes); err != nil { + if err := c.batch.OnAgentInit(reqID, txnID, rawBytes); err != nil { c.logger.Warnf("Failed to update invocation for transaction ID %s: %v", txnID, err) w.WriteHeader(http.StatusUnprocessableEntity) return From 2bb984e579363198fe34584f331ff8ccee50ae54 Mon Sep 17 00:00:00 2001 From: Vishal Raj Date: Fri, 18 Nov 2022 12:32:11 +0800 Subject: [PATCH 4/4] Remove waiting key usage --- accumulator/batch.go | 31 +++++++++++++------------------ 1 file changed, 13 insertions(+), 18 deletions(-) diff --git a/accumulator/batch.go b/accumulator/batch.go index 6104706b..20d773dc 100644 --- a/accumulator/batch.go +++ b/accumulator/batch.go @@ -40,11 +40,10 @@ var ( ) var ( - maxSizeThreshold = 0.9 - zeroTime = time.Time{} - newLineSep = []byte("\n") - transactionKey = []byte("transaction") - waitingInvocationKey = "waiting" + maxSizeThreshold = 0.9 + zeroTime = time.Time{} + newLineSep = []byte("\n") + transactionKey = []byte("transaction") ) // Batch manages the data that needs to be shipped to APM Server. It holds @@ -92,20 +91,16 @@ func (b *Batch) RegisterInvocation( ) { b.mu.Lock() defer b.mu.Unlock() - inc := &Invocation{ - RequestID: reqID, - FunctionARN: functionARN, - DeadlineMs: deadlineMs, - Timestamp: timestamp, - } - if w, ok := b.invocations[waitingInvocationKey]; ok { - // If the agent payload is cached before the invocation is registered - // then associate the agent payload with the invocation. - inc.AgentPayload = w.AgentPayload - inc.TransactionID = w.TransactionID - delete(b.invocations, waitingInvocationKey) + + i, ok := b.invocations[reqID] + if !ok { + i = &Invocation{} + b.invocations[reqID] = i } - b.invocations[reqID] = inc + i.RequestID = reqID + i.FunctionARN = functionARN + i.DeadlineMs = deadlineMs + i.Timestamp = timestamp b.currentlyExecutingRequestID = reqID }