Skip to content
Merged
20 changes: 19 additions & 1 deletion apm-lambda-extension/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,17 @@

package app

import "elastic/apm-lambda-extension/extension"
import (
"elastic/apm-lambda-extension/extension"
"elastic/apm-lambda-extension/logsapi"
"fmt"
)

// App is the main application.
type App struct {
extensionName string
extensionClient *extension.Client
logsClient *logsapi.Client
}

// New returns an App or an error if the
Expand All @@ -39,5 +44,18 @@ func New(opts ...configOption) (*App, error) {
extensionClient: extension.NewClient(c.awsLambdaRuntimeAPI),
}

if !c.disableLogsAPI {
lc, err := logsapi.NewClient(
logsapi.WithLogsAPIBaseURL(fmt.Sprintf("http://%s", c.awsLambdaRuntimeAPI)),
logsapi.WithListenerAddress("sandbox:0"),
logsapi.WithLogBuffer(100),
)
if err != nil {
return nil, err
}

app.logsClient = lc
}

return app, nil
}
8 changes: 8 additions & 0 deletions apm-lambda-extension/app/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package app
type appConfig struct {
awsLambdaRuntimeAPI string
extensionName string
disableLogsAPI bool
}

type configOption func(*appConfig)
Expand All @@ -39,3 +40,10 @@ func WithExtensionName(name string) configOption {
c.extensionName = name
}
}

// WithoutLogsAPI disables the logs api.
func WithoutLogsAPI() configOption {
return func(c *appConfig) {
c.disableLogsAPI = true
}
}
25 changes: 18 additions & 7 deletions apm-lambda-extension/app/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,20 @@ func (app *App) Run(ctx context.Context) error {
// Use a wait group to ensure the background go routine sending to the APM server
// completes before signaling that the extension is ready for the next invocation.

logsTransport, err := logsapi.Subscribe(ctx, app.extensionClient.ExtensionID, []logsapi.EventType{logsapi.Platform})
if err != nil {
extension.Log.Warnf("Error while subscribing to the Logs API: %v", err)
if app.logsClient != nil {
if err := app.logsClient.StartService([]logsapi.EventType{logsapi.Platform}, app.extensionClient.ExtensionID); err != nil {
extension.Log.Warnf("Error while subscribing to the Logs API: %v", err)

// disable logs API if the service failed to start
app.logsClient = nil
} else {
// Remember to shutdown the log service if available.
defer func() {
if err := app.logsClient.Shutdown(); err != nil {
extension.Log.Warnf("failed to shutdown the log service: %v", err)
}
}()
}
}

// The previous event id is used to validate the received Lambda metrics
Expand All @@ -81,10 +92,11 @@ func (app *App) Run(ctx context.Context) error {
select {
case <-ctx.Done():
extension.Log.Info("Received a signal, exiting...")

return nil
default:
var backgroundDataSendWg sync.WaitGroup
event := app.processEvent(ctx, apmServerTransport, logsTransport, &backgroundDataSendWg, prevEvent, &metadataContainer)
event := app.processEvent(ctx, apmServerTransport, &backgroundDataSendWg, prevEvent, &metadataContainer)
if event.EventType == extension.Shutdown {
extension.Log.Info("Received shutdown event, exiting...")
return nil
Expand All @@ -103,7 +115,6 @@ func (app *App) Run(ctx context.Context) error {
func (app *App) processEvent(
ctx context.Context,
apmServerTransport *extension.ApmServerTransport,
logsTransport *logsapi.LogsTransport,
backgroundDataSendWg *sync.WaitGroup,
prevEvent *extension.NextEventResponse,
metadataContainer *extension.MetadataContainer,
Expand Down Expand Up @@ -151,9 +162,9 @@ func (app *App) processEvent(
// Lambda Service Logs Processing, also used to extract metrics from APM logs
// This goroutine should not be started if subscription failed
runtimeDone := make(chan struct{})
if logsTransport != nil {
if app.logsClient != nil {
go func() {
if err := logsapi.ProcessLogs(invocationCtx, event.RequestID, apmServerTransport, logsTransport, metadataContainer, runtimeDone, prevEvent); err != nil {
if err := app.logsClient.ProcessLogs(invocationCtx, event.RequestID, apmServerTransport, metadataContainer, runtimeDone, prevEvent); err != nil {
extension.Log.Errorf("Error while processing Lambda Logs ; %v", err)
} else {
close(runtimeDone)
Expand Down
205 changes: 54 additions & 151 deletions apm-lambda-extension/logsapi/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,186 +18,89 @@
package logsapi

import (
"bytes"
"encoding/json"
"context"
"elastic/apm-lambda-extension/extension"
"errors"
"fmt"
"io/ioutil"
"net"
"net/http"
"time"
)

const lambdaAgentIdentifierHeaderKey string = "Lambda-Extension-Identifier"
// ClientOption is a config option for a Client.
type ClientOption func(*Client)

// Client is the client used to subscribe to the Logs API
// Client is the client used to subscribe to the Logs API.
type Client struct {
httpClient *http.Client
logsAPIBaseUrl string
logsAPIBaseURL string
logsChannel chan LogEvent
listenerAddr string
server *http.Server
}

// NewClient returns a new Client with the given URL
func NewClient(logsAPIBaseUrl string) (*Client, error) {
return &Client{
httpClient: &http.Client{},
logsAPIBaseUrl: logsAPIBaseUrl,
}, nil
}

// EventType represents the type of logs in Lambda
type EventType string

const (
// Platform is to receive logs emitted by the platform
Platform EventType = "platform"
// Function is to receive logs emitted by the function
Function EventType = "function"
// Extension is to receive logs emitted by the extension
Extension EventType = "extension"
)

// SubEventType is a Logs API sub event type
type SubEventType string

const (
// RuntimeDone event is sent when lambda function is finished it's execution
RuntimeDone SubEventType = "platform.runtimeDone"
Fault SubEventType = "platform.fault"
Report SubEventType = "platform.report"
Start SubEventType = "platform.start"
)

// BufferingCfg is the configuration set for receiving logs from Logs API. Whichever of the conditions below is met first, the logs will be sent
type BufferingCfg struct {
// MaxItems is the maximum number of events to be buffered in memory. (default: 10000, minimum: 1000, maximum: 10000)
MaxItems uint32 `json:"maxItems"`
// MaxBytes is the maximum size in bytes of the logs to be buffered in memory. (default: 262144, minimum: 262144, maximum: 1048576)
MaxBytes uint32 `json:"maxBytes"`
// TimeoutMS is the maximum time (in milliseconds) for a batch to be buffered. (default: 1000, minimum: 100, maximum: 30000)
TimeoutMS uint32 `json:"timeoutMs"`
}

// URI is used to set the endpoint where the logs will be sent to
type URI string

// HttpMethod represents the HTTP method used to receive logs from Logs API
type HttpMethod string

const (
//HttpPost is to receive logs through POST.
HttpPost HttpMethod = "POST"
// HttpPut is to receive logs through PUT.
HttpPut HttpMethod = "PUT"
)

// HttpProtocol is used to specify the protocol when subscribing to Logs API for HTTP
type HttpProtocol string

const (
HttpProto HttpProtocol = "HTTP"
)

// HttpEncoding denotes what the content is encoded in
type HttpEncoding string

const (
JSON HttpEncoding = "JSON"
)
// NewClient returns a new Client with the given URL.
func NewClient(opts ...ClientOption) (*Client, error) {
c := Client{
server: &http.Server{},
httpClient: &http.Client{},
}

// Destination is the configuration for listeners who would like to receive logs with HTTP
type Destination struct {
Protocol HttpProtocol `json:"protocol"`
URI URI `json:"URI"`
HttpMethod HttpMethod `json:"method"`
Encoding HttpEncoding `json:"encoding"`
}
for _, opt := range opts {
opt(&c)
}

// SchemaVersion is the Lambda runtime API schema version
type SchemaVersion string
mux := http.NewServeMux()
mux.HandleFunc("/", handleLogEventsRequest(c.logsChannel))

const (
SchemaVersion20210318 = "2021-03-18"
SchemaVersionLatest = SchemaVersion20210318
)
c.server.Handler = mux

// SubscribeRequest is the request body that is sent to Logs API on subscribe
type SubscribeRequest struct {
SchemaVersion SchemaVersion `json:"schemaVersion"`
EventTypes []EventType `json:"types"`
BufferingCfg BufferingCfg `json:"buffering"`
Destination Destination `json:"destination"`
}
if c.logsAPIBaseURL == "" {
return nil, errors.New("logs api base url cannot be empty")
}

// SubscribeResponse is the response body that is received from Logs API on subscribe
type SubscribeResponse struct {
body string
return &c, nil
}

// Subscribe calls the Logs API to subscribe for the log events.
func (c *Client) Subscribe(types []EventType, destinationURI URI, extensionId string) (*SubscribeResponse, error) {
bufferingCfg := BufferingCfg{
MaxItems: 10000,
MaxBytes: 262144,
TimeoutMS: 25,
}
destination := Destination{
Protocol: HttpProto,
URI: destinationURI,
HttpMethod: HttpPost,
Encoding: JSON,
}
data, err := json.Marshal(
&SubscribeRequest{
SchemaVersion: SchemaVersionLatest,
EventTypes: types,
BufferingCfg: bufferingCfg,
Destination: destination,
})
// StartService starts the HTTP server listening for log events and subscribes to the Logs API.
func (lc *Client) StartService(eventTypes []EventType, extensionID string) error {
addr, err := lc.startHTTPServer()
if err != nil {
return nil, fmt.Errorf("failed to marshal SubscribeRequest: %w", err)
return err
}

headers := make(map[string]string)
headers[lambdaAgentIdentifierHeaderKey] = extensionId
url := fmt.Sprintf("%s/2020-08-15/logs", c.logsAPIBaseUrl)
resp, err := httpPutWithHeaders(c.httpClient, url, data, &headers)
_, port, err := net.SplitHostPort(addr)
if err != nil {
return nil, err
}
defer resp.Body.Close()

if resp.StatusCode == http.StatusAccepted {
return nil, errors.New("Logs API is not supported in this environment")
} else if resp.StatusCode != http.StatusOK {
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("%s failed: %d[%s]", url, resp.StatusCode, resp.Status)
if err := lc.Shutdown(); err != nil {
extension.Log.Warnf("failed to shutdown the server: %v", err)
}

return nil, fmt.Errorf("%s failed: %d[%s] %s", url, resp.StatusCode, resp.Status, string(body))
return fmt.Errorf("failed to retrieve port from address %s: %w", addr, err)
}

body, _ := ioutil.ReadAll(resp.Body)

return &SubscribeResponse{string(body)}, nil
}

func httpPutWithHeaders(client *http.Client, url string, data []byte, headers *map[string]string) (*http.Response, error) {
req, err := http.NewRequest("PUT", url, bytes.NewBuffer(data))
host, _, err := net.SplitHostPort(lc.listenerAddr)
if err != nil {
return nil, err
if err := lc.Shutdown(); err != nil {
extension.Log.Warnf("failed to shutdown the server: %v", err)
}
return fmt.Errorf("failed to retrieve host from address %s: %w", lc.listenerAddr, err)
}

contentType := "application/json"
req.Header.Set("Content-Type", contentType)
if headers != nil {
for k, v := range *headers {
req.Header.Set(k, v)
uri := fmt.Sprintf("http://%s", net.JoinHostPort(host, port))

if err := lc.subscribe(eventTypes, extensionID, uri); err != nil {
if err := lc.Shutdown(); err != nil {
extension.Log.Warnf("failed to shutdown the server: %v", err)
}
return err
}

resp, err := client.Do(req)
if err != nil {
return nil, err
}
return nil
}

// Shutdown shutdowns the log service gracefully.
func (lc *Client) Shutdown() error {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

return resp, nil
return lc.server.Shutdown(ctx)
}
Loading