From 2fe172da4ed2e53989b3b95f7560c2e165105e9d Mon Sep 17 00:00:00 2001 From: Jean-Louis Voiseux <48380853+jlvoiseux@users.noreply.github.com> Date: Tue, 1 Mar 2022 21:13:56 +0100 Subject: [PATCH 01/24] Add unit tests for main.go --- apm-lambda-extension/main_test.go | 206 ++++++++++++++++++++++++++++++ 1 file changed, 206 insertions(+) create mode 100644 apm-lambda-extension/main_test.go diff --git a/apm-lambda-extension/main_test.go b/apm-lambda-extension/main_test.go new file mode 100644 index 00000000..2b6b86e8 --- /dev/null +++ b/apm-lambda-extension/main_test.go @@ -0,0 +1,206 @@ +package main + +import ( + "bytes" + "elastic/apm-lambda-extension/extension" + "elastic/apm-lambda-extension/logsapi" + json "encoding/json" + "github.com/google/uuid" + "log" + "net/http" + "net/http/httptest" + "os" + "strings" + "testing" + "time" +) + +type RegistrationResponse struct { + FunctionName string `json:"functionName"` + FunctionVersion string `json:"functionVersion"` + Handler string `json:"handler"` +} + +func initMockServers(eventsChannel chan MockEvent) (*httptest.Server, *httptest.Server) { + + // Mock APM Server + APMServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.RequestURI == "/intake/v2/events" { + w.WriteHeader(http.StatusAccepted) + } + })) + os.Setenv("ELASTIC_APM_LAMBDA_APM_SERVER", APMServer.URL) + os.Setenv("ELASTIC_APM_SECRET_TOKEN", "none") + + // Mock Lambda Server + lambdaServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.RequestURI { + // Extension registration request + case "/2020-01-01/extension/register": + w.Header().Set("Lambda-Extension-Identifier", "b03a29ec-ee63-44cd-8e53-3987a8e8aa8e") + body, _ := json.Marshal(RegistrationResponse{ + FunctionName: "UnitTestingMockLambda", + FunctionVersion: "$LATEST", + Handler: "main_test.mock_lambda", + }) + w.Write(body) + case "/2020-01-01/extension/event/next": + currId := uuid.New().String() + select { + case nextEvent := <-eventsChannel: + w.Write(sendNextEventInfo(currId, nextEvent)) + go processMockEvent(currId, nextEvent, APMServer) + default: + finalShutDown := MockEvent{ + Type: Shutdown, + Name: "finalShutdown", + ExecutionDuration: 0, + Timeout: 0, + } + w.Write(sendNextEventInfo(currId, finalShutDown)) + } + // Logs API subscription request + case "/2020-08-15/logs": + w.WriteHeader(http.StatusOK) + os.Setenv("ELASTIC_APM_LAMBDA_LOGS_LISTENER_ADDRESS", "localhost:8205") + } + })) + + slicedLambdaURL := strings.Split(lambdaServer.URL, "//") + strippedLambdaURL := slicedLambdaURL[1] + os.Setenv("AWS_LAMBDA_RUNTIME_API", strippedLambdaURL) + extensionClient = extension.NewClient(os.Getenv("AWS_LAMBDA_RUNTIME_API")) + + return lambdaServer, APMServer +} + +type MockEventType string + +const ( + InvokeHang MockEventType = "Hang" + InvokeStandard MockEventType = "Standard" + InvokeStandardFlush MockEventType = "Flush" + InvokeMultipleTransactions MockEventType = "MultipleTransactions" + Shutdown MockEventType = "Shutdown" +) + +type MockEvent struct { + Type MockEventType + Name string + ExecutionDuration float64 + Timeout float64 +} + +func processMockEvent(currId string, event MockEvent, APMServer *httptest.Server) { + sendLogEvent(currId, "platform.start") + client := http.Client{} + switch event.Type { + case InvokeHang: + time.Sleep(time.Duration(event.Timeout) * time.Second) + case InvokeStandard: + time.Sleep(time.Duration(event.ExecutionDuration) * time.Second) + req, _ := http.NewRequest("POST", "http://localhost:8200/intake/v2/events", bytes.NewBuffer([]byte(event.Name))) + client.Do(req) + case InvokeStandardFlush: + time.Sleep(time.Duration(event.ExecutionDuration) * time.Second) + reqData, _ := http.NewRequest("POST", "http://localhost:8200/intake/v2/events?flushed=true", bytes.NewBuffer([]byte(event.Name))) + client.Do(reqData) + case InvokeMultipleTransactions: + time.Sleep(time.Duration(event.ExecutionDuration) * time.Second) + reqData0, _ := http.NewRequest("POST", "http://localhost:8200/intake/v2/events", bytes.NewBuffer([]byte(event.Name))) + reqData1, _ := http.NewRequest("POST", "http://localhost:8200/intake/v2/events", bytes.NewBuffer([]byte(event.Name+"2"))) + go client.Do(reqData0) + go client.Do(reqData1) + time.Sleep(650 * time.Microsecond) + } + sendLogEvent(currId, "platform.runtimeDone") +} + +func sendNextEventInfo(id string, event MockEvent) []byte { + nextEventInfo := extension.NextEventResponse{ + EventType: "INVOKE", + DeadlineMs: time.Now().UnixMilli() + int64(event.Timeout*1000), + RequestID: id, + InvokedFunctionArn: "arn:aws:lambda:eu-central-1:627286350134:function:main_unit_test", + Tracing: extension.Tracing{}, + } + if event.Type == Shutdown { + nextEventInfo.EventType = "SHUTDOWN" + } + + out, _ := json.Marshal(nextEventInfo) + return out +} + +func sendLogEvent(requestId string, logEventType string) { + record := logsapi.LogEventRecord{ + RequestId: requestId, + } + logEvent := logsapi.LogEvent{ + Time: time.Now(), + Type: logEventType, + Record: record, + } + logEvent.RawRecord, _ = json.Marshal(logEvent.Record) + body, _ := json.Marshal([]logsapi.LogEvent{logEvent}) + req, _ := http.NewRequest("POST", "http://localhost:8205", bytes.NewBuffer(body)) + client := http.Client{} + client.Do(req) +} + +func eventQueueGenerator(inputQueue []MockEvent, eventsChannel chan MockEvent) { + for _, event := range inputQueue { + eventsChannel <- event + } +} + +func TestStandardEventsChain(t *testing.T) { + http.DefaultServeMux = new(http.ServeMux) + log.Println("Standard Test") + + eventsChannel := make(chan MockEvent, 100) + defer close(eventsChannel) + lambdaServer, APMServer := initMockServers(eventsChannel) + defer lambdaServer.Close() + defer APMServer.Close() + + eventsChain := []MockEvent{ + {Type: InvokeStandard, Name: "First", ExecutionDuration: 1, Timeout: 5}, + } + eventQueueGenerator(eventsChain, eventsChannel) + main() +} + +func TestFlush(t *testing.T) { + http.DefaultServeMux = new(http.ServeMux) + log.Println("Flush Test") + + eventsChannel := make(chan MockEvent, 100) + //defer close(eventsChannel) + initMockServers(eventsChannel) + //defer lambdaServer.Close() + //defer APMServer.Close() + + eventsChain := []MockEvent{ + {Type: InvokeStandardFlush, Name: "First", ExecutionDuration: 1, Timeout: 5}, + } + eventQueueGenerator(eventsChain, eventsChannel) + main() +} + +func TestSeveralTransactions(t *testing.T) { + http.DefaultServeMux = new(http.ServeMux) + log.Println("Multiple transactions") + + eventsChannel := make(chan MockEvent, 100) + defer close(eventsChannel) + lambdaServer, APMServer := initMockServers(eventsChannel) + defer lambdaServer.Close() + defer APMServer.Close() + + eventsChain := []MockEvent{ + {Type: InvokeMultipleTransactions, Name: "First", ExecutionDuration: 0.1, Timeout: 500}, + } + eventQueueGenerator(eventsChain, eventsChannel) + main() +} From 53a4e6b57446ff6e3a73627e67f5ef58cc85879f Mon Sep 17 00:00:00 2001 From: Jean-Louis Voiseux <48380853+jlvoiseux@users.noreply.github.com> Date: Thu, 3 Mar 2022 15:13:51 +0100 Subject: [PATCH 02/24] Adding Test Scenarios --- apm-lambda-extension/e2e-testing/e2e_test.go | 175 ++----------------- apm-lambda-extension/e2e-testing/e2e_util.go | 157 +++++++++++++++++ apm-lambda-extension/main_test.go | 124 +++++++++++-- 3 files changed, 282 insertions(+), 174 deletions(-) create mode 100644 apm-lambda-extension/e2e-testing/e2e_util.go diff --git a/apm-lambda-extension/e2e-testing/e2e_test.go b/apm-lambda-extension/e2e-testing/e2e_test.go index 0b066d5e..4ce3779e 100644 --- a/apm-lambda-extension/e2e-testing/e2e_test.go +++ b/apm-lambda-extension/e2e-testing/e2e_test.go @@ -1,11 +1,6 @@ package e2e_testing import ( - "archive/zip" - "bufio" - "bytes" - "compress/gzip" - "compress/zlib" "errors" "flag" "fmt" @@ -18,7 +13,6 @@ import ( "net/http" "net/http/httptest" "os" - "os/exec" "path/filepath" "strings" "testing" @@ -36,14 +30,14 @@ func TestEndToEnd(t *testing.T) { if err := godotenv.Load(".e2e_test_config"); err != nil { log.Println("No additional .e2e_test_config file found") } - if getEnvVarValueOrSetDefault("RUN_E2E_TESTS", "false") != "true" { + if GetEnvVarValueOrSetDefault("RUN_E2E_TESTS", "false") != "true" { t.Skip("Skipping E2E tests. Please set the env. variable RUN_E2E_TESTS=true if you want to run them.") } languageName := strings.ToLower(*langPtr) supportedLanguages := []string{"nodejs", "python", "java"} - if !isStringInSlice(languageName, supportedLanguages) { - processError(errors.New(fmt.Sprintf("Unsupported language %s ! Supported languages are %v", languageName, supportedLanguages))) + if !IsStringInSlice(languageName, supportedLanguages) { + ProcessError(errors.New(fmt.Sprintf("Unsupported language %s ! Supported languages are %v", languageName, supportedLanguages))) } samPath := "sam-" + languageName @@ -54,7 +48,7 @@ func TestEndToEnd(t *testing.T) { // Java agent processing if languageName == "java" { - if !folderExists(filepath.Join(samPath, "agent")) { + if !FolderExists(filepath.Join(samPath, "agent")) { log.Println("Java agent not found ! Collecting archive from Github...") retrieveJavaAgent(samPath, *javaAgentVerPtr) } @@ -65,7 +59,7 @@ func TestEndToEnd(t *testing.T) { mockAPMServerLog := "" ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if r.RequestURI == "/intake/v2/events" { - bytesRes, _ := getDecompressedBytesFromRequest(r) + bytesRes, _ := GetDecompressedBytesFromRequest(r) mockAPMServerLog += fmt.Sprintf("%s\n", bytesRes) } })) @@ -95,26 +89,26 @@ func runTestWithTimer(path string, serviceName string, serverURL string, buildFl } func buildExtensionBinaries() { - runCommandInDir("make", []string{}, "..", getEnvVarValueOrSetDefault("DEBUG_OUTPUT", "false") == "true") + RunCommandInDir("make", []string{}, "..", GetEnvVarValueOrSetDefault("DEBUG_OUTPUT", "false") == "true") } func runTest(path string, serviceName string, serverURL string, buildFlag bool, lambdaFuncTimeout int, resultsChan chan string) { log.Printf("Starting to test %s", serviceName) - if !folderExists(filepath.Join(path, ".aws-sam")) || buildFlag { + if !FolderExists(filepath.Join(path, ".aws-sam")) || buildFlag { log.Printf("Building the Lambda function %s", serviceName) - runCommandInDir("sam", []string{"build"}, path, getEnvVarValueOrSetDefault("DEBUG_OUTPUT", "false") == "true") + RunCommandInDir("sam", []string{"build"}, path, GetEnvVarValueOrSetDefault("DEBUG_OUTPUT", "false") == "true") } log.Printf("Invoking the Lambda function %s", serviceName) uuidWithHyphen := uuid.New().String() urlSlice := strings.Split(serverURL, ":") port := urlSlice[len(urlSlice)-1] - runCommandInDir("sam", []string{"local", "invoke", "--parameter-overrides", + RunCommandInDir("sam", []string{"local", "invoke", "--parameter-overrides", fmt.Sprintf("ParameterKey=ApmServerURL,ParameterValue=http://host.docker.internal:%s", port), fmt.Sprintf("ParameterKey=TestUUID,ParameterValue=%s", uuidWithHyphen), fmt.Sprintf("ParameterKey=TimeoutParam,ParameterValue=%d", lambdaFuncTimeout)}, - path, getEnvVarValueOrSetDefault("DEBUG_OUTPUT", "false") == "true") + path, GetEnvVarValueOrSetDefault("DEBUG_OUTPUT", "false") == "true") log.Printf("%s execution complete", serviceName) resultsChan <- uuidWithHyphen @@ -127,165 +121,26 @@ func retrieveJavaAgent(samJavaPath string, version string) { // Download archive out, err := os.Create(agentArchivePath) - processError(err) + ProcessError(err) defer out.Close() resp, err := http.Get(fmt.Sprintf("https://github.com/elastic/apm-agent-java/releases/download/v%[1]s/elastic-apm-java-aws-lambda-layer-%[1]s.zip", version)) - processError(err) + ProcessError(err) defer resp.Body.Close() io.Copy(out, resp.Body) // Unzip archive and delete it log.Println("Unzipping Java Agent archive...") - unzip(agentArchivePath, agentFolderPath) + Unzip(agentArchivePath, agentFolderPath) err = os.Remove(agentArchivePath) - processError(err) + ProcessError(err) } func changeJavaAgentPermissions(samJavaPath string) { agentFolderPath := filepath.Join(samJavaPath, "agent") log.Println("Setting appropriate permissions for Java agent files...") agentFiles, err := ioutil.ReadDir(agentFolderPath) - processError(err) + ProcessError(err) for _, f := range agentFiles { os.Chmod(filepath.Join(agentFolderPath, f.Name()), 0755) } } - -func getEnvVarValueOrSetDefault(envVarName string, defaultVal string) string { - val := os.Getenv(envVarName) - if val == "" { - return defaultVal - } - return val -} - -func runCommandInDir(command string, args []string, dir string, printOutput bool) { - e := exec.Command(command, args...) - if printOutput { - log.Println(e.String()) - } - e.Dir = dir - stdout, _ := e.StdoutPipe() - stderr, _ := e.StderrPipe() - e.Start() - scannerOut := bufio.NewScanner(stdout) - for scannerOut.Scan() { - m := scannerOut.Text() - if printOutput { - log.Println(m) - } - } - scannerErr := bufio.NewScanner(stderr) - for scannerErr.Scan() { - m := scannerErr.Text() - if printOutput { - log.Println(m) - } - } - e.Wait() - -} - -func folderExists(path string) bool { - _, err := os.Stat(path) - if err == nil { - return true - } - return false -} - -func processError(err error) { - if err != nil { - log.Panic(err) - } -} - -func unzip(archivePath string, destinationFolderPath string) { - - openedArchive, err := zip.OpenReader(archivePath) - processError(err) - defer openedArchive.Close() - - // Permissions setup - os.MkdirAll(destinationFolderPath, 0755) - - // Closure required, so that Close() calls do not pile up when unzipping archives with a lot of files - extractAndWriteFile := func(f *zip.File) error { - rc, err := f.Open() - if err != nil { - return err - } - defer func() { - if err := rc.Close(); err != nil { - panic(err) - } - }() - - path := filepath.Join(destinationFolderPath, f.Name) - - // Check for ZipSlip (Directory traversal) - if !strings.HasPrefix(path, filepath.Clean(destinationFolderPath)+string(os.PathSeparator)) { - return fmt.Errorf("illegal file path: %s", path) - } - - if f.FileInfo().IsDir() { - os.MkdirAll(path, f.Mode()) - } else { - os.MkdirAll(filepath.Dir(path), f.Mode()) - f, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, f.Mode()) - processError(err) - defer f.Close() - _, err = io.Copy(f, rc) - processError(err) - } - return nil - } - - for _, f := range openedArchive.File { - err := extractAndWriteFile(f) - processError(err) - } -} - -func getDecompressedBytesFromRequest(req *http.Request) ([]byte, error) { - var rawBytes []byte - if req.Body != nil { - rawBytes, _ = ioutil.ReadAll(req.Body) - } - - switch req.Header.Get("Content-Encoding") { - case "deflate": - reader := bytes.NewReader([]byte(rawBytes)) - zlibreader, err := zlib.NewReader(reader) - if err != nil { - return nil, fmt.Errorf("could not create zlib.NewReader: %v", err) - } - bodyBytes, err := ioutil.ReadAll(zlibreader) - if err != nil { - return nil, fmt.Errorf("could not read from zlib reader using ioutil.ReadAll: %v", err) - } - return bodyBytes, nil - case "gzip": - reader := bytes.NewReader([]byte(rawBytes)) - zlibreader, err := gzip.NewReader(reader) - if err != nil { - return nil, fmt.Errorf("could not create gzip.NewReader: %v", err) - } - bodyBytes, err := ioutil.ReadAll(zlibreader) - if err != nil { - return nil, fmt.Errorf("could not read from gzip reader using ioutil.ReadAll: %v", err) - } - return bodyBytes, nil - default: - return rawBytes, nil - } -} - -func isStringInSlice(a string, list []string) bool { - for _, b := range list { - if b == a { - return true - } - } - return false -} diff --git a/apm-lambda-extension/e2e-testing/e2e_util.go b/apm-lambda-extension/e2e-testing/e2e_util.go new file mode 100644 index 00000000..6ca423a3 --- /dev/null +++ b/apm-lambda-extension/e2e-testing/e2e_util.go @@ -0,0 +1,157 @@ +package e2e_testing + +import ( + "archive/zip" + "bufio" + "bytes" + "compress/gzip" + "compress/zlib" + "fmt" + "io" + "io/ioutil" + "log" + "net/http" + "os" + "os/exec" + "path/filepath" + "strings" +) + +func GetEnvVarValueOrSetDefault(envVarName string, defaultVal string) string { + val := os.Getenv(envVarName) + if val == "" { + return defaultVal + } + return val +} + +func RunCommandInDir(command string, args []string, dir string, printOutput bool) { + e := exec.Command(command, args...) + if printOutput { + log.Println(e.String()) + } + e.Dir = dir + stdout, _ := e.StdoutPipe() + stderr, _ := e.StderrPipe() + e.Start() + scannerOut := bufio.NewScanner(stdout) + for scannerOut.Scan() { + m := scannerOut.Text() + if printOutput { + log.Println(m) + } + } + scannerErr := bufio.NewScanner(stderr) + for scannerErr.Scan() { + m := scannerErr.Text() + if printOutput { + log.Println(m) + } + } + e.Wait() + +} + +func FolderExists(path string) bool { + _, err := os.Stat(path) + if err == nil { + return true + } + return false +} + +func ProcessError(err error) { + if err != nil { + log.Panic(err) + } +} + +func Unzip(archivePath string, destinationFolderPath string) { + + openedArchive, err := zip.OpenReader(archivePath) + ProcessError(err) + defer openedArchive.Close() + + // Permissions setup + os.MkdirAll(destinationFolderPath, 0755) + + // Closure required, so that Close() calls do not pile up when unzipping archives with a lot of files + extractAndWriteFile := func(f *zip.File) error { + rc, err := f.Open() + if err != nil { + return err + } + defer func() { + if err := rc.Close(); err != nil { + panic(err) + } + }() + + path := filepath.Join(destinationFolderPath, f.Name) + + // Check for ZipSlip (Directory traversal) + if !strings.HasPrefix(path, filepath.Clean(destinationFolderPath)+string(os.PathSeparator)) { + return fmt.Errorf("illegal file path: %s", path) + } + + if f.FileInfo().IsDir() { + os.MkdirAll(path, f.Mode()) + } else { + os.MkdirAll(filepath.Dir(path), f.Mode()) + f, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, f.Mode()) + ProcessError(err) + defer f.Close() + _, err = io.Copy(f, rc) + ProcessError(err) + } + return nil + } + + for _, f := range openedArchive.File { + err := extractAndWriteFile(f) + ProcessError(err) + } +} + +func IsStringInSlice(a string, list []string) bool { + for _, b := range list { + if b == a { + return true + } + } + return false +} + +func GetDecompressedBytesFromRequest(req *http.Request) ([]byte, error) { + var rawBytes []byte + if req.Body != nil { + rawBytes, _ = ioutil.ReadAll(req.Body) + } + + switch req.Header.Get("Content-Encoding") { + case "deflate": + reader := bytes.NewReader([]byte(rawBytes)) + zlibreader, err := zlib.NewReader(reader) + if err != nil { + return nil, fmt.Errorf("could not create zlib.NewReader: %v", err) + } + bodyBytes, err := ioutil.ReadAll(zlibreader) + if err != nil { + return nil, fmt.Errorf("could not read from zlib reader using ioutil.ReadAll: %v", err) + } + return bodyBytes, nil + case "gzip": + reader := bytes.NewReader([]byte(rawBytes)) + zlibreader, err := gzip.NewReader(reader) + if err != nil { + return nil, fmt.Errorf("could not create gzip.NewReader: %v", err) + } + bodyBytes, err := ioutil.ReadAll(zlibreader) + if err != nil { + return nil, fmt.Errorf("could not read from gzip reader using ioutil.ReadAll: %v", err) + } + return bodyBytes, nil + default: + return rawBytes, nil + } +} diff --git a/apm-lambda-extension/main_test.go b/apm-lambda-extension/main_test.go index 2b6b86e8..9953a5b1 100644 --- a/apm-lambda-extension/main_test.go +++ b/apm-lambda-extension/main_test.go @@ -2,6 +2,7 @@ package main import ( "bytes" + e2e_testing "elastic/apm-lambda-extension/e2e-testing" "elastic/apm-lambda-extension/extension" "elastic/apm-lambda-extension/logsapi" json "encoding/json" @@ -26,7 +27,20 @@ func initMockServers(eventsChannel chan MockEvent) (*httptest.Server, *httptest. // Mock APM Server APMServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if r.RequestURI == "/intake/v2/events" { - w.WriteHeader(http.StatusAccepted) + decompressedBytes, err := e2e_testing.GetDecompressedBytesFromRequest(r) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + } + switch APMServerBehavior(decompressedBytes) { + case TimelyResponse: + w.WriteHeader(http.StatusAccepted) + case Hangs: + select {} + case Crashes: + panic("Server crashed") + default: + w.WriteHeader(http.StatusInternalServerError) + } } })) os.Setenv("ELASTIC_APM_LAMBDA_APM_SERVER", APMServer.URL) @@ -53,7 +67,6 @@ func initMockServers(eventsChannel chan MockEvent) (*httptest.Server, *httptest. default: finalShutDown := MockEvent{ Type: Shutdown, - Name: "finalShutdown", ExecutionDuration: 0, Timeout: 0, } @@ -84,9 +97,17 @@ const ( Shutdown MockEventType = "Shutdown" ) +type APMServerBehavior string + +const ( + TimelyResponse APMServerBehavior = "TimelyResponse" + Hangs APMServerBehavior = "Hangs" + Crashes APMServerBehavior = "Crashes" +) + type MockEvent struct { Type MockEventType - Name string + APMServerBehavior APMServerBehavior ExecutionDuration float64 Timeout float64 } @@ -99,16 +120,16 @@ func processMockEvent(currId string, event MockEvent, APMServer *httptest.Server time.Sleep(time.Duration(event.Timeout) * time.Second) case InvokeStandard: time.Sleep(time.Duration(event.ExecutionDuration) * time.Second) - req, _ := http.NewRequest("POST", "http://localhost:8200/intake/v2/events", bytes.NewBuffer([]byte(event.Name))) + req, _ := http.NewRequest("POST", "http://localhost:8200/intake/v2/events", bytes.NewBuffer([]byte(event.APMServerBehavior))) client.Do(req) case InvokeStandardFlush: time.Sleep(time.Duration(event.ExecutionDuration) * time.Second) - reqData, _ := http.NewRequest("POST", "http://localhost:8200/intake/v2/events?flushed=true", bytes.NewBuffer([]byte(event.Name))) + reqData, _ := http.NewRequest("POST", "http://localhost:8200/intake/v2/events?flushed=true", bytes.NewBuffer([]byte(event.APMServerBehavior))) client.Do(reqData) case InvokeMultipleTransactions: time.Sleep(time.Duration(event.ExecutionDuration) * time.Second) - reqData0, _ := http.NewRequest("POST", "http://localhost:8200/intake/v2/events", bytes.NewBuffer([]byte(event.Name))) - reqData1, _ := http.NewRequest("POST", "http://localhost:8200/intake/v2/events", bytes.NewBuffer([]byte(event.Name+"2"))) + reqData0, _ := http.NewRequest("POST", "http://localhost:8200/intake/v2/events", bytes.NewBuffer([]byte(event.APMServerBehavior))) + reqData1, _ := http.NewRequest("POST", "http://localhost:8200/intake/v2/events", bytes.NewBuffer([]byte(event.APMServerBehavior))) go client.Do(reqData0) go client.Do(reqData1) time.Sleep(650 * time.Microsecond) @@ -154,6 +175,8 @@ func eventQueueGenerator(inputQueue []MockEvent, eventsChannel chan MockEvent) { } } +// TESTS + func TestStandardEventsChain(t *testing.T) { http.DefaultServeMux = new(http.ServeMux) log.Println("Standard Test") @@ -165,30 +188,103 @@ func TestStandardEventsChain(t *testing.T) { defer APMServer.Close() eventsChain := []MockEvent{ - {Type: InvokeStandard, Name: "First", ExecutionDuration: 1, Timeout: 5}, + {Type: InvokeStandard, APMServerBehavior: TimelyResponse, ExecutionDuration: 1, Timeout: 5}, + } + eventQueueGenerator(eventsChain, eventsChannel) + main() +} + +func TestAPMServerDown(t *testing.T) { + http.DefaultServeMux = new(http.ServeMux) + log.Println("APM Server Down") + + eventsChannel := make(chan MockEvent, 100) + defer close(eventsChannel) + lambdaServer, APMServer := initMockServers(eventsChannel) + defer lambdaServer.Close() + APMServer.Close() + + eventsChain := []MockEvent{ + {Type: InvokeStandard, APMServerBehavior: TimelyResponse, ExecutionDuration: 1, Timeout: 5}, + } + eventQueueGenerator(eventsChain, eventsChannel) + main() +} + +func TestAPMServerHangs(t *testing.T) { + http.DefaultServeMux = new(http.ServeMux) + log.Println("APM Server Hangs") + + eventsChannel := make(chan MockEvent, 100) + defer close(eventsChannel) + lambdaServer, APMServer := initMockServers(eventsChannel) + defer lambdaServer.Close() + defer APMServer.Close() + + eventsChain := []MockEvent{ + {Type: InvokeStandard, APMServerBehavior: Hangs, ExecutionDuration: 1, Timeout: 5}, } eventQueueGenerator(eventsChain, eventsChannel) main() } +func TestAPMServerCrashesDuringExecution(t *testing.T) { + http.DefaultServeMux = new(http.ServeMux) + log.Println("APM Server Crashes during execution") + + eventsChannel := make(chan MockEvent, 100) + defer close(eventsChannel) + lambdaServer, APMServer := initMockServers(eventsChannel) + defer lambdaServer.Close() + defer APMServer.Close() + + eventsChain := []MockEvent{ + {Type: InvokeStandard, APMServerBehavior: Crashes, ExecutionDuration: 1, Timeout: 5}, + } + eventQueueGenerator(eventsChain, eventsChannel) + main() +} + +func TestFullChannel(t *testing.T) { + http.DefaultServeMux = new(http.ServeMux) + log.Println("AgentData channel is full") + + eventsChannel := make(chan MockEvent, 1000) + defer close(eventsChannel) + lambdaServer, APMServer := initMockServers(eventsChannel) + defer lambdaServer.Close() + defer APMServer.Close() + + var eventsChain []MockEvent + for i := 0; i < 200; i++ { + eventsChain = append(eventsChain, MockEvent{Type: InvokeStandard, APMServerBehavior: Crashes, ExecutionDuration: 0.1, Timeout: 5}) + } + eventQueueGenerator(eventsChain, eventsChannel) + main() +} + +// Error parsing the data + +// Test full channel + func TestFlush(t *testing.T) { http.DefaultServeMux = new(http.ServeMux) log.Println("Flush Test") eventsChannel := make(chan MockEvent, 100) //defer close(eventsChannel) - initMockServers(eventsChannel) - //defer lambdaServer.Close() - //defer APMServer.Close() + lambdaServer, APMServer := initMockServers(eventsChannel) + defer lambdaServer.Close() + defer APMServer.Close() eventsChain := []MockEvent{ - {Type: InvokeStandardFlush, Name: "First", ExecutionDuration: 1, Timeout: 5}, + {Type: InvokeStandardFlush, APMServerBehavior: TimelyResponse, ExecutionDuration: 1, Timeout: 5}, } eventQueueGenerator(eventsChain, eventsChannel) main() } -func TestSeveralTransactions(t *testing.T) { +func TestWaitGroup(t *testing.T) { http.DefaultServeMux = new(http.ServeMux) log.Println("Multiple transactions") @@ -199,7 +295,7 @@ func TestSeveralTransactions(t *testing.T) { defer APMServer.Close() eventsChain := []MockEvent{ - {Type: InvokeMultipleTransactions, Name: "First", ExecutionDuration: 0.1, Timeout: 500}, + {Type: InvokeMultipleTransactions, APMServerBehavior: TimelyResponse, ExecutionDuration: 0.1, Timeout: 500}, } eventQueueGenerator(eventsChain, eventsChannel) main() From 2e3e83deb8d886c733c35f75e55890c804875489 Mon Sep 17 00:00:00 2001 From: Jean-Louis Voiseux <48380853+jlvoiseux@users.noreply.github.com> Date: Thu, 3 Mar 2022 18:09:35 +0100 Subject: [PATCH 03/24] Adding Test scenario with multiple transactions --- apm-lambda-extension/main_test.go | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/apm-lambda-extension/main_test.go b/apm-lambda-extension/main_test.go index 9953a5b1..1db5f48c 100644 --- a/apm-lambda-extension/main_test.go +++ b/apm-lambda-extension/main_test.go @@ -90,11 +90,12 @@ func initMockServers(eventsChannel chan MockEvent) (*httptest.Server, *httptest. type MockEventType string const ( - InvokeHang MockEventType = "Hang" - InvokeStandard MockEventType = "Standard" - InvokeStandardFlush MockEventType = "Flush" - InvokeMultipleTransactions MockEventType = "MultipleTransactions" - Shutdown MockEventType = "Shutdown" + InvokeHang MockEventType = "Hang" + InvokeStandard MockEventType = "Standard" + InvokeStandardFlush MockEventType = "Flush" + InvokeWaitgroupsRace MockEventType = "InvokeWaitgroupsRace" + InvokeMultipleTransactionsOverload MockEventType = "MultipleTransactionsOverload" + Shutdown MockEventType = "Shutdown" ) type APMServerBehavior string @@ -126,13 +127,19 @@ func processMockEvent(currId string, event MockEvent, APMServer *httptest.Server time.Sleep(time.Duration(event.ExecutionDuration) * time.Second) reqData, _ := http.NewRequest("POST", "http://localhost:8200/intake/v2/events?flushed=true", bytes.NewBuffer([]byte(event.APMServerBehavior))) client.Do(reqData) - case InvokeMultipleTransactions: + case InvokeWaitgroupsRace: time.Sleep(time.Duration(event.ExecutionDuration) * time.Second) reqData0, _ := http.NewRequest("POST", "http://localhost:8200/intake/v2/events", bytes.NewBuffer([]byte(event.APMServerBehavior))) reqData1, _ := http.NewRequest("POST", "http://localhost:8200/intake/v2/events", bytes.NewBuffer([]byte(event.APMServerBehavior))) go client.Do(reqData0) go client.Do(reqData1) time.Sleep(650 * time.Microsecond) + case InvokeMultipleTransactionsOverload: + for i := 0; i < 200; i++ { + time.Sleep(time.Duration(event.ExecutionDuration) * time.Second) + reqData, _ := http.NewRequest("POST", "http://localhost:8200/intake/v2/events", bytes.NewBuffer([]byte(event.APMServerBehavior))) + client.Do(reqData) + } } sendLogEvent(currId, "platform.runtimeDone") } @@ -257,7 +264,7 @@ func TestFullChannel(t *testing.T) { var eventsChain []MockEvent for i := 0; i < 200; i++ { - eventsChain = append(eventsChain, MockEvent{Type: InvokeStandard, APMServerBehavior: Crashes, ExecutionDuration: 0.1, Timeout: 5}) + eventsChain = append(eventsChain, MockEvent{Type: InvokeMultipleTransactionsOverload, APMServerBehavior: Hangs, ExecutionDuration: 0.01, Timeout: 5}) } eventQueueGenerator(eventsChain, eventsChannel) main() @@ -265,8 +272,6 @@ func TestFullChannel(t *testing.T) { // Error parsing the data -// Test full channel - func TestFlush(t *testing.T) { http.DefaultServeMux = new(http.ServeMux) log.Println("Flush Test") @@ -295,7 +300,7 @@ func TestWaitGroup(t *testing.T) { defer APMServer.Close() eventsChain := []MockEvent{ - {Type: InvokeMultipleTransactions, APMServerBehavior: TimelyResponse, ExecutionDuration: 0.1, Timeout: 500}, + {Type: InvokeWaitgroupsRace, APMServerBehavior: TimelyResponse, ExecutionDuration: 0.1, Timeout: 500}, } eventQueueGenerator(eventsChain, eventsChannel) main() From c7384e08f52c3aa0c8612c2bc98b9c5f6ebc1c00 Mon Sep 17 00:00:00 2001 From: Jean-Louis Voiseux <48380853+jlvoiseux@users.noreply.github.com> Date: Mon, 7 Mar 2022 17:38:31 +0100 Subject: [PATCH 04/24] Manage scenarios where APM Server hangs/is slow --- apm-lambda-extension/extension/apm_server.go | 1 + apm-lambda-extension/extension/process_env.go | 34 +++++---- .../extension/route_handlers.go | 9 ++- apm-lambda-extension/main.go | 10 +-- apm-lambda-extension/main_test.go | 70 ++++++++++++++----- 5 files changed, 88 insertions(+), 36 deletions(-) diff --git a/apm-lambda-extension/extension/apm_server.go b/apm-lambda-extension/extension/apm_server.go index c4c64044..31c6c96d 100644 --- a/apm-lambda-extension/extension/apm_server.go +++ b/apm-lambda-extension/extension/apm_server.go @@ -59,6 +59,7 @@ func PostToApmServer(client *http.Client, agentData AgentData, config *extension } req, err := http.NewRequest("POST", config.apmServerUrl+endpointURI, buf) + req.Close = true if err != nil { return fmt.Errorf("failed to create a new request when posting to APM server: %v", err) } diff --git a/apm-lambda-extension/extension/process_env.go b/apm-lambda-extension/extension/process_env.go index be560bec..df5aa865 100644 --- a/apm-lambda-extension/extension/process_env.go +++ b/apm-lambda-extension/extension/process_env.go @@ -25,12 +25,13 @@ import ( ) type extensionConfig struct { - apmServerUrl string - apmServerSecretToken string - apmServerApiKey string - dataReceiverServerPort string - SendStrategy SendStrategy - dataReceiverTimeoutSeconds int + apmServerUrl string + apmServerSecretToken string + apmServerApiKey string + dataReceiverServerPort string + SendStrategy SendStrategy + dataReceiverTimeoutSeconds int + DataForwarderTimeoutSeconds int } // SendStrategy represents the type of sending strategy the extension uses @@ -60,8 +61,14 @@ func getIntFromEnv(name string) (int, error) { func ProcessEnv() *extensionConfig { dataReceiverTimeoutSeconds, err := getIntFromEnv("ELASTIC_APM_DATA_RECEIVER_TIMEOUT_SECONDS") if err != nil { - log.Printf("Could not read ELASTIC_APM_DATA_RECEIVER_TIMEOUT_SECONDS, defaulting to 15: %v\n", err) dataReceiverTimeoutSeconds = 15 + log.Printf("Could not read ELASTIC_APM_DATA_RECEIVER_TIMEOUT_SECONDS, defaulting to %d: %v\n", dataReceiverTimeoutSeconds, err) + } + + dataForwarderTimeoutSeconds, err := getIntFromEnv("ELASTIC_APM_DATA_FORWARDER_TIMEOUT_SECONDS") + if err != nil { + dataForwarderTimeoutSeconds = 3 + log.Printf("Could not read ELASTIC_APM_DATA_FORWARDER_TIMEOUT_SECONDS, defaulting to %d: %v\n", dataForwarderTimeoutSeconds, err) } // add trailing slash to server name if missing @@ -78,12 +85,13 @@ func ProcessEnv() *extensionConfig { } config := &extensionConfig{ - apmServerUrl: normalizedApmLambdaServer, - apmServerSecretToken: os.Getenv("ELASTIC_APM_SECRET_TOKEN"), - apmServerApiKey: os.Getenv("ELASTIC_APM_API_KEY"), - dataReceiverServerPort: os.Getenv("ELASTIC_APM_DATA_RECEIVER_SERVER_PORT"), - SendStrategy: normalizedSendStrategy, - dataReceiverTimeoutSeconds: dataReceiverTimeoutSeconds, + apmServerUrl: normalizedApmLambdaServer, + apmServerSecretToken: os.Getenv("ELASTIC_APM_SECRET_TOKEN"), + apmServerApiKey: os.Getenv("ELASTIC_APM_API_KEY"), + dataReceiverServerPort: os.Getenv("ELASTIC_APM_DATA_RECEIVER_SERVER_PORT"), + SendStrategy: normalizedSendStrategy, + dataReceiverTimeoutSeconds: dataReceiverTimeoutSeconds, + DataForwarderTimeoutSeconds: dataForwarderTimeoutSeconds, } if config.dataReceiverServerPort == "" { diff --git a/apm-lambda-extension/extension/route_handlers.go b/apm-lambda-extension/extension/route_handlers.go index 72671230..f1f6a040 100644 --- a/apm-lambda-extension/extension/route_handlers.go +++ b/apm-lambda-extension/extension/route_handlers.go @@ -98,8 +98,13 @@ func handleIntakeV2Events(agentDataChan chan AgentData) func(w http.ResponseWrit Data: rawBytes, ContentEncoding: r.Header.Get("Content-Encoding"), } - log.Println("Adding agent data to buffer to be sent to apm server") - agentDataChan <- agentData + + select { + case agentDataChan <- agentData: + log.Println("Adding agent data to buffer to be sent to apm server") + default: + log.Println("Channel full : dropping event") + } } if len(r.URL.Query()["flushed"]) > 0 && r.URL.Query()["flushed"][0] == "true" { diff --git a/apm-lambda-extension/main.go b/apm-lambda-extension/main.go index 5e2b4020..004aa8b4 100644 --- a/apm-lambda-extension/main.go +++ b/apm-lambda-extension/main.go @@ -69,6 +69,7 @@ func main() { // Create a client to use for sending data to the apm server client := &http.Client{ + Timeout: time.Duration(config.DataForwarderTimeoutSeconds) * time.Second, Transport: http.DefaultTransport.(*http.Transport).Clone(), } @@ -121,10 +122,6 @@ func main() { // Make a channel for signaling that the function invocation is complete funcDone := make(chan struct{}) - // Flush any APM data, in case waiting for the agentDone or runtimeDone signals - // timed out, the agent data wasn't available yet, and we got to the next event - extension.FlushAPMData(client, agentDataChannel, config) - // A shutdown event indicates the execution environment is shutting down. // This is usually due to inactivity. if event.EventType == extension.Shutdown { @@ -132,6 +129,10 @@ func main() { return } + // Flush any APM data, in case waiting for the agentDone or runtimeDone signals + // timed out, the agent data wasn't available yet, and we got to the next non-shutdown event + extension.FlushAPMData(client, agentDataChannel, config) + // Receive agent data as it comes in and post it to the APM server. // Stop checking for, and sending agent data when the function invocation // has completed, signaled via a channel. @@ -144,6 +145,7 @@ func main() { log.Println("funcDone signal received, not processing any more agent data") return case agentData := <-agentDataChannel: + log.Println("DATA TO SEND TO APM RECEIVED") err := extension.PostToApmServer(client, agentData, config) if err != nil { log.Printf("Error sending to APM server, skipping: %v", err) diff --git a/apm-lambda-extension/main_test.go b/apm-lambda-extension/main_test.go index 1db5f48c..80220022 100644 --- a/apm-lambda-extension/main_test.go +++ b/apm-lambda-extension/main_test.go @@ -12,6 +12,7 @@ import ( "net/http/httptest" "os" "strings" + "sync" "testing" "time" ) @@ -22,9 +23,10 @@ type RegistrationResponse struct { Handler string `json:"handler"` } -func initMockServers(eventsChannel chan MockEvent) (*httptest.Server, *httptest.Server) { +func initMockServers(eventsChannel chan MockEvent) (*httptest.Server, *httptest.Server, chan struct{}) { // Mock APM Server + hangChan := make(chan struct{}) APMServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if r.RequestURI == "/intake/v2/events" { decompressedBytes, err := e2e_testing.GetDecompressedBytesFromRequest(r) @@ -34,8 +36,11 @@ func initMockServers(eventsChannel chan MockEvent) (*httptest.Server, *httptest. switch APMServerBehavior(decompressedBytes) { case TimelyResponse: w.WriteHeader(http.StatusAccepted) + case SlowResponse: + time.Sleep(2 * time.Second) + w.WriteHeader(http.StatusAccepted) case Hangs: - select {} + <-hangChan case Crashes: panic("Server crashed") default: @@ -71,6 +76,7 @@ func initMockServers(eventsChannel chan MockEvent) (*httptest.Server, *httptest. Timeout: 0, } w.Write(sendNextEventInfo(currId, finalShutDown)) + go processMockEvent(currId, finalShutDown, APMServer) } // Logs API subscription request case "/2020-08-15/logs": @@ -84,7 +90,7 @@ func initMockServers(eventsChannel chan MockEvent) (*httptest.Server, *httptest. os.Setenv("AWS_LAMBDA_RUNTIME_API", strippedLambdaURL) extensionClient = extension.NewClient(os.Getenv("AWS_LAMBDA_RUNTIME_API")) - return lambdaServer, APMServer + return lambdaServer, APMServer, hangChan } type MockEventType string @@ -102,6 +108,7 @@ type APMServerBehavior string const ( TimelyResponse APMServerBehavior = "TimelyResponse" + SlowResponse APMServerBehavior = "SlowResponse" Hangs APMServerBehavior = "Hangs" Crashes APMServerBehavior = "Crashes" ) @@ -122,7 +129,8 @@ func processMockEvent(currId string, event MockEvent, APMServer *httptest.Server case InvokeStandard: time.Sleep(time.Duration(event.ExecutionDuration) * time.Second) req, _ := http.NewRequest("POST", "http://localhost:8200/intake/v2/events", bytes.NewBuffer([]byte(event.APMServerBehavior))) - client.Do(req) + res, _ := client.Do(req) + log.Println(res.StatusCode) case InvokeStandardFlush: time.Sleep(time.Duration(event.ExecutionDuration) * time.Second) reqData, _ := http.NewRequest("POST", "http://localhost:8200/intake/v2/events?flushed=true", bytes.NewBuffer([]byte(event.APMServerBehavior))) @@ -135,11 +143,20 @@ func processMockEvent(currId string, event MockEvent, APMServer *httptest.Server go client.Do(reqData1) time.Sleep(650 * time.Microsecond) case InvokeMultipleTransactionsOverload: + wg := sync.WaitGroup{} for i := 0; i < 200; i++ { - time.Sleep(time.Duration(event.ExecutionDuration) * time.Second) - reqData, _ := http.NewRequest("POST", "http://localhost:8200/intake/v2/events", bytes.NewBuffer([]byte(event.APMServerBehavior))) - client.Do(reqData) + go func() { + wg.Add(1) + time.Sleep(time.Duration(event.ExecutionDuration) * time.Second) + reqData, _ := http.NewRequest("POST", "http://localhost:8200/intake/v2/events", bytes.NewBuffer([]byte(event.APMServerBehavior))) + client.Do(reqData) + wg.Done() + }() } + wg.Wait() + case Shutdown: + reqData, _ := http.NewRequest("POST", "http://localhost:8200/intake/v2/events", bytes.NewBuffer([]byte(event.APMServerBehavior))) + client.Do(reqData) } sendLogEvent(currId, "platform.runtimeDone") } @@ -190,7 +207,7 @@ func TestStandardEventsChain(t *testing.T) { eventsChannel := make(chan MockEvent, 100) defer close(eventsChannel) - lambdaServer, APMServer := initMockServers(eventsChannel) + lambdaServer, APMServer, _ := initMockServers(eventsChannel) defer lambdaServer.Close() defer APMServer.Close() @@ -207,7 +224,7 @@ func TestAPMServerDown(t *testing.T) { eventsChannel := make(chan MockEvent, 100) defer close(eventsChannel) - lambdaServer, APMServer := initMockServers(eventsChannel) + lambdaServer, APMServer, _ := initMockServers(eventsChannel) defer lambdaServer.Close() APMServer.Close() @@ -224,7 +241,7 @@ func TestAPMServerHangs(t *testing.T) { eventsChannel := make(chan MockEvent, 100) defer close(eventsChannel) - lambdaServer, APMServer := initMockServers(eventsChannel) + lambdaServer, APMServer, hangChan := initMockServers(eventsChannel) defer lambdaServer.Close() defer APMServer.Close() @@ -232,7 +249,10 @@ func TestAPMServerHangs(t *testing.T) { {Type: InvokeStandard, APMServerBehavior: Hangs, ExecutionDuration: 1, Timeout: 5}, } eventQueueGenerator(eventsChain, eventsChannel) + start := time.Now() main() + log.Printf("Success : test took %s", time.Since(start)) + hangChan <- struct{}{} } func TestAPMServerCrashesDuringExecution(t *testing.T) { @@ -241,7 +261,7 @@ func TestAPMServerCrashesDuringExecution(t *testing.T) { eventsChannel := make(chan MockEvent, 100) defer close(eventsChannel) - lambdaServer, APMServer := initMockServers(eventsChannel) + lambdaServer, APMServer, _ := initMockServers(eventsChannel) defer lambdaServer.Close() defer APMServer.Close() @@ -258,13 +278,29 @@ func TestFullChannel(t *testing.T) { eventsChannel := make(chan MockEvent, 1000) defer close(eventsChannel) - lambdaServer, APMServer := initMockServers(eventsChannel) + lambdaServer, APMServer, _ := initMockServers(eventsChannel) defer lambdaServer.Close() defer APMServer.Close() - var eventsChain []MockEvent - for i := 0; i < 200; i++ { - eventsChain = append(eventsChain, MockEvent{Type: InvokeMultipleTransactionsOverload, APMServerBehavior: Hangs, ExecutionDuration: 0.01, Timeout: 5}) + eventsChain := []MockEvent{ + {Type: InvokeMultipleTransactionsOverload, APMServerBehavior: TimelyResponse, ExecutionDuration: 0.01, Timeout: 5}, + } + eventQueueGenerator(eventsChain, eventsChannel) + main() +} + +func TestFullChannelSlowAPMServer(t *testing.T) { + http.DefaultServeMux = new(http.ServeMux) + log.Println("AgentData channel is full, and APM server is slow") + os.Setenv("ELASTIC_APM_SEND_STRATEGY", "background") + eventsChannel := make(chan MockEvent, 1000) + defer close(eventsChannel) + lambdaServer, APMServer, _ := initMockServers(eventsChannel) + defer lambdaServer.Close() + defer APMServer.Close() + + eventsChain := []MockEvent{ + {Type: InvokeMultipleTransactionsOverload, APMServerBehavior: SlowResponse, ExecutionDuration: 0.01, Timeout: 5}, } eventQueueGenerator(eventsChain, eventsChannel) main() @@ -278,7 +314,7 @@ func TestFlush(t *testing.T) { eventsChannel := make(chan MockEvent, 100) //defer close(eventsChannel) - lambdaServer, APMServer := initMockServers(eventsChannel) + lambdaServer, APMServer, _ := initMockServers(eventsChannel) defer lambdaServer.Close() defer APMServer.Close() @@ -295,7 +331,7 @@ func TestWaitGroup(t *testing.T) { eventsChannel := make(chan MockEvent, 100) defer close(eventsChannel) - lambdaServer, APMServer := initMockServers(eventsChannel) + lambdaServer, APMServer, _ := initMockServers(eventsChannel) defer lambdaServer.Close() defer APMServer.Close() From 4b840520f0eac7aa8efc4ff6894c76455dc9f2c2 Mon Sep 17 00:00:00 2001 From: Jean-Louis Voiseux <48380853+jlvoiseux@users.noreply.github.com> Date: Mon, 7 Mar 2022 17:51:15 +0100 Subject: [PATCH 05/24] Remove UnixMills() to secure compatibility with Go < 1.17 --- apm-lambda-extension/main_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apm-lambda-extension/main_test.go b/apm-lambda-extension/main_test.go index 80220022..0ddd6e72 100644 --- a/apm-lambda-extension/main_test.go +++ b/apm-lambda-extension/main_test.go @@ -164,7 +164,7 @@ func processMockEvent(currId string, event MockEvent, APMServer *httptest.Server func sendNextEventInfo(id string, event MockEvent) []byte { nextEventInfo := extension.NextEventResponse{ EventType: "INVOKE", - DeadlineMs: time.Now().UnixMilli() + int64(event.Timeout*1000), + DeadlineMs: time.Now().UnixNano()/int64(time.Millisecond) + int64(event.Timeout*1000), RequestID: id, InvokedFunctionArn: "arn:aws:lambda:eu-central-1:627286350134:function:main_unit_test", Tracing: extension.Tracing{}, From fb8fea385e4216652777647ed57bf9aa865aed23 Mon Sep 17 00:00:00 2001 From: Jean-Louis Voiseux <48380853+jlvoiseux@users.noreply.github.com> Date: Wed, 9 Mar 2022 13:01:52 +0100 Subject: [PATCH 06/24] Add timeout unit tests --- apm-lambda-extension/extension/process_env_test.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/apm-lambda-extension/extension/process_env_test.go b/apm-lambda-extension/extension/process_env_test.go index 5ad2e9cb..965f6e90 100644 --- a/apm-lambda-extension/extension/process_env_test.go +++ b/apm-lambda-extension/extension/process_env_test.go @@ -86,6 +86,20 @@ func TestProcessEnv(t *testing.T) { t.Fail() } + os.Setenv("ELASTIC_APM_DATA_FORWARDER_TIMEOUT_SECONDS", "10") + config = ProcessEnv() + if config.DataForwarderTimeoutSeconds != 10 { + t.Log("Timeout not set correctly") + t.Fail() + } + + os.Setenv("ELASTIC_APM_DATA_FORWARDER_TIMEOUT_SECONDS", "foo") + config = ProcessEnv() + if config.DataForwarderTimeoutSeconds != 3 { + t.Log("Timeout not set correctly") + t.Fail() + } + os.Setenv("ELASTIC_APM_API_KEY", "foo") config = ProcessEnv() if config.apmServerApiKey != "foo" { From 66695e378a069de68d7d3cea43dae1b11ce4e2b4 Mon Sep 17 00:00:00 2001 From: Jean-Louis Voiseux <48380853+jlvoiseux@users.noreply.github.com> Date: Wed, 9 Mar 2022 13:04:53 +0100 Subject: [PATCH 07/24] Quick fix : remove unnecessary print --- apm-lambda-extension/main.go | 1 - 1 file changed, 1 deletion(-) diff --git a/apm-lambda-extension/main.go b/apm-lambda-extension/main.go index 20a165f8..64c29672 100644 --- a/apm-lambda-extension/main.go +++ b/apm-lambda-extension/main.go @@ -145,7 +145,6 @@ func main() { log.Println("funcDone signal received, not processing any more agent data") return case agentData := <-agentDataChannel: - log.Println("DATA TO SEND TO APM RECEIVED") err := extension.PostToApmServer(client, agentData, config) if err != nil { log.Printf("Error sending to APM server, skipping: %v", err) From 8f35c5292081a0503c73418b9f1e9b11cea0dfda Mon Sep 17 00:00:00 2001 From: Jean-Louis Voiseux <48380853+jlvoiseux@users.noreply.github.com> Date: Wed, 9 Mar 2022 15:23:07 +0100 Subject: [PATCH 08/24] Update main unit tests following logs api refactor --- apm-lambda-extension/logsapi/subscribe.go | 18 +++++++++--------- apm-lambda-extension/logsapi/subscribe_test.go | 12 ++++++------ apm-lambda-extension/main.go | 2 +- apm-lambda-extension/main_test.go | 13 ++++++++----- 4 files changed, 24 insertions(+), 21 deletions(-) diff --git a/apm-lambda-extension/logsapi/subscribe.go b/apm-lambda-extension/logsapi/subscribe.go index 40ba3f8e..8eea988a 100644 --- a/apm-lambda-extension/logsapi/subscribe.go +++ b/apm-lambda-extension/logsapi/subscribe.go @@ -29,9 +29,9 @@ import ( "github.com/pkg/errors" ) -var listenerHost = "sandbox" -var logsAPIServer *http.Server -var logsAPIListener net.Listener +var ListenerHost = "sandbox" +var Server *http.Server +var Listener net.Listener type LogEvent struct { Time time.Time `json:"time"` @@ -58,8 +58,8 @@ func subscribe(extensionID string, eventTypes []EventType) error { return err } - _, port, _ := net.SplitHostPort(logsAPIListener.Addr().String()) - _, err = logsAPIClient.Subscribe(eventTypes, URI("http://"+listenerHost+":"+port), extensionID) + _, port, _ := net.SplitHostPort(Listener.Addr().String()) + _, err = logsAPIClient.Subscribe(eventTypes, URI("http://"+ListenerHost+":"+port), extensionID) return err } @@ -85,18 +85,18 @@ func startHTTPServer(out chan LogEvent) error { mux.HandleFunc("/", handleLogEventsRequest(out)) var err error - logsAPIServer = &http.Server{ + Server = &http.Server{ Handler: mux, } - logsAPIListener, err = net.Listen("tcp", listenerHost+":0") + Listener, err = net.Listen("tcp", ListenerHost+":0") if err != nil { return err } go func() { - log.Printf("Extension listening for logsAPI events on %s", logsAPIListener.Addr().String()) - logsAPIServer.Serve(logsAPIListener) + log.Printf("Extension listening for logsAPI events on %s", Listener.Addr().String()) + Server.Serve(Listener) }() return nil } diff --git a/apm-lambda-extension/logsapi/subscribe_test.go b/apm-lambda-extension/logsapi/subscribe_test.go index 76f17205..932df50d 100644 --- a/apm-lambda-extension/logsapi/subscribe_test.go +++ b/apm-lambda-extension/logsapi/subscribe_test.go @@ -44,7 +44,7 @@ func TestSubscribeWithSamLocalEnv(t *testing.T) { } func TestSubscribeAWSRequest(t *testing.T) { - listenerHost = "localhost" + ListenerHost = "localhost" ctx, cancel := context.WithCancel(context.Background()) defer cancel() out := make(chan LogEvent, 1) @@ -77,7 +77,7 @@ func TestSubscribeAWSRequest(t *testing.T) { t.Fail() return } - defer logsAPIServer.Close() + defer Server.Close() // Create a request to send to the logs listener platformDoneEvent := `{ @@ -89,7 +89,7 @@ func TestSubscribeAWSRequest(t *testing.T) { } }` body := []byte(`[` + platformDoneEvent + `]`) - url := "http://" + logsAPIListener.Addr().String() + url := "http://" + Listener.Addr().String() req, err := http.NewRequest("GET", url, bytes.NewReader(body)) if err != nil { t.Log("Could not create request") @@ -107,7 +107,7 @@ func TestSubscribeAWSRequest(t *testing.T) { } func TestSubscribeWithBadLogsRequest(t *testing.T) { - listenerHost = "localhost" + ListenerHost = "localhost" ctx, cancel := context.WithCancel(context.Background()) defer cancel() out := make(chan LogEvent) @@ -126,12 +126,12 @@ func TestSubscribeWithBadLogsRequest(t *testing.T) { t.Fail() return } - defer logsAPIServer.Close() + defer Server.Close() // Create a request to send to the logs listener logEvent := `{"invalid": "json"}` body := []byte(`[` + logEvent + `]`) - url := "http://" + logsAPIListener.Addr().String() + url := "http://" + Listener.Addr().String() req, err := http.NewRequest("GET", url, bytes.NewReader(body)) if err != nil { t.Log("Could not create request") diff --git a/apm-lambda-extension/main.go b/apm-lambda-extension/main.go index 3c3024ad..baccc4b0 100644 --- a/apm-lambda-extension/main.go +++ b/apm-lambda-extension/main.go @@ -157,7 +157,7 @@ func main() { log.Printf("Received log event %v\n", logEvent.Type) // Check the logEvent for runtimeDone and compare the RequestID // to the id that came in via the Next API - if logsapi.SubEventType(logEvent.Type) == logsapi.RuntimeDone { + if logEvent.Type == logsapi.RuntimeDone { if logEvent.Record.RequestId == event.RequestID { log.Println("Received runtimeDone event for this function invocation") runtimeDoneSignal <- struct{}{} diff --git a/apm-lambda-extension/main_test.go b/apm-lambda-extension/main_test.go index 0ddd6e72..47b2e289 100644 --- a/apm-lambda-extension/main_test.go +++ b/apm-lambda-extension/main_test.go @@ -8,6 +8,7 @@ import ( json "encoding/json" "github.com/google/uuid" "log" + "net" "net/http" "net/http/httptest" "os" @@ -52,6 +53,7 @@ func initMockServers(eventsChannel chan MockEvent) (*httptest.Server, *httptest. os.Setenv("ELASTIC_APM_SECRET_TOKEN", "none") // Mock Lambda Server + logsapi.ListenerHost = "localhost" lambdaServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { switch r.RequestURI { // Extension registration request @@ -81,7 +83,6 @@ func initMockServers(eventsChannel chan MockEvent) (*httptest.Server, *httptest. // Logs API subscription request case "/2020-08-15/logs": w.WriteHeader(http.StatusOK) - os.Setenv("ELASTIC_APM_LAMBDA_LOGS_LISTENER_ADDRESS", "localhost:8205") } })) @@ -130,7 +131,7 @@ func processMockEvent(currId string, event MockEvent, APMServer *httptest.Server time.Sleep(time.Duration(event.ExecutionDuration) * time.Second) req, _ := http.NewRequest("POST", "http://localhost:8200/intake/v2/events", bytes.NewBuffer([]byte(event.APMServerBehavior))) res, _ := client.Do(req) - log.Println(res.StatusCode) + log.Printf("Response seen by the agent : %d", res.StatusCode) case InvokeStandardFlush: time.Sleep(time.Duration(event.ExecutionDuration) * time.Second) reqData, _ := http.NewRequest("POST", "http://localhost:8200/intake/v2/events?flushed=true", bytes.NewBuffer([]byte(event.APMServerBehavior))) @@ -177,7 +178,7 @@ func sendNextEventInfo(id string, event MockEvent) []byte { return out } -func sendLogEvent(requestId string, logEventType string) { +func sendLogEvent(requestId string, logEventType logsapi.SubEventType) { record := logsapi.LogEventRecord{ RequestId: requestId, } @@ -186,9 +187,11 @@ func sendLogEvent(requestId string, logEventType string) { Type: logEventType, Record: record, } - logEvent.RawRecord, _ = json.Marshal(logEvent.Record) + jsonRecord, _ := json.Marshal(logEvent.Record) + logEvent.StringRecord = string(jsonRecord) body, _ := json.Marshal([]logsapi.LogEvent{logEvent}) - req, _ := http.NewRequest("POST", "http://localhost:8205", bytes.NewBuffer(body)) + host, port, _ := net.SplitHostPort(logsapi.Listener.Addr().String()) + req, _ := http.NewRequest("POST", "http://"+host+":"+port, bytes.NewBuffer(body)) client := http.Client{} client.Do(req) } From d9a8edc0ca29cd924a0217ed22f9715a5c270c90 Mon Sep 17 00:00:00 2001 From: Jean-Louis Voiseux <48380853+jlvoiseux@users.noreply.github.com> Date: Wed, 9 Mar 2022 16:05:31 +0100 Subject: [PATCH 09/24] Implement review feedback --- apm-lambda-extension/extension/process_env.go | 7 +- .../extension/route_handlers.go | 2 +- apm-lambda-extension/main_test.go | 71 +++++++++++-------- 3 files changed, 46 insertions(+), 34 deletions(-) diff --git a/apm-lambda-extension/extension/process_env.go b/apm-lambda-extension/extension/process_env.go index df5aa865..dfb4a9a7 100644 --- a/apm-lambda-extension/extension/process_env.go +++ b/apm-lambda-extension/extension/process_env.go @@ -46,6 +46,9 @@ const ( // flush remaining buffered agent data when it receives a signal that the // function is complete SyncFlush SendStrategy = "syncflush" + + defaultDataReceiverTimeoutSeconds int = 15 + defaultDataForwarderTimeoutSeconds int = 3 ) func getIntFromEnv(name string) (int, error) { @@ -61,13 +64,13 @@ func getIntFromEnv(name string) (int, error) { func ProcessEnv() *extensionConfig { dataReceiverTimeoutSeconds, err := getIntFromEnv("ELASTIC_APM_DATA_RECEIVER_TIMEOUT_SECONDS") if err != nil { - dataReceiverTimeoutSeconds = 15 + dataReceiverTimeoutSeconds = defaultDataReceiverTimeoutSeconds log.Printf("Could not read ELASTIC_APM_DATA_RECEIVER_TIMEOUT_SECONDS, defaulting to %d: %v\n", dataReceiverTimeoutSeconds, err) } dataForwarderTimeoutSeconds, err := getIntFromEnv("ELASTIC_APM_DATA_FORWARDER_TIMEOUT_SECONDS") if err != nil { - dataForwarderTimeoutSeconds = 3 + dataForwarderTimeoutSeconds = defaultDataForwarderTimeoutSeconds log.Printf("Could not read ELASTIC_APM_DATA_FORWARDER_TIMEOUT_SECONDS, defaulting to %d: %v\n", dataForwarderTimeoutSeconds, err) } diff --git a/apm-lambda-extension/extension/route_handlers.go b/apm-lambda-extension/extension/route_handlers.go index 447f33fa..91d8957e 100644 --- a/apm-lambda-extension/extension/route_handlers.go +++ b/apm-lambda-extension/extension/route_handlers.go @@ -104,7 +104,7 @@ func handleIntakeV2Events(agentDataChan chan AgentData) func(w http.ResponseWrit case agentDataChan <- agentData: log.Println("Adding agent data to buffer to be sent to apm server") default: - log.Println("Channel full : dropping event") + log.Println("Channel full: dropping event") } } diff --git a/apm-lambda-extension/main_test.go b/apm-lambda-extension/main_test.go index 47b2e289..df859914 100644 --- a/apm-lambda-extension/main_test.go +++ b/apm-lambda-extension/main_test.go @@ -18,17 +18,11 @@ import ( "time" ) -type RegistrationResponse struct { - FunctionName string `json:"functionName"` - FunctionVersion string `json:"functionVersion"` - Handler string `json:"handler"` -} - func initMockServers(eventsChannel chan MockEvent) (*httptest.Server, *httptest.Server, chan struct{}) { // Mock APM Server hangChan := make(chan struct{}) - APMServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + apmServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if r.RequestURI == "/intake/v2/events" { decompressedBytes, err := e2e_testing.GetDecompressedBytesFromRequest(r) if err != nil { @@ -49,7 +43,7 @@ func initMockServers(eventsChannel chan MockEvent) (*httptest.Server, *httptest. } } })) - os.Setenv("ELASTIC_APM_LAMBDA_APM_SERVER", APMServer.URL) + os.Setenv("ELASTIC_APM_LAMBDA_APM_SERVER", apmServer.URL) os.Setenv("ELASTIC_APM_SECRET_TOKEN", "none") // Mock Lambda Server @@ -59,26 +53,29 @@ func initMockServers(eventsChannel chan MockEvent) (*httptest.Server, *httptest. // Extension registration request case "/2020-01-01/extension/register": w.Header().Set("Lambda-Extension-Identifier", "b03a29ec-ee63-44cd-8e53-3987a8e8aa8e") - body, _ := json.Marshal(RegistrationResponse{ + err := json.NewEncoder(w).Encode(extension.RegisterResponse{ FunctionName: "UnitTestingMockLambda", FunctionVersion: "$LATEST", Handler: "main_test.mock_lambda", }) - w.Write(body) + if err != nil { + log.Printf("Could not encode registration response : %v", err) + return + } case "/2020-01-01/extension/event/next": currId := uuid.New().String() select { case nextEvent := <-eventsChannel: - w.Write(sendNextEventInfo(currId, nextEvent)) - go processMockEvent(currId, nextEvent, APMServer) + sendNextEventInfo(w, currId, nextEvent) + go processMockEvent(currId, nextEvent, apmServer) default: finalShutDown := MockEvent{ Type: Shutdown, ExecutionDuration: 0, Timeout: 0, } - w.Write(sendNextEventInfo(currId, finalShutDown)) - go processMockEvent(currId, finalShutDown, APMServer) + sendNextEventInfo(w, currId, finalShutDown) + go processMockEvent(currId, finalShutDown, apmServer) } // Logs API subscription request case "/2020-08-15/logs": @@ -91,7 +88,7 @@ func initMockServers(eventsChannel chan MockEvent) (*httptest.Server, *httptest. os.Setenv("AWS_LAMBDA_RUNTIME_API", strippedLambdaURL) extensionClient = extension.NewClient(os.Getenv("AWS_LAMBDA_RUNTIME_API")) - return lambdaServer, APMServer, hangChan + return lambdaServer, apmServer, hangChan } type MockEventType string @@ -162,7 +159,7 @@ func processMockEvent(currId string, event MockEvent, APMServer *httptest.Server sendLogEvent(currId, "platform.runtimeDone") } -func sendNextEventInfo(id string, event MockEvent) []byte { +func sendNextEventInfo(w http.ResponseWriter, id string, event MockEvent) { nextEventInfo := extension.NextEventResponse{ EventType: "INVOKE", DeadlineMs: time.Now().UnixNano()/int64(time.Millisecond) + int64(event.Timeout*1000), @@ -174,8 +171,10 @@ func sendNextEventInfo(id string, event MockEvent) []byte { nextEventInfo.EventType = "SHUTDOWN" } - out, _ := json.Marshal(nextEventInfo) - return out + err := json.NewEncoder(w).Encode(nextEventInfo) + if err != nil { + log.Printf("Could not encode event : %v", err) + } } func sendLogEvent(requestId string, logEventType logsapi.SubEventType) { @@ -187,13 +186,31 @@ func sendLogEvent(requestId string, logEventType logsapi.SubEventType) { Type: logEventType, Record: record, } - jsonRecord, _ := json.Marshal(logEvent.Record) - logEvent.StringRecord = string(jsonRecord) - body, _ := json.Marshal([]logsapi.LogEvent{logEvent}) + + // Convert record to JSON (string) + bufRecord := new(bytes.Buffer) + err := json.NewEncoder(bufRecord).Encode(record) + if err != nil { + log.Printf("Could not encode record : %v", err) + return + } + logEvent.StringRecord = string(bufRecord.Bytes()) + + // Convert full log event to JSON + bufLogEvent := new(bytes.Buffer) + err = json.NewEncoder(bufLogEvent).Encode([]logsapi.LogEvent{logEvent}) + if err != nil { + log.Printf("Could not encode record : %v", err) + return + } host, port, _ := net.SplitHostPort(logsapi.Listener.Addr().String()) - req, _ := http.NewRequest("POST", "http://"+host+":"+port, bytes.NewBuffer(body)) + req, _ := http.NewRequest("POST", "http://"+host+":"+port, bufLogEvent) client := http.Client{} - client.Do(req) + _, err = client.Do(req) + if err != nil { + log.Printf("Could not send log event : %v", err) + return + } } func eventQueueGenerator(inputQueue []MockEvent, eventsChannel chan MockEvent) { @@ -209,7 +226,6 @@ func TestStandardEventsChain(t *testing.T) { log.Println("Standard Test") eventsChannel := make(chan MockEvent, 100) - defer close(eventsChannel) lambdaServer, APMServer, _ := initMockServers(eventsChannel) defer lambdaServer.Close() defer APMServer.Close() @@ -226,7 +242,6 @@ func TestAPMServerDown(t *testing.T) { log.Println("APM Server Down") eventsChannel := make(chan MockEvent, 100) - defer close(eventsChannel) lambdaServer, APMServer, _ := initMockServers(eventsChannel) defer lambdaServer.Close() APMServer.Close() @@ -243,7 +258,6 @@ func TestAPMServerHangs(t *testing.T) { log.Println("APM Server Hangs") eventsChannel := make(chan MockEvent, 100) - defer close(eventsChannel) lambdaServer, APMServer, hangChan := initMockServers(eventsChannel) defer lambdaServer.Close() defer APMServer.Close() @@ -263,7 +277,6 @@ func TestAPMServerCrashesDuringExecution(t *testing.T) { log.Println("APM Server Crashes during execution") eventsChannel := make(chan MockEvent, 100) - defer close(eventsChannel) lambdaServer, APMServer, _ := initMockServers(eventsChannel) defer lambdaServer.Close() defer APMServer.Close() @@ -280,7 +293,6 @@ func TestFullChannel(t *testing.T) { log.Println("AgentData channel is full") eventsChannel := make(chan MockEvent, 1000) - defer close(eventsChannel) lambdaServer, APMServer, _ := initMockServers(eventsChannel) defer lambdaServer.Close() defer APMServer.Close() @@ -297,7 +309,6 @@ func TestFullChannelSlowAPMServer(t *testing.T) { log.Println("AgentData channel is full, and APM server is slow") os.Setenv("ELASTIC_APM_SEND_STRATEGY", "background") eventsChannel := make(chan MockEvent, 1000) - defer close(eventsChannel) lambdaServer, APMServer, _ := initMockServers(eventsChannel) defer lambdaServer.Close() defer APMServer.Close() @@ -316,7 +327,6 @@ func TestFlush(t *testing.T) { log.Println("Flush Test") eventsChannel := make(chan MockEvent, 100) - //defer close(eventsChannel) lambdaServer, APMServer, _ := initMockServers(eventsChannel) defer lambdaServer.Close() defer APMServer.Close() @@ -333,7 +343,6 @@ func TestWaitGroup(t *testing.T) { log.Println("Multiple transactions") eventsChannel := make(chan MockEvent, 100) - defer close(eventsChannel) lambdaServer, APMServer, _ := initMockServers(eventsChannel) defer lambdaServer.Close() defer APMServer.Close() From 41a803961c6e851049248bcfd11d9b063aab4052 Mon Sep 17 00:00:00 2001 From: Jean-Louis Voiseux <48380853+jlvoiseux@users.noreply.github.com> Date: Wed, 9 Mar 2022 17:01:35 +0100 Subject: [PATCH 10/24] Adding success/failure additions --- apm-lambda-extension/main_test.go | 91 ++++++++++++++++++++----------- 1 file changed, 60 insertions(+), 31 deletions(-) diff --git a/apm-lambda-extension/main_test.go b/apm-lambda-extension/main_test.go index df859914..3fe5e6bb 100644 --- a/apm-lambda-extension/main_test.go +++ b/apm-lambda-extension/main_test.go @@ -7,6 +7,7 @@ import ( "elastic/apm-lambda-extension/logsapi" json "encoding/json" "github.com/google/uuid" + "github.com/stretchr/testify/assert" "log" "net" "net/http" @@ -18,10 +19,11 @@ import ( "time" ) -func initMockServers(eventsChannel chan MockEvent) (*httptest.Server, *httptest.Server, chan struct{}) { +func initMockServers(eventsChannel chan MockEvent) (*httptest.Server, *httptest.Server, *APMServerLog, chan struct{}) { // Mock APM Server hangChan := make(chan struct{}) + var apmServerLog APMServerLog apmServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if r.RequestURI == "/intake/v2/events" { decompressedBytes, err := e2e_testing.GetDecompressedBytesFromRequest(r) @@ -30,8 +32,10 @@ func initMockServers(eventsChannel chan MockEvent) (*httptest.Server, *httptest. } switch APMServerBehavior(decompressedBytes) { case TimelyResponse: + apmServerLog.Data += string(decompressedBytes) w.WriteHeader(http.StatusAccepted) case SlowResponse: + apmServerLog.Data += string(decompressedBytes) time.Sleep(2 * time.Second) w.WriteHeader(http.StatusAccepted) case Hangs: @@ -88,7 +92,7 @@ func initMockServers(eventsChannel chan MockEvent) (*httptest.Server, *httptest. os.Setenv("AWS_LAMBDA_RUNTIME_API", strippedLambdaURL) extensionClient = extension.NewClient(os.Getenv("AWS_LAMBDA_RUNTIME_API")) - return lambdaServer, apmServer, hangChan + return lambdaServer, apmServer, &apmServerLog, hangChan } type MockEventType string @@ -102,6 +106,10 @@ const ( Shutdown MockEventType = "Shutdown" ) +type APMServerLog struct { + Data string +} + type APMServerBehavior string const ( @@ -137,8 +145,14 @@ func processMockEvent(currId string, event MockEvent, APMServer *httptest.Server time.Sleep(time.Duration(event.ExecutionDuration) * time.Second) reqData0, _ := http.NewRequest("POST", "http://localhost:8200/intake/v2/events", bytes.NewBuffer([]byte(event.APMServerBehavior))) reqData1, _ := http.NewRequest("POST", "http://localhost:8200/intake/v2/events", bytes.NewBuffer([]byte(event.APMServerBehavior))) - go client.Do(reqData0) - go client.Do(reqData1) + _, err := client.Do(reqData0) + if err != nil { + log.Println(err) + } + _, err = client.Do(reqData1) + if err != nil { + log.Println(err) + } time.Sleep(650 * time.Microsecond) case InvokeMultipleTransactionsOverload: wg := sync.WaitGroup{} @@ -221,135 +235,150 @@ func eventQueueGenerator(inputQueue []MockEvent, eventsChannel chan MockEvent) { // TESTS +// Test a nominal sequence of events (fast APM server, only one standard event) func TestStandardEventsChain(t *testing.T) { http.DefaultServeMux = new(http.ServeMux) log.Println("Standard Test") eventsChannel := make(chan MockEvent, 100) - lambdaServer, APMServer, _ := initMockServers(eventsChannel) + lambdaServer, apmServer, apmServerLog, _ := initMockServers(eventsChannel) defer lambdaServer.Close() - defer APMServer.Close() + defer apmServer.Close() eventsChain := []MockEvent{ {Type: InvokeStandard, APMServerBehavior: TimelyResponse, ExecutionDuration: 1, Timeout: 5}, } eventQueueGenerator(eventsChain, eventsChannel) - main() + assert.NotPanics(t, main) + assert.True(t, strings.Contains(apmServerLog.Data, string(TimelyResponse))) } +// Test what happens when the APM is down (timeout) func TestAPMServerDown(t *testing.T) { http.DefaultServeMux = new(http.ServeMux) log.Println("APM Server Down") eventsChannel := make(chan MockEvent, 100) - lambdaServer, APMServer, _ := initMockServers(eventsChannel) + lambdaServer, apmServer, apmServerLog, _ := initMockServers(eventsChannel) defer lambdaServer.Close() - APMServer.Close() + apmServer.Close() eventsChain := []MockEvent{ {Type: InvokeStandard, APMServerBehavior: TimelyResponse, ExecutionDuration: 1, Timeout: 5}, } eventQueueGenerator(eventsChain, eventsChannel) - main() + assert.NotPanics(t, main) + assert.False(t, strings.Contains(apmServerLog.Data, string(TimelyResponse))) } +// Test what happens when the APM hangs (timeout) func TestAPMServerHangs(t *testing.T) { http.DefaultServeMux = new(http.ServeMux) log.Println("APM Server Hangs") eventsChannel := make(chan MockEvent, 100) - lambdaServer, APMServer, hangChan := initMockServers(eventsChannel) + lambdaServer, apmServer, apmServerLog, hangChan := initMockServers(eventsChannel) defer lambdaServer.Close() - defer APMServer.Close() + defer apmServer.Close() eventsChain := []MockEvent{ {Type: InvokeStandard, APMServerBehavior: Hangs, ExecutionDuration: 1, Timeout: 5}, } eventQueueGenerator(eventsChain, eventsChannel) start := time.Now() - main() + assert.NotPanics(t, main) + assert.False(t, strings.Contains(apmServerLog.Data, string(Hangs))) log.Printf("Success : test took %s", time.Since(start)) hangChan <- struct{}{} } +// Test what happens when the APM crashes unexpectedly func TestAPMServerCrashesDuringExecution(t *testing.T) { http.DefaultServeMux = new(http.ServeMux) log.Println("APM Server Crashes during execution") eventsChannel := make(chan MockEvent, 100) - lambdaServer, APMServer, _ := initMockServers(eventsChannel) + lambdaServer, apmServer, apmServerLog, _ := initMockServers(eventsChannel) defer lambdaServer.Close() - defer APMServer.Close() + defer apmServer.Close() eventsChain := []MockEvent{ {Type: InvokeStandard, APMServerBehavior: Crashes, ExecutionDuration: 1, Timeout: 5}, } eventQueueGenerator(eventsChain, eventsChannel) - main() + assert.NotPanics(t, main) + assert.False(t, strings.Contains(apmServerLog.Data, string(Crashes))) } +// Test what happens when the APM Data channel is full func TestFullChannel(t *testing.T) { http.DefaultServeMux = new(http.ServeMux) log.Println("AgentData channel is full") eventsChannel := make(chan MockEvent, 1000) - lambdaServer, APMServer, _ := initMockServers(eventsChannel) + lambdaServer, apmServer, apmServerLog, _ := initMockServers(eventsChannel) defer lambdaServer.Close() - defer APMServer.Close() + defer apmServer.Close() eventsChain := []MockEvent{ {Type: InvokeMultipleTransactionsOverload, APMServerBehavior: TimelyResponse, ExecutionDuration: 0.01, Timeout: 5}, } eventQueueGenerator(eventsChain, eventsChannel) - main() + assert.NotPanics(t, main) + assert.True(t, strings.Contains(apmServerLog.Data, string(TimelyResponse))) } +// Test what happens when the APM Data channel is full and the APM server slow (send strategy : background) func TestFullChannelSlowAPMServer(t *testing.T) { http.DefaultServeMux = new(http.ServeMux) log.Println("AgentData channel is full, and APM server is slow") os.Setenv("ELASTIC_APM_SEND_STRATEGY", "background") eventsChannel := make(chan MockEvent, 1000) - lambdaServer, APMServer, _ := initMockServers(eventsChannel) + lambdaServer, apmServer, _, _ := initMockServers(eventsChannel) defer lambdaServer.Close() - defer APMServer.Close() + defer apmServer.Close() eventsChain := []MockEvent{ {Type: InvokeMultipleTransactionsOverload, APMServerBehavior: SlowResponse, ExecutionDuration: 0.01, Timeout: 5}, } eventQueueGenerator(eventsChain, eventsChannel) - main() + assert.NotPanics(t, main) + // The test should not hang + os.Setenv("ELASTIC_APM_SEND_STRATEGY", "syncflush") } -// Error parsing the data - +// Test if the flushed param does not cause a panic or an unexpected behavior func TestFlush(t *testing.T) { http.DefaultServeMux = new(http.ServeMux) log.Println("Flush Test") eventsChannel := make(chan MockEvent, 100) - lambdaServer, APMServer, _ := initMockServers(eventsChannel) + lambdaServer, apmServer, apmServerLog, _ := initMockServers(eventsChannel) defer lambdaServer.Close() - defer APMServer.Close() + defer apmServer.Close() eventsChain := []MockEvent{ {Type: InvokeStandardFlush, APMServerBehavior: TimelyResponse, ExecutionDuration: 1, Timeout: 5}, } eventQueueGenerator(eventsChain, eventsChannel) - main() + assert.NotPanics(t, main) + assert.True(t, strings.Contains(apmServerLog.Data, string(TimelyResponse))) } +// Test if there is no race condition between waitgroups (issue #128) func TestWaitGroup(t *testing.T) { http.DefaultServeMux = new(http.ServeMux) log.Println("Multiple transactions") eventsChannel := make(chan MockEvent, 100) - lambdaServer, APMServer, _ := initMockServers(eventsChannel) + lambdaServer, apmServer, apmServerLog, _ := initMockServers(eventsChannel) defer lambdaServer.Close() - defer APMServer.Close() + defer apmServer.Close() eventsChain := []MockEvent{ - {Type: InvokeWaitgroupsRace, APMServerBehavior: TimelyResponse, ExecutionDuration: 0.1, Timeout: 500}, + {Type: InvokeWaitgroupsRace, APMServerBehavior: TimelyResponse, ExecutionDuration: 1, Timeout: 500}, } eventQueueGenerator(eventsChain, eventsChannel) - main() + assert.NotPanics(t, main) + assert.True(t, strings.Contains(apmServerLog.Data, string(TimelyResponse))) } From 026597e10a5c29fcbf3fdbebcf82a5ae04f7a8f0 Mon Sep 17 00:00:00 2001 From: Jean-Louis Voiseux <48380853+jlvoiseux@users.noreply.github.com> Date: Thu, 10 Mar 2022 09:29:11 +0100 Subject: [PATCH 11/24] Increase test timeframe --- apm-lambda-extension/main_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apm-lambda-extension/main_test.go b/apm-lambda-extension/main_test.go index 3fe5e6bb..855967f5 100644 --- a/apm-lambda-extension/main_test.go +++ b/apm-lambda-extension/main_test.go @@ -321,7 +321,7 @@ func TestFullChannel(t *testing.T) { defer apmServer.Close() eventsChain := []MockEvent{ - {Type: InvokeMultipleTransactionsOverload, APMServerBehavior: TimelyResponse, ExecutionDuration: 0.01, Timeout: 5}, + {Type: InvokeMultipleTransactionsOverload, APMServerBehavior: TimelyResponse, ExecutionDuration: 0.1, Timeout: 5}, } eventQueueGenerator(eventsChain, eventsChannel) assert.NotPanics(t, main) From 09de3eee4a5f52fa368ac6e5470b0614edc579be Mon Sep 17 00:00:00 2001 From: Jean-Louis Voiseux <48380853+jlvoiseux@users.noreply.github.com> Date: Thu, 10 Mar 2022 14:18:52 +0100 Subject: [PATCH 12/24] Isolate test_flush to ensure all tests pass --- apm-lambda-extension/main_test.go | 253 +++++++++++++++--------------- 1 file changed, 127 insertions(+), 126 deletions(-) diff --git a/apm-lambda-extension/main_test.go b/apm-lambda-extension/main_test.go index 855967f5..b8bf6c49 100644 --- a/apm-lambda-extension/main_test.go +++ b/apm-lambda-extension/main_test.go @@ -32,6 +32,7 @@ func initMockServers(eventsChannel chan MockEvent) (*httptest.Server, *httptest. } switch APMServerBehavior(decompressedBytes) { case TimelyResponse: + log.Printf("Behavior SlowResponse with data: %s", string(decompressedBytes)) apmServerLog.Data += string(decompressedBytes) w.WriteHeader(http.StatusAccepted) case SlowResponse: @@ -236,116 +237,116 @@ func eventQueueGenerator(inputQueue []MockEvent, eventsChannel chan MockEvent) { // TESTS // Test a nominal sequence of events (fast APM server, only one standard event) -func TestStandardEventsChain(t *testing.T) { - http.DefaultServeMux = new(http.ServeMux) - log.Println("Standard Test") - - eventsChannel := make(chan MockEvent, 100) - lambdaServer, apmServer, apmServerLog, _ := initMockServers(eventsChannel) - defer lambdaServer.Close() - defer apmServer.Close() - - eventsChain := []MockEvent{ - {Type: InvokeStandard, APMServerBehavior: TimelyResponse, ExecutionDuration: 1, Timeout: 5}, - } - eventQueueGenerator(eventsChain, eventsChannel) - assert.NotPanics(t, main) - assert.True(t, strings.Contains(apmServerLog.Data, string(TimelyResponse))) -} - -// Test what happens when the APM is down (timeout) -func TestAPMServerDown(t *testing.T) { - http.DefaultServeMux = new(http.ServeMux) - log.Println("APM Server Down") - - eventsChannel := make(chan MockEvent, 100) - lambdaServer, apmServer, apmServerLog, _ := initMockServers(eventsChannel) - defer lambdaServer.Close() - apmServer.Close() - - eventsChain := []MockEvent{ - {Type: InvokeStandard, APMServerBehavior: TimelyResponse, ExecutionDuration: 1, Timeout: 5}, - } - eventQueueGenerator(eventsChain, eventsChannel) - assert.NotPanics(t, main) - assert.False(t, strings.Contains(apmServerLog.Data, string(TimelyResponse))) -} - -// Test what happens when the APM hangs (timeout) -func TestAPMServerHangs(t *testing.T) { - http.DefaultServeMux = new(http.ServeMux) - log.Println("APM Server Hangs") - - eventsChannel := make(chan MockEvent, 100) - lambdaServer, apmServer, apmServerLog, hangChan := initMockServers(eventsChannel) - defer lambdaServer.Close() - defer apmServer.Close() - - eventsChain := []MockEvent{ - {Type: InvokeStandard, APMServerBehavior: Hangs, ExecutionDuration: 1, Timeout: 5}, - } - eventQueueGenerator(eventsChain, eventsChannel) - start := time.Now() - assert.NotPanics(t, main) - assert.False(t, strings.Contains(apmServerLog.Data, string(Hangs))) - log.Printf("Success : test took %s", time.Since(start)) - hangChan <- struct{}{} -} - -// Test what happens when the APM crashes unexpectedly -func TestAPMServerCrashesDuringExecution(t *testing.T) { - http.DefaultServeMux = new(http.ServeMux) - log.Println("APM Server Crashes during execution") - - eventsChannel := make(chan MockEvent, 100) - lambdaServer, apmServer, apmServerLog, _ := initMockServers(eventsChannel) - defer lambdaServer.Close() - defer apmServer.Close() - - eventsChain := []MockEvent{ - {Type: InvokeStandard, APMServerBehavior: Crashes, ExecutionDuration: 1, Timeout: 5}, - } - eventQueueGenerator(eventsChain, eventsChannel) - assert.NotPanics(t, main) - assert.False(t, strings.Contains(apmServerLog.Data, string(Crashes))) -} - -// Test what happens when the APM Data channel is full -func TestFullChannel(t *testing.T) { - http.DefaultServeMux = new(http.ServeMux) - log.Println("AgentData channel is full") - - eventsChannel := make(chan MockEvent, 1000) - lambdaServer, apmServer, apmServerLog, _ := initMockServers(eventsChannel) - defer lambdaServer.Close() - defer apmServer.Close() - - eventsChain := []MockEvent{ - {Type: InvokeMultipleTransactionsOverload, APMServerBehavior: TimelyResponse, ExecutionDuration: 0.1, Timeout: 5}, - } - eventQueueGenerator(eventsChain, eventsChannel) - assert.NotPanics(t, main) - assert.True(t, strings.Contains(apmServerLog.Data, string(TimelyResponse))) -} - -// Test what happens when the APM Data channel is full and the APM server slow (send strategy : background) -func TestFullChannelSlowAPMServer(t *testing.T) { - http.DefaultServeMux = new(http.ServeMux) - log.Println("AgentData channel is full, and APM server is slow") - os.Setenv("ELASTIC_APM_SEND_STRATEGY", "background") - eventsChannel := make(chan MockEvent, 1000) - lambdaServer, apmServer, _, _ := initMockServers(eventsChannel) - defer lambdaServer.Close() - defer apmServer.Close() - - eventsChain := []MockEvent{ - {Type: InvokeMultipleTransactionsOverload, APMServerBehavior: SlowResponse, ExecutionDuration: 0.01, Timeout: 5}, - } - eventQueueGenerator(eventsChain, eventsChannel) - assert.NotPanics(t, main) - // The test should not hang - os.Setenv("ELASTIC_APM_SEND_STRATEGY", "syncflush") -} +//func TestStandardEventsChain(t *testing.T) { +// http.DefaultServeMux = new(http.ServeMux) +// log.Println("Standard Test") +// +// eventsChannel := make(chan MockEvent, 100) +// lambdaServer, apmServer, apmServerLog, _ := initMockServers(eventsChannel) +// defer lambdaServer.Close() +// defer apmServer.Close() +// +// eventsChain := []MockEvent{ +// {Type: InvokeStandard, APMServerBehavior: TimelyResponse, ExecutionDuration: 1, Timeout: 5}, +// } +// eventQueueGenerator(eventsChain, eventsChannel) +// assert.NotPanics(t, main) +// assert.True(t, strings.Contains(apmServerLog.Data, string(TimelyResponse))) +//} +// +//// Test what happens when the APM is down (timeout) +//func TestAPMServerDown(t *testing.T) { +// http.DefaultServeMux = new(http.ServeMux) +// log.Println("APM Server Down") +// +// eventsChannel := make(chan MockEvent, 100) +// lambdaServer, apmServer, apmServerLog, _ := initMockServers(eventsChannel) +// defer lambdaServer.Close() +// apmServer.Close() +// +// eventsChain := []MockEvent{ +// {Type: InvokeStandard, APMServerBehavior: TimelyResponse, ExecutionDuration: 1, Timeout: 5}, +// } +// eventQueueGenerator(eventsChain, eventsChannel) +// assert.NotPanics(t, main) +// assert.False(t, strings.Contains(apmServerLog.Data, string(TimelyResponse))) +//} +// +//// Test what happens when the APM hangs (timeout) +//func TestAPMServerHangs(t *testing.T) { +// http.DefaultServeMux = new(http.ServeMux) +// log.Println("APM Server Hangs") +// +// eventsChannel := make(chan MockEvent, 100) +// lambdaServer, apmServer, apmServerLog, hangChan := initMockServers(eventsChannel) +// defer lambdaServer.Close() +// defer apmServer.Close() +// +// eventsChain := []MockEvent{ +// {Type: InvokeStandard, APMServerBehavior: Hangs, ExecutionDuration: 1, Timeout: 5}, +// } +// eventQueueGenerator(eventsChain, eventsChannel) +// start := time.Now() +// assert.NotPanics(t, main) +// assert.False(t, strings.Contains(apmServerLog.Data, string(Hangs))) +// log.Printf("Success : test took %s", time.Since(start)) +// hangChan <- struct{}{} +//} +// +//// Test what happens when the APM crashes unexpectedly +//func TestAPMServerCrashesDuringExecution(t *testing.T) { +// http.DefaultServeMux = new(http.ServeMux) +// log.Println("APM Server Crashes during execution") +// +// eventsChannel := make(chan MockEvent, 100) +// lambdaServer, apmServer, apmServerLog, _ := initMockServers(eventsChannel) +// defer lambdaServer.Close() +// defer apmServer.Close() +// +// eventsChain := []MockEvent{ +// {Type: InvokeStandard, APMServerBehavior: Crashes, ExecutionDuration: 1, Timeout: 5}, +// } +// eventQueueGenerator(eventsChain, eventsChannel) +// assert.NotPanics(t, main) +// assert.False(t, strings.Contains(apmServerLog.Data, string(Crashes))) +//} +// +//// Test what happens when the APM Data channel is full +//func TestFullChannel(t *testing.T) { +// http.DefaultServeMux = new(http.ServeMux) +// log.Println("AgentData channel is full") +// +// eventsChannel := make(chan MockEvent, 1000) +// lambdaServer, apmServer, apmServerLog, _ := initMockServers(eventsChannel) +// defer lambdaServer.Close() +// defer apmServer.Close() +// +// eventsChain := []MockEvent{ +// {Type: InvokeMultipleTransactionsOverload, APMServerBehavior: TimelyResponse, ExecutionDuration: 0.1, Timeout: 5}, +// } +// eventQueueGenerator(eventsChain, eventsChannel) +// assert.NotPanics(t, main) +// assert.True(t, strings.Contains(apmServerLog.Data, string(TimelyResponse))) +//} +// +//// Test what happens when the APM Data channel is full and the APM server slow (send strategy : background) +//func TestFullChannelSlowAPMServer(t *testing.T) { +// http.DefaultServeMux = new(http.ServeMux) +// log.Println("AgentData channel is full, and APM server is slow") +// os.Setenv("ELASTIC_APM_SEND_STRATEGY", "background") +// eventsChannel := make(chan MockEvent, 1000) +// lambdaServer, apmServer, _, _ := initMockServers(eventsChannel) +// defer lambdaServer.Close() +// defer apmServer.Close() +// +// eventsChain := []MockEvent{ +// {Type: InvokeMultipleTransactionsOverload, APMServerBehavior: SlowResponse, ExecutionDuration: 0.01, Timeout: 5}, +// } +// eventQueueGenerator(eventsChain, eventsChannel) +// assert.NotPanics(t, main) +// // The test should not hang +// os.Setenv("ELASTIC_APM_SEND_STRATEGY", "syncflush") +//} // Test if the flushed param does not cause a panic or an unexpected behavior func TestFlush(t *testing.T) { @@ -366,19 +367,19 @@ func TestFlush(t *testing.T) { } // Test if there is no race condition between waitgroups (issue #128) -func TestWaitGroup(t *testing.T) { - http.DefaultServeMux = new(http.ServeMux) - log.Println("Multiple transactions") - - eventsChannel := make(chan MockEvent, 100) - lambdaServer, apmServer, apmServerLog, _ := initMockServers(eventsChannel) - defer lambdaServer.Close() - defer apmServer.Close() - - eventsChain := []MockEvent{ - {Type: InvokeWaitgroupsRace, APMServerBehavior: TimelyResponse, ExecutionDuration: 1, Timeout: 500}, - } - eventQueueGenerator(eventsChain, eventsChannel) - assert.NotPanics(t, main) - assert.True(t, strings.Contains(apmServerLog.Data, string(TimelyResponse))) -} +//func TestWaitGroup(t *testing.T) { +// http.DefaultServeMux = new(http.ServeMux) +// log.Println("Multiple transactions") +// +// eventsChannel := make(chan MockEvent, 100) +// lambdaServer, apmServer, apmServerLog, _ := initMockServers(eventsChannel) +// defer lambdaServer.Close() +// defer apmServer.Close() +// +// eventsChain := []MockEvent{ +// {Type: InvokeWaitgroupsRace, APMServerBehavior: TimelyResponse, ExecutionDuration: 1, Timeout: 500}, +// } +// eventQueueGenerator(eventsChain, eventsChannel) +// assert.NotPanics(t, main) +// assert.True(t, strings.Contains(apmServerLog.Data, string(TimelyResponse))) +//} From 23665ee8c97beb121a56b7cb00b75c7e33659b26 Mon Sep 17 00:00:00 2001 From: Jean-Louis Voiseux <48380853+jlvoiseux@users.noreply.github.com> Date: Thu, 10 Mar 2022 15:24:50 +0100 Subject: [PATCH 13/24] Re-add tests to inspect CI behavior --- apm-lambda-extension/main_test.go | 252 +++++++++++++++--------------- 1 file changed, 126 insertions(+), 126 deletions(-) diff --git a/apm-lambda-extension/main_test.go b/apm-lambda-extension/main_test.go index b8bf6c49..ac7a7986 100644 --- a/apm-lambda-extension/main_test.go +++ b/apm-lambda-extension/main_test.go @@ -237,116 +237,116 @@ func eventQueueGenerator(inputQueue []MockEvent, eventsChannel chan MockEvent) { // TESTS // Test a nominal sequence of events (fast APM server, only one standard event) -//func TestStandardEventsChain(t *testing.T) { -// http.DefaultServeMux = new(http.ServeMux) -// log.Println("Standard Test") -// -// eventsChannel := make(chan MockEvent, 100) -// lambdaServer, apmServer, apmServerLog, _ := initMockServers(eventsChannel) -// defer lambdaServer.Close() -// defer apmServer.Close() -// -// eventsChain := []MockEvent{ -// {Type: InvokeStandard, APMServerBehavior: TimelyResponse, ExecutionDuration: 1, Timeout: 5}, -// } -// eventQueueGenerator(eventsChain, eventsChannel) -// assert.NotPanics(t, main) -// assert.True(t, strings.Contains(apmServerLog.Data, string(TimelyResponse))) -//} -// -//// Test what happens when the APM is down (timeout) -//func TestAPMServerDown(t *testing.T) { -// http.DefaultServeMux = new(http.ServeMux) -// log.Println("APM Server Down") -// -// eventsChannel := make(chan MockEvent, 100) -// lambdaServer, apmServer, apmServerLog, _ := initMockServers(eventsChannel) -// defer lambdaServer.Close() -// apmServer.Close() -// -// eventsChain := []MockEvent{ -// {Type: InvokeStandard, APMServerBehavior: TimelyResponse, ExecutionDuration: 1, Timeout: 5}, -// } -// eventQueueGenerator(eventsChain, eventsChannel) -// assert.NotPanics(t, main) -// assert.False(t, strings.Contains(apmServerLog.Data, string(TimelyResponse))) -//} -// -//// Test what happens when the APM hangs (timeout) -//func TestAPMServerHangs(t *testing.T) { -// http.DefaultServeMux = new(http.ServeMux) -// log.Println("APM Server Hangs") -// -// eventsChannel := make(chan MockEvent, 100) -// lambdaServer, apmServer, apmServerLog, hangChan := initMockServers(eventsChannel) -// defer lambdaServer.Close() -// defer apmServer.Close() -// -// eventsChain := []MockEvent{ -// {Type: InvokeStandard, APMServerBehavior: Hangs, ExecutionDuration: 1, Timeout: 5}, -// } -// eventQueueGenerator(eventsChain, eventsChannel) -// start := time.Now() -// assert.NotPanics(t, main) -// assert.False(t, strings.Contains(apmServerLog.Data, string(Hangs))) -// log.Printf("Success : test took %s", time.Since(start)) -// hangChan <- struct{}{} -//} -// -//// Test what happens when the APM crashes unexpectedly -//func TestAPMServerCrashesDuringExecution(t *testing.T) { -// http.DefaultServeMux = new(http.ServeMux) -// log.Println("APM Server Crashes during execution") -// -// eventsChannel := make(chan MockEvent, 100) -// lambdaServer, apmServer, apmServerLog, _ := initMockServers(eventsChannel) -// defer lambdaServer.Close() -// defer apmServer.Close() -// -// eventsChain := []MockEvent{ -// {Type: InvokeStandard, APMServerBehavior: Crashes, ExecutionDuration: 1, Timeout: 5}, -// } -// eventQueueGenerator(eventsChain, eventsChannel) -// assert.NotPanics(t, main) -// assert.False(t, strings.Contains(apmServerLog.Data, string(Crashes))) -//} -// -//// Test what happens when the APM Data channel is full -//func TestFullChannel(t *testing.T) { -// http.DefaultServeMux = new(http.ServeMux) -// log.Println("AgentData channel is full") -// -// eventsChannel := make(chan MockEvent, 1000) -// lambdaServer, apmServer, apmServerLog, _ := initMockServers(eventsChannel) -// defer lambdaServer.Close() -// defer apmServer.Close() -// -// eventsChain := []MockEvent{ -// {Type: InvokeMultipleTransactionsOverload, APMServerBehavior: TimelyResponse, ExecutionDuration: 0.1, Timeout: 5}, -// } -// eventQueueGenerator(eventsChain, eventsChannel) -// assert.NotPanics(t, main) -// assert.True(t, strings.Contains(apmServerLog.Data, string(TimelyResponse))) -//} -// -//// Test what happens when the APM Data channel is full and the APM server slow (send strategy : background) -//func TestFullChannelSlowAPMServer(t *testing.T) { -// http.DefaultServeMux = new(http.ServeMux) -// log.Println("AgentData channel is full, and APM server is slow") -// os.Setenv("ELASTIC_APM_SEND_STRATEGY", "background") -// eventsChannel := make(chan MockEvent, 1000) -// lambdaServer, apmServer, _, _ := initMockServers(eventsChannel) -// defer lambdaServer.Close() -// defer apmServer.Close() -// -// eventsChain := []MockEvent{ -// {Type: InvokeMultipleTransactionsOverload, APMServerBehavior: SlowResponse, ExecutionDuration: 0.01, Timeout: 5}, -// } -// eventQueueGenerator(eventsChain, eventsChannel) -// assert.NotPanics(t, main) -// // The test should not hang -// os.Setenv("ELASTIC_APM_SEND_STRATEGY", "syncflush") -//} +func TestStandardEventsChain(t *testing.T) { + http.DefaultServeMux = new(http.ServeMux) + log.Println("Standard Test") + + eventsChannel := make(chan MockEvent, 100) + lambdaServer, apmServer, apmServerLog, _ := initMockServers(eventsChannel) + defer lambdaServer.Close() + defer apmServer.Close() + + eventsChain := []MockEvent{ + {Type: InvokeStandard, APMServerBehavior: TimelyResponse, ExecutionDuration: 1, Timeout: 5}, + } + eventQueueGenerator(eventsChain, eventsChannel) + assert.NotPanics(t, main) + assert.True(t, strings.Contains(apmServerLog.Data, string(TimelyResponse))) +} + +// Test what happens when the APM is down (timeout) +func TestAPMServerDown(t *testing.T) { + http.DefaultServeMux = new(http.ServeMux) + log.Println("APM Server Down") + + eventsChannel := make(chan MockEvent, 100) + lambdaServer, apmServer, apmServerLog, _ := initMockServers(eventsChannel) + defer lambdaServer.Close() + apmServer.Close() + + eventsChain := []MockEvent{ + {Type: InvokeStandard, APMServerBehavior: TimelyResponse, ExecutionDuration: 1, Timeout: 5}, + } + eventQueueGenerator(eventsChain, eventsChannel) + assert.NotPanics(t, main) + assert.False(t, strings.Contains(apmServerLog.Data, string(TimelyResponse))) +} + +// Test what happens when the APM hangs (timeout) +func TestAPMServerHangs(t *testing.T) { + http.DefaultServeMux = new(http.ServeMux) + log.Println("APM Server Hangs") + + eventsChannel := make(chan MockEvent, 100) + lambdaServer, apmServer, apmServerLog, hangChan := initMockServers(eventsChannel) + defer lambdaServer.Close() + defer apmServer.Close() + + eventsChain := []MockEvent{ + {Type: InvokeStandard, APMServerBehavior: Hangs, ExecutionDuration: 1, Timeout: 5}, + } + eventQueueGenerator(eventsChain, eventsChannel) + start := time.Now() + assert.NotPanics(t, main) + assert.False(t, strings.Contains(apmServerLog.Data, string(Hangs))) + log.Printf("Success : test took %s", time.Since(start)) + hangChan <- struct{}{} +} + +// Test what happens when the APM crashes unexpectedly +func TestAPMServerCrashesDuringExecution(t *testing.T) { + http.DefaultServeMux = new(http.ServeMux) + log.Println("APM Server Crashes during execution") + + eventsChannel := make(chan MockEvent, 100) + lambdaServer, apmServer, apmServerLog, _ := initMockServers(eventsChannel) + defer lambdaServer.Close() + defer apmServer.Close() + + eventsChain := []MockEvent{ + {Type: InvokeStandard, APMServerBehavior: Crashes, ExecutionDuration: 1, Timeout: 5}, + } + eventQueueGenerator(eventsChain, eventsChannel) + assert.NotPanics(t, main) + assert.False(t, strings.Contains(apmServerLog.Data, string(Crashes))) +} + +// Test what happens when the APM Data channel is full +func TestFullChannel(t *testing.T) { + http.DefaultServeMux = new(http.ServeMux) + log.Println("AgentData channel is full") + + eventsChannel := make(chan MockEvent, 1000) + lambdaServer, apmServer, apmServerLog, _ := initMockServers(eventsChannel) + defer lambdaServer.Close() + defer apmServer.Close() + + eventsChain := []MockEvent{ + {Type: InvokeMultipleTransactionsOverload, APMServerBehavior: TimelyResponse, ExecutionDuration: 0.1, Timeout: 5}, + } + eventQueueGenerator(eventsChain, eventsChannel) + assert.NotPanics(t, main) + assert.True(t, strings.Contains(apmServerLog.Data, string(TimelyResponse))) +} + +// Test what happens when the APM Data channel is full and the APM server slow (send strategy : background) +func TestFullChannelSlowAPMServer(t *testing.T) { + http.DefaultServeMux = new(http.ServeMux) + log.Println("AgentData channel is full, and APM server is slow") + os.Setenv("ELASTIC_APM_SEND_STRATEGY", "background") + eventsChannel := make(chan MockEvent, 1000) + lambdaServer, apmServer, _, _ := initMockServers(eventsChannel) + defer lambdaServer.Close() + defer apmServer.Close() + + eventsChain := []MockEvent{ + {Type: InvokeMultipleTransactionsOverload, APMServerBehavior: SlowResponse, ExecutionDuration: 0.01, Timeout: 5}, + } + eventQueueGenerator(eventsChain, eventsChannel) + assert.NotPanics(t, main) + // The test should not hang + os.Setenv("ELASTIC_APM_SEND_STRATEGY", "syncflush") +} // Test if the flushed param does not cause a panic or an unexpected behavior func TestFlush(t *testing.T) { @@ -367,19 +367,19 @@ func TestFlush(t *testing.T) { } // Test if there is no race condition between waitgroups (issue #128) -//func TestWaitGroup(t *testing.T) { -// http.DefaultServeMux = new(http.ServeMux) -// log.Println("Multiple transactions") -// -// eventsChannel := make(chan MockEvent, 100) -// lambdaServer, apmServer, apmServerLog, _ := initMockServers(eventsChannel) -// defer lambdaServer.Close() -// defer apmServer.Close() -// -// eventsChain := []MockEvent{ -// {Type: InvokeWaitgroupsRace, APMServerBehavior: TimelyResponse, ExecutionDuration: 1, Timeout: 500}, -// } -// eventQueueGenerator(eventsChain, eventsChannel) -// assert.NotPanics(t, main) -// assert.True(t, strings.Contains(apmServerLog.Data, string(TimelyResponse))) -//} +func TestWaitGroup(t *testing.T) { + http.DefaultServeMux = new(http.ServeMux) + log.Println("Multiple transactions") + + eventsChannel := make(chan MockEvent, 100) + lambdaServer, apmServer, apmServerLog, _ := initMockServers(eventsChannel) + defer lambdaServer.Close() + defer apmServer.Close() + + eventsChain := []MockEvent{ + {Type: InvokeWaitgroupsRace, APMServerBehavior: TimelyResponse, ExecutionDuration: 1, Timeout: 500}, + } + eventQueueGenerator(eventsChain, eventsChannel) + assert.NotPanics(t, main) + assert.True(t, strings.Contains(apmServerLog.Data, string(TimelyResponse))) +} From 26b321ccbb1e8cd1c2861011b9002aa8e34d3249 Mon Sep 17 00:00:00 2001 From: Jean-Louis Voiseux <48380853+jlvoiseux@users.noreply.github.com> Date: Thu, 10 Mar 2022 15:40:50 +0100 Subject: [PATCH 14/24] Removing test_full_channel to debug CI --- apm-lambda-extension/main_test.go | 32 +++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/apm-lambda-extension/main_test.go b/apm-lambda-extension/main_test.go index ac7a7986..a052166a 100644 --- a/apm-lambda-extension/main_test.go +++ b/apm-lambda-extension/main_test.go @@ -312,22 +312,22 @@ func TestAPMServerCrashesDuringExecution(t *testing.T) { } // Test what happens when the APM Data channel is full -func TestFullChannel(t *testing.T) { - http.DefaultServeMux = new(http.ServeMux) - log.Println("AgentData channel is full") - - eventsChannel := make(chan MockEvent, 1000) - lambdaServer, apmServer, apmServerLog, _ := initMockServers(eventsChannel) - defer lambdaServer.Close() - defer apmServer.Close() - - eventsChain := []MockEvent{ - {Type: InvokeMultipleTransactionsOverload, APMServerBehavior: TimelyResponse, ExecutionDuration: 0.1, Timeout: 5}, - } - eventQueueGenerator(eventsChain, eventsChannel) - assert.NotPanics(t, main) - assert.True(t, strings.Contains(apmServerLog.Data, string(TimelyResponse))) -} +//func TestFullChannel(t *testing.T) { +// http.DefaultServeMux = new(http.ServeMux) +// log.Println("AgentData channel is full") +// +// eventsChannel := make(chan MockEvent, 1000) +// lambdaServer, apmServer, apmServerLog, _ := initMockServers(eventsChannel) +// defer lambdaServer.Close() +// defer apmServer.Close() +// +// eventsChain := []MockEvent{ +// {Type: InvokeMultipleTransactionsOverload, APMServerBehavior: TimelyResponse, ExecutionDuration: 0.1, Timeout: 5}, +// } +// eventQueueGenerator(eventsChain, eventsChannel) +// assert.NotPanics(t, main) +// assert.True(t, strings.Contains(apmServerLog.Data, string(TimelyResponse))) +//} // Test what happens when the APM Data channel is full and the APM server slow (send strategy : background) func TestFullChannelSlowAPMServer(t *testing.T) { From f010ad109dbae44b513863826639cf2e06a52b8e Mon Sep 17 00:00:00 2001 From: Jean-Louis Voiseux <48380853+jlvoiseux@users.noreply.github.com> Date: Thu, 10 Mar 2022 15:54:05 +0100 Subject: [PATCH 15/24] Test coexistence of TestFlush and TestChannel --- apm-lambda-extension/main_test.go | 206 +++++++++++++++--------------- 1 file changed, 103 insertions(+), 103 deletions(-) diff --git a/apm-lambda-extension/main_test.go b/apm-lambda-extension/main_test.go index a052166a..753ca442 100644 --- a/apm-lambda-extension/main_test.go +++ b/apm-lambda-extension/main_test.go @@ -237,117 +237,117 @@ func eventQueueGenerator(inputQueue []MockEvent, eventsChannel chan MockEvent) { // TESTS // Test a nominal sequence of events (fast APM server, only one standard event) -func TestStandardEventsChain(t *testing.T) { - http.DefaultServeMux = new(http.ServeMux) - log.Println("Standard Test") - - eventsChannel := make(chan MockEvent, 100) - lambdaServer, apmServer, apmServerLog, _ := initMockServers(eventsChannel) - defer lambdaServer.Close() - defer apmServer.Close() - - eventsChain := []MockEvent{ - {Type: InvokeStandard, APMServerBehavior: TimelyResponse, ExecutionDuration: 1, Timeout: 5}, - } - eventQueueGenerator(eventsChain, eventsChannel) - assert.NotPanics(t, main) - assert.True(t, strings.Contains(apmServerLog.Data, string(TimelyResponse))) -} - -// Test what happens when the APM is down (timeout) -func TestAPMServerDown(t *testing.T) { - http.DefaultServeMux = new(http.ServeMux) - log.Println("APM Server Down") - - eventsChannel := make(chan MockEvent, 100) - lambdaServer, apmServer, apmServerLog, _ := initMockServers(eventsChannel) - defer lambdaServer.Close() - apmServer.Close() - - eventsChain := []MockEvent{ - {Type: InvokeStandard, APMServerBehavior: TimelyResponse, ExecutionDuration: 1, Timeout: 5}, - } - eventQueueGenerator(eventsChain, eventsChannel) - assert.NotPanics(t, main) - assert.False(t, strings.Contains(apmServerLog.Data, string(TimelyResponse))) -} - -// Test what happens when the APM hangs (timeout) -func TestAPMServerHangs(t *testing.T) { - http.DefaultServeMux = new(http.ServeMux) - log.Println("APM Server Hangs") - - eventsChannel := make(chan MockEvent, 100) - lambdaServer, apmServer, apmServerLog, hangChan := initMockServers(eventsChannel) - defer lambdaServer.Close() - defer apmServer.Close() - - eventsChain := []MockEvent{ - {Type: InvokeStandard, APMServerBehavior: Hangs, ExecutionDuration: 1, Timeout: 5}, - } - eventQueueGenerator(eventsChain, eventsChannel) - start := time.Now() - assert.NotPanics(t, main) - assert.False(t, strings.Contains(apmServerLog.Data, string(Hangs))) - log.Printf("Success : test took %s", time.Since(start)) - hangChan <- struct{}{} -} - -// Test what happens when the APM crashes unexpectedly -func TestAPMServerCrashesDuringExecution(t *testing.T) { - http.DefaultServeMux = new(http.ServeMux) - log.Println("APM Server Crashes during execution") - - eventsChannel := make(chan MockEvent, 100) - lambdaServer, apmServer, apmServerLog, _ := initMockServers(eventsChannel) - defer lambdaServer.Close() - defer apmServer.Close() - - eventsChain := []MockEvent{ - {Type: InvokeStandard, APMServerBehavior: Crashes, ExecutionDuration: 1, Timeout: 5}, - } - eventQueueGenerator(eventsChain, eventsChannel) - assert.NotPanics(t, main) - assert.False(t, strings.Contains(apmServerLog.Data, string(Crashes))) -} - -// Test what happens when the APM Data channel is full -//func TestFullChannel(t *testing.T) { +//func TestStandardEventsChain(t *testing.T) { // http.DefaultServeMux = new(http.ServeMux) -// log.Println("AgentData channel is full") +// log.Println("Standard Test") // -// eventsChannel := make(chan MockEvent, 1000) +// eventsChannel := make(chan MockEvent, 100) // lambdaServer, apmServer, apmServerLog, _ := initMockServers(eventsChannel) // defer lambdaServer.Close() // defer apmServer.Close() // // eventsChain := []MockEvent{ -// {Type: InvokeMultipleTransactionsOverload, APMServerBehavior: TimelyResponse, ExecutionDuration: 0.1, Timeout: 5}, +// {Type: InvokeStandard, APMServerBehavior: TimelyResponse, ExecutionDuration: 1, Timeout: 5}, // } // eventQueueGenerator(eventsChain, eventsChannel) // assert.NotPanics(t, main) // assert.True(t, strings.Contains(apmServerLog.Data, string(TimelyResponse))) //} +// +//// Test what happens when the APM is down (timeout) +//func TestAPMServerDown(t *testing.T) { +// http.DefaultServeMux = new(http.ServeMux) +// log.Println("APM Server Down") +// +// eventsChannel := make(chan MockEvent, 100) +// lambdaServer, apmServer, apmServerLog, _ := initMockServers(eventsChannel) +// defer lambdaServer.Close() +// apmServer.Close() +// +// eventsChain := []MockEvent{ +// {Type: InvokeStandard, APMServerBehavior: TimelyResponse, ExecutionDuration: 1, Timeout: 5}, +// } +// eventQueueGenerator(eventsChain, eventsChannel) +// assert.NotPanics(t, main) +// assert.False(t, strings.Contains(apmServerLog.Data, string(TimelyResponse))) +//} +// +//// Test what happens when the APM hangs (timeout) +//func TestAPMServerHangs(t *testing.T) { +// http.DefaultServeMux = new(http.ServeMux) +// log.Println("APM Server Hangs") +// +// eventsChannel := make(chan MockEvent, 100) +// lambdaServer, apmServer, apmServerLog, hangChan := initMockServers(eventsChannel) +// defer lambdaServer.Close() +// defer apmServer.Close() +// +// eventsChain := []MockEvent{ +// {Type: InvokeStandard, APMServerBehavior: Hangs, ExecutionDuration: 1, Timeout: 5}, +// } +// eventQueueGenerator(eventsChain, eventsChannel) +// start := time.Now() +// assert.NotPanics(t, main) +// assert.False(t, strings.Contains(apmServerLog.Data, string(Hangs))) +// log.Printf("Success : test took %s", time.Since(start)) +// hangChan <- struct{}{} +//} +// +//// Test what happens when the APM crashes unexpectedly +//func TestAPMServerCrashesDuringExecution(t *testing.T) { +// http.DefaultServeMux = new(http.ServeMux) +// log.Println("APM Server Crashes during execution") +// +// eventsChannel := make(chan MockEvent, 100) +// lambdaServer, apmServer, apmServerLog, _ := initMockServers(eventsChannel) +// defer lambdaServer.Close() +// defer apmServer.Close() +// +// eventsChain := []MockEvent{ +// {Type: InvokeStandard, APMServerBehavior: Crashes, ExecutionDuration: 1, Timeout: 5}, +// } +// eventQueueGenerator(eventsChain, eventsChannel) +// assert.NotPanics(t, main) +// assert.False(t, strings.Contains(apmServerLog.Data, string(Crashes))) +//} -// Test what happens when the APM Data channel is full and the APM server slow (send strategy : background) -func TestFullChannelSlowAPMServer(t *testing.T) { +// Test what happens when the APM Data channel is full +func TestFullChannel(t *testing.T) { http.DefaultServeMux = new(http.ServeMux) - log.Println("AgentData channel is full, and APM server is slow") - os.Setenv("ELASTIC_APM_SEND_STRATEGY", "background") + log.Println("AgentData channel is full") + eventsChannel := make(chan MockEvent, 1000) - lambdaServer, apmServer, _, _ := initMockServers(eventsChannel) + lambdaServer, apmServer, apmServerLog, _ := initMockServers(eventsChannel) defer lambdaServer.Close() defer apmServer.Close() eventsChain := []MockEvent{ - {Type: InvokeMultipleTransactionsOverload, APMServerBehavior: SlowResponse, ExecutionDuration: 0.01, Timeout: 5}, + {Type: InvokeMultipleTransactionsOverload, APMServerBehavior: TimelyResponse, ExecutionDuration: 0.1, Timeout: 5}, } eventQueueGenerator(eventsChain, eventsChannel) assert.NotPanics(t, main) - // The test should not hang - os.Setenv("ELASTIC_APM_SEND_STRATEGY", "syncflush") + assert.True(t, strings.Contains(apmServerLog.Data, string(TimelyResponse))) } +// Test what happens when the APM Data channel is full and the APM server slow (send strategy : background) +//func TestFullChannelSlowAPMServer(t *testing.T) { +// http.DefaultServeMux = new(http.ServeMux) +// log.Println("AgentData channel is full, and APM server is slow") +// os.Setenv("ELASTIC_APM_SEND_STRATEGY", "background") +// eventsChannel := make(chan MockEvent, 1000) +// lambdaServer, apmServer, _, _ := initMockServers(eventsChannel) +// defer lambdaServer.Close() +// defer apmServer.Close() +// +// eventsChain := []MockEvent{ +// {Type: InvokeMultipleTransactionsOverload, APMServerBehavior: SlowResponse, ExecutionDuration: 0.01, Timeout: 5}, +// } +// eventQueueGenerator(eventsChain, eventsChannel) +// assert.NotPanics(t, main) +// // The test should not hang +// os.Setenv("ELASTIC_APM_SEND_STRATEGY", "syncflush") +//} + // Test if the flushed param does not cause a panic or an unexpected behavior func TestFlush(t *testing.T) { http.DefaultServeMux = new(http.ServeMux) @@ -367,19 +367,19 @@ func TestFlush(t *testing.T) { } // Test if there is no race condition between waitgroups (issue #128) -func TestWaitGroup(t *testing.T) { - http.DefaultServeMux = new(http.ServeMux) - log.Println("Multiple transactions") - - eventsChannel := make(chan MockEvent, 100) - lambdaServer, apmServer, apmServerLog, _ := initMockServers(eventsChannel) - defer lambdaServer.Close() - defer apmServer.Close() - - eventsChain := []MockEvent{ - {Type: InvokeWaitgroupsRace, APMServerBehavior: TimelyResponse, ExecutionDuration: 1, Timeout: 500}, - } - eventQueueGenerator(eventsChain, eventsChannel) - assert.NotPanics(t, main) - assert.True(t, strings.Contains(apmServerLog.Data, string(TimelyResponse))) -} +//func TestWaitGroup(t *testing.T) { +// http.DefaultServeMux = new(http.ServeMux) +// log.Println("Multiple transactions") +// +// eventsChannel := make(chan MockEvent, 100) +// lambdaServer, apmServer, apmServerLog, _ := initMockServers(eventsChannel) +// defer lambdaServer.Close() +// defer apmServer.Close() +// +// eventsChain := []MockEvent{ +// {Type: InvokeWaitgroupsRace, APMServerBehavior: TimelyResponse, ExecutionDuration: 1, Timeout: 500}, +// } +// eventQueueGenerator(eventsChain, eventsChannel) +// assert.NotPanics(t, main) +// assert.True(t, strings.Contains(apmServerLog.Data, string(TimelyResponse))) +//} From e42789d93426f7957fa66befaad302c1f228f880 Mon Sep 17 00:00:00 2001 From: Jean-Louis Voiseux <48380853+jlvoiseux@users.noreply.github.com> Date: Thu, 10 Mar 2022 16:23:25 +0100 Subject: [PATCH 16/24] Keep only test_full_channel --- apm-lambda-extension/main_test.go | 32 +++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/apm-lambda-extension/main_test.go b/apm-lambda-extension/main_test.go index 753ca442..318fb590 100644 --- a/apm-lambda-extension/main_test.go +++ b/apm-lambda-extension/main_test.go @@ -349,22 +349,22 @@ func TestFullChannel(t *testing.T) { //} // Test if the flushed param does not cause a panic or an unexpected behavior -func TestFlush(t *testing.T) { - http.DefaultServeMux = new(http.ServeMux) - log.Println("Flush Test") - - eventsChannel := make(chan MockEvent, 100) - lambdaServer, apmServer, apmServerLog, _ := initMockServers(eventsChannel) - defer lambdaServer.Close() - defer apmServer.Close() - - eventsChain := []MockEvent{ - {Type: InvokeStandardFlush, APMServerBehavior: TimelyResponse, ExecutionDuration: 1, Timeout: 5}, - } - eventQueueGenerator(eventsChain, eventsChannel) - assert.NotPanics(t, main) - assert.True(t, strings.Contains(apmServerLog.Data, string(TimelyResponse))) -} +//func TestFlush(t *testing.T) { +// http.DefaultServeMux = new(http.ServeMux) +// log.Println("Flush Test") +// +// eventsChannel := make(chan MockEvent, 100) +// lambdaServer, apmServer, apmServerLog, _ := initMockServers(eventsChannel) +// defer lambdaServer.Close() +// defer apmServer.Close() +// +// eventsChain := []MockEvent{ +// {Type: InvokeStandardFlush, APMServerBehavior: TimelyResponse, ExecutionDuration: 1, Timeout: 5}, +// } +// eventQueueGenerator(eventsChain, eventsChannel) +// assert.NotPanics(t, main) +// assert.True(t, strings.Contains(apmServerLog.Data, string(TimelyResponse))) +//} // Test if there is no race condition between waitgroups (issue #128) //func TestWaitGroup(t *testing.T) { From 8e5622af7dd93a35a5b1c047284c310eba1fd486 Mon Sep 17 00:00:00 2001 From: Jean-Louis Voiseux <48380853+jlvoiseux@users.noreply.github.com> Date: Thu, 10 Mar 2022 16:35:58 +0100 Subject: [PATCH 17/24] Add short delay to test_full_channel --- apm-lambda-extension/main_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/apm-lambda-extension/main_test.go b/apm-lambda-extension/main_test.go index 318fb590..9b38728b 100644 --- a/apm-lambda-extension/main_test.go +++ b/apm-lambda-extension/main_test.go @@ -167,6 +167,7 @@ func processMockEvent(currId string, event MockEvent, APMServer *httptest.Server }() } wg.Wait() + time.Sleep(time.Duration(event.ExecutionDuration) * time.Second) case Shutdown: reqData, _ := http.NewRequest("POST", "http://localhost:8200/intake/v2/events", bytes.NewBuffer([]byte(event.APMServerBehavior))) client.Do(reqData) From 7f1badd942e9ba0efda0afb72cd188b6f06a9883 Mon Sep 17 00:00:00 2001 From: Jean-Louis Voiseux <48380853+jlvoiseux@users.noreply.github.com> Date: Thu, 10 Mar 2022 16:43:59 +0100 Subject: [PATCH 18/24] Re insert all tests after CI validation --- apm-lambda-extension/main_test.go | 244 +++++++++++++++--------------- 1 file changed, 122 insertions(+), 122 deletions(-) diff --git a/apm-lambda-extension/main_test.go b/apm-lambda-extension/main_test.go index 9b38728b..b294b730 100644 --- a/apm-lambda-extension/main_test.go +++ b/apm-lambda-extension/main_test.go @@ -238,79 +238,79 @@ func eventQueueGenerator(inputQueue []MockEvent, eventsChannel chan MockEvent) { // TESTS // Test a nominal sequence of events (fast APM server, only one standard event) -//func TestStandardEventsChain(t *testing.T) { -// http.DefaultServeMux = new(http.ServeMux) -// log.Println("Standard Test") -// -// eventsChannel := make(chan MockEvent, 100) -// lambdaServer, apmServer, apmServerLog, _ := initMockServers(eventsChannel) -// defer lambdaServer.Close() -// defer apmServer.Close() -// -// eventsChain := []MockEvent{ -// {Type: InvokeStandard, APMServerBehavior: TimelyResponse, ExecutionDuration: 1, Timeout: 5}, -// } -// eventQueueGenerator(eventsChain, eventsChannel) -// assert.NotPanics(t, main) -// assert.True(t, strings.Contains(apmServerLog.Data, string(TimelyResponse))) -//} -// -//// Test what happens when the APM is down (timeout) -//func TestAPMServerDown(t *testing.T) { -// http.DefaultServeMux = new(http.ServeMux) -// log.Println("APM Server Down") -// -// eventsChannel := make(chan MockEvent, 100) -// lambdaServer, apmServer, apmServerLog, _ := initMockServers(eventsChannel) -// defer lambdaServer.Close() -// apmServer.Close() -// -// eventsChain := []MockEvent{ -// {Type: InvokeStandard, APMServerBehavior: TimelyResponse, ExecutionDuration: 1, Timeout: 5}, -// } -// eventQueueGenerator(eventsChain, eventsChannel) -// assert.NotPanics(t, main) -// assert.False(t, strings.Contains(apmServerLog.Data, string(TimelyResponse))) -//} -// -//// Test what happens when the APM hangs (timeout) -//func TestAPMServerHangs(t *testing.T) { -// http.DefaultServeMux = new(http.ServeMux) -// log.Println("APM Server Hangs") -// -// eventsChannel := make(chan MockEvent, 100) -// lambdaServer, apmServer, apmServerLog, hangChan := initMockServers(eventsChannel) -// defer lambdaServer.Close() -// defer apmServer.Close() -// -// eventsChain := []MockEvent{ -// {Type: InvokeStandard, APMServerBehavior: Hangs, ExecutionDuration: 1, Timeout: 5}, -// } -// eventQueueGenerator(eventsChain, eventsChannel) -// start := time.Now() -// assert.NotPanics(t, main) -// assert.False(t, strings.Contains(apmServerLog.Data, string(Hangs))) -// log.Printf("Success : test took %s", time.Since(start)) -// hangChan <- struct{}{} -//} -// -//// Test what happens when the APM crashes unexpectedly -//func TestAPMServerCrashesDuringExecution(t *testing.T) { -// http.DefaultServeMux = new(http.ServeMux) -// log.Println("APM Server Crashes during execution") -// -// eventsChannel := make(chan MockEvent, 100) -// lambdaServer, apmServer, apmServerLog, _ := initMockServers(eventsChannel) -// defer lambdaServer.Close() -// defer apmServer.Close() -// -// eventsChain := []MockEvent{ -// {Type: InvokeStandard, APMServerBehavior: Crashes, ExecutionDuration: 1, Timeout: 5}, -// } -// eventQueueGenerator(eventsChain, eventsChannel) -// assert.NotPanics(t, main) -// assert.False(t, strings.Contains(apmServerLog.Data, string(Crashes))) -//} +func TestStandardEventsChain(t *testing.T) { + http.DefaultServeMux = new(http.ServeMux) + log.Println("Standard Test") + + eventsChannel := make(chan MockEvent, 100) + lambdaServer, apmServer, apmServerLog, _ := initMockServers(eventsChannel) + defer lambdaServer.Close() + defer apmServer.Close() + + eventsChain := []MockEvent{ + {Type: InvokeStandard, APMServerBehavior: TimelyResponse, ExecutionDuration: 1, Timeout: 5}, + } + eventQueueGenerator(eventsChain, eventsChannel) + assert.NotPanics(t, main) + assert.True(t, strings.Contains(apmServerLog.Data, string(TimelyResponse))) +} + +// Test what happens when the APM is down (timeout) +func TestAPMServerDown(t *testing.T) { + http.DefaultServeMux = new(http.ServeMux) + log.Println("APM Server Down") + + eventsChannel := make(chan MockEvent, 100) + lambdaServer, apmServer, apmServerLog, _ := initMockServers(eventsChannel) + defer lambdaServer.Close() + apmServer.Close() + + eventsChain := []MockEvent{ + {Type: InvokeStandard, APMServerBehavior: TimelyResponse, ExecutionDuration: 1, Timeout: 5}, + } + eventQueueGenerator(eventsChain, eventsChannel) + assert.NotPanics(t, main) + assert.False(t, strings.Contains(apmServerLog.Data, string(TimelyResponse))) +} + +// Test what happens when the APM hangs (timeout) +func TestAPMServerHangs(t *testing.T) { + http.DefaultServeMux = new(http.ServeMux) + log.Println("APM Server Hangs") + + eventsChannel := make(chan MockEvent, 100) + lambdaServer, apmServer, apmServerLog, hangChan := initMockServers(eventsChannel) + defer lambdaServer.Close() + defer apmServer.Close() + + eventsChain := []MockEvent{ + {Type: InvokeStandard, APMServerBehavior: Hangs, ExecutionDuration: 1, Timeout: 5}, + } + eventQueueGenerator(eventsChain, eventsChannel) + start := time.Now() + assert.NotPanics(t, main) + assert.False(t, strings.Contains(apmServerLog.Data, string(Hangs))) + log.Printf("Success : test took %s", time.Since(start)) + hangChan <- struct{}{} +} + +// Test what happens when the APM crashes unexpectedly +func TestAPMServerCrashesDuringExecution(t *testing.T) { + http.DefaultServeMux = new(http.ServeMux) + log.Println("APM Server Crashes during execution") + + eventsChannel := make(chan MockEvent, 100) + lambdaServer, apmServer, apmServerLog, _ := initMockServers(eventsChannel) + defer lambdaServer.Close() + defer apmServer.Close() + + eventsChain := []MockEvent{ + {Type: InvokeStandard, APMServerBehavior: Crashes, ExecutionDuration: 1, Timeout: 5}, + } + eventQueueGenerator(eventsChain, eventsChannel) + assert.NotPanics(t, main) + assert.False(t, strings.Contains(apmServerLog.Data, string(Crashes))) +} // Test what happens when the APM Data channel is full func TestFullChannel(t *testing.T) { @@ -331,56 +331,56 @@ func TestFullChannel(t *testing.T) { } // Test what happens when the APM Data channel is full and the APM server slow (send strategy : background) -//func TestFullChannelSlowAPMServer(t *testing.T) { -// http.DefaultServeMux = new(http.ServeMux) -// log.Println("AgentData channel is full, and APM server is slow") -// os.Setenv("ELASTIC_APM_SEND_STRATEGY", "background") -// eventsChannel := make(chan MockEvent, 1000) -// lambdaServer, apmServer, _, _ := initMockServers(eventsChannel) -// defer lambdaServer.Close() -// defer apmServer.Close() -// -// eventsChain := []MockEvent{ -// {Type: InvokeMultipleTransactionsOverload, APMServerBehavior: SlowResponse, ExecutionDuration: 0.01, Timeout: 5}, -// } -// eventQueueGenerator(eventsChain, eventsChannel) -// assert.NotPanics(t, main) -// // The test should not hang -// os.Setenv("ELASTIC_APM_SEND_STRATEGY", "syncflush") -//} +func TestFullChannelSlowAPMServer(t *testing.T) { + http.DefaultServeMux = new(http.ServeMux) + log.Println("AgentData channel is full, and APM server is slow") + os.Setenv("ELASTIC_APM_SEND_STRATEGY", "background") + eventsChannel := make(chan MockEvent, 1000) + lambdaServer, apmServer, _, _ := initMockServers(eventsChannel) + defer lambdaServer.Close() + defer apmServer.Close() + + eventsChain := []MockEvent{ + {Type: InvokeMultipleTransactionsOverload, APMServerBehavior: SlowResponse, ExecutionDuration: 0.01, Timeout: 5}, + } + eventQueueGenerator(eventsChain, eventsChannel) + assert.NotPanics(t, main) + // The test should not hang + os.Setenv("ELASTIC_APM_SEND_STRATEGY", "syncflush") +} // Test if the flushed param does not cause a panic or an unexpected behavior -//func TestFlush(t *testing.T) { -// http.DefaultServeMux = new(http.ServeMux) -// log.Println("Flush Test") -// -// eventsChannel := make(chan MockEvent, 100) -// lambdaServer, apmServer, apmServerLog, _ := initMockServers(eventsChannel) -// defer lambdaServer.Close() -// defer apmServer.Close() -// -// eventsChain := []MockEvent{ -// {Type: InvokeStandardFlush, APMServerBehavior: TimelyResponse, ExecutionDuration: 1, Timeout: 5}, -// } -// eventQueueGenerator(eventsChain, eventsChannel) -// assert.NotPanics(t, main) -// assert.True(t, strings.Contains(apmServerLog.Data, string(TimelyResponse))) -//} +func TestFlush(t *testing.T) { + http.DefaultServeMux = new(http.ServeMux) + log.Println("Flush Test") + + eventsChannel := make(chan MockEvent, 100) + lambdaServer, apmServer, apmServerLog, _ := initMockServers(eventsChannel) + defer lambdaServer.Close() + defer apmServer.Close() + + eventsChain := []MockEvent{ + {Type: InvokeStandardFlush, APMServerBehavior: TimelyResponse, ExecutionDuration: 1, Timeout: 5}, + } + eventQueueGenerator(eventsChain, eventsChannel) + assert.NotPanics(t, main) + assert.True(t, strings.Contains(apmServerLog.Data, string(TimelyResponse))) +} // Test if there is no race condition between waitgroups (issue #128) -//func TestWaitGroup(t *testing.T) { -// http.DefaultServeMux = new(http.ServeMux) -// log.Println("Multiple transactions") -// -// eventsChannel := make(chan MockEvent, 100) -// lambdaServer, apmServer, apmServerLog, _ := initMockServers(eventsChannel) -// defer lambdaServer.Close() -// defer apmServer.Close() -// -// eventsChain := []MockEvent{ -// {Type: InvokeWaitgroupsRace, APMServerBehavior: TimelyResponse, ExecutionDuration: 1, Timeout: 500}, -// } -// eventQueueGenerator(eventsChain, eventsChannel) -// assert.NotPanics(t, main) -// assert.True(t, strings.Contains(apmServerLog.Data, string(TimelyResponse))) -//} +func TestWaitGroup(t *testing.T) { + http.DefaultServeMux = new(http.ServeMux) + log.Println("Multiple transactions") + + eventsChannel := make(chan MockEvent, 100) + lambdaServer, apmServer, apmServerLog, _ := initMockServers(eventsChannel) + defer lambdaServer.Close() + defer apmServer.Close() + + eventsChain := []MockEvent{ + {Type: InvokeWaitgroupsRace, APMServerBehavior: TimelyResponse, ExecutionDuration: 1, Timeout: 500}, + } + eventQueueGenerator(eventsChain, eventsChannel) + assert.NotPanics(t, main) + assert.True(t, strings.Contains(apmServerLog.Data, string(TimelyResponse))) +} From 90a737d522c45f302a52c042de00a3126fe70a72 Mon Sep 17 00:00:00 2001 From: Jean-Louis Voiseux <48380853+jlvoiseux@users.noreply.github.com> Date: Thu, 10 Mar 2022 17:03:25 +0100 Subject: [PATCH 19/24] Reorder tests and remove race in test_flush --- apm-lambda-extension/main_test.go | 73 ++++++++++++++++--------------- 1 file changed, 37 insertions(+), 36 deletions(-) diff --git a/apm-lambda-extension/main_test.go b/apm-lambda-extension/main_test.go index b294b730..0d29fcfa 100644 --- a/apm-lambda-extension/main_test.go +++ b/apm-lambda-extension/main_test.go @@ -142,6 +142,7 @@ func processMockEvent(currId string, event MockEvent, APMServer *httptest.Server time.Sleep(time.Duration(event.ExecutionDuration) * time.Second) reqData, _ := http.NewRequest("POST", "http://localhost:8200/intake/v2/events?flushed=true", bytes.NewBuffer([]byte(event.APMServerBehavior))) client.Do(reqData) + time.Sleep(time.Duration(event.ExecutionDuration) * time.Second) // Provide some time to flush in case the channel is full due to parallel tests case InvokeWaitgroupsRace: time.Sleep(time.Duration(event.ExecutionDuration) * time.Second) reqData0, _ := http.NewRequest("POST", "http://localhost:8200/intake/v2/events", bytes.NewBuffer([]byte(event.APMServerBehavior))) @@ -255,6 +256,42 @@ func TestStandardEventsChain(t *testing.T) { assert.True(t, strings.Contains(apmServerLog.Data, string(TimelyResponse))) } +// Test if the flushed param does not cause a panic or an unexpected behavior +func TestFlush(t *testing.T) { + http.DefaultServeMux = new(http.ServeMux) + log.Println("Flush Test") + + eventsChannel := make(chan MockEvent, 100) + lambdaServer, apmServer, apmServerLog, _ := initMockServers(eventsChannel) + defer lambdaServer.Close() + defer apmServer.Close() + + eventsChain := []MockEvent{ + {Type: InvokeStandardFlush, APMServerBehavior: TimelyResponse, ExecutionDuration: 1, Timeout: 5}, + } + eventQueueGenerator(eventsChain, eventsChannel) + assert.NotPanics(t, main) + assert.True(t, strings.Contains(apmServerLog.Data, string(TimelyResponse))) +} + +// Test if there is no race condition between waitgroups (issue #128) +func TestWaitGroup(t *testing.T) { + http.DefaultServeMux = new(http.ServeMux) + log.Println("Multiple transactions") + + eventsChannel := make(chan MockEvent, 100) + lambdaServer, apmServer, apmServerLog, _ := initMockServers(eventsChannel) + defer lambdaServer.Close() + defer apmServer.Close() + + eventsChain := []MockEvent{ + {Type: InvokeWaitgroupsRace, APMServerBehavior: TimelyResponse, ExecutionDuration: 1, Timeout: 500}, + } + eventQueueGenerator(eventsChain, eventsChannel) + assert.NotPanics(t, main) + assert.True(t, strings.Contains(apmServerLog.Data, string(TimelyResponse))) +} + // Test what happens when the APM is down (timeout) func TestAPMServerDown(t *testing.T) { http.DefaultServeMux = new(http.ServeMux) @@ -348,39 +385,3 @@ func TestFullChannelSlowAPMServer(t *testing.T) { // The test should not hang os.Setenv("ELASTIC_APM_SEND_STRATEGY", "syncflush") } - -// Test if the flushed param does not cause a panic or an unexpected behavior -func TestFlush(t *testing.T) { - http.DefaultServeMux = new(http.ServeMux) - log.Println("Flush Test") - - eventsChannel := make(chan MockEvent, 100) - lambdaServer, apmServer, apmServerLog, _ := initMockServers(eventsChannel) - defer lambdaServer.Close() - defer apmServer.Close() - - eventsChain := []MockEvent{ - {Type: InvokeStandardFlush, APMServerBehavior: TimelyResponse, ExecutionDuration: 1, Timeout: 5}, - } - eventQueueGenerator(eventsChain, eventsChannel) - assert.NotPanics(t, main) - assert.True(t, strings.Contains(apmServerLog.Data, string(TimelyResponse))) -} - -// Test if there is no race condition between waitgroups (issue #128) -func TestWaitGroup(t *testing.T) { - http.DefaultServeMux = new(http.ServeMux) - log.Println("Multiple transactions") - - eventsChannel := make(chan MockEvent, 100) - lambdaServer, apmServer, apmServerLog, _ := initMockServers(eventsChannel) - defer lambdaServer.Close() - defer apmServer.Close() - - eventsChain := []MockEvent{ - {Type: InvokeWaitgroupsRace, APMServerBehavior: TimelyResponse, ExecutionDuration: 1, Timeout: 500}, - } - eventQueueGenerator(eventsChain, eventsChannel) - assert.NotPanics(t, main) - assert.True(t, strings.Contains(apmServerLog.Data, string(TimelyResponse))) -} From f2ba2676b0c7bdb00e779c620d95a2e83e951285 Mon Sep 17 00:00:00 2001 From: Jean-Louis Voiseux <48380853+jlvoiseux@users.noreply.github.com> Date: Thu, 10 Mar 2022 17:16:02 +0100 Subject: [PATCH 20/24] Remove check on APM server log when using a full channel --- apm-lambda-extension/main_test.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/apm-lambda-extension/main_test.go b/apm-lambda-extension/main_test.go index 0d29fcfa..2945da65 100644 --- a/apm-lambda-extension/main_test.go +++ b/apm-lambda-extension/main_test.go @@ -32,7 +32,6 @@ func initMockServers(eventsChannel chan MockEvent) (*httptest.Server, *httptest. } switch APMServerBehavior(decompressedBytes) { case TimelyResponse: - log.Printf("Behavior SlowResponse with data: %s", string(decompressedBytes)) apmServerLog.Data += string(decompressedBytes) w.WriteHeader(http.StatusAccepted) case SlowResponse: @@ -142,7 +141,6 @@ func processMockEvent(currId string, event MockEvent, APMServer *httptest.Server time.Sleep(time.Duration(event.ExecutionDuration) * time.Second) reqData, _ := http.NewRequest("POST", "http://localhost:8200/intake/v2/events?flushed=true", bytes.NewBuffer([]byte(event.APMServerBehavior))) client.Do(reqData) - time.Sleep(time.Duration(event.ExecutionDuration) * time.Second) // Provide some time to flush in case the channel is full due to parallel tests case InvokeWaitgroupsRace: time.Sleep(time.Duration(event.ExecutionDuration) * time.Second) reqData0, _ := http.NewRequest("POST", "http://localhost:8200/intake/v2/events", bytes.NewBuffer([]byte(event.APMServerBehavior))) @@ -168,7 +166,6 @@ func processMockEvent(currId string, event MockEvent, APMServer *httptest.Server }() } wg.Wait() - time.Sleep(time.Duration(event.ExecutionDuration) * time.Second) case Shutdown: reqData, _ := http.NewRequest("POST", "http://localhost:8200/intake/v2/events", bytes.NewBuffer([]byte(event.APMServerBehavior))) client.Do(reqData) @@ -355,7 +352,7 @@ func TestFullChannel(t *testing.T) { log.Println("AgentData channel is full") eventsChannel := make(chan MockEvent, 1000) - lambdaServer, apmServer, apmServerLog, _ := initMockServers(eventsChannel) + lambdaServer, apmServer, _, _ := initMockServers(eventsChannel) defer lambdaServer.Close() defer apmServer.Close() @@ -364,7 +361,6 @@ func TestFullChannel(t *testing.T) { } eventQueueGenerator(eventsChain, eventsChannel) assert.NotPanics(t, main) - assert.True(t, strings.Contains(apmServerLog.Data, string(TimelyResponse))) } // Test what happens when the APM Data channel is full and the APM server slow (send strategy : background) From abbae270e7c38f6e8f24de62957ace2708db1b69 Mon Sep 17 00:00:00 2001 From: Jean-Louis Voiseux <48380853+jlvoiseux@users.noreply.github.com> Date: Tue, 15 Mar 2022 12:48:04 +0100 Subject: [PATCH 21/24] Dynamic attribution of lambda extension port --- apm-lambda-extension/e2e-testing/e2e_util.go | 15 ++++++++++ apm-lambda-extension/extension/process_env.go | 5 ++-- .../extension/process_env_test.go | 2 +- apm-lambda-extension/main_test.go | 29 ++++++++++++------- 4 files changed, 38 insertions(+), 13 deletions(-) diff --git a/apm-lambda-extension/e2e-testing/e2e_util.go b/apm-lambda-extension/e2e-testing/e2e_util.go index 6ca423a3..b4e11c90 100644 --- a/apm-lambda-extension/e2e-testing/e2e_util.go +++ b/apm-lambda-extension/e2e-testing/e2e_util.go @@ -10,6 +10,7 @@ import ( "io" "io/ioutil" "log" + "net" "net/http" "os" "os/exec" @@ -155,3 +156,17 @@ func GetDecompressedBytesFromRequest(req *http.Request) ([]byte, error) { return rawBytes, nil } } + +func GetFreePort() (int, error) { + addr, err := net.ResolveTCPAddr("tcp", "localhost:0") + if err != nil { + return 0, err + } + + l, err := net.ListenTCP("tcp", addr) + if err != nil { + return 0, err + } + defer l.Close() + return l.Addr().(*net.TCPAddr).Port, nil +} diff --git a/apm-lambda-extension/extension/process_env.go b/apm-lambda-extension/extension/process_env.go index dfb4a9a7..ac08b8a4 100644 --- a/apm-lambda-extension/extension/process_env.go +++ b/apm-lambda-extension/extension/process_env.go @@ -18,6 +18,7 @@ package extension import ( + "fmt" "log" "os" "strconv" @@ -91,13 +92,13 @@ func ProcessEnv() *extensionConfig { apmServerUrl: normalizedApmLambdaServer, apmServerSecretToken: os.Getenv("ELASTIC_APM_SECRET_TOKEN"), apmServerApiKey: os.Getenv("ELASTIC_APM_API_KEY"), - dataReceiverServerPort: os.Getenv("ELASTIC_APM_DATA_RECEIVER_SERVER_PORT"), + dataReceiverServerPort: fmt.Sprintf(":%s", os.Getenv("ELASTIC_APM_DATA_RECEIVER_SERVER_PORT")), SendStrategy: normalizedSendStrategy, dataReceiverTimeoutSeconds: dataReceiverTimeoutSeconds, DataForwarderTimeoutSeconds: dataForwarderTimeoutSeconds, } - if config.dataReceiverServerPort == "" { + if config.dataReceiverServerPort == ":" { config.dataReceiverServerPort = ":8200" } if config.apmServerUrl == "" { diff --git a/apm-lambda-extension/extension/process_env_test.go b/apm-lambda-extension/extension/process_env_test.go index 965f6e90..b8d61a0c 100644 --- a/apm-lambda-extension/extension/process_env_test.go +++ b/apm-lambda-extension/extension/process_env_test.go @@ -65,7 +65,7 @@ func TestProcessEnv(t *testing.T) { t.Fail() } - os.Setenv("ELASTIC_APM_DATA_RECEIVER_SERVER_PORT", ":8201") + os.Setenv("ELASTIC_APM_DATA_RECEIVER_SERVER_PORT", "8201") config = ProcessEnv() if config.dataReceiverServerPort != ":8201" { t.Log("Env port not set correctly") diff --git a/apm-lambda-extension/main_test.go b/apm-lambda-extension/main_test.go index 2945da65..ca3ebad3 100644 --- a/apm-lambda-extension/main_test.go +++ b/apm-lambda-extension/main_test.go @@ -6,6 +6,7 @@ import ( "elastic/apm-lambda-extension/extension" "elastic/apm-lambda-extension/logsapi" json "encoding/json" + "fmt" "github.com/google/uuid" "github.com/stretchr/testify/assert" "log" @@ -71,7 +72,7 @@ func initMockServers(eventsChannel chan MockEvent) (*httptest.Server, *httptest. select { case nextEvent := <-eventsChannel: sendNextEventInfo(w, currId, nextEvent) - go processMockEvent(currId, nextEvent, apmServer) + go processMockEvent(currId, nextEvent, os.Getenv("ELASTIC_APM_DATA_RECEIVER_SERVER_PORT")) default: finalShutDown := MockEvent{ Type: Shutdown, @@ -79,7 +80,7 @@ func initMockServers(eventsChannel chan MockEvent) (*httptest.Server, *httptest. Timeout: 0, } sendNextEventInfo(w, currId, finalShutDown) - go processMockEvent(currId, finalShutDown, apmServer) + go processMockEvent(currId, finalShutDown, os.Getenv("ELASTIC_APM_DATA_RECEIVER_SERVER_PORT")) } // Logs API subscription request case "/2020-08-15/logs": @@ -92,6 +93,13 @@ func initMockServers(eventsChannel chan MockEvent) (*httptest.Server, *httptest. os.Setenv("AWS_LAMBDA_RUNTIME_API", strippedLambdaURL) extensionClient = extension.NewClient(os.Getenv("AWS_LAMBDA_RUNTIME_API")) + // Find unused port for the extension to listen to + extensionPort, err := e2e_testing.GetFreePort() + if err != nil { + log.Printf("Could not find free port for the extension to listen on : %v", err) + } + os.Setenv("ELASTIC_APM_DATA_RECEIVER_SERVER_PORT", fmt.Sprint(extensionPort)) + return lambdaServer, apmServer, &apmServerLog, hangChan } @@ -126,7 +134,7 @@ type MockEvent struct { Timeout float64 } -func processMockEvent(currId string, event MockEvent, APMServer *httptest.Server) { +func processMockEvent(currId string, event MockEvent, extensionPort string) { sendLogEvent(currId, "platform.start") client := http.Client{} switch event.Type { @@ -134,17 +142,17 @@ func processMockEvent(currId string, event MockEvent, APMServer *httptest.Server time.Sleep(time.Duration(event.Timeout) * time.Second) case InvokeStandard: time.Sleep(time.Duration(event.ExecutionDuration) * time.Second) - req, _ := http.NewRequest("POST", "http://localhost:8200/intake/v2/events", bytes.NewBuffer([]byte(event.APMServerBehavior))) + req, _ := http.NewRequest("POST", fmt.Sprintf("http://localhost:%s/intake/v2/events", extensionPort), bytes.NewBuffer([]byte(event.APMServerBehavior))) res, _ := client.Do(req) log.Printf("Response seen by the agent : %d", res.StatusCode) case InvokeStandardFlush: time.Sleep(time.Duration(event.ExecutionDuration) * time.Second) - reqData, _ := http.NewRequest("POST", "http://localhost:8200/intake/v2/events?flushed=true", bytes.NewBuffer([]byte(event.APMServerBehavior))) + reqData, _ := http.NewRequest("POST", fmt.Sprintf("http://localhost:%s/intake/v2/events?flushed=true", extensionPort), bytes.NewBuffer([]byte(event.APMServerBehavior))) client.Do(reqData) case InvokeWaitgroupsRace: time.Sleep(time.Duration(event.ExecutionDuration) * time.Second) - reqData0, _ := http.NewRequest("POST", "http://localhost:8200/intake/v2/events", bytes.NewBuffer([]byte(event.APMServerBehavior))) - reqData1, _ := http.NewRequest("POST", "http://localhost:8200/intake/v2/events", bytes.NewBuffer([]byte(event.APMServerBehavior))) + reqData0, _ := http.NewRequest("POST", fmt.Sprintf("http://localhost:%s/intake/v2/events", extensionPort), bytes.NewBuffer([]byte(event.APMServerBehavior))) + reqData1, _ := http.NewRequest("POST", fmt.Sprintf("http://localhost:%s/intake/v2/events", extensionPort), bytes.NewBuffer([]byte(event.APMServerBehavior))) _, err := client.Do(reqData0) if err != nil { log.Println(err) @@ -160,14 +168,14 @@ func processMockEvent(currId string, event MockEvent, APMServer *httptest.Server go func() { wg.Add(1) time.Sleep(time.Duration(event.ExecutionDuration) * time.Second) - reqData, _ := http.NewRequest("POST", "http://localhost:8200/intake/v2/events", bytes.NewBuffer([]byte(event.APMServerBehavior))) + reqData, _ := http.NewRequest("POST", fmt.Sprintf("http://localhost:%s/intake/v2/events", extensionPort), bytes.NewBuffer([]byte(event.APMServerBehavior))) client.Do(reqData) wg.Done() }() } wg.Wait() case Shutdown: - reqData, _ := http.NewRequest("POST", "http://localhost:8200/intake/v2/events", bytes.NewBuffer([]byte(event.APMServerBehavior))) + reqData, _ := http.NewRequest("POST", fmt.Sprintf("http://localhost:%s/intake/v2/events", extensionPort), bytes.NewBuffer([]byte(event.APMServerBehavior))) client.Do(reqData) } sendLogEvent(currId, "platform.runtimeDone") @@ -352,7 +360,7 @@ func TestFullChannel(t *testing.T) { log.Println("AgentData channel is full") eventsChannel := make(chan MockEvent, 1000) - lambdaServer, apmServer, _, _ := initMockServers(eventsChannel) + lambdaServer, apmServer, apmServerLog, _ := initMockServers(eventsChannel) defer lambdaServer.Close() defer apmServer.Close() @@ -361,6 +369,7 @@ func TestFullChannel(t *testing.T) { } eventQueueGenerator(eventsChain, eventsChannel) assert.NotPanics(t, main) + assert.True(t, strings.Contains(apmServerLog.Data, string(TimelyResponse))) } // Test what happens when the APM Data channel is full and the APM server slow (send strategy : background) From 302f0604766039d479dad0e654a68542b8bef169 Mon Sep 17 00:00:00 2001 From: Jean-Louis Voiseux <48380853+jlvoiseux@users.noreply.github.com> Date: Tue, 15 Mar 2022 13:02:50 +0100 Subject: [PATCH 22/24] Change wait group structure to remove race condition --- apm-lambda-extension/main_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apm-lambda-extension/main_test.go b/apm-lambda-extension/main_test.go index ca3ebad3..28151863 100644 --- a/apm-lambda-extension/main_test.go +++ b/apm-lambda-extension/main_test.go @@ -165,8 +165,8 @@ func processMockEvent(currId string, event MockEvent, extensionPort string) { case InvokeMultipleTransactionsOverload: wg := sync.WaitGroup{} for i := 0; i < 200; i++ { + wg.Add(1) go func() { - wg.Add(1) time.Sleep(time.Duration(event.ExecutionDuration) * time.Second) reqData, _ := http.NewRequest("POST", fmt.Sprintf("http://localhost:%s/intake/v2/events", extensionPort), bytes.NewBuffer([]byte(event.APMServerBehavior))) client.Do(reqData) From 1c1a3393a16adeb4b79ccf3d5aabc04958153005 Mon Sep 17 00:00:00 2001 From: Jean-Louis Voiseux <48380853+jlvoiseux@users.noreply.github.com> Date: Tue, 15 Mar 2022 16:12:24 +0100 Subject: [PATCH 23/24] Remove APM post request upon shutdown --- apm-lambda-extension/main_test.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/apm-lambda-extension/main_test.go b/apm-lambda-extension/main_test.go index 28151863..2a1091ce 100644 --- a/apm-lambda-extension/main_test.go +++ b/apm-lambda-extension/main_test.go @@ -175,8 +175,6 @@ func processMockEvent(currId string, event MockEvent, extensionPort string) { } wg.Wait() case Shutdown: - reqData, _ := http.NewRequest("POST", fmt.Sprintf("http://localhost:%s/intake/v2/events", extensionPort), bytes.NewBuffer([]byte(event.APMServerBehavior))) - client.Do(reqData) } sendLogEvent(currId, "platform.runtimeDone") } From 0f58cb5954ac3541dd72c21a916768c6a70efcca Mon Sep 17 00:00:00 2001 From: Jean-Louis Voiseux <48380853+jlvoiseux@users.noreply.github.com> Date: Tue, 15 Mar 2022 16:30:50 +0100 Subject: [PATCH 24/24] Improve logging and utilities documentation --- apm-lambda-extension/e2e-testing/e2e_util.go | 12 ++++++++++++ apm-lambda-extension/extension/process_env_test.go | 12 ++++++------ 2 files changed, 18 insertions(+), 6 deletions(-) diff --git a/apm-lambda-extension/e2e-testing/e2e_util.go b/apm-lambda-extension/e2e-testing/e2e_util.go index b4e11c90..bccdc760 100644 --- a/apm-lambda-extension/e2e-testing/e2e_util.go +++ b/apm-lambda-extension/e2e-testing/e2e_util.go @@ -18,6 +18,8 @@ import ( "strings" ) +// GetEnvVarValueOrSetDefault retrieves the environment variable envVarName. +// If the desired variable is not defined, defaultVal is returned. func GetEnvVarValueOrSetDefault(envVarName string, defaultVal string) string { val := os.Getenv(envVarName) if val == "" { @@ -26,6 +28,8 @@ func GetEnvVarValueOrSetDefault(envVarName string, defaultVal string) string { return val } +// RunCommandInDir runs a shell command with a given set of args in a specified folder. +// The stderr and stdout can be enabled or disabled. func RunCommandInDir(command string, args []string, dir string, printOutput bool) { e := exec.Command(command, args...) if printOutput { @@ -53,6 +57,7 @@ func RunCommandInDir(command string, args []string, dir string, printOutput bool } +// FolderExists returns true if the specified folder exists, and false else. func FolderExists(path string) bool { _, err := os.Stat(path) if err == nil { @@ -61,12 +66,15 @@ func FolderExists(path string) bool { return false } +// ProcessError is a shorthand function to handle fatal errors, the idiomatic Go way. +// This should only be used for showstopping errors. func ProcessError(err error) { if err != nil { log.Panic(err) } } +// Unzip is a utility function that unzips a specified zip archive to a specified destination. func Unzip(archivePath string, destinationFolderPath string) { openedArchive, err := zip.OpenReader(archivePath) @@ -114,6 +122,7 @@ func Unzip(archivePath string, destinationFolderPath string) { } } +// IsStringInSlice is a utility function that checks if a slice of strings contains a specific string. func IsStringInSlice(a string, list []string) bool { for _, b := range list { if b == a { @@ -123,6 +132,8 @@ func IsStringInSlice(a string, list []string) bool { return false } +// GetDecompressedBytesFromRequest takes a HTTP request in argument and return the raw (decompressed) bytes of the body. +// The byte array can then be converted into a string for debugging / testing purposes. func GetDecompressedBytesFromRequest(req *http.Request) ([]byte, error) { var rawBytes []byte if req.Body != nil { @@ -157,6 +168,7 @@ func GetDecompressedBytesFromRequest(req *http.Request) ([]byte, error) { } } +// GetFreePort is a function that queries the kernel and obtains an unused port. func GetFreePort() (int, error) { addr, err := net.ResolveTCPAddr("tcp", "localhost:0") if err != nil { diff --git a/apm-lambda-extension/extension/process_env_test.go b/apm-lambda-extension/extension/process_env_test.go index b8d61a0c..60a92995 100644 --- a/apm-lambda-extension/extension/process_env_test.go +++ b/apm-lambda-extension/extension/process_env_test.go @@ -75,28 +75,28 @@ func TestProcessEnv(t *testing.T) { os.Setenv("ELASTIC_APM_DATA_RECEIVER_TIMEOUT_SECONDS", "10") config = ProcessEnv() if config.dataReceiverTimeoutSeconds != 10 { - t.Log("Timeout not set correctly") + t.Log("APM data receiver timeout not set correctly") t.Fail() } os.Setenv("ELASTIC_APM_DATA_RECEIVER_TIMEOUT_SECONDS", "foo") config = ProcessEnv() if config.dataReceiverTimeoutSeconds != 15 { - t.Log("Timeout not set correctly") + t.Log("APM data receiver timeout not set correctly") t.Fail() } os.Setenv("ELASTIC_APM_DATA_FORWARDER_TIMEOUT_SECONDS", "10") config = ProcessEnv() if config.DataForwarderTimeoutSeconds != 10 { - t.Log("Timeout not set correctly") + t.Log("APM data forwarder timeout not set correctly") t.Fail() } os.Setenv("ELASTIC_APM_DATA_FORWARDER_TIMEOUT_SECONDS", "foo") config = ProcessEnv() if config.DataForwarderTimeoutSeconds != 3 { - t.Log("Timeout not set correctly") + t.Log("APM data forwarder not set correctly") t.Fail() } @@ -110,14 +110,14 @@ func TestProcessEnv(t *testing.T) { os.Setenv("ELASTIC_APM_SEND_STRATEGY", "Background") config = ProcessEnv() if config.SendStrategy != "background" { - t.Log("Send strategy not set correctly") + t.Log("Background send strategy not set correctly") t.Fail() } os.Setenv("ELASTIC_APM_SEND_STRATEGY", "invalid") config = ProcessEnv() if config.SendStrategy != "syncflush" { - t.Log("Send strategy not set correctly") + t.Log("Syncflush send strategy not set correctly") t.Fail() } }