Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 45 additions & 28 deletions accumulator/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,16 @@ type Batch struct {
buf bytes.Buffer
// invocations holds the data for a specific invocation with
// request ID as the key.
invocations map[string]*Invocation
count int
age time.Time
maxSize int
maxAge time.Duration
invocations map[string]*Invocation
count int
age time.Time
maxSize int
maxAge time.Duration
// currentlyExecutingRequestID represents the request ID of the currently
// executing lambda invocation. The ID can be set either on agent init or
// when extension receives the invoke event. If the agent hooks into the
// invoke lifecycle then it is possible to receive the agent init request
// before extension invoke is registered.
currentlyExecutingRequestID string
}

Expand All @@ -80,36 +85,43 @@ func NewBatch(maxSize int, maxAge time.Duration) *Batch {
// RegisterInvocation registers a new function invocation against its request
// ID. It also updates the caches for currently executing request ID.
func (b *Batch) RegisterInvocation(
requestID, functionARN string,
reqID, functionARN string,
deadlineMs int64,
timestamp time.Time,
) {
b.mu.Lock()
defer b.mu.Unlock()
b.invocations[requestID] = &Invocation{
RequestID: requestID,
FunctionARN: functionARN,
DeadlineMs: deadlineMs,
Timestamp: timestamp,

i, ok := b.invocations[reqID]
if !ok {
i = &Invocation{}
b.invocations[reqID] = i
}
b.currentlyExecutingRequestID = requestID
i.RequestID = reqID
i.FunctionARN = functionARN
i.DeadlineMs = deadlineMs
i.Timestamp = timestamp
b.currentlyExecutingRequestID = reqID
}

// OnAgentInit caches the transactionID and the payload for the currently
// executing invocation as reported by the agent. The traceID and transactionID
// will be used to create a new transaction in an event the actual transaction
// is not reported by the agent due to unexpected termination.
func (b *Batch) OnAgentInit(transactionID string, payload []byte) error {
// OnAgentInit caches the transaction ID and the payload for the currently
// executing invocation as reported by the agent. The agent payload will be
// used to create a new transaction in an event the actual transaction is
// not reported by the agent due to unexpected termination.
func (b *Batch) OnAgentInit(reqID, txnID string, payload []byte) error {
if !isTransactionEvent(payload) {
return errors.New("invalid payload")
}
b.mu.Lock()
defer b.mu.Unlock()
i, ok := b.invocations[b.currentlyExecutingRequestID]
i, ok := b.invocations[reqID]
if !ok {
return fmt.Errorf("invocation for requestID %s does not exist", b.currentlyExecutingRequestID)
}
if !isTransactionEvent(payload) {
return errors.New("invalid payload")
// It is possible that the invocation is registered at a later time
i = &Invocation{}
b.invocations[reqID] = i
}
i.TransactionID, i.AgentPayload = transactionID, payload
i.TransactionID, i.AgentPayload = txnID, payload
b.currentlyExecutingRequestID = reqID
return nil
}

Expand Down Expand Up @@ -167,10 +179,10 @@ func (b *Batch) AddAgentData(apmData APMData) error {
// OnLambdaLogRuntimeDone prepares the data for the invocation to be shipped
// to APM Server. It accepts requestID and status of the invocation both of
// which can be retrieved after parsing `platform.runtimeDone` event.
func (b *Batch) OnLambdaLogRuntimeDone(reqID, status string) error {
func (b *Batch) OnLambdaLogRuntimeDone(reqID, status string, time time.Time) error {
b.mu.Lock()
defer b.mu.Unlock()
return b.finalizeInvocation(reqID, status)
return b.finalizeInvocation(reqID, status, time)
}

// OnShutdown flushes the data for shipping to APM Server by finalizing all
Expand All @@ -181,7 +193,12 @@ func (b *Batch) OnShutdown(status string) error {
b.mu.Lock()
defer b.mu.Unlock()
for _, inc := range b.invocations {
if err := b.finalizeInvocation(inc.RequestID, status); err != nil {
// Assume that the transaction took all the function time.
// TODO: @lahsivjar Is it possible to tweak the extension lifecycle in
// a way that we receive the platform.report metric for a invocation
// consistently and enrich the metrics with reported values?
time := time.Unix(0, inc.DeadlineMs*int64(time.Millisecond))
if err := b.finalizeInvocation(inc.RequestID, status, time); err != nil {
return err
}
}
Expand Down Expand Up @@ -235,13 +252,13 @@ func (b *Batch) ToAPMData() APMData {
}
}

func (b *Batch) finalizeInvocation(reqID, status string) error {
func (b *Batch) finalizeInvocation(reqID, status string, time time.Time) error {
inc, ok := b.invocations[reqID]
if !ok {
return fmt.Errorf("invocation for requestID %s does not exist", reqID)
}
defer delete(b.invocations, reqID)
proxyTxn, err := inc.Finalize(status)
proxyTxn, err := inc.Finalize(status, time)
if err != nil {
return err
}
Expand Down
21 changes: 12 additions & 9 deletions accumulator/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ func TestLifecycle(t *testing.T) {
lambdaData := `{"log":{"message":"this is log"}}`
txnData := fmt.Sprintf(`{"transaction":{"id":"%s"}}`, txnID)
ts := time.Date(2022, time.October, 1, 1, 1, 1, 0, time.UTC)
txnDur := time.Second

for _, tc := range []struct {
name string
Expand Down Expand Up @@ -127,7 +128,7 @@ func TestLifecycle(t *testing.T) {
expected: fmt.Sprintf(
"%s\n%s\n%s",
metadata,
generateCompleteTxn(t, txnData, "success", ""),
generateCompleteTxn(t, txnData, "success", "", txnDur),
lambdaData,
),
},
Expand All @@ -139,7 +140,7 @@ func TestLifecycle(t *testing.T) {
expected: fmt.Sprintf(
"%s\n%s\n%s",
metadata,
generateCompleteTxn(t, txnData, "success", ""),
generateCompleteTxn(t, txnData, "success", "", txnDur),
lambdaData,
),
},
Expand All @@ -154,7 +155,7 @@ func TestLifecycle(t *testing.T) {
"%s\n%s\n%s",
metadata,
lambdaData,
generateCompleteTxn(t, txnData, "failure", "failure"),
generateCompleteTxn(t, txnData, "failure", "failure", txnDur),
),
},
{
Expand All @@ -169,17 +170,17 @@ func TestLifecycle(t *testing.T) {
"%s\n%s\n%s",
metadata,
lambdaData,
generateCompleteTxn(t, txnData, "timeout", "failure"),
generateCompleteTxn(t, txnData, "timeout", "failure", txnDur),
),
},
} {
t.Run(tc.name, func(t *testing.T) {
b := NewBatch(100, time.Hour)
// NEXT API response creates a new invocation cache
b.RegisterInvocation(reqID, fnARN, 100, ts)
b.RegisterInvocation(reqID, fnARN, ts.Add(txnDur).UnixMilli(), ts)
// Agent creates and registers a partial transaction in the extn
if tc.agentInit {
require.NoError(t, b.OnAgentInit(txnID, []byte(txnData)))
require.NoError(t, b.OnAgentInit(reqID, txnID, []byte(txnData)))
}
// Agent sends a request with metadata
require.NoError(t, b.AddAgentData(APMData{
Expand All @@ -190,7 +191,7 @@ func TestLifecycle(t *testing.T) {
Data: []byte(fmt.Sprintf(
"%s\n%s",
metadata,
generateCompleteTxn(t, txnData, "success", "")),
generateCompleteTxn(t, txnData, "success", "", txnDur)),
),
}))
}
Expand All @@ -199,7 +200,7 @@ func TestLifecycle(t *testing.T) {
require.NoError(t, b.AddLambdaData([]byte(lambdaData)))
if tc.receiveLambdaLogRuntime {
// Lambda API receives a platform.runtimeDone event
require.NoError(t, b.OnLambdaLogRuntimeDone(reqID, "failure"))
require.NoError(t, b.OnLambdaLogRuntimeDone(reqID, "failure", ts.Add(txnDur)))
}
// Instance shutdown
require.NoError(t, b.OnShutdown("timeout"))
Expand All @@ -222,10 +223,12 @@ func TestIsTransactionEvent(t *testing.T) {
}
}

func generateCompleteTxn(t *testing.T, src, result, outcome string) string {
func generateCompleteTxn(t *testing.T, src, result, outcome string, d time.Duration) string {
t.Helper()
tmp, err := sjson.SetBytes([]byte(src), "transaction.result", result)
assert.NoError(t, err)
tmp, err = sjson.SetBytes(tmp, "transaction.duration", d.Milliseconds())
assert.NoError(t, err)
if outcome != "" {
tmp, err = sjson.SetBytes(tmp, "transaction.outcome", outcome)
assert.NoError(t, err)
Expand Down
24 changes: 19 additions & 5 deletions accumulator/invocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,17 +59,31 @@ func (inc *Invocation) NeedProxyTransaction() bool {
// A proxy transaction will be required to be created if the agent has
// registered a transaction for the invocation but has not sent the
// corresponding transaction to the extension.
func (inc *Invocation) Finalize(status string) ([]byte, error) {
func (inc *Invocation) Finalize(status string, time time.Time) ([]byte, error) {
if !inc.NeedProxyTransaction() {
return nil, nil
}
return inc.createProxyTxn(status)
return inc.createProxyTxn(status, time)
}

func (inc *Invocation) createProxyTxn(status string) (txn []byte, err error) {
txn, err = sjson.SetBytes(inc.AgentPayload, "transaction.result", status)
func (inc *Invocation) createProxyTxn(status string, time time.Time) ([]byte, error) {
txn, err := sjson.SetBytes(inc.AgentPayload, "transaction.result", status)
if err != nil {
return nil, err
}
// Transaction duration cannot be known in partial transaction payload. Estimate
// the duration based on the time provided. Time can be based on the runtimeDone
// log record or function deadline.
duration := time.Sub(inc.Timestamp)
txn, err = sjson.SetBytes(txn, "transaction.duration", duration.Milliseconds())
if err != nil {
return nil, err
}
if status != "success" {
txn, err = sjson.SetBytes(txn, "transaction.outcome", "failure")
if err != nil {
return nil, err
}
}
return
return txn, nil
}
30 changes: 21 additions & 9 deletions accumulator/invocation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@
package accumulator

import (
"fmt"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestFinalize(t *testing.T) {
txnDur := time.Second
for _, tc := range []struct {
name string
txnID string
Expand All @@ -39,33 +41,42 @@ func TestFinalize(t *testing.T) {
{
name: "txn_registered_observed",
txnID: "test-txn-id",
payload: `{"transaction":{"id":"test-txn-id","trace_id":"test-trace-id"}}`,
payload: `{"transaction":{"id":"test-txn-id","trace_id":"test-trace-id","duration":-1}}`,
txnObserved: true,
runtimeDoneStatus: "success",
},
{
name: "txn_registered_not_observed_runtime_failure",
txnID: "test-txn-id",
payload: `{"transaction":{"id":"test-txn-id","trace_id":"test-trace-id"}}`,
payload: `{"transaction":{"id":"test-txn-id","trace_id":"test-trace-id","duration":-1}}`,
txnObserved: false,
runtimeDoneStatus: "failure",
output: `{"transaction":{"id":"test-txn-id","trace_id":"test-trace-id","result":"failure","outcome":"failure"}}`,
output: fmt.Sprintf(
`{"transaction":{"id":"test-txn-id","trace_id":"test-trace-id","result":"failure","outcome":"failure","duration":%d}}`,
txnDur.Milliseconds(),
),
},
{
name: "txn_registered_not_observed_runtime_timeout",
txnID: "test-txn-id",
payload: `{"transaction":{"id":"test-txn-id","trace_id":"test-trace-id"}}`,
payload: `{"transaction":{"id":"test-txn-id","trace_id":"test-trace-id","duration":-1}}`,
txnObserved: false,
runtimeDoneStatus: "timeout",
output: `{"transaction":{"id":"test-txn-id","trace_id":"test-trace-id","result":"timeout","outcome":"failure"}}`,
output: fmt.Sprintf(
`{"transaction":{"id":"test-txn-id","trace_id":"test-trace-id","result":"timeout","outcome":"failure","duration":%d}}`,
txnDur.Milliseconds(),
),
},
{
name: "txn_registered_not_observed_runtime_success",
txnID: "test-txn-id",
payload: `{"transaction":{"id":"test-txn-id","trace_id":"test-trace-id"}}`,
payload: `{"transaction":{"id":"test-txn-id","trace_id":"test-trace-id","duration":-1}}`,
txnObserved: false,
runtimeDoneStatus: "success",
output: `{"transaction":{"id":"test-txn-id","trace_id":"test-trace-id","result":"success"}}`,
output: fmt.Sprintf(
`{"transaction":{"id":"test-txn-id","trace_id":"test-trace-id","result":"success","duration":%d}}`,
txnDur.Milliseconds(),
),
},
} {
t.Run(tc.name, func(t *testing.T) {
Expand All @@ -78,7 +89,7 @@ func TestFinalize(t *testing.T) {
AgentPayload: []byte(tc.payload),
TransactionObserved: tc.txnObserved,
}
result, err := inc.Finalize(tc.runtimeDoneStatus)
result, err := inc.Finalize(tc.runtimeDoneStatus, ts.Add(txnDur))
assert.Nil(t, err)
if len(tc.output) > 0 {
assert.JSONEq(t, tc.output, string(result))
Expand All @@ -91,6 +102,7 @@ func TestFinalize(t *testing.T) {

func BenchmarkCreateProxyTxn(b *testing.B) {
ts := time.Date(2022, time.October, 1, 1, 0, 0, 0, time.UTC)
txnDur := ts.Add(time.Second)
inc := &Invocation{
Timestamp: ts,
DeadlineMs: ts.Add(time.Minute).UnixMilli(),
Expand All @@ -102,7 +114,7 @@ func BenchmarkCreateProxyTxn(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := inc.createProxyTxn("success")
_, err := inc.createProxyTxn("success", txnDur)
if err != nil {
b.Fail()
}
Expand Down
15 changes: 2 additions & 13 deletions apmproxy/apmserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (c *Client) ForwardApmData(ctx context.Context) error {
}

// FlushAPMData reads all the apm data in the apm data channel and sends it to the APM server.
func (c *Client) FlushAPMData(ctx context.Context, shutdown bool) {
func (c *Client) FlushAPMData(ctx context.Context) {
if c.IsUnhealthy() {
c.logger.Debug("Flush skipped - Transport failing")
return
Expand All @@ -87,17 +87,6 @@ func (c *Client) FlushAPMData(ctx context.Context, shutdown bool) {
}
}

if shutdown {
// At shutdown we can not expect platform.runtimeDone events to be reported
// for the remaining invocations. If we haven't received the transaction
// from agents at this point then it is safe to assume that the function
// timed out. We will flush all the cached agent data with no transaction
// assuming the outcome of the transaction to be `timeout`.
if err := c.batch.OnShutdown("timeout"); err != nil {
c.logger.Errorf("Error while flushing agent data from batch: %v", err)
}
}

// If metadata still not available then fail fast
if c.batch == nil {
c.logger.Warnf("Metadata not available at flush, skipping sending lambda data to APM Server")
Expand All @@ -113,7 +102,7 @@ func (c *Client) FlushAPMData(ctx context.Context, shutdown bool) {
c.logger.Errorf("Error sending to APM server, skipping: %v", err)
}
case <-ctx.Done():
c.logger.Debugf("Failed to flush completely, may result in data drop")
c.logger.Debug("Failed to flush completely, may result in data drop")
return
default:
// Flush any remaining data in batch
Expand Down
2 changes: 1 addition & 1 deletion apmproxy/apmserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -702,7 +702,7 @@ func BenchmarkFlushAPMData(b *testing.B) {
for j := 0; j < 99; j++ {
apmClient.LambdaDataChannel <- []byte(`{"log":{"message":this is test log"}}`)
}
apmClient.FlushAPMData(context.Background(), false)
apmClient.FlushAPMData(context.Background())
}
}

Expand Down
Loading