Skip to content
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ https://github.com/elastic/apm-aws-lambda/compare/v1.1.0...main[View commits]
[float]
===== Features
- Disable CGO to prevent libc/ABI compatibility issues {lambda-pull}292[292]
- Add support for collecting and shipping function logs to APM Server {lambda-pull}303[303]

[float]
===== Bug fixes
Expand Down
12 changes: 9 additions & 3 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type App struct {

// New returns an App or an error if the
// creation failed.
func New(ctx context.Context, opts ...configOption) (*App, error) {
func New(ctx context.Context, opts ...ConfigOption) (*App, error) {
c := appConfig{}

for _, opt := range opts {
Expand All @@ -62,7 +62,7 @@ func New(ctx context.Context, opts ...configOption) (*App, error) {
return nil, err
}

apmServerApiKey, apmServerSecretToken, err := loadAWSOptions(ctx, c.awsConfig, app.logger)
apmServerAPIKey, apmServerSecretToken, err := loadAWSOptions(ctx, c.awsConfig, app.logger)
if err != nil {
return nil, err
}
Expand All @@ -75,11 +75,17 @@ func New(ctx context.Context, opts ...configOption) (*App, error) {
addr = c.logsapiAddr
}

subscriptionLogStreams := []logsapi.SubscriptionType{logsapi.Platform}
if c.enableFunctionLogSubscription {
subscriptionLogStreams = append(subscriptionLogStreams, logsapi.Function)
}

lc, err := logsapi.NewClient(
logsapi.WithLogsAPIBaseURL(fmt.Sprintf("http://%s", c.awsLambdaRuntimeAPI)),
logsapi.WithListenerAddress(addr),
logsapi.WithLogBuffer(100),
logsapi.WithLogger(app.logger),
logsapi.WithSubscriptionTypes(subscriptionLogStreams...),
)
if err != nil {
return nil, err
Expand Down Expand Up @@ -124,7 +130,7 @@ func New(ctx context.Context, opts ...configOption) (*App, error) {
apmOpts = append(apmOpts,
apmproxy.WithURL(os.Getenv("ELASTIC_APM_LAMBDA_APM_SERVER")),
apmproxy.WithLogger(app.logger),
apmproxy.WithAPIKey(apmServerApiKey),
apmproxy.WithAPIKey(apmServerAPIKey),
apmproxy.WithSecretToken(apmServerSecretToken),
)

Expand Down
37 changes: 24 additions & 13 deletions app/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,56 +20,67 @@ package app
import "github.com/aws/aws-sdk-go-v2/aws"

type appConfig struct {
awsLambdaRuntimeAPI string
awsConfig aws.Config
extensionName string
disableLogsAPI bool
logLevel string
logsapiAddr string
awsLambdaRuntimeAPI string
awsConfig aws.Config
extensionName string
disableLogsAPI bool
enableFunctionLogSubscription bool
logLevel string
logsapiAddr string
}

type configOption func(*appConfig)
// ConfigOption is used to configure the lambda extension
type ConfigOption func(*appConfig)

// WithLambdaRuntimeAPI sets the AWS Lambda Runtime API
// endpoint (normally taken from $AWS_LAMBDA_RUNTIME_API),
// used by the AWS client.
func WithLambdaRuntimeAPI(api string) configOption {
func WithLambdaRuntimeAPI(api string) ConfigOption {
return func(c *appConfig) {
c.awsLambdaRuntimeAPI = api
}
}

// WithExtensionName sets the extension name.
func WithExtensionName(name string) configOption {
func WithExtensionName(name string) ConfigOption {
return func(c *appConfig) {
c.extensionName = name
}
}

// WithoutLogsAPI disables the logs api.
func WithoutLogsAPI() configOption {
func WithoutLogsAPI() ConfigOption {
return func(c *appConfig) {
c.disableLogsAPI = true
}
}

// WithFunctionLogSubscription enables the logs api subscription
// to function log stream. This option will only work if LogsAPI
// is not disabled by the WithoutLogsAPI config option.
func WithFunctionLogSubscription() ConfigOption {
return func(c *appConfig) {
c.enableFunctionLogSubscription = true
}
}

// WithLogLevel sets the log level.
func WithLogLevel(level string) configOption {
func WithLogLevel(level string) ConfigOption {
return func(c *appConfig) {
c.logLevel = level
}
}

// WithLogsapiAddress sets the listener address of the
// server listening for logs event.
func WithLogsapiAddress(s string) configOption {
func WithLogsapiAddress(s string) ConfigOption {
return func(c *appConfig) {
c.logsapiAddr = s
}
}

// WithAWSConfig sets the AWS config.
func WithAWSConfig(awsConfig aws.Config) configOption {
func WithAWSConfig(awsConfig aws.Config) ConfigOption {
return func(c *appConfig) {
c.awsConfig = awsConfig
}
Expand Down
18 changes: 13 additions & 5 deletions app/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ package app

import (
"context"
"github.com/elastic/apm-aws-lambda/apmproxy"
"github.com/elastic/apm-aws-lambda/extension"
"github.com/elastic/apm-aws-lambda/logsapi"
"fmt"
"sync"
"time"

"github.com/elastic/apm-aws-lambda/apmproxy"
"github.com/elastic/apm-aws-lambda/extension"
)

// Run runs the app.
Expand Down Expand Up @@ -57,7 +57,7 @@ func (app *App) Run(ctx context.Context) error {
}()

if app.logsClient != nil {
if err := app.logsClient.StartService([]logsapi.EventType{logsapi.Platform}, app.extensionClient.ExtensionID); err != nil {
if err := app.logsClient.StartService(app.extensionClient.ExtensionID); err != nil {
app.logger.Warnf("Error while subscribing to the Logs API: %v", err)

// disable logs API if the service failed to start
Expand Down Expand Up @@ -169,7 +169,15 @@ func (app *App) processEvent(
runtimeDone := make(chan struct{})
if app.logsClient != nil {
go func() {
if err := app.logsClient.ProcessLogs(invocationCtx, event.RequestID, app.apmClient, metadataContainer, runtimeDone, prevEvent); err != nil {
if err := app.logsClient.ProcessLogs(
invocationCtx,
event.RequestID,
event.InvokedFunctionArn,
app.apmClient,
metadataContainer,
runtimeDone,
prevEvent,
); err != nil {
app.logger.Errorf("Error while processing Lambda Logs ; %v", err)
} else {
close(runtimeDone)
Expand Down
32 changes: 24 additions & 8 deletions logsapi/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,33 @@ import (
"go.uber.org/zap"
)

// SubscriptionType represents the log streams that the Lambda Logs API
// provides for subscription
type SubscriptionType string

const (
// Platform logstream records events and errors related to
// invocations and extensions
Platform SubscriptionType = "platform"
// Function logstream records logs written by lambda function
// to stderr or stdout
Function SubscriptionType = "function"
// Extension logstream records logs generated by extension
Extension SubscriptionType = "extension"
)

// ClientOption is a config option for a Client.
type ClientOption func(*Client)

// Client is the client used to subscribe to the Logs API.
type Client struct {
httpClient *http.Client
logsAPIBaseURL string
logsChannel chan LogEvent
listenerAddr string
server *http.Server
logger *zap.SugaredLogger
httpClient *http.Client
logsAPIBaseURL string
logsAPISubscriptionTypes []SubscriptionType
logsChannel chan LogEvent
listenerAddr string
server *http.Server
logger *zap.SugaredLogger
}

// NewClient returns a new Client with the given URL.
Expand Down Expand Up @@ -69,7 +85,7 @@ func NewClient(opts ...ClientOption) (*Client, error) {
}

// StartService starts the HTTP server listening for log events and subscribes to the Logs API.
func (lc *Client) StartService(eventTypes []EventType, extensionID string) error {
func (lc *Client) StartService(extensionID string) error {
addr, err := lc.startHTTPServer()
if err != nil {
return err
Expand All @@ -93,7 +109,7 @@ func (lc *Client) StartService(eventTypes []EventType, extensionID string) error

uri := fmt.Sprintf("http://%s", net.JoinHostPort(host, port))

if err := lc.subscribe(eventTypes, extensionID, uri); err != nil {
if err := lc.subscribe(lc.logsAPISubscriptionTypes, extensionID, uri); err != nil {
if err := lc.Shutdown(); err != nil {
lc.logger.Warnf("failed to shutdown the server: %v", err)
}
Expand Down
20 changes: 14 additions & 6 deletions logsapi/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@ package logsapi_test

import (
"bytes"
"github.com/elastic/apm-aws-lambda/logsapi"
"encoding/json"
"net/http"
"net/http/httptest"
"net/url"
"testing"

"github.com/elastic/apm-aws-lambda/logsapi"

"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
)
Expand Down Expand Up @@ -103,13 +104,14 @@ func TestSubscribe(t *testing.T) {
}))
defer s.Close()

c, err := logsapi.NewClient(append(tc.opts, logsapi.WithLogsAPIBaseURL(s.URL))...)
cOpts := append(tc.opts, logsapi.WithLogsAPIBaseURL(s.URL), logsapi.WithSubscriptionTypes(logsapi.Platform))
c, err := logsapi.NewClient(cOpts...)
require.NoError(t, err)

if tc.expectedErr {
require.Error(t, c.StartService([]logsapi.EventType{logsapi.Platform}, "foo"))
require.Error(t, c.StartService("foo"))
} else {
require.NoError(t, c.StartService([]logsapi.EventType{logsapi.Platform}, "foo"))
require.NoError(t, c.StartService("foo"))
}

require.NoError(t, c.Shutdown())
Expand Down Expand Up @@ -141,9 +143,15 @@ func TestSubscribeAWSRequest(t *testing.T) {
}))
defer s.Close()

c, err := logsapi.NewClient(append(tc.opts, logsapi.WithLogsAPIBaseURL(s.URL), logsapi.WithLogBuffer(1))...)
cOpts := append(
tc.opts,
logsapi.WithLogsAPIBaseURL(s.URL),
logsapi.WithLogBuffer(1),
logsapi.WithSubscriptionTypes(logsapi.Platform, logsapi.Function),
)
c, err := logsapi.NewClient(cOpts...)
require.NoError(t, err)
require.NoError(t, c.StartService([]logsapi.EventType{logsapi.Platform}, "testID"))
require.NoError(t, c.StartService("testID"))

// Create a request to send to the logs listener
platformDoneEvent := `{
Expand Down
56 changes: 34 additions & 22 deletions logsapi/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,33 +25,23 @@ import (
"github.com/elastic/apm-aws-lambda/extension"
)

// EventType represents the type of logs in Lambda
type EventType string
// LogEventType represents the log type that is received in the log messages
type LogEventType string

const (
// Platform is to receive logs emitted by the platform
Platform EventType = "platform"
// Function is to receive logs emitted by the function
Function EventType = "function"
// Extension is to receive logs emitted by the extension
Extension EventType = "extension"
)

// SubEventType is a Logs API sub event type
type SubEventType string

const (
// RuntimeDone event is sent when lambda function is finished it's execution
RuntimeDone SubEventType = "platform.runtimeDone"
Fault SubEventType = "platform.fault"
Report SubEventType = "platform.report"
Start SubEventType = "platform.start"
// PlatformRuntimeDone event is sent when lambda function is finished it's execution
PlatformRuntimeDone LogEventType = "platform.runtimeDone"
PlatformFault LogEventType = "platform.fault"
PlatformReport LogEventType = "platform.report"
PlatformStart LogEventType = "platform.start"
PlatformEnd LogEventType = "platform.end"
FunctionLog LogEventType = "function"
)

// LogEvent represents an event received from the Logs API
type LogEvent struct {
Time time.Time `json:"time"`
Type SubEventType `json:"type"`
Type LogEventType `json:"type"`
StringRecord string
Record LogEventRecord
}
Expand All @@ -68,19 +58,26 @@ type LogEventRecord struct {
func (lc *Client) ProcessLogs(
ctx context.Context,
requestID string,
invokedFnArn string,
apmClient *apmproxy.Client,
metadataContainer *apmproxy.MetadataContainer,
runtimeDoneSignal chan struct{},
prevEvent *extension.NextEventResponse,
) error {
// platformStartReqID is to identify the requestID for the function
// logs under the assumption that function logs for a specific request
// ID will be bounded by PlatformStart and PlatformEnd events.
var platformStartReqID string
for {
select {
case logEvent := <-lc.logsChannel:
lc.logger.Debugf("Received log event %v", logEvent.Type)
switch logEvent.Type {
case PlatformStart:
platformStartReqID = logEvent.Record.RequestID
// Check the logEvent for runtimeDone and compare the RequestID
// to the id that came in via the Next API
case RuntimeDone:
case PlatformRuntimeDone:
if logEvent.Record.RequestID == requestID {
lc.logger.Info("Received runtimeDone event for this function invocation")
runtimeDoneSignal <- struct{}{}
Expand All @@ -89,7 +86,7 @@ func (lc *Client) ProcessLogs(

lc.logger.Debug("Log API runtimeDone event request id didn't match")
// Check if the logEvent contains metrics and verify that they can be linked to the previous invocation
case Report:
case PlatformReport:
if prevEvent != nil && logEvent.Record.RequestID == prevEvent.RequestID {
lc.logger.Debug("Received platform report for the previous function invocation")
processedMetrics, err := ProcessPlatformReport(metadataContainer, prevEvent, logEvent)
Expand All @@ -102,6 +99,21 @@ func (lc *Client) ProcessLogs(
lc.logger.Warn("report event request id didn't match the previous event id")
lc.logger.Debug("Log API runtimeDone event request id didn't match")
}
case FunctionLog:
// TODO: @lahsivjar Buffer logs and send batches of data to APM-Server.
// Buffering should account for metadata being available before sending.
lc.logger.Debug("Received function log")
processedLog, err := ProcessFunctionLog(
metadataContainer,
platformStartReqID,
invokedFnArn,
logEvent,
)
if err != nil {
lc.logger.Errorf("Error processing function log : %v", err)
} else {
apmClient.EnqueueAPMData(processedLog)
}
}
case <-ctx.Done():
lc.logger.Debug("Current invocation over. Interrupting logs processing goroutine")
Expand Down
Loading