Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 18 additions & 11 deletions apm-lambda-extension/app/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"elastic/apm-lambda-extension/apmproxy"
"elastic/apm-lambda-extension/extension"
"elastic/apm-lambda-extension/logsapi"
"fmt"
"sync"
"time"

Expand All @@ -33,7 +34,7 @@ import (
func (app *App) Run(ctx context.Context) error {
cfg, err := config.LoadDefaultConfig(ctx)
if err != nil {
extension.Log.Fatalf("failed to load default config: %v", err)
return fmt.Errorf("failed to load AWS default config: %w", err)
}
manager := secretsmanager.NewFromConfig(cfg)
// pulls ELASTIC_ env variable into globals for easy access
Expand Down Expand Up @@ -63,7 +64,7 @@ func (app *App) Run(ctx context.Context) error {
// start http server to receive data from agent
err = app.apmClient.StartReceiver()
if err != nil {
extension.Log.Errorf("Could not start APM data receiver : %v", err)
return fmt.Errorf("failed to start the APM data receiver : %w", err)
}
defer func() {
if err := app.apmClient.Shutdown(); err != nil {
Expand Down Expand Up @@ -103,7 +104,11 @@ func (app *App) Run(ctx context.Context) error {
// Use a wait group to ensure the background go routine sending to the APM server
// completes before signaling that the extension is ready for the next invocation.
var backgroundDataSendWg sync.WaitGroup
event := app.processEvent(ctx, &backgroundDataSendWg, prevEvent, &metadataContainer)
event, err := app.processEvent(ctx, &backgroundDataSendWg, prevEvent, &metadataContainer)
if err != nil {
return err
}

if event.EventType == extension.Shutdown {
extension.Log.Info("Received shutdown event, exiting...")
return nil
Expand All @@ -124,7 +129,7 @@ func (app *App) processEvent(
backgroundDataSendWg *sync.WaitGroup,
prevEvent *extension.NextEventResponse,
metadataContainer *apmproxy.MetadataContainer,
) *extension.NextEventResponse {
) (*extension.NextEventResponse, error) {

// Invocation context
invocationCtx, invocationCancel := context.WithCancel(ctx)
Expand All @@ -135,14 +140,16 @@ func (app *App) processEvent(
extension.Log.Infof("Waiting for next event...")
event, err := app.extensionClient.NextEvent(ctx)
if err != nil {
status, err := app.extensionClient.ExitError(ctx, err.Error())
if err != nil {
panic(err)
}
extension.Log.Errorf("Error: %s", err)

status, errRuntime := app.extensionClient.ExitError(ctx, err.Error())
if errRuntime != nil {
return nil, errRuntime
}

extension.Log.Infof("Exit signal sent to runtime : %s", status)
extension.Log.Infof("Exiting")
return nil
return nil, err
}

// Used to compute Lambda Timeout
Expand All @@ -151,7 +158,7 @@ func (app *App) processEvent(
extension.Log.Debugf("%v", extension.PrettyPrint(event))

if event.EventType == extension.Shutdown {
return event
return event, nil
}

// APM Data Processing
Expand Down Expand Up @@ -204,5 +211,5 @@ func (app *App) processEvent(
extension.Log.Info("Time expired waiting for agent signal or runtimeDone event")
}

return event
return event, nil
}