From c8df576e38d66aefb564f27555e693b71f1189c5 Mon Sep 17 00:00:00 2001 From: kruskal <99559985+kruskall@users.noreply.github.com> Date: Mon, 18 Jul 2022 18:24:40 +0200 Subject: [PATCH 1/2] refactor: move main loop to a separate package and avoid global state Create an app struct to hold the state of the application and use functional options for flexibility. Remove global extension client and avoid mixing test code with app logic. This is part of a bigger effort to improve testing and decouple different components. --- apm-lambda-extension/app/app.go | 43 +++++++ apm-lambda-extension/app/config.go | 40 ++++++ apm-lambda-extension/app/run.go | 191 +++++++++++++++++++++++++++++ apm-lambda-extension/main.go | 171 ++------------------------ apm-lambda-extension/main_test.go | 1 - 5 files changed, 282 insertions(+), 164 deletions(-) create mode 100644 apm-lambda-extension/app/app.go create mode 100644 apm-lambda-extension/app/config.go create mode 100644 apm-lambda-extension/app/run.go diff --git a/apm-lambda-extension/app/app.go b/apm-lambda-extension/app/app.go new file mode 100644 index 00000000..9b3b4b82 --- /dev/null +++ b/apm-lambda-extension/app/app.go @@ -0,0 +1,43 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package app + +import "elastic/apm-lambda-extension/extension" + +// App is the main application. +type App struct { + extensionName string + extensionClient *extension.Client +} + +// New returns an App or an error if the +// creation failed. +func New(opts ...configOption) (*App, error) { + c := appConfig{} + + for _, opt := range opts { + opt(&c) + } + + app := &App{ + extensionName: c.extensionName, + extensionClient: extension.NewClient(c.awsLambdaRuntimeAPI), + } + + return app, nil +} diff --git a/apm-lambda-extension/app/config.go b/apm-lambda-extension/app/config.go new file mode 100644 index 00000000..663e76e8 --- /dev/null +++ b/apm-lambda-extension/app/config.go @@ -0,0 +1,40 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package app + +type appConfig struct { + awsLambdaRuntimeAPI string + extensionName string +} + +type configOption func(*appConfig) + +// WithLambdaRuntimeAPI sets the AWS Lambda Runtime API +// used by the AWS client. +func WithLambdaRuntimeAPI(api string) configOption { + return func(c *appConfig) { + c.awsLambdaRuntimeAPI = api + } +} + +// WithExtensionName sets the extension name. +func WithExtensionName(name string) configOption { + return func(c *appConfig) { + c.extensionName = name + } +} diff --git a/apm-lambda-extension/app/run.go b/apm-lambda-extension/app/run.go new file mode 100644 index 00000000..3cf4be98 --- /dev/null +++ b/apm-lambda-extension/app/run.go @@ -0,0 +1,191 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package app + +import ( + "context" + "elastic/apm-lambda-extension/extension" + "elastic/apm-lambda-extension/logsapi" + "sync" + "time" + + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/service/secretsmanager" +) + +// Run runs the app. +func (app *App) Run(ctx context.Context) error { + cfg, err := config.LoadDefaultConfig(ctx) + if err != nil { + extension.Log.Fatalf("failed to load default config: %v", err) + } + manager := secretsmanager.NewFromConfig(cfg) + // pulls ELASTIC_ env variable into globals for easy access + config := extension.ProcessEnv(manager) + extension.Log.Level.SetLevel(config.LogLevel) + + // register extension with AWS Extension API + res, err := app.extensionClient.Register(ctx, app.extensionName) + if err != nil { + extension.Log.Errorf("Error: %s", err) + + status, errRuntime := app.extensionClient.InitError(ctx, err.Error()) + if errRuntime != nil { + return errRuntime + } + + extension.Log.Infof("Init error signal sent to runtime : %s", status) + extension.Log.Infof("Exiting") + return err + } + extension.Log.Debugf("Register response: %v", extension.PrettyPrint(res)) + + // Init APM Server Transport struct and start http server to receive data from agent + apmServerTransport := extension.InitApmServerTransport(config) + agentDataServer, err := extension.StartHttpServer(ctx, apmServerTransport) + if err != nil { + extension.Log.Errorf("Could not start APM data receiver : %v", err) + } + defer agentDataServer.Close() + + // Use a wait group to ensure the background go routine sending to the APM server + // completes before signaling that the extension is ready for the next invocation. + + logsTransport, err := logsapi.Subscribe(ctx, app.extensionClient.ExtensionID, []logsapi.EventType{logsapi.Platform}) + if err != nil { + extension.Log.Warnf("Error while subscribing to the Logs API: %v", err) + } + + // The previous event id is used to validate the received Lambda metrics + var prevEvent *extension.NextEventResponse + // This data structure contains metadata tied to the current Lambda instance. If empty, it is populated once for each + // active Lambda environment + metadataContainer := extension.MetadataContainer{} + + for { + select { + case <-ctx.Done(): + extension.Log.Info("Received a signal, exiting...") + return nil + default: + var backgroundDataSendWg sync.WaitGroup + event := app.processEvent(ctx, apmServerTransport, logsTransport, &backgroundDataSendWg, prevEvent, &metadataContainer) + if event.EventType == extension.Shutdown { + extension.Log.Info("Received shutdown event, exiting...") + return nil + } + extension.Log.Debug("Waiting for background data send to end") + backgroundDataSendWg.Wait() + if config.SendStrategy == extension.SyncFlush { + // Flush APM data now that the function invocation has completed + apmServerTransport.FlushAPMData(ctx) + } + prevEvent = event + } + } +} + +func (app *App) processEvent( + ctx context.Context, + apmServerTransport *extension.ApmServerTransport, + logsTransport *logsapi.LogsTransport, + backgroundDataSendWg *sync.WaitGroup, + prevEvent *extension.NextEventResponse, + metadataContainer *extension.MetadataContainer, +) *extension.NextEventResponse { + + // Invocation context + invocationCtx, invocationCancel := context.WithCancel(ctx) + defer invocationCancel() + + // call Next method of extension API. This long polling HTTP method + // will block until there's an invocation of the function + extension.Log.Infof("Waiting for next event...") + event, err := app.extensionClient.NextEvent(ctx) + if err != nil { + status, err := app.extensionClient.ExitError(ctx, err.Error()) + if err != nil { + panic(err) + } + extension.Log.Errorf("Error: %s", err) + extension.Log.Infof("Exit signal sent to runtime : %s", status) + extension.Log.Infof("Exiting") + return nil + } + + // Used to compute Lambda Timeout + event.Timestamp = time.Now() + extension.Log.Debug("Received event.") + extension.Log.Debugf("%v", extension.PrettyPrint(event)) + + if event.EventType == extension.Shutdown { + return event + } + + // APM Data Processing + apmServerTransport.AgentDoneSignal = make(chan struct{}) + defer close(apmServerTransport.AgentDoneSignal) + backgroundDataSendWg.Add(1) + go func() { + defer backgroundDataSendWg.Done() + if err := apmServerTransport.ForwardApmData(invocationCtx, metadataContainer); err != nil { + extension.Log.Error(err) + } + }() + + // Lambda Service Logs Processing, also used to extract metrics from APM logs + // This goroutine should not be started if subscription failed + runtimeDone := make(chan struct{}) + if logsTransport != nil { + go func() { + if err := logsapi.ProcessLogs(invocationCtx, event.RequestID, apmServerTransport, logsTransport, metadataContainer, runtimeDone, prevEvent); err != nil { + extension.Log.Errorf("Error while processing Lambda Logs ; %v", err) + } else { + close(runtimeDone) + } + }() + } else { + extension.Log.Warn("Logs collection not started due to earlier subscription failure") + close(runtimeDone) + } + + // Calculate how long to wait for a runtimeDoneSignal or AgentDoneSignal signal + flushDeadlineMs := event.DeadlineMs - 100 + durationUntilFlushDeadline := time.Until(time.Unix(flushDeadlineMs/1000, 0)) + + // Create a timer that expires after durationUntilFlushDeadline + timer := time.NewTimer(durationUntilFlushDeadline) + defer timer.Stop() + + // The extension relies on 3 independent mechanisms to minimize the time interval between the end of the execution of + // the lambda function and the end of the execution of processEvent() + // 1) AgentDoneSignal is triggered upon reception of a `flushed=true` query from the agent + // 2) [Backup 1] RuntimeDone is triggered upon reception of a Lambda log entry certifying the end of the execution of the current function + // 3) [Backup 2] If all else fails, the extension relies of the timeout of the Lambda function to interrupt itself 100 ms before the specified deadline. + // This time interval is large enough to attempt a last flush attempt (if SendStrategy == syncFlush) before the environment gets shut down. + select { + case <-apmServerTransport.AgentDoneSignal: + extension.Log.Debug("Received agent done signal") + case <-runtimeDone: + extension.Log.Debug("Received runtimeDone signal") + case <-timer.C: + extension.Log.Info("Time expired waiting for agent signal or runtimeDone event") + } + + return event +} diff --git a/apm-lambda-extension/main.go b/apm-lambda-extension/main.go index 3715b2de..2a5460f5 100644 --- a/apm-lambda-extension/main.go +++ b/apm-lambda-extension/main.go @@ -22,182 +22,27 @@ import ( "os" "os/signal" "path/filepath" - "sync" "syscall" - "time" + "elastic/apm-lambda-extension/app" "elastic/apm-lambda-extension/extension" - "elastic/apm-lambda-extension/logsapi" - - "github.com/aws/aws-sdk-go-v2/config" - "github.com/aws/aws-sdk-go-v2/service/secretsmanager" -) - -var ( - extensionName = filepath.Base(os.Args[0]) // extension name has to match the filename - extensionClient = extension.NewClient(os.Getenv("AWS_LAMBDA_RUNTIME_API")) ) -/* --- elastic vars --- */ - func main() { - // Global context ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT) defer cancel() - cfg, err := config.LoadDefaultConfig(ctx) - if err != nil { - extension.Log.Fatalf("failed to load default config: %v", err) - } - manager := secretsmanager.NewFromConfig(cfg) - // pulls ELASTIC_ env variable into globals for easy access - config := extension.ProcessEnv(manager) - extension.Log.Level.SetLevel(config.LogLevel) - - // register extension with AWS Extension API - res, err := extensionClient.Register(ctx, extensionName) - if err != nil { - status, errRuntime := extensionClient.InitError(ctx, err.Error()) - if errRuntime != nil { - panic(errRuntime) - } - extension.Log.Errorf("Error: %s", err) - extension.Log.Infof("Init error signal sent to runtime : %s", status) - extension.Log.Infof("Exiting") - return - } - extension.Log.Debugf("Register response: %v", extension.PrettyPrint(res)) - - // Init APM Server Transport struct and start http server to receive data from agent - apmServerTransport := extension.InitApmServerTransport(config) - agentDataServer, err := extension.StartHttpServer(ctx, apmServerTransport) - if err != nil { - extension.Log.Errorf("Could not start APM data receiver : %v", err) - } - defer agentDataServer.Close() - - // Use a wait group to ensure the background go routine sending to the APM server - // completes before signaling that the extension is ready for the next invocation. - - logsTransport, err := logsapi.Subscribe(ctx, extensionClient.ExtensionID, []logsapi.EventType{logsapi.Platform}) - if err != nil { - extension.Log.Warnf("Error while subscribing to the Logs API: %v", err) - } - - // The previous event id is used to validate the received Lambda metrics - var prevEvent *extension.NextEventResponse - // This data structure contains metadata tied to the current Lambda instance. If empty, it is populated once for each - // active Lambda environment - metadataContainer := extension.MetadataContainer{} - - for { - select { - case <-ctx.Done(): - extension.Log.Info("Received a signal, exiting...") - return - default: - var backgroundDataSendWg sync.WaitGroup - event := processEvent(ctx, cancel, apmServerTransport, logsTransport, &backgroundDataSendWg, prevEvent, &metadataContainer) - extension.Log.Debug("Waiting for background data send to end") - backgroundDataSendWg.Wait() - if config.SendStrategy == extension.SyncFlush { - // Flush APM data now that the function invocation has completed - apmServerTransport.FlushAPMData(ctx) - } - prevEvent = event - } - } -} - -func processEvent( - ctx context.Context, - cancel context.CancelFunc, - apmServerTransport *extension.ApmServerTransport, - logsTransport *logsapi.LogsTransport, - backgroundDataSendWg *sync.WaitGroup, - prevEvent *extension.NextEventResponse, - metadataContainer *extension.MetadataContainer, -) *extension.NextEventResponse { - - // Invocation context - invocationCtx, invocationCancel := context.WithCancel(ctx) - defer invocationCancel() - - // call Next method of extension API. This long polling HTTP method - // will block until there's an invocation of the function - extension.Log.Infof("Waiting for next event...") - event, err := extensionClient.NextEvent(ctx) + app, err := app.New( + app.WithExtensionName(filepath.Base(os.Args[0])), + app.WithLambdaRuntimeAPI(os.Getenv("AWS_LAMBDA_RUNTIME_API")), + ) if err != nil { - status, err := extensionClient.ExitError(ctx, err.Error()) - if err != nil { - panic(err) - } - extension.Log.Errorf("Error: %s", err) - extension.Log.Infof("Exit signal sent to runtime : %s", status) - extension.Log.Infof("Exiting") - return nil + extension.Log.Fatalf("failed to create the app: %v", err) } - // Used to compute Lambda Timeout - event.Timestamp = time.Now() - extension.Log.Debug("Received event.") - extension.Log.Debugf("%v", extension.PrettyPrint(event)) - - if event.EventType == extension.Shutdown { - cancel() - return event - } - - // APM Data Processing - apmServerTransport.AgentDoneSignal = make(chan struct{}) - defer close(apmServerTransport.AgentDoneSignal) - backgroundDataSendWg.Add(1) - go func() { - defer backgroundDataSendWg.Done() - if err := apmServerTransport.ForwardApmData(invocationCtx, metadataContainer); err != nil { - extension.Log.Error(err) - } - }() - - // Lambda Service Logs Processing, also used to extract metrics from APM logs - // This goroutine should not be started if subscription failed - runtimeDone := make(chan struct{}) - if logsTransport != nil { - go func() { - if err := logsapi.ProcessLogs(invocationCtx, event.RequestID, apmServerTransport, logsTransport, metadataContainer, runtimeDone, prevEvent); err != nil { - extension.Log.Errorf("Error while processing Lambda Logs ; %v", err) - } else { - close(runtimeDone) - } - }() - } else { - extension.Log.Warn("Logs collection not started due to earlier subscription failure") - close(runtimeDone) - } - - // Calculate how long to wait for a runtimeDoneSignal or AgentDoneSignal signal - flushDeadlineMs := event.DeadlineMs - 100 - durationUntilFlushDeadline := time.Until(time.Unix(flushDeadlineMs/1000, 0)) - - // Create a timer that expires after durationUntilFlushDeadline - timer := time.NewTimer(durationUntilFlushDeadline) - defer timer.Stop() - - // The extension relies on 3 independent mechanisms to minimize the time interval between the end of the execution of - // the lambda function and the end of the execution of processEvent() - // 1) AgentDoneSignal is triggered upon reception of a `flushed=true` query from the agent - // 2) [Backup 1] RuntimeDone is triggered upon reception of a Lambda log entry certifying the end of the execution of the current function - // 3) [Backup 2] If all else fails, the extension relies of the timeout of the Lambda function to interrupt itself 100 ms before the specified deadline. - // This time interval is large enough to attempt a last flush attempt (if SendStrategy == syncFlush) before the environment gets shut down. - select { - case <-apmServerTransport.AgentDoneSignal: - extension.Log.Debug("Received agent done signal") - case <-runtimeDone: - extension.Log.Debug("Received runtimeDone signal") - case <-timer.C: - extension.Log.Info("Time expired waiting for agent signal or runtimeDone event") + if err := app.Run(ctx); err != nil { + extension.Log.Fatalf("error while running: %v", err) } - return event } diff --git a/apm-lambda-extension/main_test.go b/apm-lambda-extension/main_test.go index 42b0c0ab..7617242b 100644 --- a/apm-lambda-extension/main_test.go +++ b/apm-lambda-extension/main_test.go @@ -182,7 +182,6 @@ func newMockLambdaServer(t *testing.T, eventsChannel chan MockEvent) *MockServer slicedLambdaURL := strings.Split(lambdaServer.URL, "//") strippedLambdaURL := slicedLambdaURL[1] t.Setenv("AWS_LAMBDA_RUNTIME_API", strippedLambdaURL) - extensionClient = extension.NewClient(os.Getenv("AWS_LAMBDA_RUNTIME_API")) // Find unused port for the extension to listen to extensionPort, err := e2eTesting.GetFreePort() From c3b404897c1cf461939a42425ddd85f46552805e Mon Sep 17 00:00:00 2001 From: Joseph Kruskal <99559985+kruskall@users.noreply.github.com> Date: Wed, 20 Jul 2022 11:37:30 +0200 Subject: [PATCH 2/2] docs: improve lambdaruntimeapi comment Co-authored-by: Andrew Wilkins --- apm-lambda-extension/app/config.go | 1 + 1 file changed, 1 insertion(+) diff --git a/apm-lambda-extension/app/config.go b/apm-lambda-extension/app/config.go index 663e76e8..a21b1588 100644 --- a/apm-lambda-extension/app/config.go +++ b/apm-lambda-extension/app/config.go @@ -25,6 +25,7 @@ type appConfig struct { type configOption func(*appConfig) // WithLambdaRuntimeAPI sets the AWS Lambda Runtime API +// endpoint (normally taken from $AWS_LAMBDA_RUNTIME_API), // used by the AWS client. func WithLambdaRuntimeAPI(api string) configOption { return func(c *appConfig) {