From 7d0a003c59d54f5973cc219c4ec89a62a7728eb5 Mon Sep 17 00:00:00 2001 From: Vishal Raj Date: Tue, 13 Sep 2022 12:11:55 +0800 Subject: [PATCH 01/11] Add support for collecting and sending function logs to APM Server --- app/run.go | 12 +++-- logsapi/client.go | 2 +- logsapi/client_test.go | 9 ++-- logsapi/event.go | 51 +++++++++++------- logsapi/functionlogs.go | 99 ++++++++++++++++++++++++++++++++++ logsapi/functionlogs_test.go | 70 ++++++++++++++++++++++++ logsapi/route_handlers.go | 2 +- logsapi/route_handlers_test.go | 10 ++-- logsapi/subscribe.go | 4 +- main_test.go | 10 ++-- 10 files changed, 228 insertions(+), 41 deletions(-) create mode 100644 logsapi/functionlogs.go create mode 100644 logsapi/functionlogs_test.go diff --git a/app/run.go b/app/run.go index a9f0fba9..d1c8951b 100644 --- a/app/run.go +++ b/app/run.go @@ -19,12 +19,13 @@ package app import ( "context" - "github.com/elastic/apm-aws-lambda/apmproxy" - "github.com/elastic/apm-aws-lambda/extension" - "github.com/elastic/apm-aws-lambda/logsapi" "fmt" "sync" "time" + + "github.com/elastic/apm-aws-lambda/apmproxy" + "github.com/elastic/apm-aws-lambda/extension" + "github.com/elastic/apm-aws-lambda/logsapi" ) // Run runs the app. @@ -57,7 +58,10 @@ func (app *App) Run(ctx context.Context) error { }() if app.logsClient != nil { - if err := app.logsClient.StartService([]logsapi.EventType{logsapi.Platform}, app.extensionClient.ExtensionID); err != nil { + if err := app.logsClient.StartService( + []logsapi.LogType{logsapi.Platform, logsapi.Function}, + app.extensionClient.ExtensionID, + ); err != nil { app.logger.Warnf("Error while subscribing to the Logs API: %v", err) // disable logs API if the service failed to start diff --git a/logsapi/client.go b/logsapi/client.go index 1f747bbf..36d0fe11 100644 --- a/logsapi/client.go +++ b/logsapi/client.go @@ -69,7 +69,7 @@ func NewClient(opts ...ClientOption) (*Client, error) { } // StartService starts the HTTP server listening for log events and subscribes to the Logs API. -func (lc *Client) StartService(eventTypes []EventType, extensionID string) error { +func (lc *Client) StartService(eventTypes []LogType, extensionID string) error { addr, err := lc.startHTTPServer() if err != nil { return err diff --git a/logsapi/client_test.go b/logsapi/client_test.go index d3878528..d0847e56 100644 --- a/logsapi/client_test.go +++ b/logsapi/client_test.go @@ -19,13 +19,14 @@ package logsapi_test import ( "bytes" - "github.com/elastic/apm-aws-lambda/logsapi" "encoding/json" "net/http" "net/http/httptest" "net/url" "testing" + "github.com/elastic/apm-aws-lambda/logsapi" + "github.com/stretchr/testify/require" "go.uber.org/zap/zaptest" ) @@ -107,9 +108,9 @@ func TestSubscribe(t *testing.T) { require.NoError(t, err) if tc.expectedErr { - require.Error(t, c.StartService([]logsapi.EventType{logsapi.Platform}, "foo")) + require.Error(t, c.StartService([]logsapi.LogType{logsapi.Platform}, "foo")) } else { - require.NoError(t, c.StartService([]logsapi.EventType{logsapi.Platform}, "foo")) + require.NoError(t, c.StartService([]logsapi.LogType{logsapi.Platform}, "foo")) } require.NoError(t, c.Shutdown()) @@ -143,7 +144,7 @@ func TestSubscribeAWSRequest(t *testing.T) { c, err := logsapi.NewClient(append(tc.opts, logsapi.WithLogsAPIBaseURL(s.URL), logsapi.WithLogBuffer(1))...) require.NoError(t, err) - require.NoError(t, c.StartService([]logsapi.EventType{logsapi.Platform}, "testID")) + require.NoError(t, c.StartService([]logsapi.LogType{logsapi.Platform}, "testID")) // Create a request to send to the logs listener platformDoneEvent := `{ diff --git a/logsapi/event.go b/logsapi/event.go index 4c65af88..2f76a67b 100644 --- a/logsapi/event.go +++ b/logsapi/event.go @@ -25,33 +25,38 @@ import ( "github.com/elastic/apm-aws-lambda/extension" ) -// EventType represents the type of logs in Lambda -type EventType string +// LogType represents the log streams that the Lambda Logs API +// provides for subscription +type LogType string const ( - // Platform is to receive logs emitted by the platform - Platform EventType = "platform" - // Function is to receive logs emitted by the function - Function EventType = "function" - // Extension is to receive logs emitted by the extension - Extension EventType = "extension" + // Platform log stream records events and errors related to + // invocations and extensions + Platform LogType = "platform" + // Function log stream records logs written by lambda function + // to stderr or stdout + Function LogType = "function" + // Extension log stream records logs generated by extension + Extension LogType = "extension" ) -// SubEventType is a Logs API sub event type -type SubEventType string +// SubLogType represents the subtype for each log type that is +// received in the log messages +type SubLogType string const ( - // RuntimeDone event is sent when lambda function is finished it's execution - RuntimeDone SubEventType = "platform.runtimeDone" - Fault SubEventType = "platform.fault" - Report SubEventType = "platform.report" - Start SubEventType = "platform.start" + // PlatformRuntimeDone event is sent when lambda function is finished it's execution + PlatformRuntimeDone SubLogType = "platform.runtimeDone" + PlatformFault SubLogType = "platform.fault" + PlatformReport SubLogType = "platform.report" + PlatformStart SubLogType = "platform.start" + FunctionLog SubLogType = "function" ) // LogEvent represents an event received from the Logs API type LogEvent struct { - Time time.Time `json:"time"` - Type SubEventType `json:"type"` + Time time.Time `json:"time"` + Type SubLogType `json:"type"` StringRecord string Record LogEventRecord } @@ -80,7 +85,7 @@ func (lc *Client) ProcessLogs( switch logEvent.Type { // Check the logEvent for runtimeDone and compare the RequestID // to the id that came in via the Next API - case RuntimeDone: + case PlatformRuntimeDone: if logEvent.Record.RequestID == requestID { lc.logger.Info("Received runtimeDone event for this function invocation") runtimeDoneSignal <- struct{}{} @@ -89,7 +94,7 @@ func (lc *Client) ProcessLogs( lc.logger.Debug("Log API runtimeDone event request id didn't match") // Check if the logEvent contains metrics and verify that they can be linked to the previous invocation - case Report: + case PlatformReport: if prevEvent != nil && logEvent.Record.RequestID == prevEvent.RequestID { lc.logger.Debug("Received platform report for the previous function invocation") processedMetrics, err := ProcessPlatformReport(metadataContainer, prevEvent, logEvent) @@ -102,6 +107,14 @@ func (lc *Client) ProcessLogs( lc.logger.Warn("report event request id didn't match the previous event id") lc.logger.Debug("Log API runtimeDone event request id didn't match") } + case FunctionLog: + lc.logger.Debug("Received function log") + processedLog, err := ProcessFunctionLog(metadataContainer, prevEvent, logEvent) + if err != nil { + lc.logger.Errorf("Error processing function log : %v", err) + } else { + apmClient.EnqueueAPMData(processedLog) + } } case <-ctx.Done(): lc.logger.Debug("Current invocation over. Interrupting logs processing goroutine") diff --git a/logsapi/functionlogs.go b/logsapi/functionlogs.go new file mode 100644 index 00000000..3625d217 --- /dev/null +++ b/logsapi/functionlogs.go @@ -0,0 +1,99 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package logsapi + +import ( + "github.com/elastic/apm-aws-lambda/apmproxy" + "github.com/elastic/apm-aws-lambda/extension" + "go.elastic.co/apm/v2/model" + "go.elastic.co/fastjson" +) + +type LogContainer struct { + Log *logLine +} + +type logLine struct { + Timestamp model.Time + Message string + FAAS *model.FAAS +} + +func (l *logLine) MarshalFastJSON(w *fastjson.Writer) error { + var firstErr error + w.RawByte('{') + w.RawString("\"message\":") + w.String(l.Message) + w.RawString(",\"@timestamp\":") + if err := l.Timestamp.MarshalFastJSON(w); err != nil && firstErr == nil { + firstErr = err + } + if l.FAAS != nil { + w.RawString(",\"faas\":") + if err := l.FAAS.MarshalFastJSON(w); err != nil && firstErr == nil { + firstErr = err + } + } + w.RawByte('}') + return firstErr +} + +func (lc LogContainer) MarshalFastJSON(json *fastjson.Writer) error { + json.RawString(`{"log":`) + if err := lc.Log.MarshalFastJSON(json); err != nil { + return err + } + json.RawString(`}`) + return nil +} + +// ProcessFunctionLog consumes extension event, agent metadata and log +// event from Lambda logs API to create a payload for APM server +func ProcessFunctionLog( + metadataContainer *apmproxy.MetadataContainer, + functionData *extension.NextEventResponse, + log LogEvent, +) (apmproxy.AgentData, error) { + lc := LogContainer{ + Log: &logLine{ + Timestamp: model.Time(log.Time), + Message: log.StringRecord, + }, + } + + if functionData != nil { + // FaaS Fields + lc.Log.FAAS = &model.FAAS{ + Execution: functionData.RequestID, + ID: functionData.InvokedFunctionArn, + } + } + + var jsonWriter fastjson.Writer + if err := lc.MarshalFastJSON(&jsonWriter); err != nil { + return apmproxy.AgentData{}, err + } + + var logData []byte + if metadataContainer.Metadata != nil { + logData = append(metadataContainer.Metadata, []byte("\n")...) + } + + logData = append(logData, jsonWriter.Bytes()...) + return apmproxy.AgentData{Data: logData}, nil +} diff --git a/logsapi/functionlogs_test.go b/logsapi/functionlogs_test.go new file mode 100644 index 00000000..64d4312f --- /dev/null +++ b/logsapi/functionlogs_test.go @@ -0,0 +1,70 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package logsapi + +import ( + "fmt" + "testing" + "time" + + "github.com/elastic/apm-aws-lambda/apmproxy" + "github.com/elastic/apm-aws-lambda/extension" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestProcessFunctionLog(t *testing.T) { + metadataContainer := &apmproxy.MetadataContainer{ + Metadata: []byte(`{"metadata":{"service":{"agent":{"name":"apm-lambda-extension","version":"1.1.0"},"framework":{"name":"AWS Lambda","version":""},"language":{"name":"python","version":"3.9.8"},"runtime":{"name":"","version":""},"node":{}},"user":{},"process":{"pid":0},"system":{"container":{"id":""},"kubernetes":{"node":{},"pod":{}}},"cloud":{"provider":"","instance":{},"machine":{},"account":{},"project":{},"service":{}}}}`), + } + event := LogEvent{ + Time: time.Date(2022, 11, 12, 0, 0, 0, 0, time.UTC), + Type: FunctionLog, + StringRecord: "ERROR encountered. Stack trace:\nmy-function (line 10)\n", + } + + t.Run("without_faas", func(t *testing.T) { + expectedEventJSON := `{"log":{"message":"ERROR encountered. Stack trace:\nmy-function (line 10)\n","@timestamp":1668211200000000}}` + + apmData, err := ProcessFunctionLog(metadataContainer, nil, event) + + require.NoError(t, err) + assert.Equal( + t, + fmt.Sprintf("%s\n%s", metadataContainer.Metadata, expectedEventJSON), + string(apmData.Data), + ) + }) + + t.Run("with_faas", func(t *testing.T) { + nextEventResp := &extension.NextEventResponse{ + RequestID: "8476a536-e9f4-11e8-9739-2dfe598c3fcd", + InvokedFunctionArn: "arn:aws:lambda:us-east-2:123456789012:function:custom-runtime", + } + expectedEventJSON := `{"log":{"message":"ERROR encountered. Stack trace:\nmy-function (line 10)\n","@timestamp":1668211200000000,"faas":{"coldstart":false,"execution":"8476a536-e9f4-11e8-9739-2dfe598c3fcd","id":"arn:aws:lambda:us-east-2:123456789012:function:custom-runtime"}}}` + + apmData, err := ProcessFunctionLog(metadataContainer, nextEventResp, event) + + require.NoError(t, err) + assert.Equal( + t, + fmt.Sprintf("%s\n%s", metadataContainer.Metadata, expectedEventJSON), + string(apmData.Data), + ) + }) +} diff --git a/logsapi/route_handlers.go b/logsapi/route_handlers.go index 4ffe2c83..73fc2024 100644 --- a/logsapi/route_handlers.go +++ b/logsapi/route_handlers.go @@ -48,7 +48,7 @@ func handleLogEventsRequest(logger *zap.SugaredLogger, logsChannel chan LogEvent func (le *LogEvent) UnmarshalJSON(data []byte) error { b := struct { Time time.Time `json:"time"` - Type SubEventType `json:"type"` + Type SubLogType `json:"type"` Record json.RawMessage `json:"record"` }{} diff --git a/logsapi/route_handlers_test.go b/logsapi/route_handlers_test.go index dc077e4a..ac40fe7a 100644 --- a/logsapi/route_handlers_test.go +++ b/logsapi/route_handlers_test.go @@ -43,7 +43,7 @@ func TestLogEventUnmarshalReport(t *testing.T) { err := le.UnmarshalJSON(reportJSON) require.NoError(t, err) - assert.Equal(t, SubEventType("platform.report"), le.Type) + assert.Equal(t, SubLogType("platform.report"), le.Type) assert.Equal(t, "2020-08-20T12:31:32.123Z", le.Time.Format(time.RFC3339Nano)) rec := LogEventRecord{ RequestID: "6f7f0961f83442118a7af6fe80b88d56", @@ -70,7 +70,7 @@ func TestLogEventUnmarshalFault(t *testing.T) { err := le.UnmarshalJSON(reportJSON) require.NoError(t, err) - assert.Equal(t, SubEventType("platform.fault"), le.Type) + assert.Equal(t, SubLogType("platform.fault"), le.Type) assert.Equal(t, "2020-08-20T12:31:32.123Z", le.Time.Format(time.RFC3339Nano)) rec := "RequestId: d783b35e-a91d-4251-af17-035953428a2c Process exited before completing request" assert.Equal(t, rec, le.StringRecord) @@ -92,7 +92,7 @@ func Test_unmarshalRuntimeDoneRecordObject(t *testing.T) { err := le.UnmarshalJSON(jsonBytes) require.NoError(t, err) - assert.Equal(t, SubEventType("platform.runtimeDone"), le.Type) + assert.Equal(t, SubLogType("platform.runtimeDone"), le.Type) assert.Equal(t, "2021-02-04T20:00:05.123Z", le.Time.Format(time.RFC3339Nano)) rec := LogEventRecord{ RequestID: "6f7f0961f83442118a7af6fe80b88", @@ -113,7 +113,7 @@ func Test_unmarshalRuntimeDoneRecordString(t *testing.T) { err := le.UnmarshalJSON(jsonBytes) require.NoError(t, err) - assert.Equal(t, SubEventType("platform.runtimeDone"), le.Type) + assert.Equal(t, SubLogType("platform.runtimeDone"), le.Type) assert.Equal(t, "2021-02-04T20:00:05.123Z", le.Time.Format(time.RFC3339Nano)) assert.Equal(t, "Unknown application error occurred", le.StringRecord) } @@ -134,6 +134,6 @@ func Test_unmarshalRuntimeDoneFaultRecordString(t *testing.T) { timeValue, _ := time.Parse(time.RFC3339, "2021-02-04T20:00:05.123Z") assert.Equal(t, timeValue, le.Time) - assert.Equal(t, Fault, le.Type) + assert.Equal(t, PlatformFault, le.Type) assert.Equal(t, "Unknown application error occurred", le.StringRecord) } diff --git a/logsapi/subscribe.go b/logsapi/subscribe.go index 9cf40f75..0dd6e161 100644 --- a/logsapi/subscribe.go +++ b/logsapi/subscribe.go @@ -30,7 +30,7 @@ import ( // SubscribeRequest is the request body that is sent to Logs API on subscribe type SubscribeRequest struct { SchemaVersion SchemaVersion `json:"schemaVersion"` - EventTypes []EventType `json:"types"` + EventTypes []LogType `json:"types"` BufferingCfg BufferingCfg `json:"buffering"` Destination Destination `json:"destination"` } @@ -80,7 +80,7 @@ func (lc *Client) startHTTPServer() (string, error) { return addr, nil } -func (lc *Client) subscribe(types []EventType, extensionID string, uri string) error { +func (lc *Client) subscribe(types []LogType, extensionID string, uri string) error { data, err := json.Marshal(&SubscribeRequest{ SchemaVersion: SchemaVersionLatest, EventTypes: types, diff --git a/main_test.go b/main_test.go index 90ea4ff3..78028b54 100644 --- a/main_test.go +++ b/main_test.go @@ -216,7 +216,7 @@ func newTestStructs(t *testing.T) chan MockEvent { } func processMockEvent(currId string, event MockEvent, extensionPort string, logsapiAddr string, internals *MockServerInternals, l *zap.SugaredLogger) { - sendLogEvent(logsapiAddr, currId, logsapi.Start, l) + sendLogEvent(logsapiAddr, currId, logsapi.PlatformStart, l) client := http.Client{} // Use a custom transport with a low timeout @@ -311,10 +311,10 @@ func processMockEvent(currId string, event MockEvent, extensionPort string, logs case Shutdown: } if sendRuntimeDone { - sendLogEvent(logsapiAddr, currId, logsapi.RuntimeDone, l) + sendLogEvent(logsapiAddr, currId, logsapi.PlatformRuntimeDone, l) } if sendMetrics { - sendLogEvent(logsapiAddr, currId, logsapi.Report, l) + sendLogEvent(logsapiAddr, currId, logsapi.PlatformReport, l) } } @@ -335,11 +335,11 @@ func sendNextEventInfo(w http.ResponseWriter, id string, event MockEvent, l *zap } } -func sendLogEvent(logsapiAddr string, requestId string, logEventType logsapi.SubEventType, l *zap.SugaredLogger) { +func sendLogEvent(logsapiAddr string, requestId string, logEventType logsapi.SubLogType, l *zap.SugaredLogger) { record := logsapi.LogEventRecord{ RequestID: requestId, } - if logEventType == logsapi.Report { + if logEventType == logsapi.PlatformReport { record.Metrics = logsapi.PlatformMetrics{ BilledDurationMs: 60, DurationMs: 59.9, From edacd527e49f1ccec941bf3e857b7813e347fea1 Mon Sep 17 00:00:00 2001 From: Vishal Raj Date: Tue, 13 Sep 2022 12:34:47 +0800 Subject: [PATCH 02/11] Add changelog --- CHANGELOG.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 55a0b33b..69e7ee54 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -29,6 +29,7 @@ https://github.com/elastic/apm-aws-lambda/compare/v1.1.0...main[View commits] [float] ===== Features - Disable CGO to prevent libc/ABI compatibility issues {lambda-pull}292[292] +- Add support for collecting and shipping function logs to APM Server {lambda-pull}303[303] [float] ===== Bug fixes From 25b061f02ce5e3ef28b4e4223ffe45f11f6c9bfd Mon Sep 17 00:00:00 2001 From: Vishal Raj Date: Tue, 13 Sep 2022 15:57:26 +0800 Subject: [PATCH 03/11] Minor refactoring --- logsapi/functionlogs.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/logsapi/functionlogs.go b/logsapi/functionlogs.go index 3625d217..984b15f7 100644 --- a/logsapi/functionlogs.go +++ b/logsapi/functionlogs.go @@ -24,7 +24,7 @@ import ( "go.elastic.co/fastjson" ) -type LogContainer struct { +type logContainer struct { Log *logLine } @@ -53,12 +53,12 @@ func (l *logLine) MarshalFastJSON(w *fastjson.Writer) error { return firstErr } -func (lc LogContainer) MarshalFastJSON(json *fastjson.Writer) error { +func (lc logContainer) MarshalFastJSON(json *fastjson.Writer) error { json.RawString(`{"log":`) if err := lc.Log.MarshalFastJSON(json); err != nil { return err } - json.RawString(`}`) + json.RawByte('}') return nil } @@ -69,7 +69,7 @@ func ProcessFunctionLog( functionData *extension.NextEventResponse, log LogEvent, ) (apmproxy.AgentData, error) { - lc := LogContainer{ + lc := logContainer{ Log: &logLine{ Timestamp: model.Time(log.Time), Message: log.StringRecord, From 3687e67b3ce1ba2e5d4bbf1b68527bdb572765e8 Mon Sep 17 00:00:00 2001 From: Vishal Raj Date: Wed, 14 Sep 2022 10:05:11 +0800 Subject: [PATCH 04/11] Fix handling of metadata and next event resp for fn logs --- app/run.go | 10 ++++++- logsapi/event.go | 15 +++++++---- logsapi/functionlogs.go | 13 ++++----- logsapi/functionlogs_test.go | 51 ++++++++++++++---------------------- 4 files changed, 43 insertions(+), 46 deletions(-) diff --git a/app/run.go b/app/run.go index d1c8951b..28ca4756 100644 --- a/app/run.go +++ b/app/run.go @@ -173,7 +173,15 @@ func (app *App) processEvent( runtimeDone := make(chan struct{}) if app.logsClient != nil { go func() { - if err := app.logsClient.ProcessLogs(invocationCtx, event.RequestID, app.apmClient, metadataContainer, runtimeDone, prevEvent); err != nil { + if err := app.logsClient.ProcessLogs( + invocationCtx, + event.RequestID, + event.InvokedFunctionArn, + app.apmClient, + metadataContainer, + runtimeDone, + prevEvent, + ); err != nil { app.logger.Errorf("Error while processing Lambda Logs ; %v", err) } else { close(runtimeDone) diff --git a/logsapi/event.go b/logsapi/event.go index 2f76a67b..59f7445e 100644 --- a/logsapi/event.go +++ b/logsapi/event.go @@ -73,6 +73,7 @@ type LogEventRecord struct { func (lc *Client) ProcessLogs( ctx context.Context, requestID string, + invokedFnArn string, apmClient *apmproxy.Client, metadataContainer *apmproxy.MetadataContainer, runtimeDoneSignal chan struct{}, @@ -108,12 +109,16 @@ func (lc *Client) ProcessLogs( lc.logger.Debug("Log API runtimeDone event request id didn't match") } case FunctionLog: - lc.logger.Debug("Received function log") - processedLog, err := ProcessFunctionLog(metadataContainer, prevEvent, logEvent) - if err != nil { - lc.logger.Errorf("Error processing function log : %v", err) + if metadataContainer != nil && len(metadataContainer.Metadata) > 0 { + lc.logger.Debug("Received function log") + processedLog, err := ProcessFunctionLog(metadataContainer, requestID, invokedFnArn, logEvent) + if err != nil { + lc.logger.Errorf("Error processing function log : %v", err) + } else { + apmClient.EnqueueAPMData(processedLog) + } } else { - apmClient.EnqueueAPMData(processedLog) + lc.logger.Warn("Function log received before metadata populated, quietly dropping log") } } case <-ctx.Done(): diff --git a/logsapi/functionlogs.go b/logsapi/functionlogs.go index 984b15f7..618a86c4 100644 --- a/logsapi/functionlogs.go +++ b/logsapi/functionlogs.go @@ -19,7 +19,6 @@ package logsapi import ( "github.com/elastic/apm-aws-lambda/apmproxy" - "github.com/elastic/apm-aws-lambda/extension" "go.elastic.co/apm/v2/model" "go.elastic.co/fastjson" ) @@ -66,7 +65,8 @@ func (lc logContainer) MarshalFastJSON(json *fastjson.Writer) error { // event from Lambda logs API to create a payload for APM server func ProcessFunctionLog( metadataContainer *apmproxy.MetadataContainer, - functionData *extension.NextEventResponse, + requestID string, + invokedFnArn string, log LogEvent, ) (apmproxy.AgentData, error) { lc := logContainer{ @@ -76,12 +76,9 @@ func ProcessFunctionLog( }, } - if functionData != nil { - // FaaS Fields - lc.Log.FAAS = &model.FAAS{ - Execution: functionData.RequestID, - ID: functionData.InvokedFunctionArn, - } + lc.Log.FAAS = &model.FAAS{ + ID: invokedFnArn, + Execution: requestID, } var jsonWriter fastjson.Writer diff --git a/logsapi/functionlogs_test.go b/logsapi/functionlogs_test.go index 64d4312f..9bd1d8e8 100644 --- a/logsapi/functionlogs_test.go +++ b/logsapi/functionlogs_test.go @@ -23,7 +23,6 @@ import ( "time" "github.com/elastic/apm-aws-lambda/apmproxy" - "github.com/elastic/apm-aws-lambda/extension" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -35,36 +34,24 @@ func TestProcessFunctionLog(t *testing.T) { event := LogEvent{ Time: time.Date(2022, 11, 12, 0, 0, 0, 0, time.UTC), Type: FunctionLog, - StringRecord: "ERROR encountered. Stack trace:\nmy-function (line 10)\n", + StringRecord: "ERROR encountered. Stack trace:my-function (line 10)", } - - t.Run("without_faas", func(t *testing.T) { - expectedEventJSON := `{"log":{"message":"ERROR encountered. Stack trace:\nmy-function (line 10)\n","@timestamp":1668211200000000}}` - - apmData, err := ProcessFunctionLog(metadataContainer, nil, event) - - require.NoError(t, err) - assert.Equal( - t, - fmt.Sprintf("%s\n%s", metadataContainer.Metadata, expectedEventJSON), - string(apmData.Data), - ) - }) - - t.Run("with_faas", func(t *testing.T) { - nextEventResp := &extension.NextEventResponse{ - RequestID: "8476a536-e9f4-11e8-9739-2dfe598c3fcd", - InvokedFunctionArn: "arn:aws:lambda:us-east-2:123456789012:function:custom-runtime", - } - expectedEventJSON := `{"log":{"message":"ERROR encountered. Stack trace:\nmy-function (line 10)\n","@timestamp":1668211200000000,"faas":{"coldstart":false,"execution":"8476a536-e9f4-11e8-9739-2dfe598c3fcd","id":"arn:aws:lambda:us-east-2:123456789012:function:custom-runtime"}}}` - - apmData, err := ProcessFunctionLog(metadataContainer, nextEventResp, event) - - require.NoError(t, err) - assert.Equal( - t, - fmt.Sprintf("%s\n%s", metadataContainer.Metadata, expectedEventJSON), - string(apmData.Data), - ) - }) + reqID := "8476a536-e9f4-11e8-9739-2dfe598c3fcd" + invokedFnArn := "arn:aws:lambda:us-east-2:123456789012:function:custom-runtime" + expectedEventJSON := fmt.Sprintf( + `{"log":{"message":"%s","@timestamp":%d,"faas":{"coldstart":false,"execution":"%s","id":"%s"}}}`, + event.StringRecord, + event.Time.UnixNano()/int64(time.Microsecond), + reqID, + invokedFnArn, + ) + + apmData, err := ProcessFunctionLog(metadataContainer, reqID, invokedFnArn, event) + + require.NoError(t, err) + assert.Equal( + t, + fmt.Sprintf("%s\n%s", metadataContainer.Metadata, expectedEventJSON), + string(apmData.Data), + ) } From 365c08a5fa55863b5f2a4de832389335ffe9663e Mon Sep 17 00:00:00 2001 From: Vishal Raj Date: Wed, 14 Sep 2022 10:09:45 +0800 Subject: [PATCH 05/11] Minor refactor --- logsapi/subscribe.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/logsapi/subscribe.go b/logsapi/subscribe.go index 0dd6e161..f22dfe42 100644 --- a/logsapi/subscribe.go +++ b/logsapi/subscribe.go @@ -30,7 +30,7 @@ import ( // SubscribeRequest is the request body that is sent to Logs API on subscribe type SubscribeRequest struct { SchemaVersion SchemaVersion `json:"schemaVersion"` - EventTypes []LogType `json:"types"` + LogTypes []LogType `json:"types"` BufferingCfg BufferingCfg `json:"buffering"` Destination Destination `json:"destination"` } @@ -83,7 +83,7 @@ func (lc *Client) startHTTPServer() (string, error) { func (lc *Client) subscribe(types []LogType, extensionID string, uri string) error { data, err := json.Marshal(&SubscribeRequest{ SchemaVersion: SchemaVersionLatest, - EventTypes: types, + LogTypes: types, BufferingCfg: BufferingCfg{ MaxItems: 10000, MaxBytes: 262144, From f800f4347287435aeca6d5054854ef8c9720e60a Mon Sep 17 00:00:00 2001 From: Vishal Raj Date: Wed, 14 Sep 2022 10:24:51 +0800 Subject: [PATCH 06/11] Refactor function logs test --- logsapi/functionlogs_test.go | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/logsapi/functionlogs_test.go b/logsapi/functionlogs_test.go index 9bd1d8e8..0c25ded4 100644 --- a/logsapi/functionlogs_test.go +++ b/logsapi/functionlogs_test.go @@ -38,8 +38,9 @@ func TestProcessFunctionLog(t *testing.T) { } reqID := "8476a536-e9f4-11e8-9739-2dfe598c3fcd" invokedFnArn := "arn:aws:lambda:us-east-2:123456789012:function:custom-runtime" - expectedEventJSON := fmt.Sprintf( - `{"log":{"message":"%s","@timestamp":%d,"faas":{"coldstart":false,"execution":"%s","id":"%s"}}}`, + expectedData := fmt.Sprintf( + "%s\n{\"log\":{\"message\":\"%s\",\"@timestamp\":%d,\"faas\":{\"coldstart\":false,\"execution\":\"%s\",\"id\":\"%s\"}}}", + metadataContainer.Metadata, event.StringRecord, event.Time.UnixNano()/int64(time.Microsecond), reqID, @@ -49,9 +50,5 @@ func TestProcessFunctionLog(t *testing.T) { apmData, err := ProcessFunctionLog(metadataContainer, reqID, invokedFnArn, event) require.NoError(t, err) - assert.Equal( - t, - fmt.Sprintf("%s\n%s", metadataContainer.Metadata, expectedEventJSON), - string(apmData.Data), - ) + assert.Equal(t, expectedData, string(apmData.Data)) } From 7a2358a8d6e1f0a278d1388e679a36c802d467f7 Mon Sep 17 00:00:00 2001 From: Vishal Raj Date: Wed, 14 Sep 2022 17:20:42 +0800 Subject: [PATCH 07/11] Use platform start event to find function log request ID --- logsapi/event.go | 14 +++++++++++++- logsapi/functionlogs.go | 30 ++++++++++++++++++++++++++---- logsapi/functionlogs_test.go | 4 ++-- 3 files changed, 41 insertions(+), 7 deletions(-) diff --git a/logsapi/event.go b/logsapi/event.go index 59f7445e..40c402cb 100644 --- a/logsapi/event.go +++ b/logsapi/event.go @@ -50,6 +50,7 @@ const ( PlatformFault SubLogType = "platform.fault" PlatformReport SubLogType = "platform.report" PlatformStart SubLogType = "platform.start" + PlatformEnd SubLogType = "platform.end" FunctionLog SubLogType = "function" ) @@ -79,11 +80,17 @@ func (lc *Client) ProcessLogs( runtimeDoneSignal chan struct{}, prevEvent *extension.NextEventResponse, ) error { + // platformStartReqID is to identify the requestID for the function + // logs under the assumption that function logs for a specific request + // ID will be interleaved b/w its PlatformStart and PlatformEnd events. + var platformStartReqID string for { select { case logEvent := <-lc.logsChannel: lc.logger.Debugf("Received log event %v", logEvent.Type) switch logEvent.Type { + case PlatformStart: + platformStartReqID = logEvent.Record.RequestID // Check the logEvent for runtimeDone and compare the RequestID // to the id that came in via the Next API case PlatformRuntimeDone: @@ -111,7 +118,12 @@ func (lc *Client) ProcessLogs( case FunctionLog: if metadataContainer != nil && len(metadataContainer.Metadata) > 0 { lc.logger.Debug("Received function log") - processedLog, err := ProcessFunctionLog(metadataContainer, requestID, invokedFnArn, logEvent) + processedLog, err := ProcessFunctionLog( + metadataContainer, + platformStartReqID, + invokedFnArn, + logEvent, + ) if err != nil { lc.logger.Errorf("Error processing function log : %v", err) } else { diff --git a/logsapi/functionlogs.go b/logsapi/functionlogs.go index 618a86c4..732a4ee2 100644 --- a/logsapi/functionlogs.go +++ b/logsapi/functionlogs.go @@ -30,13 +30,12 @@ type logContainer struct { type logLine struct { Timestamp model.Time Message string - FAAS *model.FAAS + FAAS *faas } func (l *logLine) MarshalFastJSON(w *fastjson.Writer) error { var firstErr error - w.RawByte('{') - w.RawString("\"message\":") + w.RawString("{\"message\":") w.String(l.Message) w.RawString(",\"@timestamp\":") if err := l.Timestamp.MarshalFastJSON(w); err != nil && firstErr == nil { @@ -52,6 +51,29 @@ func (l *logLine) MarshalFastJSON(w *fastjson.Writer) error { return firstErr } +// faas struct is a subset of go.elastic.co/apm/v2/model#FAAS +// +// The purpose of having a separate struct is to have a custom +// marshalling logic that is targeted for the faas fields +// available for function logs. For example: `coldstart` value +// cannot be inferred for function logs so this struct drops +// the field entirely. +type faas struct { + // ID holds a unique identifier of the invoked serverless function. + ID string `json:"id,omitempty"` + // Execution holds the request ID of the function invocation. + Execution string `json:"execution,omitempty"` +} + +func (f *faas) MarshalFastJSON(w *fastjson.Writer) error { + w.RawString("{\"id\":") + w.String(f.ID) + w.RawString(",\"execution\":") + w.String(f.Execution) + w.RawByte('}') + return nil +} + func (lc logContainer) MarshalFastJSON(json *fastjson.Writer) error { json.RawString(`{"log":`) if err := lc.Log.MarshalFastJSON(json); err != nil { @@ -76,7 +98,7 @@ func ProcessFunctionLog( }, } - lc.Log.FAAS = &model.FAAS{ + lc.Log.FAAS = &faas{ ID: invokedFnArn, Execution: requestID, } diff --git a/logsapi/functionlogs_test.go b/logsapi/functionlogs_test.go index 0c25ded4..5aa4034b 100644 --- a/logsapi/functionlogs_test.go +++ b/logsapi/functionlogs_test.go @@ -39,12 +39,12 @@ func TestProcessFunctionLog(t *testing.T) { reqID := "8476a536-e9f4-11e8-9739-2dfe598c3fcd" invokedFnArn := "arn:aws:lambda:us-east-2:123456789012:function:custom-runtime" expectedData := fmt.Sprintf( - "%s\n{\"log\":{\"message\":\"%s\",\"@timestamp\":%d,\"faas\":{\"coldstart\":false,\"execution\":\"%s\",\"id\":\"%s\"}}}", + "%s\n{\"log\":{\"message\":\"%s\",\"@timestamp\":%d,\"faas\":{\"id\":\"%s\",\"execution\":\"%s\"}}}", metadataContainer.Metadata, event.StringRecord, event.Time.UnixNano()/int64(time.Microsecond), - reqID, invokedFnArn, + reqID, ) apmData, err := ProcessFunctionLog(metadataContainer, reqID, invokedFnArn, event) From a39b91d69420ce2ebf358ad47a4fe920f0127cb0 Mon Sep 17 00:00:00 2001 From: Vishal Raj Date: Wed, 14 Sep 2022 19:02:02 +0800 Subject: [PATCH 08/11] Add config option to disable function logs subscription --- app/app.go | 12 ++++++++--- app/config.go | 37 ++++++++++++++++++++++------------ app/run.go | 6 +----- logsapi/client.go | 32 +++++++++++++++++++++-------- logsapi/client_test.go | 17 +++++++++++----- logsapi/event.go | 36 +++++++++------------------------ logsapi/option.go | 7 +++++++ logsapi/route_handlers.go | 2 +- logsapi/route_handlers_test.go | 8 ++++---- logsapi/subscribe.go | 10 ++++----- main.go | 12 +++++++++-- main_test.go | 2 +- 12 files changed, 108 insertions(+), 73 deletions(-) diff --git a/app/app.go b/app/app.go index 1663f463..1048a571 100644 --- a/app/app.go +++ b/app/app.go @@ -45,7 +45,7 @@ type App struct { // New returns an App or an error if the // creation failed. -func New(ctx context.Context, opts ...configOption) (*App, error) { +func New(ctx context.Context, opts ...ConfigOption) (*App, error) { c := appConfig{} for _, opt := range opts { @@ -62,7 +62,7 @@ func New(ctx context.Context, opts ...configOption) (*App, error) { return nil, err } - apmServerApiKey, apmServerSecretToken, err := loadAWSOptions(ctx, c.awsConfig, app.logger) + apmServerAPIKey, apmServerSecretToken, err := loadAWSOptions(ctx, c.awsConfig, app.logger) if err != nil { return nil, err } @@ -75,11 +75,17 @@ func New(ctx context.Context, opts ...configOption) (*App, error) { addr = c.logsapiAddr } + subscriptionLogStreams := []logsapi.SubscriptionType{logsapi.Platform} + if !c.disableFunctionLogSubscription { + subscriptionLogStreams = append(subscriptionLogStreams, logsapi.Function) + } + lc, err := logsapi.NewClient( logsapi.WithLogsAPIBaseURL(fmt.Sprintf("http://%s", c.awsLambdaRuntimeAPI)), logsapi.WithListenerAddress(addr), logsapi.WithLogBuffer(100), logsapi.WithLogger(app.logger), + logsapi.WithLogsAPISubscriptionTypes(subscriptionLogStreams...), ) if err != nil { return nil, err @@ -124,7 +130,7 @@ func New(ctx context.Context, opts ...configOption) (*App, error) { apmOpts = append(apmOpts, apmproxy.WithURL(os.Getenv("ELASTIC_APM_LAMBDA_APM_SERVER")), apmproxy.WithLogger(app.logger), - apmproxy.WithAPIKey(apmServerApiKey), + apmproxy.WithAPIKey(apmServerAPIKey), apmproxy.WithSecretToken(apmServerSecretToken), ) diff --git a/app/config.go b/app/config.go index 14566ad1..3fdf6302 100644 --- a/app/config.go +++ b/app/config.go @@ -20,41 +20,52 @@ package app import "github.com/aws/aws-sdk-go-v2/aws" type appConfig struct { - awsLambdaRuntimeAPI string - awsConfig aws.Config - extensionName string - disableLogsAPI bool - logLevel string - logsapiAddr string + awsLambdaRuntimeAPI string + awsConfig aws.Config + extensionName string + disableLogsAPI bool + disableFunctionLogSubscription bool + logLevel string + logsapiAddr string } -type configOption func(*appConfig) +// ConfigOption is used to configure the lambda extension +type ConfigOption func(*appConfig) // WithLambdaRuntimeAPI sets the AWS Lambda Runtime API // endpoint (normally taken from $AWS_LAMBDA_RUNTIME_API), // used by the AWS client. -func WithLambdaRuntimeAPI(api string) configOption { +func WithLambdaRuntimeAPI(api string) ConfigOption { return func(c *appConfig) { c.awsLambdaRuntimeAPI = api } } // WithExtensionName sets the extension name. -func WithExtensionName(name string) configOption { +func WithExtensionName(name string) ConfigOption { return func(c *appConfig) { c.extensionName = name } } // WithoutLogsAPI disables the logs api. -func WithoutLogsAPI() configOption { +func WithoutLogsAPI() ConfigOption { return func(c *appConfig) { c.disableLogsAPI = true } } +// WithoutFunctionLogSubscription disables the logs api subscription +// to function log stream. This option will only work if LogsAPI is +// not disabled by the WithoutLogsAPI config option. +func WithoutFunctionLogSubscription() ConfigOption { + return func(c *appConfig) { + c.disableFunctionLogSubscription = true + } +} + // WithLogLevel sets the log level. -func WithLogLevel(level string) configOption { +func WithLogLevel(level string) ConfigOption { return func(c *appConfig) { c.logLevel = level } @@ -62,14 +73,14 @@ func WithLogLevel(level string) configOption { // WithLogsapiAddress sets the listener address of the // server listening for logs event. -func WithLogsapiAddress(s string) configOption { +func WithLogsapiAddress(s string) ConfigOption { return func(c *appConfig) { c.logsapiAddr = s } } // WithAWSConfig sets the AWS config. -func WithAWSConfig(awsConfig aws.Config) configOption { +func WithAWSConfig(awsConfig aws.Config) ConfigOption { return func(c *appConfig) { c.awsConfig = awsConfig } diff --git a/app/run.go b/app/run.go index 28ca4756..8a9a1438 100644 --- a/app/run.go +++ b/app/run.go @@ -25,7 +25,6 @@ import ( "github.com/elastic/apm-aws-lambda/apmproxy" "github.com/elastic/apm-aws-lambda/extension" - "github.com/elastic/apm-aws-lambda/logsapi" ) // Run runs the app. @@ -58,10 +57,7 @@ func (app *App) Run(ctx context.Context) error { }() if app.logsClient != nil { - if err := app.logsClient.StartService( - []logsapi.LogType{logsapi.Platform, logsapi.Function}, - app.extensionClient.ExtensionID, - ); err != nil { + if err := app.logsClient.StartService(app.extensionClient.ExtensionID); err != nil { app.logger.Warnf("Error while subscribing to the Logs API: %v", err) // disable logs API if the service failed to start diff --git a/logsapi/client.go b/logsapi/client.go index 36d0fe11..b7d18ac0 100644 --- a/logsapi/client.go +++ b/logsapi/client.go @@ -28,17 +28,33 @@ import ( "go.uber.org/zap" ) +// SubscriptionType represents the log streams that the Lambda Logs API +// provides for subscription +type SubscriptionType string + +const ( + // Platform logstream records events and errors related to + // invocations and extensions + Platform SubscriptionType = "platform" + // Function logstream records logs written by lambda function + // to stderr or stdout + Function SubscriptionType = "function" + // Extension logstream records logs generated by extension + Extension SubscriptionType = "extension" +) + // ClientOption is a config option for a Client. type ClientOption func(*Client) // Client is the client used to subscribe to the Logs API. type Client struct { - httpClient *http.Client - logsAPIBaseURL string - logsChannel chan LogEvent - listenerAddr string - server *http.Server - logger *zap.SugaredLogger + httpClient *http.Client + logsAPIBaseURL string + logsAPISubscriptionTypes []SubscriptionType + logsChannel chan LogEvent + listenerAddr string + server *http.Server + logger *zap.SugaredLogger } // NewClient returns a new Client with the given URL. @@ -69,7 +85,7 @@ func NewClient(opts ...ClientOption) (*Client, error) { } // StartService starts the HTTP server listening for log events and subscribes to the Logs API. -func (lc *Client) StartService(eventTypes []LogType, extensionID string) error { +func (lc *Client) StartService(extensionID string) error { addr, err := lc.startHTTPServer() if err != nil { return err @@ -93,7 +109,7 @@ func (lc *Client) StartService(eventTypes []LogType, extensionID string) error { uri := fmt.Sprintf("http://%s", net.JoinHostPort(host, port)) - if err := lc.subscribe(eventTypes, extensionID, uri); err != nil { + if err := lc.subscribe(lc.logsAPISubscriptionTypes, extensionID, uri); err != nil { if err := lc.Shutdown(); err != nil { lc.logger.Warnf("failed to shutdown the server: %v", err) } diff --git a/logsapi/client_test.go b/logsapi/client_test.go index d0847e56..76a1ad5c 100644 --- a/logsapi/client_test.go +++ b/logsapi/client_test.go @@ -104,13 +104,14 @@ func TestSubscribe(t *testing.T) { })) defer s.Close() - c, err := logsapi.NewClient(append(tc.opts, logsapi.WithLogsAPIBaseURL(s.URL))...) + cOpts := append(tc.opts, logsapi.WithLogsAPIBaseURL(s.URL), logsapi.WithLogsAPISubscriptionTypes(logsapi.Platform)) + c, err := logsapi.NewClient(cOpts...) require.NoError(t, err) if tc.expectedErr { - require.Error(t, c.StartService([]logsapi.LogType{logsapi.Platform}, "foo")) + require.Error(t, c.StartService("foo")) } else { - require.NoError(t, c.StartService([]logsapi.LogType{logsapi.Platform}, "foo")) + require.NoError(t, c.StartService("foo")) } require.NoError(t, c.Shutdown()) @@ -142,9 +143,15 @@ func TestSubscribeAWSRequest(t *testing.T) { })) defer s.Close() - c, err := logsapi.NewClient(append(tc.opts, logsapi.WithLogsAPIBaseURL(s.URL), logsapi.WithLogBuffer(1))...) + cOpts := append( + tc.opts, + logsapi.WithLogsAPIBaseURL(s.URL), + logsapi.WithLogBuffer(1), + logsapi.WithLogsAPISubscriptionTypes(logsapi.Platform, logsapi.Function), + ) + c, err := logsapi.NewClient(cOpts...) require.NoError(t, err) - require.NoError(t, c.StartService([]logsapi.LogType{logsapi.Platform}, "testID")) + require.NoError(t, c.StartService("testID")) // Create a request to send to the logs listener platformDoneEvent := `{ diff --git a/logsapi/event.go b/logsapi/event.go index 40c402cb..64055c1b 100644 --- a/logsapi/event.go +++ b/logsapi/event.go @@ -25,39 +25,23 @@ import ( "github.com/elastic/apm-aws-lambda/extension" ) -// LogType represents the log streams that the Lambda Logs API -// provides for subscription -type LogType string - -const ( - // Platform log stream records events and errors related to - // invocations and extensions - Platform LogType = "platform" - // Function log stream records logs written by lambda function - // to stderr or stdout - Function LogType = "function" - // Extension log stream records logs generated by extension - Extension LogType = "extension" -) - -// SubLogType represents the subtype for each log type that is -// received in the log messages -type SubLogType string +// LogEventType represents the log type that is received in the log messages +type LogEventType string const ( // PlatformRuntimeDone event is sent when lambda function is finished it's execution - PlatformRuntimeDone SubLogType = "platform.runtimeDone" - PlatformFault SubLogType = "platform.fault" - PlatformReport SubLogType = "platform.report" - PlatformStart SubLogType = "platform.start" - PlatformEnd SubLogType = "platform.end" - FunctionLog SubLogType = "function" + PlatformRuntimeDone LogEventType = "platform.runtimeDone" + PlatformFault LogEventType = "platform.fault" + PlatformReport LogEventType = "platform.report" + PlatformStart LogEventType = "platform.start" + PlatformEnd LogEventType = "platform.end" + FunctionLog LogEventType = "function" ) // LogEvent represents an event received from the Logs API type LogEvent struct { - Time time.Time `json:"time"` - Type SubLogType `json:"type"` + Time time.Time `json:"time"` + Type LogEventType `json:"type"` StringRecord string Record LogEventRecord } diff --git a/logsapi/option.go b/logsapi/option.go index 0c889879..9caf374e 100644 --- a/logsapi/option.go +++ b/logsapi/option.go @@ -48,3 +48,10 @@ func WithLogger(logger *zap.SugaredLogger) ClientOption { c.logger = logger } } + +// WithLogsAPISubscriptionTypes sets the logstreams that the Logs API will subscribe to. +func WithLogsAPISubscriptionTypes(types ...SubscriptionType) ClientOption { + return func(c *Client) { + c.logsAPISubscriptionTypes = types + } +} diff --git a/logsapi/route_handlers.go b/logsapi/route_handlers.go index 73fc2024..0981d497 100644 --- a/logsapi/route_handlers.go +++ b/logsapi/route_handlers.go @@ -48,7 +48,7 @@ func handleLogEventsRequest(logger *zap.SugaredLogger, logsChannel chan LogEvent func (le *LogEvent) UnmarshalJSON(data []byte) error { b := struct { Time time.Time `json:"time"` - Type SubLogType `json:"type"` + Type LogEventType `json:"type"` Record json.RawMessage `json:"record"` }{} diff --git a/logsapi/route_handlers_test.go b/logsapi/route_handlers_test.go index ac40fe7a..c64069ef 100644 --- a/logsapi/route_handlers_test.go +++ b/logsapi/route_handlers_test.go @@ -43,7 +43,7 @@ func TestLogEventUnmarshalReport(t *testing.T) { err := le.UnmarshalJSON(reportJSON) require.NoError(t, err) - assert.Equal(t, SubLogType("platform.report"), le.Type) + assert.Equal(t, LogEventType("platform.report"), le.Type) assert.Equal(t, "2020-08-20T12:31:32.123Z", le.Time.Format(time.RFC3339Nano)) rec := LogEventRecord{ RequestID: "6f7f0961f83442118a7af6fe80b88d56", @@ -70,7 +70,7 @@ func TestLogEventUnmarshalFault(t *testing.T) { err := le.UnmarshalJSON(reportJSON) require.NoError(t, err) - assert.Equal(t, SubLogType("platform.fault"), le.Type) + assert.Equal(t, LogEventType("platform.fault"), le.Type) assert.Equal(t, "2020-08-20T12:31:32.123Z", le.Time.Format(time.RFC3339Nano)) rec := "RequestId: d783b35e-a91d-4251-af17-035953428a2c Process exited before completing request" assert.Equal(t, rec, le.StringRecord) @@ -92,7 +92,7 @@ func Test_unmarshalRuntimeDoneRecordObject(t *testing.T) { err := le.UnmarshalJSON(jsonBytes) require.NoError(t, err) - assert.Equal(t, SubLogType("platform.runtimeDone"), le.Type) + assert.Equal(t, LogEventType("platform.runtimeDone"), le.Type) assert.Equal(t, "2021-02-04T20:00:05.123Z", le.Time.Format(time.RFC3339Nano)) rec := LogEventRecord{ RequestID: "6f7f0961f83442118a7af6fe80b88", @@ -113,7 +113,7 @@ func Test_unmarshalRuntimeDoneRecordString(t *testing.T) { err := le.UnmarshalJSON(jsonBytes) require.NoError(t, err) - assert.Equal(t, SubLogType("platform.runtimeDone"), le.Type) + assert.Equal(t, LogEventType("platform.runtimeDone"), le.Type) assert.Equal(t, "2021-02-04T20:00:05.123Z", le.Time.Format(time.RFC3339Nano)) assert.Equal(t, "Unknown application error occurred", le.StringRecord) } diff --git a/logsapi/subscribe.go b/logsapi/subscribe.go index f22dfe42..9e96a096 100644 --- a/logsapi/subscribe.go +++ b/logsapi/subscribe.go @@ -29,10 +29,10 @@ import ( // SubscribeRequest is the request body that is sent to Logs API on subscribe type SubscribeRequest struct { - SchemaVersion SchemaVersion `json:"schemaVersion"` - LogTypes []LogType `json:"types"` - BufferingCfg BufferingCfg `json:"buffering"` - Destination Destination `json:"destination"` + SchemaVersion SchemaVersion `json:"schemaVersion"` + LogTypes []SubscriptionType `json:"types"` + BufferingCfg BufferingCfg `json:"buffering"` + Destination Destination `json:"destination"` } // SchemaVersion is the Lambda runtime API schema version @@ -80,7 +80,7 @@ func (lc *Client) startHTTPServer() (string, error) { return addr, nil } -func (lc *Client) subscribe(types []LogType, extensionID string, uri string) error { +func (lc *Client) subscribe(types []SubscriptionType, extensionID string, uri string) error { data, err := json.Marshal(&SubscribeRequest{ SchemaVersion: SchemaVersionLatest, LogTypes: types, diff --git a/main.go b/main.go index 3f766037..e23311f1 100644 --- a/main.go +++ b/main.go @@ -23,6 +23,7 @@ import ( "os" "os/signal" "path/filepath" + "strconv" "syscall" "github.com/aws/aws-sdk-go-v2/config" @@ -39,12 +40,19 @@ func main() { log.Fatalf("failed to load AWS default config: %v", err) } - app, err := app.New(ctx, + appConfigs := []app.ConfigOption{ app.WithExtensionName(filepath.Base(os.Args[0])), app.WithLambdaRuntimeAPI(os.Getenv("AWS_LAMBDA_RUNTIME_API")), app.WithLogLevel(os.Getenv("ELASTIC_APM_LOG_LEVEL")), app.WithAWSConfig(cfg), - ) + } + + disableFnLog, _ := strconv.ParseBool(os.Getenv("ELASTIC_DISABLE_FUNCTION_LOG_SUBSCRIPTION")) + if disableFnLog { + appConfigs = append(appConfigs, app.WithoutFunctionLogSubscription()) + } + + app, err := app.New(ctx, appConfigs...) if err != nil { log.Fatalf("failed to create the app: %v", err) } diff --git a/main_test.go b/main_test.go index 78028b54..72609e82 100644 --- a/main_test.go +++ b/main_test.go @@ -335,7 +335,7 @@ func sendNextEventInfo(w http.ResponseWriter, id string, event MockEvent, l *zap } } -func sendLogEvent(logsapiAddr string, requestId string, logEventType logsapi.SubLogType, l *zap.SugaredLogger) { +func sendLogEvent(logsapiAddr string, requestId string, logEventType logsapi.LogEventType, l *zap.SugaredLogger) { record := logsapi.LogEventRecord{ RequestID: requestId, } From 27abd78b37e18e9590bc09d969de44a121bd9251 Mon Sep 17 00:00:00 2001 From: Vishal Raj Date: Wed, 14 Sep 2022 19:10:13 +0800 Subject: [PATCH 09/11] Fix comment typo --- logsapi/event.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/logsapi/event.go b/logsapi/event.go index 64055c1b..4777e1eb 100644 --- a/logsapi/event.go +++ b/logsapi/event.go @@ -66,7 +66,7 @@ func (lc *Client) ProcessLogs( ) error { // platformStartReqID is to identify the requestID for the function // logs under the assumption that function logs for a specific request - // ID will be interleaved b/w its PlatformStart and PlatformEnd events. + // ID will be bounded by PlatformStart and PlatformEnd events. var platformStartReqID string for { select { From 7e6865cb61e2b52e29f09cc2944ee186bfeaa7e0 Mon Sep 17 00:00:00 2001 From: Vishal Raj Date: Mon, 19 Sep 2022 11:05:24 +0800 Subject: [PATCH 10/11] Address review comments --- app/app.go | 4 ++-- app/config.go | 24 ++++++++++++------------ logsapi/client_test.go | 4 ++-- logsapi/event.go | 26 ++++++++++++-------------- logsapi/functionlogs.go | 18 ++++++++++++------ logsapi/option.go | 4 ++-- main.go | 11 ++++++++--- 7 files changed, 50 insertions(+), 41 deletions(-) diff --git a/app/app.go b/app/app.go index 1048a571..d2b9f808 100644 --- a/app/app.go +++ b/app/app.go @@ -76,7 +76,7 @@ func New(ctx context.Context, opts ...ConfigOption) (*App, error) { } subscriptionLogStreams := []logsapi.SubscriptionType{logsapi.Platform} - if !c.disableFunctionLogSubscription { + if c.enableFunctionLogSubscription { subscriptionLogStreams = append(subscriptionLogStreams, logsapi.Function) } @@ -85,7 +85,7 @@ func New(ctx context.Context, opts ...ConfigOption) (*App, error) { logsapi.WithListenerAddress(addr), logsapi.WithLogBuffer(100), logsapi.WithLogger(app.logger), - logsapi.WithLogsAPISubscriptionTypes(subscriptionLogStreams...), + logsapi.WithSubscriptionTypes(subscriptionLogStreams...), ) if err != nil { return nil, err diff --git a/app/config.go b/app/config.go index 3fdf6302..4311de5a 100644 --- a/app/config.go +++ b/app/config.go @@ -20,13 +20,13 @@ package app import "github.com/aws/aws-sdk-go-v2/aws" type appConfig struct { - awsLambdaRuntimeAPI string - awsConfig aws.Config - extensionName string - disableLogsAPI bool - disableFunctionLogSubscription bool - logLevel string - logsapiAddr string + awsLambdaRuntimeAPI string + awsConfig aws.Config + extensionName string + disableLogsAPI bool + enableFunctionLogSubscription bool + logLevel string + logsapiAddr string } // ConfigOption is used to configure the lambda extension @@ -55,12 +55,12 @@ func WithoutLogsAPI() ConfigOption { } } -// WithoutFunctionLogSubscription disables the logs api subscription -// to function log stream. This option will only work if LogsAPI is -// not disabled by the WithoutLogsAPI config option. -func WithoutFunctionLogSubscription() ConfigOption { +// WithFunctionLogSubscription enables the logs api subscription +// to function log stream. This option will only work if LogsAPI +// is not disabled by the WithoutLogsAPI config option. +func WithFunctionLogSubscription() ConfigOption { return func(c *appConfig) { - c.disableFunctionLogSubscription = true + c.enableFunctionLogSubscription = true } } diff --git a/logsapi/client_test.go b/logsapi/client_test.go index 76a1ad5c..b29d8cb8 100644 --- a/logsapi/client_test.go +++ b/logsapi/client_test.go @@ -104,7 +104,7 @@ func TestSubscribe(t *testing.T) { })) defer s.Close() - cOpts := append(tc.opts, logsapi.WithLogsAPIBaseURL(s.URL), logsapi.WithLogsAPISubscriptionTypes(logsapi.Platform)) + cOpts := append(tc.opts, logsapi.WithLogsAPIBaseURL(s.URL), logsapi.WithSubscriptionTypes(logsapi.Platform)) c, err := logsapi.NewClient(cOpts...) require.NoError(t, err) @@ -147,7 +147,7 @@ func TestSubscribeAWSRequest(t *testing.T) { tc.opts, logsapi.WithLogsAPIBaseURL(s.URL), logsapi.WithLogBuffer(1), - logsapi.WithLogsAPISubscriptionTypes(logsapi.Platform, logsapi.Function), + logsapi.WithSubscriptionTypes(logsapi.Platform, logsapi.Function), ) c, err := logsapi.NewClient(cOpts...) require.NoError(t, err) diff --git a/logsapi/event.go b/logsapi/event.go index 4777e1eb..0cefa6be 100644 --- a/logsapi/event.go +++ b/logsapi/event.go @@ -100,21 +100,19 @@ func (lc *Client) ProcessLogs( lc.logger.Debug("Log API runtimeDone event request id didn't match") } case FunctionLog: - if metadataContainer != nil && len(metadataContainer.Metadata) > 0 { - lc.logger.Debug("Received function log") - processedLog, err := ProcessFunctionLog( - metadataContainer, - platformStartReqID, - invokedFnArn, - logEvent, - ) - if err != nil { - lc.logger.Errorf("Error processing function log : %v", err) - } else { - apmClient.EnqueueAPMData(processedLog) - } + // TODO: @lahsivjar Buffer logs and send batches of data to APM-Server. + // Buffering should account for metadata being available before sending. + lc.logger.Debug("Received function log") + processedLog, err := ProcessFunctionLog( + metadataContainer, + platformStartReqID, + invokedFnArn, + logEvent, + ) + if err != nil { + lc.logger.Errorf("Error processing function log : %v", err) } else { - lc.logger.Warn("Function log received before metadata populated, quietly dropping log") + apmClient.EnqueueAPMData(processedLog) } } case <-ctx.Done(): diff --git a/logsapi/functionlogs.go b/logsapi/functionlogs.go index 732a4ee2..77298890 100644 --- a/logsapi/functionlogs.go +++ b/logsapi/functionlogs.go @@ -18,6 +18,8 @@ package logsapi import ( + "errors" + "github.com/elastic/apm-aws-lambda/apmproxy" "go.elastic.co/apm/v2/model" "go.elastic.co/fastjson" @@ -83,14 +85,18 @@ func (lc logContainer) MarshalFastJSON(json *fastjson.Writer) error { return nil } -// ProcessFunctionLog consumes extension event, agent metadata and log -// event from Lambda logs API to create a payload for APM server +// ProcessFunctionLog consumes agent metadata and log event from Lambda +// logs API to create a payload for APM server. func ProcessFunctionLog( metadataContainer *apmproxy.MetadataContainer, requestID string, invokedFnArn string, log LogEvent, ) (apmproxy.AgentData, error) { + if metadataContainer == nil || len(metadataContainer.Metadata) == 0 { + return apmproxy.AgentData{}, errors.New("metadata is required") + } + lc := logContainer{ Log: &logLine{ Timestamp: model.Time(log.Time), @@ -108,11 +114,11 @@ func ProcessFunctionLog( return apmproxy.AgentData{}, err } - var logData []byte - if metadataContainer.Metadata != nil { - logData = append(metadataContainer.Metadata, []byte("\n")...) - } + capacity := len(metadataContainer.Metadata) + jsonWriter.Size() + 1 + logData := make([]byte, len(metadataContainer.Metadata), capacity) + copy(logData, metadataContainer.Metadata) + logData = append(logData, '\n') logData = append(logData, jsonWriter.Bytes()...) return apmproxy.AgentData{Data: logData}, nil } diff --git a/logsapi/option.go b/logsapi/option.go index 9caf374e..e431477e 100644 --- a/logsapi/option.go +++ b/logsapi/option.go @@ -49,8 +49,8 @@ func WithLogger(logger *zap.SugaredLogger) ClientOption { } } -// WithLogsAPISubscriptionTypes sets the logstreams that the Logs API will subscribe to. -func WithLogsAPISubscriptionTypes(types ...SubscriptionType) ClientOption { +// WithSubscriptionTypes sets the logstreams that the Logs API will subscribe to. +func WithSubscriptionTypes(types ...SubscriptionType) ClientOption { return func(c *Client) { c.logsAPISubscriptionTypes = types } diff --git a/main.go b/main.go index e23311f1..6e43626e 100644 --- a/main.go +++ b/main.go @@ -47,9 +47,14 @@ func main() { app.WithAWSConfig(cfg), } - disableFnLog, _ := strconv.ParseBool(os.Getenv("ELASTIC_DISABLE_FUNCTION_LOG_SUBSCRIPTION")) - if disableFnLog { - appConfigs = append(appConfigs, app.WithoutFunctionLogSubscription()) + captureLogs, err := strconv.ParseBool(os.Getenv("ELASTIC_APM_LAMBDA_CAPTURE_LOGS")) + // Default capture function logs to true + if err != nil { + captureLogs = true + } + + if captureLogs { + appConfigs = append(appConfigs, app.WithFunctionLogSubscription()) } app, err := app.New(ctx, appConfigs...) From 7006c07a298f17e5591a05fa3f5b784935583a00 Mon Sep 17 00:00:00 2001 From: Vishal Raj Date: Mon, 19 Sep 2022 11:26:11 +0800 Subject: [PATCH 11/11] Fix tests with latest code --- logsapi/metrics_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/logsapi/metrics_test.go b/logsapi/metrics_test.go index 1eab6781..7ca086a5 100644 --- a/logsapi/metrics_test.go +++ b/logsapi/metrics_test.go @@ -161,7 +161,7 @@ func TestProcessPlatformReport_DataCorruption(t *testing.T) { timestamp := time.Date(2022, 11, 12, 0, 0, 0, 0, time.UTC) logEvent := LogEvent{ Time: timestamp, - Type: Report, + Type: PlatformReport, Record: LogEventRecord{ RequestID: reqID, Metrics: PlatformMetrics{ @@ -203,7 +203,7 @@ func BenchmarkPlatformReport(b *testing.B) { timestamp := time.Date(2022, 11, 12, 0, 0, 0, 0, time.UTC) logEvent := LogEvent{ Time: timestamp, - Type: Report, + Type: PlatformReport, Record: LogEventRecord{ RequestID: reqID, Metrics: PlatformMetrics{