diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 55a0b33b..69e7ee54 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -29,6 +29,7 @@ https://github.com/elastic/apm-aws-lambda/compare/v1.1.0...main[View commits] [float] ===== Features - Disable CGO to prevent libc/ABI compatibility issues {lambda-pull}292[292] +- Add support for collecting and shipping function logs to APM Server {lambda-pull}303[303] [float] ===== Bug fixes diff --git a/app/app.go b/app/app.go index 1663f463..d2b9f808 100644 --- a/app/app.go +++ b/app/app.go @@ -45,7 +45,7 @@ type App struct { // New returns an App or an error if the // creation failed. -func New(ctx context.Context, opts ...configOption) (*App, error) { +func New(ctx context.Context, opts ...ConfigOption) (*App, error) { c := appConfig{} for _, opt := range opts { @@ -62,7 +62,7 @@ func New(ctx context.Context, opts ...configOption) (*App, error) { return nil, err } - apmServerApiKey, apmServerSecretToken, err := loadAWSOptions(ctx, c.awsConfig, app.logger) + apmServerAPIKey, apmServerSecretToken, err := loadAWSOptions(ctx, c.awsConfig, app.logger) if err != nil { return nil, err } @@ -75,11 +75,17 @@ func New(ctx context.Context, opts ...configOption) (*App, error) { addr = c.logsapiAddr } + subscriptionLogStreams := []logsapi.SubscriptionType{logsapi.Platform} + if c.enableFunctionLogSubscription { + subscriptionLogStreams = append(subscriptionLogStreams, logsapi.Function) + } + lc, err := logsapi.NewClient( logsapi.WithLogsAPIBaseURL(fmt.Sprintf("http://%s", c.awsLambdaRuntimeAPI)), logsapi.WithListenerAddress(addr), logsapi.WithLogBuffer(100), logsapi.WithLogger(app.logger), + logsapi.WithSubscriptionTypes(subscriptionLogStreams...), ) if err != nil { return nil, err @@ -124,7 +130,7 @@ func New(ctx context.Context, opts ...configOption) (*App, error) { apmOpts = append(apmOpts, apmproxy.WithURL(os.Getenv("ELASTIC_APM_LAMBDA_APM_SERVER")), apmproxy.WithLogger(app.logger), - apmproxy.WithAPIKey(apmServerApiKey), + apmproxy.WithAPIKey(apmServerAPIKey), apmproxy.WithSecretToken(apmServerSecretToken), ) diff --git a/app/config.go b/app/config.go index 14566ad1..4311de5a 100644 --- a/app/config.go +++ b/app/config.go @@ -20,41 +20,52 @@ package app import "github.com/aws/aws-sdk-go-v2/aws" type appConfig struct { - awsLambdaRuntimeAPI string - awsConfig aws.Config - extensionName string - disableLogsAPI bool - logLevel string - logsapiAddr string + awsLambdaRuntimeAPI string + awsConfig aws.Config + extensionName string + disableLogsAPI bool + enableFunctionLogSubscription bool + logLevel string + logsapiAddr string } -type configOption func(*appConfig) +// ConfigOption is used to configure the lambda extension +type ConfigOption func(*appConfig) // WithLambdaRuntimeAPI sets the AWS Lambda Runtime API // endpoint (normally taken from $AWS_LAMBDA_RUNTIME_API), // used by the AWS client. -func WithLambdaRuntimeAPI(api string) configOption { +func WithLambdaRuntimeAPI(api string) ConfigOption { return func(c *appConfig) { c.awsLambdaRuntimeAPI = api } } // WithExtensionName sets the extension name. -func WithExtensionName(name string) configOption { +func WithExtensionName(name string) ConfigOption { return func(c *appConfig) { c.extensionName = name } } // WithoutLogsAPI disables the logs api. -func WithoutLogsAPI() configOption { +func WithoutLogsAPI() ConfigOption { return func(c *appConfig) { c.disableLogsAPI = true } } +// WithFunctionLogSubscription enables the logs api subscription +// to function log stream. This option will only work if LogsAPI +// is not disabled by the WithoutLogsAPI config option. +func WithFunctionLogSubscription() ConfigOption { + return func(c *appConfig) { + c.enableFunctionLogSubscription = true + } +} + // WithLogLevel sets the log level. -func WithLogLevel(level string) configOption { +func WithLogLevel(level string) ConfigOption { return func(c *appConfig) { c.logLevel = level } @@ -62,14 +73,14 @@ func WithLogLevel(level string) configOption { // WithLogsapiAddress sets the listener address of the // server listening for logs event. -func WithLogsapiAddress(s string) configOption { +func WithLogsapiAddress(s string) ConfigOption { return func(c *appConfig) { c.logsapiAddr = s } } // WithAWSConfig sets the AWS config. -func WithAWSConfig(awsConfig aws.Config) configOption { +func WithAWSConfig(awsConfig aws.Config) ConfigOption { return func(c *appConfig) { c.awsConfig = awsConfig } diff --git a/app/run.go b/app/run.go index a9f0fba9..8a9a1438 100644 --- a/app/run.go +++ b/app/run.go @@ -19,12 +19,12 @@ package app import ( "context" - "github.com/elastic/apm-aws-lambda/apmproxy" - "github.com/elastic/apm-aws-lambda/extension" - "github.com/elastic/apm-aws-lambda/logsapi" "fmt" "sync" "time" + + "github.com/elastic/apm-aws-lambda/apmproxy" + "github.com/elastic/apm-aws-lambda/extension" ) // Run runs the app. @@ -57,7 +57,7 @@ func (app *App) Run(ctx context.Context) error { }() if app.logsClient != nil { - if err := app.logsClient.StartService([]logsapi.EventType{logsapi.Platform}, app.extensionClient.ExtensionID); err != nil { + if err := app.logsClient.StartService(app.extensionClient.ExtensionID); err != nil { app.logger.Warnf("Error while subscribing to the Logs API: %v", err) // disable logs API if the service failed to start @@ -169,7 +169,15 @@ func (app *App) processEvent( runtimeDone := make(chan struct{}) if app.logsClient != nil { go func() { - if err := app.logsClient.ProcessLogs(invocationCtx, event.RequestID, app.apmClient, metadataContainer, runtimeDone, prevEvent); err != nil { + if err := app.logsClient.ProcessLogs( + invocationCtx, + event.RequestID, + event.InvokedFunctionArn, + app.apmClient, + metadataContainer, + runtimeDone, + prevEvent, + ); err != nil { app.logger.Errorf("Error while processing Lambda Logs ; %v", err) } else { close(runtimeDone) diff --git a/logsapi/client.go b/logsapi/client.go index 1f747bbf..b7d18ac0 100644 --- a/logsapi/client.go +++ b/logsapi/client.go @@ -28,17 +28,33 @@ import ( "go.uber.org/zap" ) +// SubscriptionType represents the log streams that the Lambda Logs API +// provides for subscription +type SubscriptionType string + +const ( + // Platform logstream records events and errors related to + // invocations and extensions + Platform SubscriptionType = "platform" + // Function logstream records logs written by lambda function + // to stderr or stdout + Function SubscriptionType = "function" + // Extension logstream records logs generated by extension + Extension SubscriptionType = "extension" +) + // ClientOption is a config option for a Client. type ClientOption func(*Client) // Client is the client used to subscribe to the Logs API. type Client struct { - httpClient *http.Client - logsAPIBaseURL string - logsChannel chan LogEvent - listenerAddr string - server *http.Server - logger *zap.SugaredLogger + httpClient *http.Client + logsAPIBaseURL string + logsAPISubscriptionTypes []SubscriptionType + logsChannel chan LogEvent + listenerAddr string + server *http.Server + logger *zap.SugaredLogger } // NewClient returns a new Client with the given URL. @@ -69,7 +85,7 @@ func NewClient(opts ...ClientOption) (*Client, error) { } // StartService starts the HTTP server listening for log events and subscribes to the Logs API. -func (lc *Client) StartService(eventTypes []EventType, extensionID string) error { +func (lc *Client) StartService(extensionID string) error { addr, err := lc.startHTTPServer() if err != nil { return err @@ -93,7 +109,7 @@ func (lc *Client) StartService(eventTypes []EventType, extensionID string) error uri := fmt.Sprintf("http://%s", net.JoinHostPort(host, port)) - if err := lc.subscribe(eventTypes, extensionID, uri); err != nil { + if err := lc.subscribe(lc.logsAPISubscriptionTypes, extensionID, uri); err != nil { if err := lc.Shutdown(); err != nil { lc.logger.Warnf("failed to shutdown the server: %v", err) } diff --git a/logsapi/client_test.go b/logsapi/client_test.go index d3878528..b29d8cb8 100644 --- a/logsapi/client_test.go +++ b/logsapi/client_test.go @@ -19,13 +19,14 @@ package logsapi_test import ( "bytes" - "github.com/elastic/apm-aws-lambda/logsapi" "encoding/json" "net/http" "net/http/httptest" "net/url" "testing" + "github.com/elastic/apm-aws-lambda/logsapi" + "github.com/stretchr/testify/require" "go.uber.org/zap/zaptest" ) @@ -103,13 +104,14 @@ func TestSubscribe(t *testing.T) { })) defer s.Close() - c, err := logsapi.NewClient(append(tc.opts, logsapi.WithLogsAPIBaseURL(s.URL))...) + cOpts := append(tc.opts, logsapi.WithLogsAPIBaseURL(s.URL), logsapi.WithSubscriptionTypes(logsapi.Platform)) + c, err := logsapi.NewClient(cOpts...) require.NoError(t, err) if tc.expectedErr { - require.Error(t, c.StartService([]logsapi.EventType{logsapi.Platform}, "foo")) + require.Error(t, c.StartService("foo")) } else { - require.NoError(t, c.StartService([]logsapi.EventType{logsapi.Platform}, "foo")) + require.NoError(t, c.StartService("foo")) } require.NoError(t, c.Shutdown()) @@ -141,9 +143,15 @@ func TestSubscribeAWSRequest(t *testing.T) { })) defer s.Close() - c, err := logsapi.NewClient(append(tc.opts, logsapi.WithLogsAPIBaseURL(s.URL), logsapi.WithLogBuffer(1))...) + cOpts := append( + tc.opts, + logsapi.WithLogsAPIBaseURL(s.URL), + logsapi.WithLogBuffer(1), + logsapi.WithSubscriptionTypes(logsapi.Platform, logsapi.Function), + ) + c, err := logsapi.NewClient(cOpts...) require.NoError(t, err) - require.NoError(t, c.StartService([]logsapi.EventType{logsapi.Platform}, "testID")) + require.NoError(t, c.StartService("testID")) // Create a request to send to the logs listener platformDoneEvent := `{ diff --git a/logsapi/event.go b/logsapi/event.go index 4c65af88..0cefa6be 100644 --- a/logsapi/event.go +++ b/logsapi/event.go @@ -25,33 +25,23 @@ import ( "github.com/elastic/apm-aws-lambda/extension" ) -// EventType represents the type of logs in Lambda -type EventType string +// LogEventType represents the log type that is received in the log messages +type LogEventType string const ( - // Platform is to receive logs emitted by the platform - Platform EventType = "platform" - // Function is to receive logs emitted by the function - Function EventType = "function" - // Extension is to receive logs emitted by the extension - Extension EventType = "extension" -) - -// SubEventType is a Logs API sub event type -type SubEventType string - -const ( - // RuntimeDone event is sent when lambda function is finished it's execution - RuntimeDone SubEventType = "platform.runtimeDone" - Fault SubEventType = "platform.fault" - Report SubEventType = "platform.report" - Start SubEventType = "platform.start" + // PlatformRuntimeDone event is sent when lambda function is finished it's execution + PlatformRuntimeDone LogEventType = "platform.runtimeDone" + PlatformFault LogEventType = "platform.fault" + PlatformReport LogEventType = "platform.report" + PlatformStart LogEventType = "platform.start" + PlatformEnd LogEventType = "platform.end" + FunctionLog LogEventType = "function" ) // LogEvent represents an event received from the Logs API type LogEvent struct { Time time.Time `json:"time"` - Type SubEventType `json:"type"` + Type LogEventType `json:"type"` StringRecord string Record LogEventRecord } @@ -68,19 +58,26 @@ type LogEventRecord struct { func (lc *Client) ProcessLogs( ctx context.Context, requestID string, + invokedFnArn string, apmClient *apmproxy.Client, metadataContainer *apmproxy.MetadataContainer, runtimeDoneSignal chan struct{}, prevEvent *extension.NextEventResponse, ) error { + // platformStartReqID is to identify the requestID for the function + // logs under the assumption that function logs for a specific request + // ID will be bounded by PlatformStart and PlatformEnd events. + var platformStartReqID string for { select { case logEvent := <-lc.logsChannel: lc.logger.Debugf("Received log event %v", logEvent.Type) switch logEvent.Type { + case PlatformStart: + platformStartReqID = logEvent.Record.RequestID // Check the logEvent for runtimeDone and compare the RequestID // to the id that came in via the Next API - case RuntimeDone: + case PlatformRuntimeDone: if logEvent.Record.RequestID == requestID { lc.logger.Info("Received runtimeDone event for this function invocation") runtimeDoneSignal <- struct{}{} @@ -89,7 +86,7 @@ func (lc *Client) ProcessLogs( lc.logger.Debug("Log API runtimeDone event request id didn't match") // Check if the logEvent contains metrics and verify that they can be linked to the previous invocation - case Report: + case PlatformReport: if prevEvent != nil && logEvent.Record.RequestID == prevEvent.RequestID { lc.logger.Debug("Received platform report for the previous function invocation") processedMetrics, err := ProcessPlatformReport(metadataContainer, prevEvent, logEvent) @@ -102,6 +99,21 @@ func (lc *Client) ProcessLogs( lc.logger.Warn("report event request id didn't match the previous event id") lc.logger.Debug("Log API runtimeDone event request id didn't match") } + case FunctionLog: + // TODO: @lahsivjar Buffer logs and send batches of data to APM-Server. + // Buffering should account for metadata being available before sending. + lc.logger.Debug("Received function log") + processedLog, err := ProcessFunctionLog( + metadataContainer, + platformStartReqID, + invokedFnArn, + logEvent, + ) + if err != nil { + lc.logger.Errorf("Error processing function log : %v", err) + } else { + apmClient.EnqueueAPMData(processedLog) + } } case <-ctx.Done(): lc.logger.Debug("Current invocation over. Interrupting logs processing goroutine") diff --git a/logsapi/functionlogs.go b/logsapi/functionlogs.go new file mode 100644 index 00000000..77298890 --- /dev/null +++ b/logsapi/functionlogs.go @@ -0,0 +1,124 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package logsapi + +import ( + "errors" + + "github.com/elastic/apm-aws-lambda/apmproxy" + "go.elastic.co/apm/v2/model" + "go.elastic.co/fastjson" +) + +type logContainer struct { + Log *logLine +} + +type logLine struct { + Timestamp model.Time + Message string + FAAS *faas +} + +func (l *logLine) MarshalFastJSON(w *fastjson.Writer) error { + var firstErr error + w.RawString("{\"message\":") + w.String(l.Message) + w.RawString(",\"@timestamp\":") + if err := l.Timestamp.MarshalFastJSON(w); err != nil && firstErr == nil { + firstErr = err + } + if l.FAAS != nil { + w.RawString(",\"faas\":") + if err := l.FAAS.MarshalFastJSON(w); err != nil && firstErr == nil { + firstErr = err + } + } + w.RawByte('}') + return firstErr +} + +// faas struct is a subset of go.elastic.co/apm/v2/model#FAAS +// +// The purpose of having a separate struct is to have a custom +// marshalling logic that is targeted for the faas fields +// available for function logs. For example: `coldstart` value +// cannot be inferred for function logs so this struct drops +// the field entirely. +type faas struct { + // ID holds a unique identifier of the invoked serverless function. + ID string `json:"id,omitempty"` + // Execution holds the request ID of the function invocation. + Execution string `json:"execution,omitempty"` +} + +func (f *faas) MarshalFastJSON(w *fastjson.Writer) error { + w.RawString("{\"id\":") + w.String(f.ID) + w.RawString(",\"execution\":") + w.String(f.Execution) + w.RawByte('}') + return nil +} + +func (lc logContainer) MarshalFastJSON(json *fastjson.Writer) error { + json.RawString(`{"log":`) + if err := lc.Log.MarshalFastJSON(json); err != nil { + return err + } + json.RawByte('}') + return nil +} + +// ProcessFunctionLog consumes agent metadata and log event from Lambda +// logs API to create a payload for APM server. +func ProcessFunctionLog( + metadataContainer *apmproxy.MetadataContainer, + requestID string, + invokedFnArn string, + log LogEvent, +) (apmproxy.AgentData, error) { + if metadataContainer == nil || len(metadataContainer.Metadata) == 0 { + return apmproxy.AgentData{}, errors.New("metadata is required") + } + + lc := logContainer{ + Log: &logLine{ + Timestamp: model.Time(log.Time), + Message: log.StringRecord, + }, + } + + lc.Log.FAAS = &faas{ + ID: invokedFnArn, + Execution: requestID, + } + + var jsonWriter fastjson.Writer + if err := lc.MarshalFastJSON(&jsonWriter); err != nil { + return apmproxy.AgentData{}, err + } + + capacity := len(metadataContainer.Metadata) + jsonWriter.Size() + 1 + logData := make([]byte, len(metadataContainer.Metadata), capacity) + copy(logData, metadataContainer.Metadata) + + logData = append(logData, '\n') + logData = append(logData, jsonWriter.Bytes()...) + return apmproxy.AgentData{Data: logData}, nil +} diff --git a/logsapi/functionlogs_test.go b/logsapi/functionlogs_test.go new file mode 100644 index 00000000..5aa4034b --- /dev/null +++ b/logsapi/functionlogs_test.go @@ -0,0 +1,54 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package logsapi + +import ( + "fmt" + "testing" + "time" + + "github.com/elastic/apm-aws-lambda/apmproxy" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestProcessFunctionLog(t *testing.T) { + metadataContainer := &apmproxy.MetadataContainer{ + Metadata: []byte(`{"metadata":{"service":{"agent":{"name":"apm-lambda-extension","version":"1.1.0"},"framework":{"name":"AWS Lambda","version":""},"language":{"name":"python","version":"3.9.8"},"runtime":{"name":"","version":""},"node":{}},"user":{},"process":{"pid":0},"system":{"container":{"id":""},"kubernetes":{"node":{},"pod":{}}},"cloud":{"provider":"","instance":{},"machine":{},"account":{},"project":{},"service":{}}}}`), + } + event := LogEvent{ + Time: time.Date(2022, 11, 12, 0, 0, 0, 0, time.UTC), + Type: FunctionLog, + StringRecord: "ERROR encountered. Stack trace:my-function (line 10)", + } + reqID := "8476a536-e9f4-11e8-9739-2dfe598c3fcd" + invokedFnArn := "arn:aws:lambda:us-east-2:123456789012:function:custom-runtime" + expectedData := fmt.Sprintf( + "%s\n{\"log\":{\"message\":\"%s\",\"@timestamp\":%d,\"faas\":{\"id\":\"%s\",\"execution\":\"%s\"}}}", + metadataContainer.Metadata, + event.StringRecord, + event.Time.UnixNano()/int64(time.Microsecond), + invokedFnArn, + reqID, + ) + + apmData, err := ProcessFunctionLog(metadataContainer, reqID, invokedFnArn, event) + + require.NoError(t, err) + assert.Equal(t, expectedData, string(apmData.Data)) +} diff --git a/logsapi/metrics_test.go b/logsapi/metrics_test.go index 1eab6781..7ca086a5 100644 --- a/logsapi/metrics_test.go +++ b/logsapi/metrics_test.go @@ -161,7 +161,7 @@ func TestProcessPlatformReport_DataCorruption(t *testing.T) { timestamp := time.Date(2022, 11, 12, 0, 0, 0, 0, time.UTC) logEvent := LogEvent{ Time: timestamp, - Type: Report, + Type: PlatformReport, Record: LogEventRecord{ RequestID: reqID, Metrics: PlatformMetrics{ @@ -203,7 +203,7 @@ func BenchmarkPlatformReport(b *testing.B) { timestamp := time.Date(2022, 11, 12, 0, 0, 0, 0, time.UTC) logEvent := LogEvent{ Time: timestamp, - Type: Report, + Type: PlatformReport, Record: LogEventRecord{ RequestID: reqID, Metrics: PlatformMetrics{ diff --git a/logsapi/option.go b/logsapi/option.go index 0c889879..e431477e 100644 --- a/logsapi/option.go +++ b/logsapi/option.go @@ -48,3 +48,10 @@ func WithLogger(logger *zap.SugaredLogger) ClientOption { c.logger = logger } } + +// WithSubscriptionTypes sets the logstreams that the Logs API will subscribe to. +func WithSubscriptionTypes(types ...SubscriptionType) ClientOption { + return func(c *Client) { + c.logsAPISubscriptionTypes = types + } +} diff --git a/logsapi/route_handlers.go b/logsapi/route_handlers.go index 4ffe2c83..0981d497 100644 --- a/logsapi/route_handlers.go +++ b/logsapi/route_handlers.go @@ -48,7 +48,7 @@ func handleLogEventsRequest(logger *zap.SugaredLogger, logsChannel chan LogEvent func (le *LogEvent) UnmarshalJSON(data []byte) error { b := struct { Time time.Time `json:"time"` - Type SubEventType `json:"type"` + Type LogEventType `json:"type"` Record json.RawMessage `json:"record"` }{} diff --git a/logsapi/route_handlers_test.go b/logsapi/route_handlers_test.go index dc077e4a..c64069ef 100644 --- a/logsapi/route_handlers_test.go +++ b/logsapi/route_handlers_test.go @@ -43,7 +43,7 @@ func TestLogEventUnmarshalReport(t *testing.T) { err := le.UnmarshalJSON(reportJSON) require.NoError(t, err) - assert.Equal(t, SubEventType("platform.report"), le.Type) + assert.Equal(t, LogEventType("platform.report"), le.Type) assert.Equal(t, "2020-08-20T12:31:32.123Z", le.Time.Format(time.RFC3339Nano)) rec := LogEventRecord{ RequestID: "6f7f0961f83442118a7af6fe80b88d56", @@ -70,7 +70,7 @@ func TestLogEventUnmarshalFault(t *testing.T) { err := le.UnmarshalJSON(reportJSON) require.NoError(t, err) - assert.Equal(t, SubEventType("platform.fault"), le.Type) + assert.Equal(t, LogEventType("platform.fault"), le.Type) assert.Equal(t, "2020-08-20T12:31:32.123Z", le.Time.Format(time.RFC3339Nano)) rec := "RequestId: d783b35e-a91d-4251-af17-035953428a2c Process exited before completing request" assert.Equal(t, rec, le.StringRecord) @@ -92,7 +92,7 @@ func Test_unmarshalRuntimeDoneRecordObject(t *testing.T) { err := le.UnmarshalJSON(jsonBytes) require.NoError(t, err) - assert.Equal(t, SubEventType("platform.runtimeDone"), le.Type) + assert.Equal(t, LogEventType("platform.runtimeDone"), le.Type) assert.Equal(t, "2021-02-04T20:00:05.123Z", le.Time.Format(time.RFC3339Nano)) rec := LogEventRecord{ RequestID: "6f7f0961f83442118a7af6fe80b88", @@ -113,7 +113,7 @@ func Test_unmarshalRuntimeDoneRecordString(t *testing.T) { err := le.UnmarshalJSON(jsonBytes) require.NoError(t, err) - assert.Equal(t, SubEventType("platform.runtimeDone"), le.Type) + assert.Equal(t, LogEventType("platform.runtimeDone"), le.Type) assert.Equal(t, "2021-02-04T20:00:05.123Z", le.Time.Format(time.RFC3339Nano)) assert.Equal(t, "Unknown application error occurred", le.StringRecord) } @@ -134,6 +134,6 @@ func Test_unmarshalRuntimeDoneFaultRecordString(t *testing.T) { timeValue, _ := time.Parse(time.RFC3339, "2021-02-04T20:00:05.123Z") assert.Equal(t, timeValue, le.Time) - assert.Equal(t, Fault, le.Type) + assert.Equal(t, PlatformFault, le.Type) assert.Equal(t, "Unknown application error occurred", le.StringRecord) } diff --git a/logsapi/subscribe.go b/logsapi/subscribe.go index 9cf40f75..9e96a096 100644 --- a/logsapi/subscribe.go +++ b/logsapi/subscribe.go @@ -29,10 +29,10 @@ import ( // SubscribeRequest is the request body that is sent to Logs API on subscribe type SubscribeRequest struct { - SchemaVersion SchemaVersion `json:"schemaVersion"` - EventTypes []EventType `json:"types"` - BufferingCfg BufferingCfg `json:"buffering"` - Destination Destination `json:"destination"` + SchemaVersion SchemaVersion `json:"schemaVersion"` + LogTypes []SubscriptionType `json:"types"` + BufferingCfg BufferingCfg `json:"buffering"` + Destination Destination `json:"destination"` } // SchemaVersion is the Lambda runtime API schema version @@ -80,10 +80,10 @@ func (lc *Client) startHTTPServer() (string, error) { return addr, nil } -func (lc *Client) subscribe(types []EventType, extensionID string, uri string) error { +func (lc *Client) subscribe(types []SubscriptionType, extensionID string, uri string) error { data, err := json.Marshal(&SubscribeRequest{ SchemaVersion: SchemaVersionLatest, - EventTypes: types, + LogTypes: types, BufferingCfg: BufferingCfg{ MaxItems: 10000, MaxBytes: 262144, diff --git a/main.go b/main.go index 3f766037..6e43626e 100644 --- a/main.go +++ b/main.go @@ -23,6 +23,7 @@ import ( "os" "os/signal" "path/filepath" + "strconv" "syscall" "github.com/aws/aws-sdk-go-v2/config" @@ -39,12 +40,24 @@ func main() { log.Fatalf("failed to load AWS default config: %v", err) } - app, err := app.New(ctx, + appConfigs := []app.ConfigOption{ app.WithExtensionName(filepath.Base(os.Args[0])), app.WithLambdaRuntimeAPI(os.Getenv("AWS_LAMBDA_RUNTIME_API")), app.WithLogLevel(os.Getenv("ELASTIC_APM_LOG_LEVEL")), app.WithAWSConfig(cfg), - ) + } + + captureLogs, err := strconv.ParseBool(os.Getenv("ELASTIC_APM_LAMBDA_CAPTURE_LOGS")) + // Default capture function logs to true + if err != nil { + captureLogs = true + } + + if captureLogs { + appConfigs = append(appConfigs, app.WithFunctionLogSubscription()) + } + + app, err := app.New(ctx, appConfigs...) if err != nil { log.Fatalf("failed to create the app: %v", err) } diff --git a/main_test.go b/main_test.go index 90ea4ff3..72609e82 100644 --- a/main_test.go +++ b/main_test.go @@ -216,7 +216,7 @@ func newTestStructs(t *testing.T) chan MockEvent { } func processMockEvent(currId string, event MockEvent, extensionPort string, logsapiAddr string, internals *MockServerInternals, l *zap.SugaredLogger) { - sendLogEvent(logsapiAddr, currId, logsapi.Start, l) + sendLogEvent(logsapiAddr, currId, logsapi.PlatformStart, l) client := http.Client{} // Use a custom transport with a low timeout @@ -311,10 +311,10 @@ func processMockEvent(currId string, event MockEvent, extensionPort string, logs case Shutdown: } if sendRuntimeDone { - sendLogEvent(logsapiAddr, currId, logsapi.RuntimeDone, l) + sendLogEvent(logsapiAddr, currId, logsapi.PlatformRuntimeDone, l) } if sendMetrics { - sendLogEvent(logsapiAddr, currId, logsapi.Report, l) + sendLogEvent(logsapiAddr, currId, logsapi.PlatformReport, l) } } @@ -335,11 +335,11 @@ func sendNextEventInfo(w http.ResponseWriter, id string, event MockEvent, l *zap } } -func sendLogEvent(logsapiAddr string, requestId string, logEventType logsapi.SubEventType, l *zap.SugaredLogger) { +func sendLogEvent(logsapiAddr string, requestId string, logEventType logsapi.LogEventType, l *zap.SugaredLogger) { record := logsapi.LogEventRecord{ RequestID: requestId, } - if logEventType == logsapi.Report { + if logEventType == logsapi.PlatformReport { record.Metrics = logsapi.PlatformMetrics{ BilledDurationMs: 60, DurationMs: 59.9,