Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
3d9680e
First iteration, minimal metadata
jlvoiseux Feb 18, 2022
5d73ee7
Comma fix for http_listener unit test
jlvoiseux Feb 18, 2022
89772ef
Remove label processing code
jlvoiseux Feb 22, 2022
21df4c4
Metadata addition and Model refactor
jlvoiseux Feb 22, 2022
02611e9
Add faas fields and version number
jlvoiseux Feb 28, 2022
0e1e448
Merge branch 'main' into lambda-platform-metrics
jlvoiseux Mar 10, 2022
f9eb3e6
Merge main and take refactor into account
jlvoiseux Mar 10, 2022
c8e23cb
Replace UnixMills by UnixNano to enforce Go 1.15 compatibility
jlvoiseux Mar 10, 2022
577e296
Replace UnixMicro by UnixNano to enforce Go 1.15 compatibility
jlvoiseux Mar 10, 2022
9b4e37c
Merge branch 'main' into lambda-platform-metrics
jlvoiseux Mar 23, 2022
dd18ed6
Merge branch 'main' into lambda-platform-metrics
jlvoiseux Apr 20, 2022
e26f662
Refactor metrics model
jlvoiseux Apr 20, 2022
d13ca5d
Fix e2e tests config
jlvoiseux Apr 21, 2022
81ee964
Merge branch 'main' into lambda-platform-metrics
jlvoiseux Apr 28, 2022
66dd5d4
FIx linting
jlvoiseux Apr 29, 2022
ffd2ce5
Comment extension name upsertion
jlvoiseux May 23, 2022
674ee2d
Merge branch 'main' into lambda-platform-metrics
jlvoiseux May 23, 2022
2e61ae2
Populate metadatacontainer in the APM Server transport goroutine
jlvoiseux May 30, 2022
dfccc6c
Update config field usage
jlvoiseux May 30, 2022
be64157
Refactor metrics processing
jlvoiseux May 30, 2022
220efca
Move back logs API data model to the logsapi package
jlvoiseux May 30, 2022
f4fa34f
Refactor logs processing in main.go
jlvoiseux May 30, 2022
af20235
Add end to end metrics and metadata testing
jlvoiseux May 30, 2022
95efb4b
Various fixes
jlvoiseux May 30, 2022
304f752
Merge branch 'main' into lambda-platform-metrics
jlvoiseux May 30, 2022
080c7c6
Improve error handling
jlvoiseux May 31, 2022
a17a88b
Using extension-defined logger
jlvoiseux May 31, 2022
91caa52
Simplify main tests
jlvoiseux May 31, 2022
18d50ad
Fix linting-related issues
jlvoiseux May 31, 2022
6905a8b
Correct extension name
jlvoiseux May 31, 2022
4bfd75a
Check nil prevevent
jlvoiseux May 31, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 3 additions & 31 deletions apm-lambda-extension/e2e-testing/e2e_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,6 @@ package e2eTesting
import (
"archive/zip"
"bufio"
"bytes"
"compress/gzip"
"compress/zlib"
"elastic/apm-lambda-extension/extension"
"fmt"
"io"
"io/ioutil"
Expand All @@ -33,6 +29,8 @@ import (
"os/exec"
"path/filepath"
"strings"

"elastic/apm-lambda-extension/extension"
)

// GetEnvVarValueOrSetDefault retrieves the environment variable envVarName.
Expand Down Expand Up @@ -157,33 +155,7 @@ func GetDecompressedBytesFromRequest(req *http.Request) ([]byte, error) {
if req.Body != nil {
rawBytes, _ = ioutil.ReadAll(req.Body)
}

switch req.Header.Get("Content-Encoding") {
case "deflate":
reader := bytes.NewReader(rawBytes)
zlibreader, err := zlib.NewReader(reader)
if err != nil {
return nil, fmt.Errorf("could not create zlib.NewReader: %v", err)
}
bodyBytes, err := ioutil.ReadAll(zlibreader)
if err != nil {
return nil, fmt.Errorf("could not read from zlib reader using ioutil.ReadAll: %v", err)
}
return bodyBytes, nil
case "gzip":
reader := bytes.NewReader(rawBytes)
zlibreader, err := gzip.NewReader(reader)
if err != nil {
return nil, fmt.Errorf("could not create gzip.NewReader: %v", err)
}
bodyBytes, err := ioutil.ReadAll(zlibreader)
if err != nil {
return nil, fmt.Errorf("could not read from gzip reader using ioutil.ReadAll: %v", err)
}
return bodyBytes, nil
default:
return rawBytes, nil
}
return extension.GetUncompressedBytes(rawBytes, req.Header.Get("Content-Encoding"))
}

// GetFreePort is a function that queries the kernel and obtains an unused port.
Expand Down
22 changes: 14 additions & 8 deletions apm-lambda-extension/extension/apm_server_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ const (
type ApmServerTransport struct {
sync.Mutex
bufferPool sync.Pool
config *extensionConfig
config *Config
AgentDoneSignal chan struct{}
dataChannel chan AgentData
client *http.Client
Expand All @@ -56,7 +56,7 @@ type ApmServerTransport struct {
gracePeriodTimer *time.Timer
}

func InitApmServerTransport(config *extensionConfig) *ApmServerTransport {
func InitApmServerTransport(config *Config) *ApmServerTransport {
var transport ApmServerTransport
transport.bufferPool = sync.Pool{New: func() interface{} {
return &bytes.Buffer{}
Expand All @@ -75,7 +75,7 @@ func InitApmServerTransport(config *extensionConfig) *ApmServerTransport {
// StartBackgroundApmDataForwarding Receive agent data as it comes in and post it to the APM server.
// Stop checking for, and sending agent data when the function invocation
// has completed, signaled via a channel.
func (transport *ApmServerTransport) ForwardApmData(ctx context.Context) error {
func (transport *ApmServerTransport) ForwardApmData(ctx context.Context, metadataContainer *MetadataContainer) error {
if transport.status == Failing {
return nil
}
Expand All @@ -85,6 +85,12 @@ func (transport *ApmServerTransport) ForwardApmData(ctx context.Context) error {
Log.Debug("Invocation context cancelled, not processing any more agent data")
return nil
case agentData := <-transport.dataChannel:
if metadataContainer.Metadata == nil {
err := ProcessMetadata(agentData, metadataContainer)
if err != nil {
Log.Errorf("Error extracting metadata from agent payload %v", err)
}
}
if err := transport.PostToApmServer(ctx, agentData); err != nil {
return fmt.Errorf("error sending to APM server, skipping: %v", err)
}
Expand Down Expand Up @@ -151,16 +157,16 @@ func (transport *ApmServerTransport) PostToApmServer(ctx context.Context, agentD
r = buf
}

req, err := http.NewRequest("POST", transport.config.apmServerUrl+endpointURI, r)
req, err := http.NewRequest("POST", transport.config.ApmServerUrl+endpointURI, r)
if err != nil {
return fmt.Errorf("failed to create a new request when posting to APM server: %v", err)
}
req.Header.Add("Content-Encoding", encoding)
req.Header.Add("Content-Type", "application/x-ndjson")
if transport.config.apmServerApiKey != "" {
req.Header.Add("Authorization", "ApiKey "+transport.config.apmServerApiKey)
} else if transport.config.apmServerSecretToken != "" {
req.Header.Add("Authorization", "Bearer "+transport.config.apmServerSecretToken)
if transport.config.ApmServerApiKey != "" {
req.Header.Add("Authorization", "ApiKey "+transport.config.ApmServerApiKey)
} else if transport.config.ApmServerSecretToken != "" {
req.Header.Add("Authorization", "Bearer "+transport.config.ApmServerSecretToken)
}

Log.Debug("Sending data chunk to APM server")
Expand Down
43 changes: 22 additions & 21 deletions apm-lambda-extension/extension/apm_server_transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@ package extension
import (
"compress/gzip"
"context"
"github.com/stretchr/testify/assert"
"io"
"io/ioutil"
"net/http"
"net/http/httptest"
"testing"

"github.com/stretchr/testify/assert"
)

func TestPostToApmServerDataCompressed(t *testing.T) {
Expand Down Expand Up @@ -66,8 +67,8 @@ func TestPostToApmServerDataCompressed(t *testing.T) {
}))
defer apmServer.Close()

config := extensionConfig{
apmServerUrl: apmServer.URL + "/",
config := Config{
ApmServerUrl: apmServer.URL + "/",
}
transport := InitApmServerTransport(&config)
err := transport.PostToApmServer(context.Background(), agentData)
Expand Down Expand Up @@ -111,16 +112,16 @@ func TestPostToApmServerDataNotCompressed(t *testing.T) {
}))
defer apmServer.Close()

config := extensionConfig{
apmServerUrl: apmServer.URL + "/",
config := Config{
ApmServerUrl: apmServer.URL + "/",
}
transport := InitApmServerTransport(&config)
err := transport.PostToApmServer(context.Background(), agentData)
assert.Equal(t, nil, err)
}

func TestGracePeriod(t *testing.T) {
transport := InitApmServerTransport(&extensionConfig{})
transport := InitApmServerTransport(&Config{})

transport.reconnectionCount = 0
val0 := transport.computeGracePeriod().Seconds()
Expand Down Expand Up @@ -156,7 +157,7 @@ func TestGracePeriod(t *testing.T) {
}

func TestSetHealthyTransport(t *testing.T) {
transport := InitApmServerTransport(&extensionConfig{})
transport := InitApmServerTransport(&Config{})
transport.SetApmServerTransportState(context.Background(), Healthy)
assert.True(t, transport.status == Healthy)
assert.Equal(t, transport.reconnectionCount, -1)
Expand All @@ -165,15 +166,15 @@ func TestSetHealthyTransport(t *testing.T) {
func TestSetFailingTransport(t *testing.T) {
// By explicitly setting the reconnection count to 0, we ensure that the grace period will not be 0
// and avoid a race between reaching the pending status and the test assertion.
transport := InitApmServerTransport(&extensionConfig{})
transport := InitApmServerTransport(&Config{})
transport.reconnectionCount = 0
transport.SetApmServerTransportState(context.Background(), Failing)
assert.True(t, transport.status == Failing)
assert.Equal(t, transport.reconnectionCount, 1)
}

func TestSetPendingTransport(t *testing.T) {
transport := InitApmServerTransport(&extensionConfig{})
transport := InitApmServerTransport(&Config{})
transport.SetApmServerTransportState(context.Background(), Healthy)
transport.SetApmServerTransportState(context.Background(), Failing)
for {
Expand All @@ -186,15 +187,15 @@ func TestSetPendingTransport(t *testing.T) {
}

func TestSetPendingTransportExplicitly(t *testing.T) {
transport := InitApmServerTransport(&extensionConfig{})
transport := InitApmServerTransport(&Config{})
transport.SetApmServerTransportState(context.Background(), Healthy)
transport.SetApmServerTransportState(context.Background(), Pending)
assert.True(t, transport.status == Healthy)
assert.Equal(t, transport.reconnectionCount, -1)
}

func TestSetInvalidTransport(t *testing.T) {
transport := InitApmServerTransport(&extensionConfig{})
transport := InitApmServerTransport(&Config{})
transport.SetApmServerTransportState(context.Background(), Healthy)
transport.SetApmServerTransportState(context.Background(), "Invalid")
assert.True(t, transport.status == Healthy)
Expand Down Expand Up @@ -233,8 +234,8 @@ func TestEnterBackoffFromHealthy(t *testing.T) {
return
}
}))
config := extensionConfig{
apmServerUrl: apmServer.URL + "/",
config := Config{
ApmServerUrl: apmServer.URL + "/",
}
transport := InitApmServerTransport(&config)
transport.SetApmServerTransportState(context.Background(), Healthy)
Expand Down Expand Up @@ -286,8 +287,8 @@ func TestEnterBackoffFromFailing(t *testing.T) {
// Close the APM server early so that POST requests fail and that backoff is enabled
apmServer.Close()

config := extensionConfig{
apmServerUrl: apmServer.URL + "/",
config := Config{
ApmServerUrl: apmServer.URL + "/",
}

transport := InitApmServerTransport(&config)
Expand Down Expand Up @@ -339,8 +340,8 @@ func TestAPMServerRecovery(t *testing.T) {
}))
defer apmServer.Close()

config := extensionConfig{
apmServerUrl: apmServer.URL + "/",
config := Config{
ApmServerUrl: apmServer.URL + "/",
}

transport := InitApmServerTransport(&config)
Expand Down Expand Up @@ -392,8 +393,8 @@ func TestContinuedAPMServerFailure(t *testing.T) {
}))
apmServer.Close()

config := extensionConfig{
apmServerUrl: apmServer.URL + "/",
config := Config{
ApmServerUrl: apmServer.URL + "/",
}

transport := InitApmServerTransport(&config)
Expand Down Expand Up @@ -425,8 +426,8 @@ func BenchmarkPostToAPM(b *testing.B) {
return
}
}))
config := extensionConfig{
apmServerUrl: apmServer.URL + "/",
config := Config{
ApmServerUrl: apmServer.URL + "/",
}

transport := InitApmServerTransport(&config)
Expand Down
2 changes: 2 additions & 0 deletions apm-lambda-extension/extension/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"encoding/json"
"fmt"
"net/http"
"time"
)

// RegisterResponse is the body of the response for /register
Expand All @@ -34,6 +35,7 @@ type RegisterResponse struct {

// NextEventResponse is the response for /event/next
type NextEventResponse struct {
Timestamp time.Time `json:"timestamp,omitempty"`
EventType EventType `json:"eventType"`
DeadlineMs int64 `json:"deadlineMs"`
RequestID string `json:"requestId"`
Expand Down
4 changes: 2 additions & 2 deletions apm-lambda-extension/extension/http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ func StartHttpServer(ctx context.Context, transport *ApmServerTransport) (agentD
mux := http.NewServeMux()
mux.HandleFunc("/", handleInfoRequest(ctx, transport))
mux.HandleFunc("/intake/v2/events", handleIntakeV2Events(transport))
timeout := time.Duration(transport.config.dataReceiverTimeoutSeconds) * time.Second
timeout := time.Duration(transport.config.DataReceiverTimeoutSeconds) * time.Second
server := &http.Server{
Addr: transport.config.dataReceiverServerPort,
Addr: transport.config.DataReceiverServerPort,
Handler: mux,
ReadTimeout: timeout,
WriteTimeout: timeout,
Expand Down
60 changes: 30 additions & 30 deletions apm-lambda-extension/extension/http_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,12 @@ func TestInfoProxy(t *testing.T) {
defer apmServer.Close()

// Create extension config and start the server
config := extensionConfig{
apmServerUrl: apmServer.URL,
apmServerSecretToken: "foo",
apmServerApiKey: "bar",
dataReceiverServerPort: ":1234",
dataReceiverTimeoutSeconds: 15,
config := Config{
ApmServerUrl: apmServer.URL,
ApmServerSecretToken: "foo",
ApmServerApiKey: "bar",
DataReceiverServerPort: ":1234",
DataReceiverTimeoutSeconds: 15,
}
transport := InitApmServerTransport(&config)
agentDataServer, err := StartHttpServer(context.Background(), transport)
Expand Down Expand Up @@ -100,12 +100,12 @@ func TestInfoProxyErrorStatusCode(t *testing.T) {
defer apmServer.Close()

// Create extension config and start the server
config := extensionConfig{
apmServerUrl: apmServer.URL,
apmServerSecretToken: "foo",
apmServerApiKey: "bar",
dataReceiverServerPort: ":1234",
dataReceiverTimeoutSeconds: 15,
config := Config{
ApmServerUrl: apmServer.URL,
ApmServerSecretToken: "foo",
ApmServerApiKey: "bar",
DataReceiverServerPort: ":1234",
DataReceiverTimeoutSeconds: 15,
}
transport := InitApmServerTransport(&config)

Expand Down Expand Up @@ -145,11 +145,11 @@ func Test_handleInfoRequest(t *testing.T) {
`

// Create extension config
config := extensionConfig{
apmServerSecretToken: "foo",
apmServerApiKey: "bar",
dataReceiverServerPort: ":1234",
dataReceiverTimeoutSeconds: 15,
config := Config{
ApmServerSecretToken: "foo",
ApmServerApiKey: "bar",
DataReceiverServerPort: ":1234",
DataReceiverTimeoutSeconds: 15,
}
transport := InitApmServerTransport(&config)

Expand Down Expand Up @@ -191,7 +191,7 @@ func (errReader) Read(_ []byte) (int, error) {
}

func Test_handleInfoRequestInvalidBody(t *testing.T) {
transport := InitApmServerTransport(&extensionConfig{})
transport := InitApmServerTransport(&Config{})
mux := http.NewServeMux()
urlPath := "/intake/v2/events"
mux.HandleFunc(urlPath, handleIntakeV2Events(transport))
Expand All @@ -211,10 +211,10 @@ func Test_handleIntakeV2EventsQueryParam(t *testing.T) {
defer apmServer.Close()

// Create extension config and start the server
config := extensionConfig{
apmServerUrl: apmServer.URL,
dataReceiverServerPort: ":1234",
dataReceiverTimeoutSeconds: 15,
config := Config{
ApmServerUrl: apmServer.URL,
DataReceiverServerPort: ":1234",
DataReceiverTimeoutSeconds: 15,
}
transport := InitApmServerTransport(&config)
transport.AgentDoneSignal = make(chan struct{}, 1)
Expand Down Expand Up @@ -264,10 +264,10 @@ func Test_handleIntakeV2EventsNoQueryParam(t *testing.T) {
defer apmServer.Close()

// Create extension config and start the server
config := extensionConfig{
apmServerUrl: apmServer.URL,
dataReceiverServerPort: ":1234",
dataReceiverTimeoutSeconds: 15,
config := Config{
ApmServerUrl: apmServer.URL,
DataReceiverServerPort: ":1234",
DataReceiverTimeoutSeconds: 15,
}
transport := InitApmServerTransport(&config)
transport.AgentDoneSignal = make(chan struct{}, 1)
Expand Down Expand Up @@ -307,10 +307,10 @@ func Test_handleIntakeV2EventsQueryParamEmptyData(t *testing.T) {
defer apmServer.Close()

// Create extension config and start the server
config := extensionConfig{
apmServerUrl: apmServer.URL,
dataReceiverServerPort: ":1234",
dataReceiverTimeoutSeconds: 15,
config := Config{
ApmServerUrl: apmServer.URL,
DataReceiverServerPort: ":1234",
DataReceiverTimeoutSeconds: 15,
}
transport := InitApmServerTransport(&config)
transport.AgentDoneSignal = make(chan struct{}, 1)
Expand Down
Loading