diff --git a/apm-lambda-extension/e2e-testing/e2e_util.go b/apm-lambda-extension/e2e-testing/e2e_util.go index 485c097a..c88f3089 100644 --- a/apm-lambda-extension/e2e-testing/e2e_util.go +++ b/apm-lambda-extension/e2e-testing/e2e_util.go @@ -20,10 +20,6 @@ package e2eTesting import ( "archive/zip" "bufio" - "bytes" - "compress/gzip" - "compress/zlib" - "elastic/apm-lambda-extension/extension" "fmt" "io" "io/ioutil" @@ -33,6 +29,8 @@ import ( "os/exec" "path/filepath" "strings" + + "elastic/apm-lambda-extension/extension" ) // GetEnvVarValueOrSetDefault retrieves the environment variable envVarName. @@ -157,33 +155,7 @@ func GetDecompressedBytesFromRequest(req *http.Request) ([]byte, error) { if req.Body != nil { rawBytes, _ = ioutil.ReadAll(req.Body) } - - switch req.Header.Get("Content-Encoding") { - case "deflate": - reader := bytes.NewReader(rawBytes) - zlibreader, err := zlib.NewReader(reader) - if err != nil { - return nil, fmt.Errorf("could not create zlib.NewReader: %v", err) - } - bodyBytes, err := ioutil.ReadAll(zlibreader) - if err != nil { - return nil, fmt.Errorf("could not read from zlib reader using ioutil.ReadAll: %v", err) - } - return bodyBytes, nil - case "gzip": - reader := bytes.NewReader(rawBytes) - zlibreader, err := gzip.NewReader(reader) - if err != nil { - return nil, fmt.Errorf("could not create gzip.NewReader: %v", err) - } - bodyBytes, err := ioutil.ReadAll(zlibreader) - if err != nil { - return nil, fmt.Errorf("could not read from gzip reader using ioutil.ReadAll: %v", err) - } - return bodyBytes, nil - default: - return rawBytes, nil - } + return extension.GetUncompressedBytes(rawBytes, req.Header.Get("Content-Encoding")) } // GetFreePort is a function that queries the kernel and obtains an unused port. diff --git a/apm-lambda-extension/extension/apm_server_transport.go b/apm-lambda-extension/extension/apm_server_transport.go index 9b519098..87d5dd58 100644 --- a/apm-lambda-extension/extension/apm_server_transport.go +++ b/apm-lambda-extension/extension/apm_server_transport.go @@ -47,7 +47,7 @@ const ( type ApmServerTransport struct { sync.Mutex bufferPool sync.Pool - config *extensionConfig + config *Config AgentDoneSignal chan struct{} dataChannel chan AgentData client *http.Client @@ -56,7 +56,7 @@ type ApmServerTransport struct { gracePeriodTimer *time.Timer } -func InitApmServerTransport(config *extensionConfig) *ApmServerTransport { +func InitApmServerTransport(config *Config) *ApmServerTransport { var transport ApmServerTransport transport.bufferPool = sync.Pool{New: func() interface{} { return &bytes.Buffer{} @@ -75,7 +75,7 @@ func InitApmServerTransport(config *extensionConfig) *ApmServerTransport { // StartBackgroundApmDataForwarding Receive agent data as it comes in and post it to the APM server. // Stop checking for, and sending agent data when the function invocation // has completed, signaled via a channel. -func (transport *ApmServerTransport) ForwardApmData(ctx context.Context) error { +func (transport *ApmServerTransport) ForwardApmData(ctx context.Context, metadataContainer *MetadataContainer) error { if transport.status == Failing { return nil } @@ -85,6 +85,12 @@ func (transport *ApmServerTransport) ForwardApmData(ctx context.Context) error { Log.Debug("Invocation context cancelled, not processing any more agent data") return nil case agentData := <-transport.dataChannel: + if metadataContainer.Metadata == nil { + err := ProcessMetadata(agentData, metadataContainer) + if err != nil { + Log.Errorf("Error extracting metadata from agent payload %v", err) + } + } if err := transport.PostToApmServer(ctx, agentData); err != nil { return fmt.Errorf("error sending to APM server, skipping: %v", err) } @@ -151,16 +157,16 @@ func (transport *ApmServerTransport) PostToApmServer(ctx context.Context, agentD r = buf } - req, err := http.NewRequest("POST", transport.config.apmServerUrl+endpointURI, r) + req, err := http.NewRequest("POST", transport.config.ApmServerUrl+endpointURI, r) if err != nil { return fmt.Errorf("failed to create a new request when posting to APM server: %v", err) } req.Header.Add("Content-Encoding", encoding) req.Header.Add("Content-Type", "application/x-ndjson") - if transport.config.apmServerApiKey != "" { - req.Header.Add("Authorization", "ApiKey "+transport.config.apmServerApiKey) - } else if transport.config.apmServerSecretToken != "" { - req.Header.Add("Authorization", "Bearer "+transport.config.apmServerSecretToken) + if transport.config.ApmServerApiKey != "" { + req.Header.Add("Authorization", "ApiKey "+transport.config.ApmServerApiKey) + } else if transport.config.ApmServerSecretToken != "" { + req.Header.Add("Authorization", "Bearer "+transport.config.ApmServerSecretToken) } Log.Debug("Sending data chunk to APM server") diff --git a/apm-lambda-extension/extension/apm_server_transport_test.go b/apm-lambda-extension/extension/apm_server_transport_test.go index 5362e588..82217baf 100644 --- a/apm-lambda-extension/extension/apm_server_transport_test.go +++ b/apm-lambda-extension/extension/apm_server_transport_test.go @@ -20,12 +20,13 @@ package extension import ( "compress/gzip" "context" - "github.com/stretchr/testify/assert" "io" "io/ioutil" "net/http" "net/http/httptest" "testing" + + "github.com/stretchr/testify/assert" ) func TestPostToApmServerDataCompressed(t *testing.T) { @@ -66,8 +67,8 @@ func TestPostToApmServerDataCompressed(t *testing.T) { })) defer apmServer.Close() - config := extensionConfig{ - apmServerUrl: apmServer.URL + "/", + config := Config{ + ApmServerUrl: apmServer.URL + "/", } transport := InitApmServerTransport(&config) err := transport.PostToApmServer(context.Background(), agentData) @@ -111,8 +112,8 @@ func TestPostToApmServerDataNotCompressed(t *testing.T) { })) defer apmServer.Close() - config := extensionConfig{ - apmServerUrl: apmServer.URL + "/", + config := Config{ + ApmServerUrl: apmServer.URL + "/", } transport := InitApmServerTransport(&config) err := transport.PostToApmServer(context.Background(), agentData) @@ -120,7 +121,7 @@ func TestPostToApmServerDataNotCompressed(t *testing.T) { } func TestGracePeriod(t *testing.T) { - transport := InitApmServerTransport(&extensionConfig{}) + transport := InitApmServerTransport(&Config{}) transport.reconnectionCount = 0 val0 := transport.computeGracePeriod().Seconds() @@ -156,7 +157,7 @@ func TestGracePeriod(t *testing.T) { } func TestSetHealthyTransport(t *testing.T) { - transport := InitApmServerTransport(&extensionConfig{}) + transport := InitApmServerTransport(&Config{}) transport.SetApmServerTransportState(context.Background(), Healthy) assert.True(t, transport.status == Healthy) assert.Equal(t, transport.reconnectionCount, -1) @@ -165,7 +166,7 @@ func TestSetHealthyTransport(t *testing.T) { func TestSetFailingTransport(t *testing.T) { // By explicitly setting the reconnection count to 0, we ensure that the grace period will not be 0 // and avoid a race between reaching the pending status and the test assertion. - transport := InitApmServerTransport(&extensionConfig{}) + transport := InitApmServerTransport(&Config{}) transport.reconnectionCount = 0 transport.SetApmServerTransportState(context.Background(), Failing) assert.True(t, transport.status == Failing) @@ -173,7 +174,7 @@ func TestSetFailingTransport(t *testing.T) { } func TestSetPendingTransport(t *testing.T) { - transport := InitApmServerTransport(&extensionConfig{}) + transport := InitApmServerTransport(&Config{}) transport.SetApmServerTransportState(context.Background(), Healthy) transport.SetApmServerTransportState(context.Background(), Failing) for { @@ -186,7 +187,7 @@ func TestSetPendingTransport(t *testing.T) { } func TestSetPendingTransportExplicitly(t *testing.T) { - transport := InitApmServerTransport(&extensionConfig{}) + transport := InitApmServerTransport(&Config{}) transport.SetApmServerTransportState(context.Background(), Healthy) transport.SetApmServerTransportState(context.Background(), Pending) assert.True(t, transport.status == Healthy) @@ -194,7 +195,7 @@ func TestSetPendingTransportExplicitly(t *testing.T) { } func TestSetInvalidTransport(t *testing.T) { - transport := InitApmServerTransport(&extensionConfig{}) + transport := InitApmServerTransport(&Config{}) transport.SetApmServerTransportState(context.Background(), Healthy) transport.SetApmServerTransportState(context.Background(), "Invalid") assert.True(t, transport.status == Healthy) @@ -233,8 +234,8 @@ func TestEnterBackoffFromHealthy(t *testing.T) { return } })) - config := extensionConfig{ - apmServerUrl: apmServer.URL + "/", + config := Config{ + ApmServerUrl: apmServer.URL + "/", } transport := InitApmServerTransport(&config) transport.SetApmServerTransportState(context.Background(), Healthy) @@ -286,8 +287,8 @@ func TestEnterBackoffFromFailing(t *testing.T) { // Close the APM server early so that POST requests fail and that backoff is enabled apmServer.Close() - config := extensionConfig{ - apmServerUrl: apmServer.URL + "/", + config := Config{ + ApmServerUrl: apmServer.URL + "/", } transport := InitApmServerTransport(&config) @@ -339,8 +340,8 @@ func TestAPMServerRecovery(t *testing.T) { })) defer apmServer.Close() - config := extensionConfig{ - apmServerUrl: apmServer.URL + "/", + config := Config{ + ApmServerUrl: apmServer.URL + "/", } transport := InitApmServerTransport(&config) @@ -392,8 +393,8 @@ func TestContinuedAPMServerFailure(t *testing.T) { })) apmServer.Close() - config := extensionConfig{ - apmServerUrl: apmServer.URL + "/", + config := Config{ + ApmServerUrl: apmServer.URL + "/", } transport := InitApmServerTransport(&config) @@ -425,8 +426,8 @@ func BenchmarkPostToAPM(b *testing.B) { return } })) - config := extensionConfig{ - apmServerUrl: apmServer.URL + "/", + config := Config{ + ApmServerUrl: apmServer.URL + "/", } transport := InitApmServerTransport(&config) diff --git a/apm-lambda-extension/extension/client.go b/apm-lambda-extension/extension/client.go index fd8d41d5..0c6c78b1 100644 --- a/apm-lambda-extension/extension/client.go +++ b/apm-lambda-extension/extension/client.go @@ -23,6 +23,7 @@ import ( "encoding/json" "fmt" "net/http" + "time" ) // RegisterResponse is the body of the response for /register @@ -34,6 +35,7 @@ type RegisterResponse struct { // NextEventResponse is the response for /event/next type NextEventResponse struct { + Timestamp time.Time `json:"timestamp,omitempty"` EventType EventType `json:"eventType"` DeadlineMs int64 `json:"deadlineMs"` RequestID string `json:"requestId"` diff --git a/apm-lambda-extension/extension/http_server.go b/apm-lambda-extension/extension/http_server.go index 6794f5ff..b4f46208 100644 --- a/apm-lambda-extension/extension/http_server.go +++ b/apm-lambda-extension/extension/http_server.go @@ -29,9 +29,9 @@ func StartHttpServer(ctx context.Context, transport *ApmServerTransport) (agentD mux := http.NewServeMux() mux.HandleFunc("/", handleInfoRequest(ctx, transport)) mux.HandleFunc("/intake/v2/events", handleIntakeV2Events(transport)) - timeout := time.Duration(transport.config.dataReceiverTimeoutSeconds) * time.Second + timeout := time.Duration(transport.config.DataReceiverTimeoutSeconds) * time.Second server := &http.Server{ - Addr: transport.config.dataReceiverServerPort, + Addr: transport.config.DataReceiverServerPort, Handler: mux, ReadTimeout: timeout, WriteTimeout: timeout, diff --git a/apm-lambda-extension/extension/http_server_test.go b/apm-lambda-extension/extension/http_server_test.go index 190a65fe..4e8c5435 100644 --- a/apm-lambda-extension/extension/http_server_test.go +++ b/apm-lambda-extension/extension/http_server_test.go @@ -51,12 +51,12 @@ func TestInfoProxy(t *testing.T) { defer apmServer.Close() // Create extension config and start the server - config := extensionConfig{ - apmServerUrl: apmServer.URL, - apmServerSecretToken: "foo", - apmServerApiKey: "bar", - dataReceiverServerPort: ":1234", - dataReceiverTimeoutSeconds: 15, + config := Config{ + ApmServerUrl: apmServer.URL, + ApmServerSecretToken: "foo", + ApmServerApiKey: "bar", + DataReceiverServerPort: ":1234", + DataReceiverTimeoutSeconds: 15, } transport := InitApmServerTransport(&config) agentDataServer, err := StartHttpServer(context.Background(), transport) @@ -100,12 +100,12 @@ func TestInfoProxyErrorStatusCode(t *testing.T) { defer apmServer.Close() // Create extension config and start the server - config := extensionConfig{ - apmServerUrl: apmServer.URL, - apmServerSecretToken: "foo", - apmServerApiKey: "bar", - dataReceiverServerPort: ":1234", - dataReceiverTimeoutSeconds: 15, + config := Config{ + ApmServerUrl: apmServer.URL, + ApmServerSecretToken: "foo", + ApmServerApiKey: "bar", + DataReceiverServerPort: ":1234", + DataReceiverTimeoutSeconds: 15, } transport := InitApmServerTransport(&config) @@ -145,11 +145,11 @@ func Test_handleInfoRequest(t *testing.T) { ` // Create extension config - config := extensionConfig{ - apmServerSecretToken: "foo", - apmServerApiKey: "bar", - dataReceiverServerPort: ":1234", - dataReceiverTimeoutSeconds: 15, + config := Config{ + ApmServerSecretToken: "foo", + ApmServerApiKey: "bar", + DataReceiverServerPort: ":1234", + DataReceiverTimeoutSeconds: 15, } transport := InitApmServerTransport(&config) @@ -191,7 +191,7 @@ func (errReader) Read(_ []byte) (int, error) { } func Test_handleInfoRequestInvalidBody(t *testing.T) { - transport := InitApmServerTransport(&extensionConfig{}) + transport := InitApmServerTransport(&Config{}) mux := http.NewServeMux() urlPath := "/intake/v2/events" mux.HandleFunc(urlPath, handleIntakeV2Events(transport)) @@ -211,10 +211,10 @@ func Test_handleIntakeV2EventsQueryParam(t *testing.T) { defer apmServer.Close() // Create extension config and start the server - config := extensionConfig{ - apmServerUrl: apmServer.URL, - dataReceiverServerPort: ":1234", - dataReceiverTimeoutSeconds: 15, + config := Config{ + ApmServerUrl: apmServer.URL, + DataReceiverServerPort: ":1234", + DataReceiverTimeoutSeconds: 15, } transport := InitApmServerTransport(&config) transport.AgentDoneSignal = make(chan struct{}, 1) @@ -264,10 +264,10 @@ func Test_handleIntakeV2EventsNoQueryParam(t *testing.T) { defer apmServer.Close() // Create extension config and start the server - config := extensionConfig{ - apmServerUrl: apmServer.URL, - dataReceiverServerPort: ":1234", - dataReceiverTimeoutSeconds: 15, + config := Config{ + ApmServerUrl: apmServer.URL, + DataReceiverServerPort: ":1234", + DataReceiverTimeoutSeconds: 15, } transport := InitApmServerTransport(&config) transport.AgentDoneSignal = make(chan struct{}, 1) @@ -307,10 +307,10 @@ func Test_handleIntakeV2EventsQueryParamEmptyData(t *testing.T) { defer apmServer.Close() // Create extension config and start the server - config := extensionConfig{ - apmServerUrl: apmServer.URL, - dataReceiverServerPort: ":1234", - dataReceiverTimeoutSeconds: 15, + config := Config{ + ApmServerUrl: apmServer.URL, + DataReceiverServerPort: ":1234", + DataReceiverTimeoutSeconds: 15, } transport := InitApmServerTransport(&config) transport.AgentDoneSignal = make(chan struct{}, 1) diff --git a/apm-lambda-extension/extension/logger_test.go b/apm-lambda-extension/extension/logger_test.go index d869c808..05d5cedd 100644 --- a/apm-lambda-extension/extension/logger_test.go +++ b/apm-lambda-extension/extension/logger_test.go @@ -18,12 +18,13 @@ package extension import ( - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "go.uber.org/zap/zapcore" "io/ioutil" "os" "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap/zapcore" ) func init() { diff --git a/apm-lambda-extension/extension/process_env.go b/apm-lambda-extension/extension/process_env.go index c7e5485e..c8c1b3e2 100644 --- a/apm-lambda-extension/extension/process_env.go +++ b/apm-lambda-extension/extension/process_env.go @@ -25,13 +25,13 @@ import ( "strings" ) -type extensionConfig struct { - apmServerUrl string - apmServerSecretToken string - apmServerApiKey string - dataReceiverServerPort string +type Config struct { + ApmServerUrl string + ApmServerSecretToken string + ApmServerApiKey string + DataReceiverServerPort string SendStrategy SendStrategy - dataReceiverTimeoutSeconds int + DataReceiverTimeoutSeconds int DataForwarderTimeoutSeconds int LogLevel zapcore.Level } @@ -63,7 +63,7 @@ func getIntFromEnv(name string) (int, error) { } // ProcessEnv extracts ENV variables into globals -func ProcessEnv() *extensionConfig { +func ProcessEnv() *Config { dataReceiverTimeoutSeconds, err := getIntFromEnv("ELASTIC_APM_DATA_RECEIVER_TIMEOUT_SECONDS") if err != nil { dataReceiverTimeoutSeconds = defaultDataReceiverTimeoutSeconds @@ -95,24 +95,24 @@ func ProcessEnv() *extensionConfig { normalizedSendStrategy = Background } - config := &extensionConfig{ - apmServerUrl: normalizedApmLambdaServer, - apmServerSecretToken: os.Getenv("ELASTIC_APM_SECRET_TOKEN"), - apmServerApiKey: os.Getenv("ELASTIC_APM_API_KEY"), - dataReceiverServerPort: fmt.Sprintf(":%s", os.Getenv("ELASTIC_APM_DATA_RECEIVER_SERVER_PORT")), + config := &Config{ + ApmServerUrl: normalizedApmLambdaServer, + ApmServerSecretToken: os.Getenv("ELASTIC_APM_SECRET_TOKEN"), + ApmServerApiKey: os.Getenv("ELASTIC_APM_API_KEY"), + DataReceiverServerPort: fmt.Sprintf(":%s", os.Getenv("ELASTIC_APM_DATA_RECEIVER_SERVER_PORT")), SendStrategy: normalizedSendStrategy, - dataReceiverTimeoutSeconds: dataReceiverTimeoutSeconds, + DataReceiverTimeoutSeconds: dataReceiverTimeoutSeconds, DataForwarderTimeoutSeconds: dataForwarderTimeoutSeconds, LogLevel: logLevel, } - if config.dataReceiverServerPort == ":" { - config.dataReceiverServerPort = ":8200" + if config.DataReceiverServerPort == ":" { + config.DataReceiverServerPort = ":8200" } - if config.apmServerUrl == "" { + if config.ApmServerUrl == "" { Log.Fatal("please set ELASTIC_APM_LAMBDA_APM_SERVER, exiting") } - if config.apmServerSecretToken == "" && config.apmServerApiKey == "" { + if config.ApmServerSecretToken == "" && config.ApmServerApiKey == "" { Log.Warn("ELASTIC_APM_SECRET_TOKEN or ELASTIC_APM_API_KEY not specified") } diff --git a/apm-lambda-extension/extension/process_env_test.go b/apm-lambda-extension/extension/process_env_test.go index 74fec439..8b6d9939 100644 --- a/apm-lambda-extension/extension/process_env_test.go +++ b/apm-lambda-extension/extension/process_env_test.go @@ -35,8 +35,8 @@ func TestProcessEnv(t *testing.T) { config := ProcessEnv() t.Logf("%v", config) - if config.apmServerUrl != "bar.example.com/" { - t.Logf("Endpoint not set correctly: %s", config.apmServerUrl) + if config.ApmServerUrl != "bar.example.com/" { + t.Logf("Endpoint not set correctly: %s", config.ApmServerUrl) t.Fail() } @@ -53,22 +53,22 @@ func TestProcessEnv(t *testing.T) { t.Logf("%v", config) // config normalizes string to ensure it ends in a `/` - if config.apmServerUrl != "foo.example.com/" { - t.Logf("Endpoint not set correctly: %s", config.apmServerUrl) + if config.ApmServerUrl != "foo.example.com/" { + t.Logf("Endpoint not set correctly: %s", config.ApmServerUrl) t.Fail() } - if config.apmServerSecretToken != "bar" { + if config.ApmServerSecretToken != "bar" { t.Log("Secret Token not set correctly") t.Fail() } - if config.dataReceiverServerPort != ":8200" { + if config.DataReceiverServerPort != ":8200" { t.Log("Default port not set correctly") t.Fail() } - if config.dataReceiverTimeoutSeconds != 15 { + if config.DataReceiverTimeoutSeconds != 15 { t.Log("Default timeout not set correctly") t.Fail() } @@ -83,7 +83,7 @@ func TestProcessEnv(t *testing.T) { return } config = ProcessEnv() - if config.dataReceiverServerPort != ":8201" { + if config.DataReceiverServerPort != ":8201" { t.Log("Env port not set correctly") t.Fail() } @@ -93,7 +93,7 @@ func TestProcessEnv(t *testing.T) { return } config = ProcessEnv() - if config.dataReceiverTimeoutSeconds != 10 { + if config.DataReceiverTimeoutSeconds != 10 { t.Log("APM data receiver timeout not set correctly") t.Fail() } @@ -103,7 +103,7 @@ func TestProcessEnv(t *testing.T) { return } config = ProcessEnv() - if config.dataReceiverTimeoutSeconds != 15 { + if config.DataReceiverTimeoutSeconds != 15 { t.Log("APM data receiver timeout not set correctly") t.Fail() } @@ -133,7 +133,7 @@ func TestProcessEnv(t *testing.T) { return } config = ProcessEnv() - if config.apmServerApiKey != "foo" { + if config.ApmServerApiKey != "foo" { t.Log("API Key not set correctly") t.Fail() } diff --git a/apm-lambda-extension/extension/process_metadata.go b/apm-lambda-extension/extension/process_metadata.go new file mode 100644 index 00000000..27f757a4 --- /dev/null +++ b/apm-lambda-extension/extension/process_metadata.go @@ -0,0 +1,88 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package extension + +import ( + "bytes" + "compress/gzip" + "compress/zlib" + "encoding/json" + "fmt" + "io" + "io/ioutil" + + "elastic/apm-lambda-extension/model" + + "github.com/pkg/errors" +) + +type MetadataContainer struct { + Metadata *model.Metadata `json:"metadata"` +} + +func ProcessMetadata(data AgentData, container *MetadataContainer) error { + uncompressedData, err := GetUncompressedBytes(data.Data, data.ContentEncoding) + if err != nil { + return errors.New(fmt.Sprintf("Error uncompressing agent data for metadata extraction : %v", err)) + } + decoder := json.NewDecoder(bytes.NewReader(uncompressedData)) + for { + err = decoder.Decode(container) + if container.Metadata != nil { + Log.Debug("Metadata decoded") + break + } + if err != nil { + if err == io.EOF { + return errors.New("No metadata in current agent transaction") + } else { + return errors.New(fmt.Sprintf("Error uncompressing agent data for metadata extraction : %v", err)) + } + } + } + return nil +} + +func GetUncompressedBytes(rawBytes []byte, encodingType string) ([]byte, error) { + switch encodingType { + case "deflate": + reader := bytes.NewReader([]byte(rawBytes)) + zlibreader, err := zlib.NewReader(reader) + if err != nil { + return nil, fmt.Errorf("could not create zlib.NewReader: %v", err) + } + bodyBytes, err := ioutil.ReadAll(zlibreader) + if err != nil { + return nil, fmt.Errorf("could not read from zlib reader using ioutil.ReadAll: %v", err) + } + return bodyBytes, nil + case "gzip": + reader := bytes.NewReader([]byte(rawBytes)) + zlibreader, err := gzip.NewReader(reader) + if err != nil { + return nil, fmt.Errorf("could not create gzip.NewReader: %v", err) + } + bodyBytes, err := ioutil.ReadAll(zlibreader) + if err != nil { + return nil, fmt.Errorf("could not read from gzip reader using ioutil.ReadAll: %v", err) + } + return bodyBytes, nil + default: + return rawBytes, nil + } +} diff --git a/apm-lambda-extension/extension/process_metadata_test.go b/apm-lambda-extension/extension/process_metadata_test.go new file mode 100644 index 00000000..75188c0c --- /dev/null +++ b/apm-lambda-extension/extension/process_metadata_test.go @@ -0,0 +1,70 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package extension + +import ( + "encoding/json" + "testing" + + "gotest.tools/assert" +) + +func Test_processMetadata(t *testing.T) { + + // Copied from https://github.com/elastic/apm-server/blob/master/testdata/intake-v2/transactions.ndjson. + benchBody := []byte(`{"metadata": {"service": {"name": "1234_service-12a3","node": {"configured_name": "node-123"},"version": "5.1.3","environment": "staging","language": {"name": "ecmascript","version": "8"},"runtime": {"name": "node","version": "8.0.0"},"framework": {"name": "Express","version": "1.2.3"},"agent": {"name": "elastic-node","version": "3.14.0"}},"user": {"id": "123user", "username": "bar", "email": "bar@user.com"}, "labels": {"tag0": null, "tag1": "one", "tag2": 2}, "process": {"pid": 1234,"ppid": 6789,"title": "node","argv": ["node","server.js"]},"system": {"hostname": "prod1.example.com","architecture": "x64","platform": "darwin", "container": {"id": "container-id"}, "kubernetes": {"namespace": "namespace1", "pod": {"uid": "pod-uid", "name": "pod-name"}, "node": {"name": "node-name"}}},"cloud":{"account":{"id":"account_id","name":"account_name"},"availability_zone":"cloud_availability_zone","instance":{"id":"instance_id","name":"instance_name"},"machine":{"type":"machine_type"},"project":{"id":"project_id","name":"project_name"},"provider":"cloud_provider","region":"cloud_region","service":{"name":"lambda"}}}} +{"transaction": { "id": "945254c567a5417e", "trace_id": "0123456789abcdef0123456789abcdef", "parent_id": "abcdefabcdef01234567", "type": "request", "duration": 32.592981, "span_count": { "started": 43 }}} +{"transaction": {"id": "4340a8e0df1906ecbfa9", "trace_id": "0acd456789abcdef0123456789abcdef", "name": "GET /api/types","type": "request","duration": 32.592981,"outcome":"success", "result": "success", "timestamp": 1496170407154000, "sampled": true, "span_count": {"started": 17},"context": {"service": {"runtime": {"version": "7.0"}},"page":{"referer":"http://localhost:8000/test/e2e/","url":"http://localhost:8000/test/e2e/general-usecase/"}, "request": {"socket": {"remote_address": "12.53.12.1","encrypted": true},"http_version": "1.1","method": "POST","url": {"protocol": "https:","full": "https://www.example.com/p/a/t/h?query=string#hash","hostname": "www.example.com","port": "8080","pathname": "/p/a/t/h","search": "?query=string","hash": "#hash","raw": "/p/a/t/h?query=string#hash"},"headers": {"user-agent":["Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/51.0.2704.103 Safari/537.36","Mozilla Chrome Edge"],"content-type": "text/html","cookie": "c1=v1, c2=v2","some-other-header": "foo","array": ["foo","bar","baz"]},"cookies": {"c1": "v1","c2": "v2"},"env": {"SERVER_SOFTWARE": "nginx","GATEWAY_INTERFACE": "CGI/1.1"},"body": {"str": "hello world","additional": { "foo": {},"bar": 123,"req": "additional information"}}},"response": {"status_code": 200,"headers": {"content-type": "application/json"},"headers_sent": true,"finished": true,"transfer_size":25.8,"encoded_body_size":26.90,"decoded_body_size":29.90}, "user": {"domain": "ldap://abc","id": "99","username": "foo"},"tags": {"organization_uuid": "9f0e9d64-c185-4d21-a6f4-4673ed561ec8", "tag2": 12, "tag3": 12.45, "tag4": false, "tag5": null },"custom": {"my_key": 1,"some_other_value": "foo bar","and_objects": {"foo": ["bar","baz"]},"(": "not a valid regex and that is fine"}}}} +{"transaction": { "id": "cdef4340a8e0df19", "trace_id": "0acd456789abcdef0123456789abcdef", "type": "request", "duration": 13.980558, "timestamp": 1532976822281000, "sampled": null, "span_count": { "dropped": 55, "started": 436 }, "marks": {"navigationTiming": {"appBeforeBootstrap": 608.9300000000001,"navigationStart": -21},"another_mark": {"some_long": 10,"some_float": 10.0}, "performance": {}}, "context": { "request": { "socket": { "remote_address": "192.0.1", "encrypted": null }, "method": "POST", "headers": { "user-agent": null, "content-type": null, "cookie": null }, "url": { "protocol": null, "full": null, "hostname": null, "port": null, "pathname": null, "search": null, "hash": null, "raw": null } }, "response": { "headers": { "content-type": null } }, "service": {"environment":"testing","name": "service1","node": {"configured_name": "node-ABC"}, "language": {"version": "2.5", "name": "ruby"}, "agent": {"version": "2.2", "name": "elastic-ruby", "ephemeral_id": "justanid"}, "framework": {"version": "5.0", "name": "Rails"}, "version": "2", "runtime": {"version": "2.5", "name": "cruby"}}},"experience":{"cls":1,"fid":2.0,"tbt":3.4,"longtask":{"count":3,"sum":2.5,"max":1}}}} +{"transaction": { "id": "00xxxxFFaaaa1234", "trace_id": "0123456789abcdef0123456789abcdef", "name": "amqp receive", "parent_id": "abcdefabcdef01234567", "type": "messaging", "duration": 3, "span_count": { "started": 1 }, "context": {"message": {"queue": { "name": "new_users"}, "age":{ "ms": 1577958057123}, "headers": {"user_id": "1ax3", "involved_services": ["user", "auth"]}, "body": "user created", "routing_key": "user-created-transaction"}},"session":{"id":"sunday","sequence":123}}} +{"transaction": { "name": "july-2021-delete-after-july-31", "type": "lambda", "result": "success", "id": "142e61450efb8574", "trace_id": "eb56529a1f461c5e7e2f66ecb075e983", "subtype": null, "action": null, "duration": 38.853, "timestamp": 1631736666365048, "sampled": true, "context": { "cloud": { "origin": { "account": { "id": "abc123" }, "provider": "aws", "region": "us-east-1", "service": { "name": "serviceName" } } }, "service": { "origin": { "id": "abc123", "name": "service-name", "version": "1.0" } }, "user": {}, "tags": {}, "custom": { } }, "sync": true, "span_count": { "started": 0 }, "outcome": "unknown", "faas": { "coldstart": false, "execution": "2e13b309-23e1-417f-8bf7-074fc96bc683", "trigger": { "request_id": "FuH2Cir_vHcEMUA=", "type": "http" } }, "sample_rate": 1 } } +`) + + badBenchBody := []byte(`{"metadata": {"service": {"name": "BAD","node": {"configured_name": "node-123"},"version": "5.1.3","environment": "staging","language": {"name": "ecmascript","version": "8"},"runtime": {"name": "node","version": "8.0.0"},"framework": {"name": "Express","version": "1.2.3"},"agent": {"name": "elastic-node","version": "3.14.0"}},"user": {"id": "123user", "username": "bar", "email": "bar@user.com"}, "labels": {"tag0": null, "tag1": "one", "tag2": 2}, "process": {"pid": 1234,"ppid": 6789,"title": "node","argv": ["node","server.js"]},"system": {"hostname": "prod1.example.com","architecture": "x64","platform": "darwin", "container": {"id": "container-id"}, "kubernetes": {"namespace": "namespace1", "pod": {"uid": "pod-uid", "name": "pod-name"}, "node": {"name": "node-name"}}},"cloud":{"account":{"id":"account_id","name":"account_name"},"availability_zone":"cloud_availability_zone","instance":{"id":"instance_id","name":"instance_name"},"machine":{"type":"machine_type"},"project":{"id":"project_id","name":"project_name"},"provider":"cloud_provider","region":"cloud_region","service":{"name":"lambda"}}}} +{"transaction": { "id": "945254c567a5417e", "trace_id": "0123456789abcdef0123456789abcdef", "parent_id": "abcdefabcdef01234567", "type": "request", "duration": 32.592981, "span_count": { "started": 43 }}} +{"transaction": {"id": "4340a8e0df1906ecbfa9", "trace_id": "0acd456789abcdef0123456789abcdef", "name": "GET /api/types","type": "request","duration": 32.592981,"outcome":"success", "result": "success", "timestamp": 1496170407154000, "sampled": true, "span_count": {"started": 17},"context": {"service": {"runtime": {"version": "7.0"}},"page":{"referer":"http://localhost:8000/test/e2e/","url":"http://localhost:8000/test/e2e/general-usecase/"}, "request": {"socket": {"remote_address": "12.53.12.1","encrypted": true},"http_version": "1.1","method": "POST","url": {"protocol": "https:","full": "https://www.example.com/p/a/t/h?query=string#hash","hostname": "www.example.com","port": "8080","pathname": "/p/a/t/h","search": "?query=string","hash": "#hash","raw": "/p/a/t/h?query=string#hash"},"headers": {"user-agent":["Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/51.0.2704.103 Safari/537.36","Mozilla Chrome Edge"],"content-type": "text/html","cookie": "c1=v1, c2=v2","some-other-header": "foo","array": ["foo","bar","baz"]},"cookies": {"c1": "v1","c2": "v2"},"env": {"SERVER_SOFTWARE": "nginx","GATEWAY_INTERFACE": "CGI/1.1"},"body": {"str": "hello world","additional": { "foo": {},"bar": 123,"req": "additional information"}}},"response": {"status_code": 200,"headers": {"content-type": "application/json"},"headers_sent": true,"finished": true,"transfer_size":25.8,"encoded_body_size":26.90,"decoded_body_size":29.90}, "user": {"domain": "ldap://abc","id": "99","username": "foo"},"tags": {"organization_uuid": "9f0e9d64-c185-4d21-a6f4-4673ed561ec8", "tag2": 12, "tag3": 12.45, "tag4": false, "tag5": null },"custom": {"my_key": 1,"some_other_value": "foo bar","and_objects": {"foo": ["bar","baz"]},"(": "not a valid regex and that is fine"}}}} +{"transaction": { "id": "cdef4340a8e0df19", "trace_id": "0acd456789abcdef0123456789abcdef", "type": "request", "duration": 13.980558, "timestamp": 1532976822281000, "sampled": null, "span_count": { "dropped": 55, "started": 436 }, "marks": {"navigationTiming": {"appBeforeBootstrap": 608.9300000000001,"navigationStart": -21},"another_mark": {"some_long": 10,"some_float": 10.0}, "performance": {}}, "context": { "request": { "socket": { "remote_address": "192.0.1", "encrypted": null }, "method": "POST", "headers": { "user-agent": null, "content-type": null, "cookie": null }, "url": { "protocol": null, "full": null, "hostname": null, "port": null, "pathname": null, "search": null, "hash": null, "raw": null } }, "response": { "headers": { "content-type": null } }, "service": {"environment":"testing","name": "service1","node": {"configured_name": "node-ABC"}, "language": {"version": "2.5", "name": "ruby"}, "agent": {"version": "2.2", "name": "elastic-ruby", "ephemeral_id": "justanid"}, "framework": {"version": "5.0", "name": "Rails"}, "version": "2", "runtime": {"version": "2.5", "name": "cruby"}}},"experience":{"cls":1,"fid":2.0,"tbt":3.4,"longtask":{"count":3,"sum":2.5,"max":1}}}} +{"transaction": { "id": "00xxxxFFaaaa1234", "trace_id": "0123456789abcdef0123456789abcdef", "name": "amqp receive", "parent_id": "abcdefabcdef01234567", "type": "messaging", "duration": 3, "span_count": { "started": 1 }, "context": {"message": {"queue": { "name": "new_users"}, "age":{ "ms": 1577958057123}, "headers": {"user_id": "1ax3", "involved_services": ["user", "auth"]}, "body": "user created", "routing_key": "user-created-transaction"}},"session":{"id":"sunday","sequence":123}}} +{"transaction": { "name": "july-2021-delete-after-july-31", "type": "lambda", "result": "success", "id": "142e61450efb8574", "trace_id": "eb56529a1f461c5e7e2f66ecb075e983", "subtype": null, "action": null, "duration": 38.853, "timestamp": 1631736666365048, "sampled": true, "context": { "cloud": { "origin": { "account": { "id": "abc123" }, "provider": "aws", "region": "us-east-1", "service": { "name": "serviceName" } } }, "service": { "origin": { "id": "abc123", "name": "service-name", "version": "1.0" } }, "user": {}, "tags": {}, "custom": { } }, "sync": true, "span_count": { "started": 0 }, "outcome": "unknown", "faas": { "coldstart": false, "execution": "2e13b309-23e1-417f-8bf7-074fc96bc683", "trigger": { "request_id": "FuH2Cir_vHcEMUA=", "type": "http" } }, "sample_rate": 1 } } +`) + + agentData := AgentData{Data: benchBody, ContentEncoding: ""} + badAgentData := AgentData{Data: badBenchBody, ContentEncoding: ""} + var mc MetadataContainer + + if mc.Metadata == nil { + if err := ProcessMetadata(agentData, &mc); err != nil { + t.Fail() + } + } + + if mc.Metadata == nil { + if err := ProcessMetadata(badAgentData, &mc); err != nil { + t.Fail() + } + } + + // Metadata is extracted as is. The agent name and version are replaced by values related to the extension in + // logsapi.ProcessMetrics + desiredMetadata := []byte(`{"metadata":{"service":{"name":"1234_service-12a3","version":"5.1.3","environment":"staging","agent":{"name":"elastic-node","version":"3.14.0"},"framework":{"name":"Express","version":"1.2.3"},"language":{"name":"ecmascript","version":"8"},"runtime":{"name":"node","version":"8.0.0"},"node":{"configured_name":"node-123"}},"user":{"username":"bar","id":"123user","email":"bar@user.com"},"labels":{"tag0":null,"tag1":"one","tag2":2},"process":{"pid":1234,"ppid":6789,"title":"node","argv":["node","server.js"]},"system":{"architecture":"x64","hostname":"prod1.example.com","platform":"darwin","container":{"id":"container-id"},"kubernetes":{"namespace":"namespace1","node":{"name":"node-name"},"pod":{"name":"pod-name","uid":"pod-uid"}}},"cloud":{"provider":"cloud_provider","region":"cloud_region","availability_zone":"cloud_availability_zone","instance":{"id":"instance_id","name":"instance_name"},"machine":{"type":"machine_type"},"account":{"id":"account_id","name":"account_name"},"project":{"id":"project_id","name":"project_name"},"service":{"name":"lambda"}}}}`) + extractedMetadata, err := json.Marshal(mc) + if err != nil { + Log.Errorf("Could not marshal extracted metadata : %v", err) + } + assert.DeepEqual(t, desiredMetadata, extractedMetadata) +} diff --git a/apm-lambda-extension/extension/route_handlers.go b/apm-lambda-extension/extension/route_handlers.go index ab7d9f0f..f71eb66c 100644 --- a/apm-lambda-extension/extension/route_handlers.go +++ b/apm-lambda-extension/extension/route_handlers.go @@ -38,7 +38,7 @@ func handleInfoRequest(ctx context.Context, apmServerTransport *ApmServerTranspo Log.Debug("Handling APM server Info Request") // Init reverse proxy - parsedApmServerUrl, err := url.Parse(apmServerTransport.config.apmServerUrl) + parsedApmServerUrl, err := url.Parse(apmServerTransport.config.ApmServerUrl) if err != nil { Log.Errorf("could not parse APM server URL: %v", err) return diff --git a/apm-lambda-extension/extension/version.go b/apm-lambda-extension/extension/version.go new file mode 100644 index 00000000..805955fb --- /dev/null +++ b/apm-lambda-extension/extension/version.go @@ -0,0 +1,22 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package extension + +const ( + Version = "0.0.3" +) diff --git a/apm-lambda-extension/logsapi/client.go b/apm-lambda-extension/logsapi/client.go index f8dd490c..093872c0 100644 --- a/apm-lambda-extension/logsapi/client.go +++ b/apm-lambda-extension/logsapi/client.go @@ -62,6 +62,8 @@ const ( // RuntimeDone event is sent when lambda function is finished it's execution RuntimeDone SubEventType = "platform.runtimeDone" Fault SubEventType = "platform.fault" + Report SubEventType = "platform.report" + Start SubEventType = "platform.start" ) // BufferingCfg is the configuration set for receiving logs from Logs API. Whichever of the conditions below is met first, the logs will be sent diff --git a/apm-lambda-extension/logsapi/process_metrics.go b/apm-lambda-extension/logsapi/process_metrics.go new file mode 100644 index 00000000..fcaae746 --- /dev/null +++ b/apm-lambda-extension/logsapi/process_metrics.go @@ -0,0 +1,108 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package logsapi + +import ( + "context" + "encoding/json" + "math" + + "elastic/apm-lambda-extension/extension" + "elastic/apm-lambda-extension/model" +) + +type PlatformMetrics struct { + DurationMs float32 `json:"durationMs"` + BilledDurationMs int32 `json:"billedDurationMs"` + MemorySizeMB int32 `json:"memorySizeMB"` + MaxMemoryUsedMB int32 `json:"maxMemoryUsedMB"` + InitDurationMs float32 `json:"initDurationMs"` +} + +type MetricsContainer struct { + Metrics *model.Metrics `json:"metricset"` +} + +// Add adds a metric with the given name, labels, and value, +// The labels are expected to be sorted lexicographically. +func (mc MetricsContainer) Add(name string, labels []model.MetricLabel, value float64) { + mc.addMetric(name, model.Metric{Value: value}) +} + +// Simplified version of https://github.com/elastic/apm-agent-go/blob/675e8398c7fe546f9fd169bef971b9ccfbcdc71f/metrics.go#L89 +func (mc MetricsContainer) addMetric(name string, metric model.Metric) { + + if mc.Metrics.Samples == nil { + mc.Metrics.Samples = make(map[string]model.Metric) + } + mc.Metrics.Samples[name] = metric +} + +func ProcessPlatformReport(ctx context.Context, apmServerTransport *extension.ApmServerTransport, metadataContainer *extension.MetadataContainer, functionData *extension.NextEventResponse, platformReport LogEvent) error { + var metricsData []byte + metricsContainer := MetricsContainer{ + Metrics: &model.Metrics{}, + } + convMB2Bytes := float64(1024 * 1024) + platformReportMetrics := platformReport.Record.Metrics + + // APM Spec Fields + // Timestamp + metricsContainer.Metrics.Timestamp = platformReport.Time.UnixMicro() + // FaaS Fields + metricsContainer.Metrics.Labels = make(map[string]interface{}) + metricsContainer.Metrics.Labels["faas.execution"] = platformReport.Record.RequestId + metricsContainer.Metrics.Labels["faas.id"] = functionData.InvokedFunctionArn + if platformReportMetrics.InitDurationMs > 0 { + metricsContainer.Metrics.Labels["faas.coldstart"] = true + } else { + metricsContainer.Metrics.Labels["faas.coldstart"] = false + } + // System + metricsContainer.Add("system.memory.total", nil, float64(platformReportMetrics.MemorySizeMB)*convMB2Bytes) // Unit : Bytes + metricsContainer.Add("system.memory.actual.free", nil, float64(platformReportMetrics.MemorySizeMB-platformReportMetrics.MaxMemoryUsedMB)*convMB2Bytes) // Unit : Bytes + + // Raw Metrics + // AWS uses binary multiples to compute memory : https://aws.amazon.com/about-aws/whats-new/2020/12/aws-lambda-supports-10gb-memory-6-vcpu-cores-lambda-functions/ + metricsContainer.Add("aws.lambda.metrics.TotalMemory", nil, float64(platformReportMetrics.MemorySizeMB)*convMB2Bytes) // Unit : Bytes + metricsContainer.Add("aws.lambda.metrics.UsedMemory", nil, float64(platformReportMetrics.MaxMemoryUsedMB)*convMB2Bytes) // Unit : Bytes + metricsContainer.Add("aws.lambda.metrics.Duration", nil, float64(platformReportMetrics.DurationMs)) // Unit : Milliseconds + metricsContainer.Add("aws.lambda.metrics.BilledDuration", nil, float64(platformReportMetrics.BilledDurationMs)) // Unit : Milliseconds + metricsContainer.Add("aws.lambda.metrics.ColdStartDuration", nil, float64(platformReportMetrics.InitDurationMs)) // Unit : Milliseconds + metricsContainer.Add("aws.lambda.metrics.Timeout", nil, math.Ceil(float64(functionData.DeadlineMs-functionData.Timestamp.UnixMilli())/1e3)*1e3) // Unit : Milliseconds + + metricsJson, err := json.Marshal(metricsContainer) + if err != nil { + return err + } + + if metadataContainer.Metadata != nil { + //TODO : Discuss relevance of displaying extension name + metadataContainer.Metadata.Service.Agent.Name = "apm-lambda-extension" + metadataContainer.Metadata.Service.Agent.Version = extension.Version + metadataJson, err := json.Marshal(metadataContainer) + if err != nil { + return err + } + metricsData = append(metadataJson, []byte("\n")...) + } + + metricsData = append(metricsData, metricsJson...) + apmServerTransport.EnqueueAPMData(extension.AgentData{Data: metricsData}) + return nil +} diff --git a/apm-lambda-extension/logsapi/process_metrics_test.go b/apm-lambda-extension/logsapi/process_metrics_test.go new file mode 100644 index 00000000..64767702 --- /dev/null +++ b/apm-lambda-extension/logsapi/process_metrics_test.go @@ -0,0 +1,119 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package logsapi + +import ( + "context" + "fmt" + "io/ioutil" + "net/http" + "net/http/httptest" + "os" + "testing" + "time" + + "elastic/apm-lambda-extension/extension" + "elastic/apm-lambda-extension/model" + + "gotest.tools/assert" +) + +func Test_processPlatformReport(t *testing.T) { + + timestamp := time.Now() + + pm := PlatformMetrics{ + DurationMs: 182.43, + BilledDurationMs: 183, + MemorySizeMB: 128, + MaxMemoryUsedMB: 76, + InitDurationMs: 422.97, + } + + logEventRecord := LogEventRecord{ + RequestId: "6f7f0961f83442118a7af6fe80b88d56", + Status: "Available", + Metrics: pm, + } + + logEvent := LogEvent{ + Time: timestamp, + Type: "platform.report", + StringRecord: "", + Record: logEventRecord, + } + + event := extension.NextEventResponse{ + Timestamp: timestamp, + EventType: extension.Invoke, + DeadlineMs: timestamp.UnixNano()/1e6 + 4584, // Milliseconds + RequestID: "8476a536-e9f4-11e8-9739-2dfe598c3fcd", + InvokedFunctionArn: "arn:aws:lambda:us-east-2:123456789012:function:custom-runtime", + Tracing: extension.Tracing{ + Type: "None", + Value: "None", + }, + } + + desiredOutput := fmt.Sprintf(`{"metadata":{"service":{"agent":{"name":"apm-lambda-extension","version":"%s"},"framework":{"name":"AWS Lambda","version":""},"language":{"name":"python","version":"3.9.8"},"runtime":{"name":"","version":""},"node":{}},"user":{},"process":{"pid":0},"system":{"container":{"id":""},"kubernetes":{"node":{},"pod":{}}},"cloud":{"provider":"","instance":{},"machine":{},"account":{},"project":{},"service":{}}}} +{"metricset":{"timestamp":%d,"transaction":{},"span":{},"tags":{"faas.coldstart":true,"faas.execution":"6f7f0961f83442118a7af6fe80b88d56","faas.id":"arn:aws:lambda:us-east-2:123456789012:function:custom-runtime"},"samples":{"aws.lambda.metrics.BilledDuration":{"value":183},"aws.lambda.metrics.ColdStartDuration":{"value":422.9700012207031},"aws.lambda.metrics.Duration":{"value":182.42999267578125},"aws.lambda.metrics.Timeout":{"value":5000},"aws.lambda.metrics.TotalMemory":{"value":134217728},"aws.lambda.metrics.UsedMemory":{"value":79691776},"system.memory.actual.free":{"value":54525952},"system.memory.total":{"value":134217728}}}}`, extension.Version, timestamp.UnixNano()/1e3) + + apmServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var rawBytes []byte + if r.Body != nil { + rawBytes, _ = ioutil.ReadAll(r.Body) + } + requestBytes, err := extension.GetUncompressedBytes(rawBytes, r.Header.Get("Content-Encoding")) + if err != nil { + extension.Log.Error(err) + t.Fail() + } + assert.Equal(t, string(requestBytes), desiredOutput) + assert.Equal(t, "gzip", r.Header.Get("Content-Encoding")) + _, err = w.Write([]byte(`{"foo": "bar"}`)) + if err != nil { + extension.Log.Error(err) + t.Fail() + } + })) + defer apmServer.Close() + + config := extension.Config{ + ApmServerUrl: apmServer.URL + "/", + } + + apmServerTransport := extension.InitApmServerTransport(&config) + + mc := extension.MetadataContainer{ + Metadata: &model.Metadata{}, + } + mc.Metadata.Service = model.Service{ + Name: os.Getenv("AWS_LAMBDA_FUNCTION_NAME"), + Agent: model.Agent{Name: "python", Version: "6.7.2"}, + Language: model.Language{Name: "python", Version: "3.9.8"}, + Runtime: model.Runtime{Name: os.Getenv("AWS_EXECUTION_ENV")}, + Framework: model.Framework{Name: "AWS Lambda"}, + } + mc.Metadata.Process = model.Process{} + mc.Metadata.System = model.System{} + + if err := ProcessPlatformReport(context.Background(), apmServerTransport, &mc, &event, logEvent); err != nil { + t.Fail() + } + apmServerTransport.FlushAPMData(context.Background()) +} diff --git a/apm-lambda-extension/logsapi/route_handlers.go b/apm-lambda-extension/logsapi/route_handlers.go index 3da4c316..7606f654 100644 --- a/apm-lambda-extension/logsapi/route_handlers.go +++ b/apm-lambda-extension/logsapi/route_handlers.go @@ -18,10 +18,11 @@ package logsapi import ( - "elastic/apm-lambda-extension/extension" "encoding/json" "net/http" "time" + + "elastic/apm-lambda-extension/extension" ) func handleLogEventsRequest(transport *LogsTransport) func(w http.ResponseWriter, r *http.Request) { diff --git a/apm-lambda-extension/logsapi/route_handlers_test.go b/apm-lambda-extension/logsapi/route_handlers_test.go index 9bb2f6e3..79abd17e 100644 --- a/apm-lambda-extension/logsapi/route_handlers_test.go +++ b/apm-lambda-extension/logsapi/route_handlers_test.go @@ -32,11 +32,11 @@ func TestLogEventUnmarshalReport(t *testing.T) { "time": "2020-08-20T12:31:32.123Z", "type": "platform.report", "record": {"requestId": "6f7f0961f83442118a7af6fe80b88d56", - "metrics": {"durationMs": 101.51, - "billedDurationMs": 300, - "memorySizeMB": 512, - "maxMemoryUsedMB": 33, - "initDurationMs": 116.67 + "metrics": {"durationMs": 182.43, + "billedDurationMs": 183, + "memorySizeMB": 128, + "maxMemoryUsedMB": 76, + "initDurationMs": 422.97 } } }`) @@ -48,6 +48,13 @@ func TestLogEventUnmarshalReport(t *testing.T) { rec := LogEventRecord{ RequestId: "6f7f0961f83442118a7af6fe80b88d56", Status: "", // no status was given in sample json + Metrics: PlatformMetrics{ + DurationMs: 182.43, + BilledDurationMs: 183, + MemorySizeMB: 128, + MaxMemoryUsedMB: 76, + InitDurationMs: 422.97, + }, } assert.Equal(t, rec, le.Record) diff --git a/apm-lambda-extension/logsapi/subscribe.go b/apm-lambda-extension/logsapi/subscribe.go index 3d844bd3..00d144c8 100644 --- a/apm-lambda-extension/logsapi/subscribe.go +++ b/apm-lambda-extension/logsapi/subscribe.go @@ -19,13 +19,14 @@ package logsapi import ( "context" - "elastic/apm-lambda-extension/extension" "fmt" "net" "net/http" "os" "time" + "elastic/apm-lambda-extension/extension" + "github.com/pkg/errors" ) @@ -57,8 +58,9 @@ type LogEvent struct { // LogEventRecord is a sub-object in a Logs API event type LogEventRecord struct { - RequestId string `json:"requestId"` - Status string `json:"status"` + RequestId string `json:"requestId"` + Status string `json:"status"` + Metrics PlatformMetrics `json:"metrics"` } // Subscribes to the Logs API @@ -156,16 +158,25 @@ func checkLambdaFunction() bool { return false } -// WaitRuntimeDone consumes events until a RuntimeDone event corresponding +// ProcessLogs consumes events until a RuntimeDone event corresponding // to requestID is received, or ctx is cancelled, and then returns. -func WaitRuntimeDone(ctx context.Context, requestID string, transport *LogsTransport, runtimeDoneSignal chan struct{}) error { +func ProcessLogs( + ctx context.Context, + requestID string, + apmServerTransport *extension.ApmServerTransport, + logsTransport *LogsTransport, + metadataContainer *extension.MetadataContainer, + runtimeDoneSignal chan struct{}, + prevEvent *extension.NextEventResponse, +) error { for { select { - case logEvent := <-transport.logsChannel: + case logEvent := <-logsTransport.logsChannel: extension.Log.Debugf("Received log event %v", logEvent.Type) + switch logEvent.Type { // Check the logEvent for runtimeDone and compare the RequestID // to the id that came in via the Next API - if logEvent.Type == RuntimeDone { + case RuntimeDone: if logEvent.Record.RequestId == requestID { extension.Log.Info("Received runtimeDone event for this function invocation") runtimeDoneSignal <- struct{}{} @@ -173,6 +184,18 @@ func WaitRuntimeDone(ctx context.Context, requestID string, transport *LogsTrans } else { extension.Log.Debug("Log API runtimeDone event request id didn't match") } + // Check if the logEvent contains metrics and verify that they can be linked to the previous invocation + case Report: + if prevEvent != nil && logEvent.Record.RequestId == prevEvent.RequestID { + extension.Log.Debug("Received platform report for the previous function invocation") + err := ProcessPlatformReport(ctx, apmServerTransport, metadataContainer, prevEvent, logEvent) + if err != nil { + extension.Log.Errorf("Error processing Lambda platform metrics : %v", err) + } + } else { + extension.Log.Warn("report event request id didn't match the previous event id") + extension.Log.Debug("Log API runtimeDone event request id didn't match") + } } case <-ctx.Done(): extension.Log.Debug("Current invocation over. Interrupting logs processing goroutine") diff --git a/apm-lambda-extension/logsapi/subscribe_test.go b/apm-lambda-extension/logsapi/subscribe_test.go index df1494fe..1ab8b82c 100644 --- a/apm-lambda-extension/logsapi/subscribe_test.go +++ b/apm-lambda-extension/logsapi/subscribe_test.go @@ -59,7 +59,6 @@ func TestSubscribeWithLambdaFunction(t *testing.T) { } func TestSubscribeAWSRequest(t *testing.T) { - // For subscription request expectedTypes := []EventType{Platform} expectedBufferingCfg := BufferingCfg{ diff --git a/apm-lambda-extension/main.go b/apm-lambda-extension/main.go index 5e7d9f58..bbf82bc1 100644 --- a/apm-lambda-extension/main.go +++ b/apm-lambda-extension/main.go @@ -85,24 +85,40 @@ func main() { extension.Log.Warnf("Error while subscribing to the Logs API: %v", err) } + // The previous event id is used to validate the received Lambda metrics + var prevEvent *extension.NextEventResponse + // This data structure contains metadata tied to the current Lambda instance. If empty, it is populated once for each + // active Lambda environment + metadataContainer := extension.MetadataContainer{} + for { select { case <-ctx.Done(): return default: var backgroundDataSendWg sync.WaitGroup - processEvent(ctx, cancel, apmServerTransport, logsTransport, &backgroundDataSendWg) + event := processEvent(ctx, cancel, apmServerTransport, logsTransport, &backgroundDataSendWg, prevEvent, &metadataContainer) extension.Log.Debug("Waiting for background data send to end") backgroundDataSendWg.Wait() if config.SendStrategy == extension.SyncFlush { // Flush APM data now that the function invocation has completed apmServerTransport.FlushAPMData(ctx) } + prevEvent = event } } } -func processEvent(ctx context.Context, cancel context.CancelFunc, apmServerTransport *extension.ApmServerTransport, logsTransport *logsapi.LogsTransport, backgroundDataSendWg *sync.WaitGroup) { +func processEvent( + ctx context.Context, + cancel context.CancelFunc, + apmServerTransport *extension.ApmServerTransport, + logsTransport *logsapi.LogsTransport, + backgroundDataSendWg *sync.WaitGroup, + prevEvent *extension.NextEventResponse, + metadataContainer *extension.MetadataContainer, +) *extension.NextEventResponse { + // Invocation context invocationCtx, invocationCancel := context.WithCancel(ctx) defer invocationCancel() @@ -119,7 +135,7 @@ func processEvent(ctx context.Context, cancel context.CancelFunc, apmServerTrans extension.Log.Errorf("Error: %s", err) extension.Log.Infof("Exit signal sent to runtime : %s", status) extension.Log.Infof("Exiting") - return + return nil } extension.Log.Debug("Received event.") @@ -127,7 +143,7 @@ func processEvent(ctx context.Context, cancel context.CancelFunc, apmServerTrans if event.EventType == extension.Shutdown { cancel() - return + return event } // APM Data Processing @@ -136,17 +152,17 @@ func processEvent(ctx context.Context, cancel context.CancelFunc, apmServerTrans backgroundDataSendWg.Add(1) go func() { defer backgroundDataSendWg.Done() - if err := apmServerTransport.ForwardApmData(invocationCtx); err != nil { + if err := apmServerTransport.ForwardApmData(invocationCtx, metadataContainer); err != nil { extension.Log.Error(err) } }() - // Lambda Service Logs Processing + // Lambda Service Logs Processing, also used to extract metrics from APM logs // This goroutine should not be started if subscription failed runtimeDone := make(chan struct{}) if logsTransport != nil { go func() { - if err := logsapi.WaitRuntimeDone(invocationCtx, event.RequestID, logsTransport, runtimeDone); err != nil { + if err := logsapi.ProcessLogs(invocationCtx, event.RequestID, apmServerTransport, logsTransport, metadataContainer, runtimeDone, prevEvent); err != nil { extension.Log.Errorf("Error while processing Lambda Logs ; %v", err) } else { close(runtimeDone) @@ -179,4 +195,6 @@ func processEvent(ctx context.Context, cancel context.CancelFunc, apmServerTrans case <-timer.C: extension.Log.Info("Time expired waiting for agent signal or runtimeDone event") } + + return event } diff --git a/apm-lambda-extension/main_test.go b/apm-lambda-extension/main_test.go index 8355f9f3..934b2d54 100644 --- a/apm-lambda-extension/main_test.go +++ b/apm-lambda-extension/main_test.go @@ -20,12 +20,8 @@ package main import ( "bytes" "context" - e2eTesting "elastic/apm-lambda-extension/e2e-testing" - "elastic/apm-lambda-extension/extension" - "elastic/apm-lambda-extension/logsapi" "encoding/json" "fmt" - "github.com/stretchr/testify/suite" "io/ioutil" "net" "net/http" @@ -36,6 +32,12 @@ import ( "testing" "time" + e2eTesting "elastic/apm-lambda-extension/e2e-testing" + "elastic/apm-lambda-extension/extension" + "elastic/apm-lambda-extension/logsapi" + + "github.com/stretchr/testify/suite" + "github.com/google/uuid" "github.com/stretchr/testify/assert" ) @@ -46,7 +48,8 @@ const ( InvokeHang MockEventType = "Hang" InvokeStandard MockEventType = "Standard" InvokeStandardInfo MockEventType = "StandardInfo" - InvokeStandardFlush MockEventType = "Flush" + InvokeStandardFlush MockEventType = "StandardFlush" + InvokeStandardMetadata MockEventType = "StandardMetadata" InvokeLateFlush MockEventType = "LateFlush" InvokeWaitgroupsRace MockEventType = "InvokeWaitgroupsRace" InvokeMultipleTransactionsOverload MockEventType = "MultipleTransactionsOverload" @@ -84,7 +87,6 @@ type ApmInfo struct { } func initMockServers(eventsChannel chan MockEvent) (*httptest.Server, *httptest.Server, *MockServerInternals, *MockServerInternals) { - // Mock APM Server var apmServerInternals MockServerInternals apmServerInternals.WaitForUnlockSignal = true @@ -113,8 +115,6 @@ func initMockServers(eventsChannel chan MockEvent) (*httptest.Server, *httptest. case Crashes: panic("Server crashed") default: - w.WriteHeader(http.StatusInternalServerError) - return } if r.RequestURI == "/intake/v2/events" { w.WriteHeader(http.StatusAccepted) @@ -206,9 +206,10 @@ func initMockServers(eventsChannel chan MockEvent) (*httptest.Server, *httptest. } func processMockEvent(currId string, event MockEvent, extensionPort string, internals *MockServerInternals) { - sendLogEvent(currId, "platform.start") + sendLogEvent(currId, logsapi.Start) client := http.Client{} sendRuntimeDone := true + sendMetrics := true switch event.Type { case InvokeHang: time.Sleep(time.Duration(event.Timeout) * time.Second) @@ -217,6 +218,12 @@ func processMockEvent(currId string, event MockEvent, extensionPort string, inte req, _ := http.NewRequest("POST", fmt.Sprintf("http://localhost:%s/intake/v2/events", extensionPort), bytes.NewBuffer([]byte(event.APMServerBehavior))) res, _ := client.Do(req) extension.Log.Debugf("Response seen by the agent : %d", res.StatusCode) + case InvokeStandardMetadata: + time.Sleep(time.Duration(event.ExecutionDuration) * time.Second) + metadata := `{"metadata":{"service":{"name":"1234_service-12a3","version":"5.1.3","environment":"staging","agent":{"name":"elastic-node","version":"3.14.0"},"framework":{"name":"Express","version":"1.2.3"},"language":{"name":"ecmascript","version":"8"},"runtime":{"name":"node","version":"8.0.0"},"node":{"configured_name":"node-123"}},"user":{"username":"bar","id":"123user","email":"bar@user.com"},"labels":{"tag0":null,"tag1":"one","tag2":2},"process":{"pid":1234,"ppid":6789,"title":"node","argv":["node","server.js"]},"system":{"architecture":"x64","hostname":"prod1.example.com","platform":"darwin","container":{"id":"container-id"},"kubernetes":{"namespace":"namespace1","node":{"name":"node-name"},"pod":{"name":"pod-name","uid":"pod-uid"}}},"cloud":{"provider":"cloud_provider","region":"cloud_region","availability_zone":"cloud_availability_zone","instance":{"id":"instance_id","name":"instance_name"},"machine":{"type":"machine_type"},"account":{"id":"account_id","name":"account_name"},"project":{"id":"project_id","name":"project_name"},"service":{"name":"lambda"}}}}` + req, _ := http.NewRequest("POST", fmt.Sprintf("http://localhost:%s/intake/v2/events", extensionPort), bytes.NewBuffer([]byte(metadata))) + res, _ := client.Do(req) + extension.Log.Debugf("Response seen by the agent : %d", res.StatusCode) case InvokeStandardFlush: time.Sleep(time.Duration(event.ExecutionDuration) * time.Second) reqData, _ := http.NewRequest("POST", fmt.Sprintf("http://localhost:%s/intake/v2/events?flushed=true", extensionPort), bytes.NewBuffer([]byte(event.APMServerBehavior))) @@ -233,6 +240,7 @@ func processMockEvent(currId string, event MockEvent, extensionPort string, inte } internals.WaitGroup.Done() }() + sendMetrics = false case InvokeWaitgroupsRace: time.Sleep(time.Duration(event.ExecutionDuration) * time.Second) reqData0, _ := http.NewRequest("POST", fmt.Sprintf("http://localhost:%s/intake/v2/events", extensionPort), bytes.NewBuffer([]byte(event.APMServerBehavior))) @@ -275,7 +283,10 @@ func processMockEvent(currId string, event MockEvent, extensionPort string, inte case Shutdown: } if sendRuntimeDone { - sendLogEvent(currId, "platform.runtimeDone") + sendLogEvent(currId, logsapi.RuntimeDone) + } + if sendMetrics { + sendLogEvent(currId, logsapi.Report) } } @@ -290,7 +301,6 @@ func sendNextEventInfo(w http.ResponseWriter, id string, event MockEvent) { if event.Type == Shutdown { nextEventInfo.EventType = "SHUTDOWN" } - if err := json.NewEncoder(w).Encode(nextEventInfo); err != nil { extension.Log.Errorf("Could not encode event : %v", err) } @@ -300,6 +310,16 @@ func sendLogEvent(requestId string, logEventType logsapi.SubEventType) { record := logsapi.LogEventRecord{ RequestId: requestId, } + if logEventType == logsapi.Report { + record.Metrics = logsapi.PlatformMetrics{ + BilledDurationMs: 60, + DurationMs: 59.9, + MemorySizeMB: 128, + MaxMemoryUsedMB: 60, + InitDurationMs: 500, + } + } + logEvent := logsapi.LogEvent{ Time: time.Now(), Type: logEventType, @@ -376,7 +396,7 @@ func (suite *MainUnitTestsSuite) TestStandardEventsChain() { } eventQueueGenerator(eventsChain, suite.eventsChannel) assert.NotPanics(suite.T(), main) - assert.True(suite.T(), strings.Contains(suite.apmServerInternals.Data, string(TimelyResponse))) + assert.Contains(suite.T(), suite.apmServerInternals.Data, TimelyResponse) } // TestFlush checks if the flushed param does not cause a panic or an unexpected behavior @@ -386,7 +406,7 @@ func (suite *MainUnitTestsSuite) TestFlush() { } eventQueueGenerator(eventsChain, suite.eventsChannel) assert.NotPanics(suite.T(), main) - assert.True(suite.T(), strings.Contains(suite.apmServerInternals.Data, string(TimelyResponse))) + assert.Contains(suite.T(), suite.apmServerInternals.Data, TimelyResponse) } // TestLateFlush checks if there is no race condition between RuntimeDone and AgentDone @@ -398,7 +418,7 @@ func (suite *MainUnitTestsSuite) TestLateFlush() { } eventQueueGenerator(eventsChain, suite.eventsChannel) assert.NotPanics(suite.T(), main) - assert.True(suite.T(), strings.Contains(suite.apmServerInternals.Data, string(TimelyResponse+TimelyResponse))) + assert.Contains(suite.T(), suite.apmServerInternals.Data, TimelyResponse) } // TestWaitGroup checks if there is no race condition between the main waitgroups (issue #128) @@ -408,7 +428,7 @@ func (suite *MainUnitTestsSuite) TestWaitGroup() { } eventQueueGenerator(eventsChain, suite.eventsChannel) assert.NotPanics(suite.T(), main) - assert.True(suite.T(), strings.Contains(suite.apmServerInternals.Data, string(TimelyResponse))) + assert.Contains(suite.T(), suite.apmServerInternals.Data, TimelyResponse) } // TestAPMServerDown tests that main does not panic nor runs indefinitely when the APM server is inactive. @@ -419,7 +439,7 @@ func (suite *MainUnitTestsSuite) TestAPMServerDown() { } eventQueueGenerator(eventsChain, suite.eventsChannel) assert.NotPanics(suite.T(), main) - assert.False(suite.T(), strings.Contains(suite.apmServerInternals.Data, string(TimelyResponse))) + assert.NotContains(suite.T(), suite.apmServerInternals.Data, TimelyResponse) } // TestAPMServerHangs tests that main does not panic nor runs indefinitely when the APM server does not respond. @@ -429,7 +449,7 @@ func (suite *MainUnitTestsSuite) TestAPMServerHangs() { } eventQueueGenerator(eventsChain, suite.eventsChannel) assert.NotPanics(suite.T(), main) - assert.False(suite.T(), strings.Contains(suite.apmServerInternals.Data, string(Hangs))) + assert.NotContains(suite.T(), suite.apmServerInternals.Data, string(Hangs)) suite.apmServerInternals.UnlockSignalChannel <- struct{}{} } @@ -440,7 +460,6 @@ func (suite *MainUnitTestsSuite) TestAPMServerRecovery() { if err := os.Setenv("ELASTIC_APM_DATA_FORWARDER_TIMEOUT_SECONDS", "1"); err != nil { suite.T().Fail() } - eventsChain := []MockEvent{ {Type: InvokeStandard, APMServerBehavior: Hangs, ExecutionDuration: 1, Timeout: 5}, {Type: InvokeStandard, APMServerBehavior: TimelyResponse, ExecutionDuration: 1, Timeout: 5}, @@ -451,8 +470,8 @@ func (suite *MainUnitTestsSuite) TestAPMServerRecovery() { suite.apmServerInternals.UnlockSignalChannel <- struct{}{} }() assert.NotPanics(suite.T(), main) - assert.True(suite.T(), strings.Contains(suite.apmServerInternals.Data, string(Hangs))) - assert.True(suite.T(), strings.Contains(suite.apmServerInternals.Data, string(TimelyResponse))) + assert.Contains(suite.T(), suite.apmServerInternals.Data, Hangs) + assert.Contains(suite.T(), suite.apmServerInternals.Data, TimelyResponse) if err := os.Setenv("ELASTIC_APM_DATA_FORWARDER_TIMEOUT_SECONDS", ""); err != nil { suite.T().Fail() } @@ -466,7 +485,6 @@ func (suite *MainUnitTestsSuite) TestGracePeriodHangs() { } eventQueueGenerator(eventsChain, suite.eventsChannel) assert.NotPanics(suite.T(), main) - time.Sleep(100 * time.Millisecond) suite.apmServerInternals.UnlockSignalChannel <- struct{}{} } @@ -479,7 +497,7 @@ func (suite *MainUnitTestsSuite) TestAPMServerCrashesDuringExecution() { } eventQueueGenerator(eventsChain, suite.eventsChannel) assert.NotPanics(suite.T(), main) - assert.False(suite.T(), strings.Contains(suite.apmServerInternals.Data, string(Crashes))) + assert.NotContains(suite.T(), suite.apmServerInternals.Data, Crashes) } // TestFullChannel checks that an overload of APM data chunks is handled correctly, events dropped beyond the 100th one @@ -490,7 +508,7 @@ func (suite *MainUnitTestsSuite) TestFullChannel() { } eventQueueGenerator(eventsChain, suite.eventsChannel) assert.NotPanics(suite.T(), main) - assert.True(suite.T(), strings.Contains(suite.apmServerInternals.Data, string(TimelyResponse))) + assert.Contains(suite.T(), suite.apmServerInternals.Data, TimelyResponse) } // TestFullChannelSlowAPMServer tests what happens when the APM Data channel is full and the APM server is slow @@ -499,7 +517,6 @@ func (suite *MainUnitTestsSuite) TestFullChannelSlowAPMServer() { if err := os.Setenv("ELASTIC_APM_SEND_STRATEGY", "background"); err != nil { suite.T().Fail() } - eventsChain := []MockEvent{ {Type: InvokeMultipleTransactionsOverload, APMServerBehavior: SlowResponse, ExecutionDuration: 0.01, Timeout: 5}, } @@ -518,7 +535,7 @@ func (suite *MainUnitTestsSuite) TestInfoRequest() { } eventQueueGenerator(eventsChain, suite.eventsChannel) assert.NotPanics(suite.T(), main) - assert.True(suite.T(), strings.Contains(suite.lambdaServerInternals.Data, "7814d524d3602e70b703539c57568cba6964fc20")) + assert.Contains(suite.T(), suite.lambdaServerInternals.Data, "7814d524d3602e70b703539c57568cba6964fc20") } // TestInfoRequest checks if the extension times out when unable to retrieve APM server info (/ endpoint) @@ -528,6 +545,39 @@ func (suite *MainUnitTestsSuite) TestInfoRequestHangs() { } eventQueueGenerator(eventsChain, suite.eventsChannel) assert.NotPanics(suite.T(), main) - assert.False(suite.T(), strings.Contains(suite.lambdaServerInternals.Data, "7814d524d3602e70b703539c57568cba6964fc20")) + assert.NotContains(suite.T(), suite.lambdaServerInternals.Data, "7814d524d3602e70b703539c57568cba6964fc20") suite.apmServerInternals.UnlockSignalChannel <- struct{}{} } + +// TestMetricsWithoutMetadata checks if the extension sends metrics corresponding to invocation n during invocation +// n+1, even if the metadata container was not populated +func (suite *MainUnitTestsSuite) TestMetricsWithoutMetadata() { + eventsChain := []MockEvent{ + {Type: InvokeStandard, APMServerBehavior: TimelyResponse, ExecutionDuration: 1, Timeout: 5}, + {Type: InvokeStandard, APMServerBehavior: TimelyResponse, ExecutionDuration: 1, Timeout: 5}, + } + eventQueueGenerator(eventsChain, suite.eventsChannel) + assert.NotPanics(suite.T(), main) + assert.Contains(suite.T(), suite.apmServerInternals.Data, `aws.lambda.metrics.BilledDuration":{"value":60`) + assert.Contains(suite.T(), suite.apmServerInternals.Data, `aws.lambda.metrics.Duration":{"value":59.9`) + assert.Contains(suite.T(), suite.apmServerInternals.Data, `aws.lambda.metrics.TotalMemory":{"value":134217728`) + assert.Contains(suite.T(), suite.apmServerInternals.Data, `aws.lambda.metrics.UsedMemory":{"value":62914560`) + assert.Contains(suite.T(), suite.apmServerInternals.Data, `aws.lambda.metrics.ColdStartDuration":{"value":500`) +} + +// TestMetricsWithMetadata checks if the extension sends metrics corresponding to invocation n during invocation +// n+1, even if the metadata container was not populated +func (suite *MainUnitTestsSuite) TestMetricsWithMetadata() { + eventsChain := []MockEvent{ + {Type: InvokeStandardMetadata, APMServerBehavior: TimelyResponse, ExecutionDuration: 1, Timeout: 5}, + {Type: InvokeStandardMetadata, APMServerBehavior: TimelyResponse, ExecutionDuration: 1, Timeout: 5}, + } + eventQueueGenerator(eventsChain, suite.eventsChannel) + assert.NotPanics(suite.T(), main) + assert.Contains(suite.T(), suite.apmServerInternals.Data, fmt.Sprintf(`{"metadata":{"service":{"name":"1234_service-12a3","version":"5.1.3","environment":"staging","agent":{"name":"apm-lambda-extension","version":"%s"},"framework":{"name":"Express","version":"1.2.3"},"language":{"name":"ecmascript","version":"8"},"runtime":{"name":"node","version":"8.0.0"},"node":{"configured_name":"node-123"}},"user":{"username":"bar","id":"123user","email":"bar@user.com"},"labels":{"tag0":null,"tag1":"one","tag2":2},"process":{"pid":1234,"ppid":6789,"title":"node","argv":["node","server.js"]},"system":{"architecture":"x64","hostname":"prod1.example.com","platform":"darwin","container":{"id":"container-id"},"kubernetes":{"namespace":"namespace1","node":{"name":"node-name"},"pod":{"name":"pod-name","uid":"pod-uid"}}},"cloud":{"provider":"cloud_provider","region":"cloud_region","availability_zone":"cloud_availability_zone","instance":{"id":"instance_id","name":"instance_name"},"machine":{"type":"machine_type"},"account":{"id":"account_id","name":"account_name"},"project":{"id":"project_id","name":"project_name"},"service":{"name":"lambda"}}}}`, extension.Version)) + assert.Contains(suite.T(), suite.apmServerInternals.Data, `aws.lambda.metrics.BilledDuration":{"value":60`) + assert.Contains(suite.T(), suite.apmServerInternals.Data, `aws.lambda.metrics.Duration":{"value":59.9`) + assert.Contains(suite.T(), suite.apmServerInternals.Data, `aws.lambda.metrics.TotalMemory":{"value":134217728`) + assert.Contains(suite.T(), suite.apmServerInternals.Data, `aws.lambda.metrics.UsedMemory":{"value":62914560`) + assert.Contains(suite.T(), suite.apmServerInternals.Data, `aws.lambda.metrics.ColdStartDuration":{"value":500`) +} diff --git a/apm-lambda-extension/model/apm_common.go b/apm-lambda-extension/model/apm_common.go new file mode 100644 index 00000000..35f3cc46 --- /dev/null +++ b/apm-lambda-extension/model/apm_common.go @@ -0,0 +1,20 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package model + +type StringMap map[string]interface{} diff --git a/apm-lambda-extension/model/apm_metadata.go b/apm-lambda-extension/model/apm_metadata.go new file mode 100644 index 00000000..2f12f5ef --- /dev/null +++ b/apm-lambda-extension/model/apm_metadata.go @@ -0,0 +1,245 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package model + +// Service represents the service handling transactions being traced. +type Service struct { + // Name is the immutable name of the service. + Name string `json:"name,omitempty"` + + // Version is the version of the service, if it has one. + Version string `json:"version,omitempty"` + + // Environment is the name of the service's environment, if it has + // one, e.g. "production" or "staging". + Environment string `json:"environment,omitempty"` + + // Agent holds information about the Elastic APM agent tracing this + // service's transactions. + Agent Agent `json:"agent,omitempty"` + + // Framework holds information about the service's framework, if any. + Framework Framework `json:"framework,omitempty"` + + // Language holds information about the programming language in which + // the service is written. + Language Language `json:"language,omitempty"` + + // Runtime holds information about the programming language runtime + // running this service. + Runtime Runtime `json:"runtime,omitempty"` + + // Node holds unique information about each service node + Node ServiceNode `json:"node,omitempty"` +} + +// Agent holds information about the Elastic APM agent. +type Agent struct { + // Name is the name of the Elastic APM agent, e.g. "Go". + Name string `json:"name"` + + // Version is the version of the Elastic APM agent, e.g. "1.0.0". + Version string `json:"version"` +} + +// Framework holds information about the framework (typically web) +// used by the service. +type Framework struct { + // Name is the name of the framework. + Name string `json:"name"` + + // Version is the version of the framework. + Version string `json:"version"` +} + +// Language holds information about the programming language used. +type Language struct { + // Name is the name of the programming language. + Name string `json:"name"` + + // Version is the version of the programming language. + Version string `json:"version,omitempty"` +} + +// Runtime holds information about the programming language runtime. +type Runtime struct { + // Name is the name of the programming language runtime. + Name string `json:"name"` + + // Version is the version of the programming language runtime. + Version string `json:"version"` +} + +// ServiceNode holds unique information about each service node +type ServiceNode struct { + // ConfiguredName holds the name of the service node + ConfiguredName string `json:"configured_name,omitempty"` +} + +// System represents the system (operating system and machine) running the +// service. +type System struct { + // Architecture is the system's hardware architecture. + Architecture string `json:"architecture,omitempty"` + + // Hostname is the system's hostname. + Hostname string `json:"hostname,omitempty"` + + // Platform is the system's platform, or operating system name. + Platform string `json:"platform,omitempty"` + + // Container describes the container running the service. + Container Container `json:"container,omitempty"` + + // Kubernetes describes the kubernetes node and pod running the service. + Kubernetes Kubernetes `json:"kubernetes,omitempty"` +} + +// Process represents an operating system process. +type Process struct { + // Pid is the process ID. + Pid int `json:"pid"` + + // Ppid is the parent process ID, if known. + Ppid *int `json:"ppid,omitempty"` + + // Title is the title of the process. + Title string `json:"title,omitempty"` + + // Argv holds the command line arguments used to start the process. + Argv []string `json:"argv,omitempty"` +} + +// Container represents the container (e.g. Docker) running the service. +type Container struct { + // ID is the unique container ID. + ID string `json:"id"` +} + +// Kubernetes describes properties of the Kubernetes node and pod in which +// the service is running. +type Kubernetes struct { + // Namespace names the Kubernetes namespace in which the pod exists. + Namespace string `json:"namespace,omitempty"` + + // Node describes the Kubernetes node running the service's pod. + Node KubernetesNode `json:"node,omitempty"` + + // Pod describes the Kubernetes pod running the service. + Pod KubernetesPod `json:"pod,omitempty"` +} + +// KubernetesNode describes a Kubernetes node. +type KubernetesNode struct { + // Name holds the node name. + Name string `json:"name,omitempty"` +} + +// KubernetesPod describes a Kubernetes pod. +type KubernetesPod struct { + // Name holds the pod name. + Name string `json:"name,omitempty"` + + // UID holds the pod UID. + UID string `json:"uid,omitempty"` +} + +// Cloud represents the cloud in which the service is running. +type Cloud struct { + // Provider is the cloud provider name, e.g. aws, azure, gcp. + Provider string `json:"provider"` + + // Region is the cloud region name, e.g. us-east-1. + Region string `json:"region,omitempty"` + + // AvailabilityZone is the cloud availability zone name, e.g. us-east-1a. + AvailabilityZone string `json:"availability_zone,omitempty"` + + // Instance holds information about the cloud instance (virtual machine). + Instance CloudInstance `json:"instance,omitempty"` + + // Machine also holds information about the cloud instance (virtual machine). + Machine CloudMachine `json:"machine,omitempty"` + + // Account holds information about the cloud account. + Account CloudAccount `json:"account,omitempty"` + + // Project holds information about the cloud project. + Project CloudProject `json:"project,omitempty"` + + Service CloudService `json:"service,omitempty"` +} + +// CloudInstance holds information about a cloud instance (virtual machine). +type CloudInstance struct { + // ID holds the cloud instance identifier. + ID string `json:"id,omitempty"` + + // ID holds the cloud instance name. + Name string `json:"name,omitempty"` +} + +// CloudMachine holds information about a cloud instance (virtual machine). +type CloudMachine struct { + // Type holds the cloud instance type, e.g. t2.medium. + Type string `json:"type,omitempty"` +} + +// CloudAccount holds information about a cloud account. +type CloudAccount struct { + // ID holds the cloud account identifier. + ID string `json:"id,omitempty"` + + // ID holds the cloud account name. + Name string `json:"name,omitempty"` +} + +// CloudProject holds information about a cloud project. +type CloudProject struct { + // ID holds the cloud project identifier. + ID string `json:"id,omitempty"` + + // Name holds the cloud project name. + Name string `json:"name,omitempty"` +} + +type CloudService struct { + Name string `json:"name,omitempty"` +} + +// User holds information about an authenticated user. +type User struct { + // Username holds the username of the user. + Username string `json:"username,omitempty"` + + // ID identifies the user, e.g. a primary key. This may be + // a string or number. + ID string `json:"id,omitempty"` + + // Email holds the email address of the user. + Email string `json:"email,omitempty"` +} + +type Metadata struct { + Service Service `json:"service,omitempty"` + User User `json:"user,omitempty"` + Labels map[string]interface{} `json:"labels,omitempty"` + Process Process `json:"process,omitempty"` + System System `json:"system,omitempty"` + Cloud Cloud `json:"cloud,omitempty"` +} diff --git a/apm-lambda-extension/model/apm_metrics.go b/apm-lambda-extension/model/apm_metrics.go new file mode 100644 index 00000000..8b8b7a62 --- /dev/null +++ b/apm-lambda-extension/model/apm_metrics.go @@ -0,0 +1,75 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package model + +// MetricLabel is a name/value pair for labeling metrics. +type MetricLabel struct { + // Name is the label name. + Name string + + // Value is the label value. + Value string +} + +// MetricsTransaction holds transaction identifiers for metrics. +type MetricsTransaction struct { + Type string `json:"type,omitempty"` + Name string `json:"name,omitempty"` +} + +// MetricsSpan holds span identifiers for metrics. +type MetricsSpan struct { + Type string `json:"type,omitempty"` + Subtype string `json:"subtype,omitempty"` +} + +// Metric holds metric values. +type Metric struct { + Type string `json:"type,omitempty"` + // Value holds the metric value. + Value float64 `json:"value"` + // Buckets holds the metric bucket values. + Values []float64 `json:"values,omitempty"` + // Count holds the metric observation count for the bucket. + Counts []uint64 `json:"counts,omitempty"` +} + +// Metrics holds a set of metric samples, with an optional set of labels. +type Metrics struct { + // Timestamp holds the time at which the metric samples were taken. + Timestamp int64 `json:"timestamp"` + + // Transaction optionally holds the name and type of transactions + // with which these metrics are associated. + Transaction MetricsTransaction `json:"transaction,omitempty"` + + // Span optionally holds the type and subtype of the spans with + // which these metrics are associated. + Span MetricsSpan `json:"span,omitempty"` + + // Labels holds a set of labels associated with the metrics. + // The labels apply uniformly to all metric samples in the set. + // + // NOTE(axw) the schema calls the field "tags", but we use + // "labels" for agent-internal consistency. Labels aligns better + // with the common schema, anyway. + Labels StringMap `json:"tags,omitempty"` + + // Samples holds a map of metric samples, keyed by metric name. + Samples map[string]Metric `json:"samples"` +}