Skip to content

Commit f2b6b0d

Browse files
committed
Address review comments
1 parent 5f0ac89 commit f2b6b0d

File tree

8 files changed

+54
-45
lines changed

8 files changed

+54
-45
lines changed

app/app.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ func New(ctx context.Context, opts ...ConfigOption) (*App, error) {
7676
}
7777

7878
subscriptionLogStreams := []logsapi.SubscriptionType{logsapi.Platform}
79-
if !c.disableFunctionLogSubscription {
79+
if c.enableFunctionLogSubscription {
8080
subscriptionLogStreams = append(subscriptionLogStreams, logsapi.Function)
8181
}
8282

@@ -85,7 +85,7 @@ func New(ctx context.Context, opts ...ConfigOption) (*App, error) {
8585
logsapi.WithListenerAddress(addr),
8686
logsapi.WithLogBuffer(100),
8787
logsapi.WithLogger(app.logger),
88-
logsapi.WithLogsAPISubscriptionTypes(subscriptionLogStreams...),
88+
logsapi.WithSubscriptionTypes(subscriptionLogStreams...),
8989
)
9090
if err != nil {
9191
return nil, err

app/config.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,13 @@ package app
2020
import "github.com/aws/aws-sdk-go-v2/aws"
2121

2222
type appConfig struct {
23-
awsLambdaRuntimeAPI string
24-
awsConfig aws.Config
25-
extensionName string
26-
disableLogsAPI bool
27-
disableFunctionLogSubscription bool
28-
logLevel string
29-
logsapiAddr string
23+
awsLambdaRuntimeAPI string
24+
awsConfig aws.Config
25+
extensionName string
26+
disableLogsAPI bool
27+
enableFunctionLogSubscription bool
28+
logLevel string
29+
logsapiAddr string
3030
}
3131

3232
// ConfigOption is used to configure the lambda extension
@@ -55,12 +55,12 @@ func WithoutLogsAPI() ConfigOption {
5555
}
5656
}
5757

58-
// WithoutFunctionLogSubscription disables the logs api subscription
59-
// to function log stream. This option will only work if LogsAPI is
60-
// not disabled by the WithoutLogsAPI config option.
61-
func WithoutFunctionLogSubscription() ConfigOption {
58+
// WithFunctionLogSubscription enables the logs api subscription
59+
// to function log stream. This option will only work if LogsAPI
60+
// is not disabled by the WithoutLogsAPI config option.
61+
func WithFunctionLogSubscription() ConfigOption {
6262
return func(c *appConfig) {
63-
c.disableFunctionLogSubscription = true
63+
c.enableFunctionLogSubscription = true
6464
}
6565
}
6666

logsapi/client_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ func TestSubscribe(t *testing.T) {
104104
}))
105105
defer s.Close()
106106

107-
cOpts := append(tc.opts, logsapi.WithLogsAPIBaseURL(s.URL), logsapi.WithLogsAPISubscriptionTypes(logsapi.Platform))
107+
cOpts := append(tc.opts, logsapi.WithLogsAPIBaseURL(s.URL), logsapi.WithSubscriptionTypes(logsapi.Platform))
108108
c, err := logsapi.NewClient(cOpts...)
109109
require.NoError(t, err)
110110

@@ -147,7 +147,7 @@ func TestSubscribeAWSRequest(t *testing.T) {
147147
tc.opts,
148148
logsapi.WithLogsAPIBaseURL(s.URL),
149149
logsapi.WithLogBuffer(1),
150-
logsapi.WithLogsAPISubscriptionTypes(logsapi.Platform, logsapi.Function),
150+
logsapi.WithSubscriptionTypes(logsapi.Platform, logsapi.Function),
151151
)
152152
c, err := logsapi.NewClient(cOpts...)
153153
require.NoError(t, err)

logsapi/event.go

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -100,21 +100,19 @@ func (lc *Client) ProcessLogs(
100100
lc.logger.Debug("Log API runtimeDone event request id didn't match")
101101
}
102102
case FunctionLog:
103-
if metadataContainer != nil && len(metadataContainer.Metadata) > 0 {
104-
lc.logger.Debug("Received function log")
105-
processedLog, err := ProcessFunctionLog(
106-
metadataContainer,
107-
platformStartReqID,
108-
invokedFnArn,
109-
logEvent,
110-
)
111-
if err != nil {
112-
lc.logger.Errorf("Error processing function log : %v", err)
113-
} else {
114-
apmClient.EnqueueAPMData(processedLog)
115-
}
103+
// TODO: @lahsivjar Buffer logs and send batches of data to APM-Server.
104+
// Buffering should account for metadata being available before sending.
105+
lc.logger.Debug("Received function log")
106+
processedLog, err := ProcessFunctionLog(
107+
metadataContainer,
108+
platformStartReqID,
109+
invokedFnArn,
110+
logEvent,
111+
)
112+
if err != nil {
113+
lc.logger.Errorf("Error processing function log : %v", err)
116114
} else {
117-
lc.logger.Warn("Function log received before metadata populated, quietly dropping log")
115+
apmClient.EnqueueAPMData(processedLog)
118116
}
119117
}
120118
case <-ctx.Done():

logsapi/functionlogs.go

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
package logsapi
1919

2020
import (
21+
"errors"
22+
2123
"github.com/elastic/apm-aws-lambda/apmproxy"
2224
"go.elastic.co/apm/v2/model"
2325
"go.elastic.co/fastjson"
@@ -83,14 +85,18 @@ func (lc logContainer) MarshalFastJSON(json *fastjson.Writer) error {
8385
return nil
8486
}
8587

86-
// ProcessFunctionLog consumes extension event, agent metadata and log
87-
// event from Lambda logs API to create a payload for APM server
88+
// ProcessFunctionLog consumes agent metadata and log event from Lambda
89+
// logs API to create a payload for APM server.
8890
func ProcessFunctionLog(
8991
metadataContainer *apmproxy.MetadataContainer,
9092
requestID string,
9193
invokedFnArn string,
9294
log LogEvent,
9395
) (apmproxy.AgentData, error) {
96+
if metadataContainer == nil || len(metadataContainer.Metadata) == 0 {
97+
return apmproxy.AgentData{}, errors.New("metadata is required")
98+
}
99+
94100
lc := logContainer{
95101
Log: &logLine{
96102
Timestamp: model.Time(log.Time),
@@ -108,11 +114,11 @@ func ProcessFunctionLog(
108114
return apmproxy.AgentData{}, err
109115
}
110116

111-
var logData []byte
112-
if metadataContainer.Metadata != nil {
113-
logData = append(metadataContainer.Metadata, []byte("\n")...)
114-
}
117+
capacity := len(metadataContainer.Metadata) + jsonWriter.Size() + 1
118+
logData := make([]byte, len(metadataContainer.Metadata), capacity)
119+
copy(logData, metadataContainer.Metadata)
115120

121+
logData = append(logData, '\n')
116122
logData = append(logData, jsonWriter.Bytes()...)
117123
return apmproxy.AgentData{Data: logData}, nil
118124
}

logsapi/metrics.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,6 @@ func (mc MetricsContainer) MarshalFastJSON(json *fastjson.Writer) error {
6363
}
6464

6565
func ProcessPlatformReport(metadataContainer *apmproxy.MetadataContainer, functionData *extension.NextEventResponse, platformReport LogEvent) (apmproxy.AgentData, error) {
66-
var metricsData []byte
6766
metricsContainer := MetricsContainer{
6867
Metrics: &model.Metrics{},
6968
}
@@ -101,10 +100,11 @@ func ProcessPlatformReport(metadataContainer *apmproxy.MetadataContainer, functi
101100
return apmproxy.AgentData{}, err
102101
}
103102

104-
if metadataContainer.Metadata != nil {
105-
metricsData = append(metadataContainer.Metadata, []byte("\n")...)
106-
}
103+
capacity := len(metadataContainer.Metadata) + jsonWriter.Size() + 1
104+
metricsData := make([]byte, len(metadataContainer.Metadata), capacity)
105+
copy(metricsData, metadataContainer.Metadata)
107106

107+
metricsData = append(metricsData, '\n')
108108
metricsData = append(metricsData, jsonWriter.Bytes()...)
109109
return apmproxy.AgentData{Data: metricsData}, nil
110110
}

logsapi/option.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,8 @@ func WithLogger(logger *zap.SugaredLogger) ClientOption {
4949
}
5050
}
5151

52-
// WithLogsAPISubscriptionTypes sets the logstreams that the Logs API will subscribe to.
53-
func WithLogsAPISubscriptionTypes(types ...SubscriptionType) ClientOption {
52+
// WithSubscriptionTypes sets the logstreams that the Logs API will subscribe to.
53+
func WithSubscriptionTypes(types ...SubscriptionType) ClientOption {
5454
return func(c *Client) {
5555
c.logsAPISubscriptionTypes = types
5656
}

main.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,14 @@ func main() {
4747
app.WithAWSConfig(cfg),
4848
}
4949

50-
disableFnLog, _ := strconv.ParseBool(os.Getenv("ELASTIC_DISABLE_FUNCTION_LOG_SUBSCRIPTION"))
51-
if disableFnLog {
52-
appConfigs = append(appConfigs, app.WithoutFunctionLogSubscription())
50+
captureLogs, err := strconv.ParseBool(os.Getenv("ELASTIC_APM_LAMBDA_CAPTURE_LOGS"))
51+
// Default capture function logs to true
52+
if err != nil {
53+
captureLogs = true
54+
}
55+
56+
if captureLogs {
57+
appConfigs = append(appConfigs, app.WithFunctionLogSubscription())
5358
}
5459

5560
app, err := app.New(ctx, appConfigs...)

0 commit comments

Comments
 (0)