From 41f933eff9f59152fa75997fbd1c59339c158405 Mon Sep 17 00:00:00 2001 From: Emily Stolfo Date: Tue, 16 Nov 2021 15:30:12 +0100 Subject: [PATCH 01/11] Refactor route handling and server --- apm-lambda-extension/extension/http_server.go | 61 +++------- .../extension/http_server_test.go | 3 +- .../extension/route_handlers.go | 105 +++++++++--------- apm-lambda-extension/main.go | 2 +- 4 files changed, 76 insertions(+), 95 deletions(-) diff --git a/apm-lambda-extension/extension/http_server.go b/apm-lambda-extension/extension/http_server.go index 30bd45a8..349541c4 100644 --- a/apm-lambda-extension/extension/http_server.go +++ b/apm-lambda-extension/extension/http_server.go @@ -18,50 +18,25 @@ package extension import ( - "net" + "log" "net/http" - "time" ) -type serverHandler struct { - data chan AgentData - config *extensionConfig -} - -func (handler *serverHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - if r.URL.Path == "/intake/v2/events" { - handleIntakeV2Events(handler, w, r) - return - } - - if r.URL.Path == "/" { - handleInfoRequest(handler, w, r) - return - } - - // if we have not yet returned, 404 - w.WriteHeader(http.StatusNotFound) - w.Write([]byte("404")) - -} - -func NewHttpServer(dataChannel chan AgentData, config *extensionConfig) *http.Server { - var handler = serverHandler{data: dataChannel, config: config} - timeout := time.Duration(config.dataReceiverTimeoutSeconds) * time.Second - s := &http.Server{ - Addr: config.dataReceiverServerPort, - Handler: &handler, - ReadTimeout: timeout, - WriteTimeout: timeout, - MaxHeaderBytes: 1 << 20, - } - - addr := s.Addr - ln, err := net.Listen("tcp", addr) - if err != nil { - return s - } - go s.Serve(ln) - - return s +var extensionServer *http.Server + +func StartHttpServer(agentDataChan chan AgentData, config *extensionConfig) { + mux := http.NewServeMux() + mux.HandleFunc("/", handleInfoRequest(config.apmServerUrl)) + mux.HandleFunc("/intake/v2/events", handleIntakeV2Events(agentDataChan)) + extensionServer = &http.Server{Addr: config.dataReceiverServerPort, Handler: mux} + + go func() { + log.Printf("Extension liistening for apm data on %s", extensionServer.Addr) + err := extensionServer.ListenAndServe() + if err != http.ErrServerClosed { + log.Printf("Unexpected stop on Extension Server: %v", err) + } else { + log.Printf("Extension Server closed %v", err) + } + }() } diff --git a/apm-lambda-extension/extension/http_server_test.go b/apm-lambda-extension/extension/http_server_test.go index f765d875..2968911f 100644 --- a/apm-lambda-extension/extension/http_server_test.go +++ b/apm-lambda-extension/extension/http_server_test.go @@ -49,7 +49,8 @@ func TestInfoProxy(t *testing.T) { dataReceiverServerPort: "127.0.0.1:1234", dataReceiverTimeoutSeconds: 15, } - extensionServer := NewHttpServer(dataChannel, &config) + + StartHttpServer(dataChannel, &config) defer extensionServer.Close() // Create a request to send to the extension diff --git a/apm-lambda-extension/extension/route_handlers.go b/apm-lambda-extension/extension/route_handlers.go index 60d4373c..865fe1d2 100644 --- a/apm-lambda-extension/extension/route_handlers.go +++ b/apm-lambda-extension/extension/route_handlers.go @@ -29,68 +29,73 @@ type AgentData struct { } // URL: http://server/ -func handleInfoRequest(handler *serverHandler, w http.ResponseWriter, r *http.Request) { - client := &http.Client{} +func handleInfoRequest(apmServerUrl string) func(w http.ResponseWriter, r *http.Request) { + return func(w http.ResponseWriter, r *http.Request) { + client := &http.Client{} - req, err := http.NewRequest(r.Method, handler.config.apmServerUrl, nil) - //forward every header received - for name, values := range r.Header { - // Loop over all values for the name. - for _, value := range values { - req.Header.Set(name, value) + req, err := http.NewRequest(r.Method, apmServerUrl, nil) + //forward every header received + for name, values := range r.Header { + // Loop over all values for the name. + for _, value := range values { + req.Header.Set(name, value) + } + } + if err != nil { + log.Printf("could not create request object for %s:%s: %v", r.Method, apmServerUrl, err) + return } - } - if err != nil { - log.Printf("could not create request object for %s:%s: %v", r.Method, handler.config.apmServerUrl, err) - return - } - resp, err := client.Do(req) - if err != nil { - log.Printf("error forwarding info request (`/`) to APM Server: %v", err) - return - } - defer resp.Body.Close() - body, err := ioutil.ReadAll(resp.Body) - if err != nil { - log.Printf("could not read info request response to APM Server: %v", err) - return - } + resp, err := client.Do(req) + if err != nil { + log.Printf("error forwarding info request (`/`) to APM Server: %v", err) + return + } + defer resp.Body.Close() + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + log.Printf("could not read info request response to APM Server: %v", err) + return + } - // send status code - w.WriteHeader(resp.StatusCode) + // send status code + w.WriteHeader(resp.StatusCode) - // send every header received - for name, values := range resp.Header { - // Loop over all values for the name. - for _, value := range values { - w.Header().Add(name, value) + // send every header received + for name, values := range resp.Header { + // Loop over all values for the name. + for _, value := range values { + w.Header().Add(name, value) + } } + // send body + w.Write([]byte(body)) } - // send body - w.Write([]byte(body)) } // URL: http://server/intake/v2/events -func handleIntakeV2Events(handler *serverHandler, w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusAccepted) - w.Write([]byte("ok")) +func handleIntakeV2Events(agentDataChan chan AgentData) func(w http.ResponseWriter, r *http.Request) { - if r.Body == nil { - log.Println("Could not get bytes from agent request body") - return - } + return func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusAccepted) + w.Write([]byte("ok")) - rawBytes, err := ioutil.ReadAll(r.Body) - if err != nil { - log.Println("Could not read bytes from agent request body") - return - } + if r.Body == nil { + log.Println("No body in agent request") + return + } - agentData := AgentData{ - Data: rawBytes, - ContentEncoding: r.Header.Get("Content-Encoding"), + rawBytes, err := ioutil.ReadAll(r.Body) + if err != nil { + log.Println("Could not read bytes from agent request body") + return + } + + agentData := AgentData{ + Data: rawBytes, + ContentEncoding: r.Header.Get("Content-Encoding"), + } + log.Println("Adding agent data to buffer to be sent to apm server") + agentDataChan <- agentData } - log.Println("Adding agent data to buffer to be sent to apm server") - handler.data <- agentData } diff --git a/apm-lambda-extension/main.go b/apm-lambda-extension/main.go index 421d3231..42d1cc45 100644 --- a/apm-lambda-extension/main.go +++ b/apm-lambda-extension/main.go @@ -64,7 +64,7 @@ func main() { // and get a channel to listen for that data agentDataChannel := make(chan extension.AgentData, 100) - extension.NewHttpServer(agentDataChannel, config) + extension.StartHttpServer(agentDataChannel, config) // Make channel for collecting logs and create a HTTP server to listen for them logsChannel := make(chan logsapi.LogEvent) From 10520b1fd02ba8720ac6ea3ca9b4593a732be5c8 Mon Sep 17 00:00:00 2001 From: Emily Stolfo Date: Tue, 16 Nov 2021 16:53:50 +0100 Subject: [PATCH 02/11] Improve variable names --- apm-lambda-extension/extension/http_server.go | 8 ++++---- apm-lambda-extension/extension/http_server_test.go | 6 +++--- apm-lambda-extension/extension/process_events.go | 1 + 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/apm-lambda-extension/extension/http_server.go b/apm-lambda-extension/extension/http_server.go index 349541c4..1e10551a 100644 --- a/apm-lambda-extension/extension/http_server.go +++ b/apm-lambda-extension/extension/http_server.go @@ -22,17 +22,17 @@ import ( "net/http" ) -var extensionServer *http.Server +var agentDataServer *http.Server func StartHttpServer(agentDataChan chan AgentData, config *extensionConfig) { mux := http.NewServeMux() mux.HandleFunc("/", handleInfoRequest(config.apmServerUrl)) mux.HandleFunc("/intake/v2/events", handleIntakeV2Events(agentDataChan)) - extensionServer = &http.Server{Addr: config.dataReceiverServerPort, Handler: mux} + agentDataServer = &http.Server{Addr: config.dataReceiverServerPort, Handler: mux} go func() { - log.Printf("Extension liistening for apm data on %s", extensionServer.Addr) - err := extensionServer.ListenAndServe() + log.Printf("Extension listening for apm data on %s", agentDataServer.Addr) + err := agentDataServer.ListenAndServe() if err != http.ErrServerClosed { log.Printf("Unexpected stop on Extension Server: %v", err) } else { diff --git a/apm-lambda-extension/extension/http_server_test.go b/apm-lambda-extension/extension/http_server_test.go index 2968911f..47eb1103 100644 --- a/apm-lambda-extension/extension/http_server_test.go +++ b/apm-lambda-extension/extension/http_server_test.go @@ -51,11 +51,11 @@ func TestInfoProxy(t *testing.T) { } StartHttpServer(dataChannel, &config) - defer extensionServer.Close() + defer agentDataServer.Close() // Create a request to send to the extension client := &http.Client{} - url := "http://" + extensionServer.Addr + url := "http://" + agentDataServer.Addr req, err := http.NewRequest("GET", url, nil) if err != nil { t.Logf("Could not create request") @@ -67,7 +67,7 @@ func TestInfoProxy(t *testing.T) { // Send the request to the extension resp, err := client.Do(req) if err != nil { - t.Logf("Error fetching %s, [%v]", extensionServer.Addr, err) + t.Logf("Error fetching %s, [%v]", agentDataServer.Addr, err) t.Fail() } else { body, _ := ioutil.ReadAll(resp.Body) diff --git a/apm-lambda-extension/extension/process_events.go b/apm-lambda-extension/extension/process_events.go index 1debb836..e1585bf1 100644 --- a/apm-lambda-extension/extension/process_events.go +++ b/apm-lambda-extension/extension/process_events.go @@ -26,6 +26,7 @@ import ( func ProcessShutdown() { log.Println("Received SHUTDOWN event") log.Println("Exiting") + agentDataServer.Close() } func FlushAPMData(client *http.Client, dataChannel chan AgentData, config *extensionConfig) { From e0d7ac122568b8b3efae1d8d6d143c048f0ddea7 Mon Sep 17 00:00:00 2001 From: Emily Stolfo Date: Tue, 16 Nov 2021 17:22:03 +0100 Subject: [PATCH 03/11] Use localhost instead of ip --- apm-lambda-extension/extension/http_server_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apm-lambda-extension/extension/http_server_test.go b/apm-lambda-extension/extension/http_server_test.go index 47eb1103..fc58cd19 100644 --- a/apm-lambda-extension/extension/http_server_test.go +++ b/apm-lambda-extension/extension/http_server_test.go @@ -46,7 +46,7 @@ func TestInfoProxy(t *testing.T) { apmServerUrl: apmServer.URL, apmServerSecretToken: "foo", apmServerApiKey: "bar", - dataReceiverServerPort: "127.0.0.1:1234", + dataReceiverServerPort: "localhost:1234", dataReceiverTimeoutSeconds: 15, } From 26d301a400d57720d4b86b8b5e5a11e4e57bd474 Mon Sep 17 00:00:00 2001 From: Emily Stolfo Date: Wed, 17 Nov 2021 14:45:32 +0100 Subject: [PATCH 04/11] http.Server Addr is the TCP address for the server to listen on --- apm-lambda-extension/extension/http_server_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apm-lambda-extension/extension/http_server_test.go b/apm-lambda-extension/extension/http_server_test.go index fc58cd19..f2be5795 100644 --- a/apm-lambda-extension/extension/http_server_test.go +++ b/apm-lambda-extension/extension/http_server_test.go @@ -46,7 +46,7 @@ func TestInfoProxy(t *testing.T) { apmServerUrl: apmServer.URL, apmServerSecretToken: "foo", apmServerApiKey: "bar", - dataReceiverServerPort: "localhost:1234", + dataReceiverServerPort: "127.0.0.1:4567", dataReceiverTimeoutSeconds: 15, } From f33cdf5b64552f0971f591ea99b1344e7b13d67c Mon Sep 17 00:00:00 2001 From: Emily Stolfo Date: Wed, 17 Nov 2021 15:22:52 +0100 Subject: [PATCH 05/11] Just provide the port for the http server --- apm-lambda-extension/extension/http_server_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apm-lambda-extension/extension/http_server_test.go b/apm-lambda-extension/extension/http_server_test.go index f2be5795..3ca4c8c6 100644 --- a/apm-lambda-extension/extension/http_server_test.go +++ b/apm-lambda-extension/extension/http_server_test.go @@ -46,7 +46,7 @@ func TestInfoProxy(t *testing.T) { apmServerUrl: apmServer.URL, apmServerSecretToken: "foo", apmServerApiKey: "bar", - dataReceiverServerPort: "127.0.0.1:4567", + dataReceiverServerPort: ":4567", dataReceiverTimeoutSeconds: 15, } From e44b857ebeb2166b46675f5cebdb7a22358ce132 Mon Sep 17 00:00:00 2001 From: Emily Stolfo Date: Wed, 17 Nov 2021 15:41:56 +0100 Subject: [PATCH 06/11] Add localhost to url in test --- apm-lambda-extension/extension/http_server_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apm-lambda-extension/extension/http_server_test.go b/apm-lambda-extension/extension/http_server_test.go index 3ca4c8c6..69724720 100644 --- a/apm-lambda-extension/extension/http_server_test.go +++ b/apm-lambda-extension/extension/http_server_test.go @@ -55,7 +55,7 @@ func TestInfoProxy(t *testing.T) { // Create a request to send to the extension client := &http.Client{} - url := "http://" + agentDataServer.Addr + url := "http://localhost" + agentDataServer.Addr req, err := http.NewRequest("GET", url, nil) if err != nil { t.Logf("Could not create request") From 1f22ae56989c037d45e4d93c3d3dbde2cbb1e9d4 Mon Sep 17 00:00:00 2001 From: Emily Stolfo Date: Wed, 17 Nov 2021 16:01:59 +0100 Subject: [PATCH 07/11] Don't reuse the server Addr when making request --- apm-lambda-extension/extension/http_server_test.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/apm-lambda-extension/extension/http_server_test.go b/apm-lambda-extension/extension/http_server_test.go index 69724720..f6228523 100644 --- a/apm-lambda-extension/extension/http_server_test.go +++ b/apm-lambda-extension/extension/http_server_test.go @@ -46,7 +46,7 @@ func TestInfoProxy(t *testing.T) { apmServerUrl: apmServer.URL, apmServerSecretToken: "foo", apmServerApiKey: "bar", - dataReceiverServerPort: ":4567", + dataReceiverServerPort: ":1234", dataReceiverTimeoutSeconds: 15, } @@ -55,8 +55,7 @@ func TestInfoProxy(t *testing.T) { // Create a request to send to the extension client := &http.Client{} - url := "http://localhost" + agentDataServer.Addr - req, err := http.NewRequest("GET", url, nil) + req, err := http.NewRequest("GET", "http://localhost:1234", nil) if err != nil { t.Logf("Could not create request") } From 053344082bdcd8929cb8c9e0c3607a44296e2667 Mon Sep 17 00:00:00 2001 From: Emily Stolfo Date: Wed, 17 Nov 2021 16:30:53 +0100 Subject: [PATCH 08/11] Use LookupHost to get localhost ip --- apm-lambda-extension/extension/http_server_test.go | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/apm-lambda-extension/extension/http_server_test.go b/apm-lambda-extension/extension/http_server_test.go index f6228523..6c9c0954 100644 --- a/apm-lambda-extension/extension/http_server_test.go +++ b/apm-lambda-extension/extension/http_server_test.go @@ -19,6 +19,7 @@ package extension import ( "io/ioutil" + "net" "net/http" "net/http/httptest" "testing" @@ -29,6 +30,7 @@ import ( func TestInfoProxy(t *testing.T) { headers := map[string]string{"Authorization": "test-value"} wantResp := "{\"foo\": \"bar\"}" + var url string // Create apm server and handler apmServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -53,9 +55,15 @@ func TestInfoProxy(t *testing.T) { StartHttpServer(dataChannel, &config) defer agentDataServer.Close() + hosts, _ := net.LookupHost("localhost") + if hosts[0] == "::1" { + url = "http://" + "[::1]" + ":1234" + } else { + url = "http://" + hosts[0] + ":1234" + } + // Create a request to send to the extension - client := &http.Client{} - req, err := http.NewRequest("GET", "http://localhost:1234", nil) + req, err := http.NewRequest("GET", url, nil) if err != nil { t.Logf("Could not create request") } @@ -64,6 +72,7 @@ func TestInfoProxy(t *testing.T) { } // Send the request to the extension + client := &http.Client{} resp, err := client.Do(req) if err != nil { t.Logf("Error fetching %s, [%v]", agentDataServer.Addr, err) From 07d092862ad9dffe3840d43ea615bb3cbcecd2ce Mon Sep 17 00:00:00 2001 From: Emily Stolfo Date: Wed, 17 Nov 2021 16:50:24 +0100 Subject: [PATCH 09/11] Try with a sleep --- apm-lambda-extension/extension/http_server_test.go | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/apm-lambda-extension/extension/http_server_test.go b/apm-lambda-extension/extension/http_server_test.go index 6c9c0954..28aae1f4 100644 --- a/apm-lambda-extension/extension/http_server_test.go +++ b/apm-lambda-extension/extension/http_server_test.go @@ -23,6 +23,7 @@ import ( "net/http" "net/http/httptest" "testing" + "time" "gotest.tools/assert" ) @@ -30,7 +31,6 @@ import ( func TestInfoProxy(t *testing.T) { headers := map[string]string{"Authorization": "test-value"} wantResp := "{\"foo\": \"bar\"}" - var url string // Create apm server and handler apmServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -56,11 +56,7 @@ func TestInfoProxy(t *testing.T) { defer agentDataServer.Close() hosts, _ := net.LookupHost("localhost") - if hosts[0] == "::1" { - url = "http://" + "[::1]" + ":1234" - } else { - url = "http://" + hosts[0] + ":1234" - } + url := "http://" + hosts[0] + ":1234" // Create a request to send to the extension req, err := http.NewRequest("GET", url, nil) @@ -71,6 +67,7 @@ func TestInfoProxy(t *testing.T) { req.Header.Add(name, value) } + time.Sleep(2 * time.Second) // Send the request to the extension client := &http.Client{} resp, err := client.Do(req) From f20f82ba7e8be4236dabccf24873cf899e8c20d0 Mon Sep 17 00:00:00 2001 From: Emily Stolfo Date: Thu, 18 Nov 2021 13:05:57 +0100 Subject: [PATCH 10/11] Use listen and server separately --- apm-lambda-extension/extension/http_server.go | 16 +++++++++------- .../extension/http_server_test.go | 2 -- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/apm-lambda-extension/extension/http_server.go b/apm-lambda-extension/extension/http_server.go index 1e10551a..89a3e2df 100644 --- a/apm-lambda-extension/extension/http_server.go +++ b/apm-lambda-extension/extension/http_server.go @@ -19,24 +19,26 @@ package extension import ( "log" + "net" "net/http" ) var agentDataServer *http.Server -func StartHttpServer(agentDataChan chan AgentData, config *extensionConfig) { +func StartHttpServer(agentDataChan chan AgentData, config *extensionConfig) (err error) { mux := http.NewServeMux() mux.HandleFunc("/", handleInfoRequest(config.apmServerUrl)) mux.HandleFunc("/intake/v2/events", handleIntakeV2Events(agentDataChan)) agentDataServer = &http.Server{Addr: config.dataReceiverServerPort, Handler: mux} + ln, err := net.Listen("tcp", agentDataServer.Addr) + if err != nil { + return + } + go func() { + agentDataServer.Serve(ln) log.Printf("Extension listening for apm data on %s", agentDataServer.Addr) - err := agentDataServer.ListenAndServe() - if err != http.ErrServerClosed { - log.Printf("Unexpected stop on Extension Server: %v", err) - } else { - log.Printf("Extension Server closed %v", err) - } }() + return nil } diff --git a/apm-lambda-extension/extension/http_server_test.go b/apm-lambda-extension/extension/http_server_test.go index 28aae1f4..3a914cd8 100644 --- a/apm-lambda-extension/extension/http_server_test.go +++ b/apm-lambda-extension/extension/http_server_test.go @@ -23,7 +23,6 @@ import ( "net/http" "net/http/httptest" "testing" - "time" "gotest.tools/assert" ) @@ -67,7 +66,6 @@ func TestInfoProxy(t *testing.T) { req.Header.Add(name, value) } - time.Sleep(2 * time.Second) // Send the request to the extension client := &http.Client{} resp, err := client.Do(req) From 1d49a367fceced53384a79d351b8a728fe3f8476 Mon Sep 17 00:00:00 2001 From: Emily Stolfo Date: Thu, 18 Nov 2021 13:34:49 +0100 Subject: [PATCH 11/11] Move log line, as Serve blocks --- apm-lambda-extension/extension/http_server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apm-lambda-extension/extension/http_server.go b/apm-lambda-extension/extension/http_server.go index 89a3e2df..3df5d9e3 100644 --- a/apm-lambda-extension/extension/http_server.go +++ b/apm-lambda-extension/extension/http_server.go @@ -37,8 +37,8 @@ func StartHttpServer(agentDataChan chan AgentData, config *extensionConfig) (err } go func() { - agentDataServer.Serve(ln) log.Printf("Extension listening for apm data on %s", agentDataServer.Addr) + agentDataServer.Serve(ln) }() return nil }