From 8fd1f213a3fb67e99def2de0b21fcc9f7327345e Mon Sep 17 00:00:00 2001 From: Emily Stolfo Date: Thu, 25 Nov 2021 14:13:50 +0100 Subject: [PATCH 1/5] Signal that function is complete if query param flushed=true is received --- .../extension/route_handlers.go | 6 +++ apm-lambda-extension/main.go | 45 +++++++++---------- 2 files changed, 28 insertions(+), 23 deletions(-) diff --git a/apm-lambda-extension/extension/route_handlers.go b/apm-lambda-extension/extension/route_handlers.go index 5c9ebbe8..30f99bab 100644 --- a/apm-lambda-extension/extension/route_handlers.go +++ b/apm-lambda-extension/extension/route_handlers.go @@ -29,6 +29,8 @@ type AgentData struct { ContentEncoding string } +var FuncDone chan struct{} + // URL: http://server/ func handleInfoRequest(apmServerUrl string) func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) { @@ -97,5 +99,9 @@ func handleIntakeV2Events(agentDataChan chan AgentData) func(w http.ResponseWrit } log.Println("Adding agent data to buffer to be sent to apm server") agentDataChan <- agentData + + if r.URL.Query()["flushed"][0] == "true" { + FuncDone <- struct{}{} + } } } diff --git a/apm-lambda-extension/main.go b/apm-lambda-extension/main.go index 42d1cc45..a64c9f1d 100644 --- a/apm-lambda-extension/main.go +++ b/apm-lambda-extension/main.go @@ -60,12 +60,17 @@ func main() { // pulls ELASTIC_ env variable into globals for easy access config := extension.ProcessEnv() - // setup http server to receive data from agent - // and get a channel to listen for that data + // Create a channel to buffer apm agent data agentDataChannel := make(chan extension.AgentData, 100) + // Start http server to receive data from agent extension.StartHttpServer(agentDataChannel, config) + // Create a client to use for sending data to the apm server + client := &http.Client{ + Transport: http.DefaultTransport.(*http.Transport).Clone(), + } + // Make channel for collecting logs and create a HTTP server to listen for them logsChannel := make(chan logsapi.LogEvent) @@ -74,7 +79,7 @@ func main() { extensionClient.ExtensionID, []logsapi.EventType{logsapi.Platform}) if err != nil { - log.Printf("Could not subscribe to the logs API. Will instead flush APM data 100ms before the function deadline.") + log.Printf("Could not subscribe to the logs API.") } else { logsAPIListener, err := logsapi.NewLogsAPIHttpListener(logsChannel) if err != nil { @@ -88,9 +93,6 @@ func main() { } } - client := &http.Client{ - Transport: http.DefaultTransport.(*http.Transport).Clone(), - } for { select { case <-ctx.Done(): @@ -107,6 +109,13 @@ func main() { } log.Printf("Received event: %v\n", extension.PrettyPrint(event)) + // Make a channel for signaling that all apm data has been received + extension.FuncDone = make(chan struct{}) + + // Flush any APM data, in case waiting for the FuncDone signal timed out, + // the agent data wasn't available yet, and we got to the next event + extension.FlushAPMData(client, agentDataChannel, config) + // A shutdown event indicates the execution environment is shutting down. // This is usually due to inactivity. if event.EventType == extension.Shutdown { @@ -114,23 +123,13 @@ func main() { return } - // Flush any APM data, in case waiting for the runtimeDone event timed out, - // the agent data wasn't available yet, and we got to the next event - extension.FlushAPMData(client, agentDataChannel, config) - - // Make a channel for signaling that a runtimeDone event has been received - runtimeDone := make(chan struct{}) - - // Make a channel for signaling that that function invocation has completed - funcInvocDone := make(chan struct{}) - // Receive agent data as it comes in and post it to the APM server. // Stop checking for, and sending agent data when the function invocation // has completed, signaled via a channel. go func() { for { select { - case <-funcInvocDone: + case <-extension.FuncDone: log.Println("Function invocation is complete, not receiving any more agent data") return case agentData := <-agentDataChannel: @@ -147,7 +146,7 @@ func main() { go func() { for { select { - case <-funcInvocDone: + case <-extension.FuncDone: log.Println("Function invocation is complete, not receiving any more log events") return case logEvent := <-logsChannel: @@ -157,7 +156,7 @@ func main() { if logsapi.SubEventType(logEvent.Type) == logsapi.RuntimeDone { if logEvent.Record.RequestId == event.RequestID { log.Printf("Received runtimeDone event %v", logEvent) - runtimeDone <- struct{}{} + extension.FuncDone <- struct{}{} return } else { log.Println("Log API runtimeDone event request id didn't match") @@ -167,7 +166,7 @@ func main() { } }() - // Calculate how long to wait for a runtimeDone event + // Calculate how long to wait for a FuncDone signal flushDeadlineMs := event.DeadlineMs - 100 durationUntilFlushDeadline := time.Until(time.Unix(flushDeadlineMs/1000, 0)) @@ -176,8 +175,8 @@ func main() { defer timer.Stop() select { - case <-runtimeDone: - log.Println("Received runtimeDone event signal") + case <-extension.FuncDone: + log.Println("Received signal that function is complete") case <-timer.C: log.Println("Time expired waiting for runtimeDone event") } @@ -186,7 +185,7 @@ func main() { extension.FlushAPMData(client, agentDataChannel, config) // Signal that the function invocation has completed - close(funcInvocDone) + close(extension.FuncDone) } } } From 24246694b4526a1dde24055f585c4f5896b48c56 Mon Sep 17 00:00:00 2001 From: Emily Stolfo Date: Thu, 25 Nov 2021 14:19:10 +0100 Subject: [PATCH 2/5] Update comment --- apm-lambda-extension/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apm-lambda-extension/main.go b/apm-lambda-extension/main.go index a64c9f1d..81294d79 100644 --- a/apm-lambda-extension/main.go +++ b/apm-lambda-extension/main.go @@ -142,7 +142,7 @@ func main() { }() // Receive Logs API events - // Send to the runtimeDone channel to signal when a runtimeDone event is received + // Send to the FuncDone channel to signal when a runtimeDone event is received go func() { for { select { From 2f4db174fcd2050f1dac13b96fac2d2fac8bdbec Mon Sep 17 00:00:00 2001 From: Emily Stolfo Date: Tue, 30 Nov 2021 11:07:37 +0100 Subject: [PATCH 3/5] Check length of query param slice before accessing index --- .../extension/http_server_test.go | 48 +++++++++++++++++++ .../extension/route_handlers.go | 2 +- 2 files changed, 49 insertions(+), 1 deletion(-) diff --git a/apm-lambda-extension/extension/http_server_test.go b/apm-lambda-extension/extension/http_server_test.go index 2a90ff45..0847b676 100644 --- a/apm-lambda-extension/extension/http_server_test.go +++ b/apm-lambda-extension/extension/http_server_test.go @@ -82,7 +82,15 @@ func TestInfoProxy(t *testing.T) { } } +<<<<<<< HEAD func TestInfoProxyErrorStatusCode(t *testing.T) { +======= +func Test_handleIntakeV2EventsQueryParam(t *testing.T) { + body := []byte(`{"metadata": {}`) + + FuncDone = make(chan struct{}) + +>>>>>>> Check length of query param slice before accessing index // Create apm server and handler apmServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(401) @@ -165,3 +173,43 @@ func Test_handleInfoRequest(t *testing.T) { assert.Equal(t, 202, resp.StatusCode) } } + +func Test_handleIntakeV2EventsNoQueryParam(t *testing.T) { + body := []byte(`{"metadata": {}`) + + FuncDone = make(chan struct{}) + + // Create apm server and handler + apmServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + })) + defer apmServer.Close() + + // Create extension config and start the server + dataChannel := make(chan AgentData, 100) + config := extensionConfig{ + apmServerUrl: apmServer.URL, + dataReceiverServerPort: ":1234", + dataReceiverTimeoutSeconds: 15, + } + + StartHttpServer(dataChannel, &config) + defer agentDataServer.Close() + + hosts, _ := net.LookupHost("localhost") + url := "http://" + hosts[0] + ":1234/intake/v2/events" + + // Create a request to send to the extension + req, err := http.NewRequest("POST", url, bytes.NewReader(body)) + if err != nil { + t.Logf("Could not create request") + } + + // Send the request to the extension + client := &http.Client{} + resp, err := client.Do(req) + if err != nil { + t.Logf("Error fetching %s, [%v]", agentDataServer.Addr, err) + t.Fail() + } + assert.Equal(t, 202, resp.StatusCode) +} diff --git a/apm-lambda-extension/extension/route_handlers.go b/apm-lambda-extension/extension/route_handlers.go index 30f99bab..e23c2912 100644 --- a/apm-lambda-extension/extension/route_handlers.go +++ b/apm-lambda-extension/extension/route_handlers.go @@ -100,7 +100,7 @@ func handleIntakeV2Events(agentDataChan chan AgentData) func(w http.ResponseWrit log.Println("Adding agent data to buffer to be sent to apm server") agentDataChan <- agentData - if r.URL.Query()["flushed"][0] == "true" { + if len(r.URL.Query()["flushed"]) > 0 && r.URL.Query()["flushed"][0] == "true" { FuncDone <- struct{}{} } } From 3a0ce93a332f8e9e2327ceefe55e9f9037ccbed2 Mon Sep 17 00:00:00 2001 From: Emily Stolfo Date: Tue, 30 Nov 2021 22:33:24 +0100 Subject: [PATCH 4/5] Use three channels for coordination --- .../extension/http_server_test.go | 65 ++++++++++++++++--- .../extension/route_handlers.go | 4 +- apm-lambda-extension/main.go | 35 ++++++---- 3 files changed, 78 insertions(+), 26 deletions(-) diff --git a/apm-lambda-extension/extension/http_server_test.go b/apm-lambda-extension/extension/http_server_test.go index 0847b676..61942fce 100644 --- a/apm-lambda-extension/extension/http_server_test.go +++ b/apm-lambda-extension/extension/http_server_test.go @@ -18,12 +18,14 @@ package extension import ( + "bytes" "io/ioutil" "net" "net/http" "net/http/httptest" "strings" "testing" + "time" "gotest.tools/assert" ) @@ -82,15 +84,7 @@ func TestInfoProxy(t *testing.T) { } } -<<<<<<< HEAD func TestInfoProxyErrorStatusCode(t *testing.T) { -======= -func Test_handleIntakeV2EventsQueryParam(t *testing.T) { - body := []byte(`{"metadata": {}`) - - FuncDone = make(chan struct{}) - ->>>>>>> Check length of query param slice before accessing index // Create apm server and handler apmServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(401) @@ -174,10 +168,60 @@ func Test_handleInfoRequest(t *testing.T) { } } -func Test_handleIntakeV2EventsNoQueryParam(t *testing.T) { +func Test_handleIntakeV2EventsQueryParam(t *testing.T) { body := []byte(`{"metadata": {}`) - FuncDone = make(chan struct{}) + AgentDoneSignal = make(chan struct{}) + + // Create apm server and handler + apmServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + })) + defer apmServer.Close() + + // Create extension config and start the server + dataChannel := make(chan AgentData, 100) + config := extensionConfig{ + apmServerUrl: apmServer.URL, + dataReceiverServerPort: ":1234", + dataReceiverTimeoutSeconds: 15, + } + + StartHttpServer(dataChannel, &config) + defer agentDataServer.Close() + + hosts, _ := net.LookupHost("localhost") + url := "http://" + hosts[0] + ":1234/intake/v2/events?flushed=true" + + // Create a request to send to the extension + req, err := http.NewRequest("POST", url, bytes.NewReader(body)) + if err != nil { + t.Logf("Could not create request") + } + + // Send the request to the extension + client := &http.Client{} + go func() { + _, err := client.Do(req) + if err != nil { + t.Logf("Error fetching %s, [%v]", agentDataServer.Addr, err) + t.Fail() + } + }() + + timer := time.NewTimer(1 * time.Second) + defer timer.Stop() + + select { + case <-AgentDoneSignal: + <-dataChannel + case <-timer.C: + t.Log("Timed out waiting for server to send FuncDone signal") + t.Fail() + } +} + +func Test_handleIntakeV2EventsNoQueryParam(t *testing.T) { + body := []byte(`{"metadata": {}`) // Create apm server and handler apmServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -211,5 +255,6 @@ func Test_handleIntakeV2EventsNoQueryParam(t *testing.T) { t.Logf("Error fetching %s, [%v]", agentDataServer.Addr, err) t.Fail() } + <-dataChannel assert.Equal(t, 202, resp.StatusCode) } diff --git a/apm-lambda-extension/extension/route_handlers.go b/apm-lambda-extension/extension/route_handlers.go index e23c2912..0df6cd8d 100644 --- a/apm-lambda-extension/extension/route_handlers.go +++ b/apm-lambda-extension/extension/route_handlers.go @@ -29,7 +29,7 @@ type AgentData struct { ContentEncoding string } -var FuncDone chan struct{} +var AgentDoneSignal chan struct{} // URL: http://server/ func handleInfoRequest(apmServerUrl string) func(w http.ResponseWriter, r *http.Request) { @@ -101,7 +101,7 @@ func handleIntakeV2Events(agentDataChan chan AgentData) func(w http.ResponseWrit agentDataChan <- agentData if len(r.URL.Query()["flushed"]) > 0 && r.URL.Query()["flushed"][0] == "true" { - FuncDone <- struct{}{} + AgentDoneSignal <- struct{}{} } } } diff --git a/apm-lambda-extension/main.go b/apm-lambda-extension/main.go index 81294d79..7c78abbf 100644 --- a/apm-lambda-extension/main.go +++ b/apm-lambda-extension/main.go @@ -109,8 +109,12 @@ func main() { } log.Printf("Received event: %v\n", extension.PrettyPrint(event)) - // Make a channel for signaling that all apm data has been received - extension.FuncDone = make(chan struct{}) + // Make a channel for signaling that we received the agent flushed signal + extension.AgentDoneSignal = make(chan struct{}) + // Make a channel for signaling that we received the runtimeDone logs API event + runtimeDoneSignal := make(chan struct{}) + // Make a channel for signaling that the function invocation is complete + funcDone := make(chan struct{}) // Flush any APM data, in case waiting for the FuncDone signal timed out, // the agent data wasn't available yet, and we got to the next event @@ -129,8 +133,8 @@ func main() { go func() { for { select { - case <-extension.FuncDone: - log.Println("Function invocation is complete, not receiving any more agent data") + case <-funcDone: + log.Println("funcDone signal received, not processing any more agent data") return case agentData := <-agentDataChannel: err := extension.PostToApmServer(client, agentData, config) @@ -142,12 +146,12 @@ func main() { }() // Receive Logs API events - // Send to the FuncDone channel to signal when a runtimeDone event is received + // Send to the runtimeDoneSignal channel to signal when a runtimeDone event is received go func() { for { select { - case <-extension.FuncDone: - log.Println("Function invocation is complete, not receiving any more log events") + case <-funcDone: + log.Println("funcDone signal received, not processing any more log events") return case logEvent := <-logsChannel: log.Printf("Received log event %v\n", logEvent.Type) @@ -155,8 +159,8 @@ func main() { // to the id that came in via the Next API if logsapi.SubEventType(logEvent.Type) == logsapi.RuntimeDone { if logEvent.Record.RequestId == event.RequestID { - log.Printf("Received runtimeDone event %v", logEvent) - extension.FuncDone <- struct{}{} + log.Println("Received runtimeDone event for this function invocation") + runtimeDoneSignal <- struct{}{} return } else { log.Println("Log API runtimeDone event request id didn't match") @@ -175,17 +179,20 @@ func main() { defer timer.Stop() select { - case <-extension.FuncDone: - log.Println("Received signal that function is complete") + case <-extension.AgentDoneSignal: + log.Println("Received agent done signal") + case <-runtimeDoneSignal: + log.Println("Received runtimeDone signal") case <-timer.C: - log.Println("Time expired waiting for runtimeDone event") + log.Println("Time expired waiting for agent signal or runtimeDone event") } // Flush APM data now that the function invocation has completed extension.FlushAPMData(client, agentDataChannel, config) - // Signal that the function invocation has completed - close(extension.FuncDone) + close(funcDone) + close(runtimeDoneSignal) + close(extension.AgentDoneSignal) } } } From 5b2295b78d163d86488036ea2cf1f52a8a8e76ed Mon Sep 17 00:00:00 2001 From: Emily Stolfo Date: Wed, 1 Dec 2021 15:07:34 +0100 Subject: [PATCH 5/5] Minor updates to comments --- apm-lambda-extension/main.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/apm-lambda-extension/main.go b/apm-lambda-extension/main.go index 7c78abbf..bf77d39f 100644 --- a/apm-lambda-extension/main.go +++ b/apm-lambda-extension/main.go @@ -116,8 +116,8 @@ func main() { // Make a channel for signaling that the function invocation is complete funcDone := make(chan struct{}) - // Flush any APM data, in case waiting for the FuncDone signal timed out, - // the agent data wasn't available yet, and we got to the next event + // Flush any APM data, in case waiting for the agentDone or runtimeDone signals + // timed out, the agent data wasn't available yet, and we got to the next event extension.FlushAPMData(client, agentDataChannel, config) // A shutdown event indicates the execution environment is shutting down. @@ -170,7 +170,7 @@ func main() { } }() - // Calculate how long to wait for a FuncDone signal + // Calculate how long to wait for a runtimeDoneSignal or AgentDoneSignal signal flushDeadlineMs := event.DeadlineMs - 100 durationUntilFlushDeadline := time.Until(time.Unix(flushDeadlineMs/1000, 0))