Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions apm-lambda-extension/app/app.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package app

import "elastic/apm-lambda-extension/extension"

// App is the main application.
type App struct {
extensionName string
extensionClient *extension.Client
}

// New returns an App or an error if the
// creation failed.
func New(opts ...configOption) (*App, error) {
c := appConfig{}

for _, opt := range opts {
opt(&c)
}

app := &App{
extensionName: c.extensionName,
extensionClient: extension.NewClient(c.awsLambdaRuntimeAPI),
}

return app, nil
}
41 changes: 41 additions & 0 deletions apm-lambda-extension/app/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package app

type appConfig struct {
awsLambdaRuntimeAPI string
extensionName string
}

type configOption func(*appConfig)

// WithLambdaRuntimeAPI sets the AWS Lambda Runtime API
// endpoint (normally taken from $AWS_LAMBDA_RUNTIME_API),
// used by the AWS client.
func WithLambdaRuntimeAPI(api string) configOption {
return func(c *appConfig) {
c.awsLambdaRuntimeAPI = api
}
}

// WithExtensionName sets the extension name.
func WithExtensionName(name string) configOption {
return func(c *appConfig) {
c.extensionName = name
}
}
191 changes: 191 additions & 0 deletions apm-lambda-extension/app/run.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package app

import (
"context"
"elastic/apm-lambda-extension/extension"
"elastic/apm-lambda-extension/logsapi"
"sync"
"time"

"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/secretsmanager"
)

// Run runs the app.
func (app *App) Run(ctx context.Context) error {
cfg, err := config.LoadDefaultConfig(ctx)
if err != nil {
extension.Log.Fatalf("failed to load default config: %v", err)
}
manager := secretsmanager.NewFromConfig(cfg)
// pulls ELASTIC_ env variable into globals for easy access
config := extension.ProcessEnv(manager)
extension.Log.Level.SetLevel(config.LogLevel)

// register extension with AWS Extension API
res, err := app.extensionClient.Register(ctx, app.extensionName)
if err != nil {
extension.Log.Errorf("Error: %s", err)

status, errRuntime := app.extensionClient.InitError(ctx, err.Error())
if errRuntime != nil {
return errRuntime
}

extension.Log.Infof("Init error signal sent to runtime : %s", status)
extension.Log.Infof("Exiting")
return err
}
extension.Log.Debugf("Register response: %v", extension.PrettyPrint(res))

// Init APM Server Transport struct and start http server to receive data from agent
apmServerTransport := extension.InitApmServerTransport(config)
agentDataServer, err := extension.StartHttpServer(ctx, apmServerTransport)
if err != nil {
extension.Log.Errorf("Could not start APM data receiver : %v", err)
}
defer agentDataServer.Close()

// Use a wait group to ensure the background go routine sending to the APM server
// completes before signaling that the extension is ready for the next invocation.

logsTransport, err := logsapi.Subscribe(ctx, app.extensionClient.ExtensionID, []logsapi.EventType{logsapi.Platform})
if err != nil {
extension.Log.Warnf("Error while subscribing to the Logs API: %v", err)
}

// The previous event id is used to validate the received Lambda metrics
var prevEvent *extension.NextEventResponse
// This data structure contains metadata tied to the current Lambda instance. If empty, it is populated once for each
// active Lambda environment
metadataContainer := extension.MetadataContainer{}

for {
select {
case <-ctx.Done():
extension.Log.Info("Received a signal, exiting...")
return nil
default:
var backgroundDataSendWg sync.WaitGroup
event := app.processEvent(ctx, apmServerTransport, logsTransport, &backgroundDataSendWg, prevEvent, &metadataContainer)
if event.EventType == extension.Shutdown {
extension.Log.Info("Received shutdown event, exiting...")
return nil
}
extension.Log.Debug("Waiting for background data send to end")
backgroundDataSendWg.Wait()
if config.SendStrategy == extension.SyncFlush {
// Flush APM data now that the function invocation has completed
apmServerTransport.FlushAPMData(ctx)
}
prevEvent = event
}
}
}

func (app *App) processEvent(
ctx context.Context,
apmServerTransport *extension.ApmServerTransport,
logsTransport *logsapi.LogsTransport,
backgroundDataSendWg *sync.WaitGroup,
prevEvent *extension.NextEventResponse,
metadataContainer *extension.MetadataContainer,
) *extension.NextEventResponse {

// Invocation context
invocationCtx, invocationCancel := context.WithCancel(ctx)
defer invocationCancel()

// call Next method of extension API. This long polling HTTP method
// will block until there's an invocation of the function
extension.Log.Infof("Waiting for next event...")
event, err := app.extensionClient.NextEvent(ctx)
if err != nil {
status, err := app.extensionClient.ExitError(ctx, err.Error())
if err != nil {
panic(err)
}
extension.Log.Errorf("Error: %s", err)
extension.Log.Infof("Exit signal sent to runtime : %s", status)
extension.Log.Infof("Exiting")
return nil
}

// Used to compute Lambda Timeout
event.Timestamp = time.Now()
extension.Log.Debug("Received event.")
extension.Log.Debugf("%v", extension.PrettyPrint(event))

if event.EventType == extension.Shutdown {
return event
}

// APM Data Processing
apmServerTransport.AgentDoneSignal = make(chan struct{})
defer close(apmServerTransport.AgentDoneSignal)
backgroundDataSendWg.Add(1)
go func() {
defer backgroundDataSendWg.Done()
if err := apmServerTransport.ForwardApmData(invocationCtx, metadataContainer); err != nil {
extension.Log.Error(err)
}
}()

// Lambda Service Logs Processing, also used to extract metrics from APM logs
// This goroutine should not be started if subscription failed
runtimeDone := make(chan struct{})
if logsTransport != nil {
go func() {
if err := logsapi.ProcessLogs(invocationCtx, event.RequestID, apmServerTransport, logsTransport, metadataContainer, runtimeDone, prevEvent); err != nil {
extension.Log.Errorf("Error while processing Lambda Logs ; %v", err)
} else {
close(runtimeDone)
}
}()
} else {
extension.Log.Warn("Logs collection not started due to earlier subscription failure")
close(runtimeDone)
}

// Calculate how long to wait for a runtimeDoneSignal or AgentDoneSignal signal
flushDeadlineMs := event.DeadlineMs - 100
durationUntilFlushDeadline := time.Until(time.Unix(flushDeadlineMs/1000, 0))

// Create a timer that expires after durationUntilFlushDeadline
timer := time.NewTimer(durationUntilFlushDeadline)
defer timer.Stop()

// The extension relies on 3 independent mechanisms to minimize the time interval between the end of the execution of
// the lambda function and the end of the execution of processEvent()
// 1) AgentDoneSignal is triggered upon reception of a `flushed=true` query from the agent
// 2) [Backup 1] RuntimeDone is triggered upon reception of a Lambda log entry certifying the end of the execution of the current function
// 3) [Backup 2] If all else fails, the extension relies of the timeout of the Lambda function to interrupt itself 100 ms before the specified deadline.
// This time interval is large enough to attempt a last flush attempt (if SendStrategy == syncFlush) before the environment gets shut down.
select {
case <-apmServerTransport.AgentDoneSignal:
extension.Log.Debug("Received agent done signal")
case <-runtimeDone:
extension.Log.Debug("Received runtimeDone signal")
case <-timer.C:
extension.Log.Info("Time expired waiting for agent signal or runtimeDone event")
}

return event
}
Loading