diff --git a/apm-lambda-extension/extension/http_server_test.go b/apm-lambda-extension/extension/http_server_test.go index 2a90ff45..61942fce 100644 --- a/apm-lambda-extension/extension/http_server_test.go +++ b/apm-lambda-extension/extension/http_server_test.go @@ -18,12 +18,14 @@ package extension import ( + "bytes" "io/ioutil" "net" "net/http" "net/http/httptest" "strings" "testing" + "time" "gotest.tools/assert" ) @@ -165,3 +167,94 @@ func Test_handleInfoRequest(t *testing.T) { assert.Equal(t, 202, resp.StatusCode) } } + +func Test_handleIntakeV2EventsQueryParam(t *testing.T) { + body := []byte(`{"metadata": {}`) + + AgentDoneSignal = make(chan struct{}) + + // Create apm server and handler + apmServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + })) + defer apmServer.Close() + + // Create extension config and start the server + dataChannel := make(chan AgentData, 100) + config := extensionConfig{ + apmServerUrl: apmServer.URL, + dataReceiverServerPort: ":1234", + dataReceiverTimeoutSeconds: 15, + } + + StartHttpServer(dataChannel, &config) + defer agentDataServer.Close() + + hosts, _ := net.LookupHost("localhost") + url := "http://" + hosts[0] + ":1234/intake/v2/events?flushed=true" + + // Create a request to send to the extension + req, err := http.NewRequest("POST", url, bytes.NewReader(body)) + if err != nil { + t.Logf("Could not create request") + } + + // Send the request to the extension + client := &http.Client{} + go func() { + _, err := client.Do(req) + if err != nil { + t.Logf("Error fetching %s, [%v]", agentDataServer.Addr, err) + t.Fail() + } + }() + + timer := time.NewTimer(1 * time.Second) + defer timer.Stop() + + select { + case <-AgentDoneSignal: + <-dataChannel + case <-timer.C: + t.Log("Timed out waiting for server to send FuncDone signal") + t.Fail() + } +} + +func Test_handleIntakeV2EventsNoQueryParam(t *testing.T) { + body := []byte(`{"metadata": {}`) + + // Create apm server and handler + apmServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + })) + defer apmServer.Close() + + // Create extension config and start the server + dataChannel := make(chan AgentData, 100) + config := extensionConfig{ + apmServerUrl: apmServer.URL, + dataReceiverServerPort: ":1234", + dataReceiverTimeoutSeconds: 15, + } + + StartHttpServer(dataChannel, &config) + defer agentDataServer.Close() + + hosts, _ := net.LookupHost("localhost") + url := "http://" + hosts[0] + ":1234/intake/v2/events" + + // Create a request to send to the extension + req, err := http.NewRequest("POST", url, bytes.NewReader(body)) + if err != nil { + t.Logf("Could not create request") + } + + // Send the request to the extension + client := &http.Client{} + resp, err := client.Do(req) + if err != nil { + t.Logf("Error fetching %s, [%v]", agentDataServer.Addr, err) + t.Fail() + } + <-dataChannel + assert.Equal(t, 202, resp.StatusCode) +} diff --git a/apm-lambda-extension/extension/route_handlers.go b/apm-lambda-extension/extension/route_handlers.go index 5c9ebbe8..0df6cd8d 100644 --- a/apm-lambda-extension/extension/route_handlers.go +++ b/apm-lambda-extension/extension/route_handlers.go @@ -29,6 +29,8 @@ type AgentData struct { ContentEncoding string } +var AgentDoneSignal chan struct{} + // URL: http://server/ func handleInfoRequest(apmServerUrl string) func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) { @@ -97,5 +99,9 @@ func handleIntakeV2Events(agentDataChan chan AgentData) func(w http.ResponseWrit } log.Println("Adding agent data to buffer to be sent to apm server") agentDataChan <- agentData + + if len(r.URL.Query()["flushed"]) > 0 && r.URL.Query()["flushed"][0] == "true" { + AgentDoneSignal <- struct{}{} + } } } diff --git a/apm-lambda-extension/main.go b/apm-lambda-extension/main.go index 42d1cc45..bf77d39f 100644 --- a/apm-lambda-extension/main.go +++ b/apm-lambda-extension/main.go @@ -60,12 +60,17 @@ func main() { // pulls ELASTIC_ env variable into globals for easy access config := extension.ProcessEnv() - // setup http server to receive data from agent - // and get a channel to listen for that data + // Create a channel to buffer apm agent data agentDataChannel := make(chan extension.AgentData, 100) + // Start http server to receive data from agent extension.StartHttpServer(agentDataChannel, config) + // Create a client to use for sending data to the apm server + client := &http.Client{ + Transport: http.DefaultTransport.(*http.Transport).Clone(), + } + // Make channel for collecting logs and create a HTTP server to listen for them logsChannel := make(chan logsapi.LogEvent) @@ -74,7 +79,7 @@ func main() { extensionClient.ExtensionID, []logsapi.EventType{logsapi.Platform}) if err != nil { - log.Printf("Could not subscribe to the logs API. Will instead flush APM data 100ms before the function deadline.") + log.Printf("Could not subscribe to the logs API.") } else { logsAPIListener, err := logsapi.NewLogsAPIHttpListener(logsChannel) if err != nil { @@ -88,9 +93,6 @@ func main() { } } - client := &http.Client{ - Transport: http.DefaultTransport.(*http.Transport).Clone(), - } for { select { case <-ctx.Done(): @@ -107,6 +109,17 @@ func main() { } log.Printf("Received event: %v\n", extension.PrettyPrint(event)) + // Make a channel for signaling that we received the agent flushed signal + extension.AgentDoneSignal = make(chan struct{}) + // Make a channel for signaling that we received the runtimeDone logs API event + runtimeDoneSignal := make(chan struct{}) + // Make a channel for signaling that the function invocation is complete + funcDone := make(chan struct{}) + + // Flush any APM data, in case waiting for the agentDone or runtimeDone signals + // timed out, the agent data wasn't available yet, and we got to the next event + extension.FlushAPMData(client, agentDataChannel, config) + // A shutdown event indicates the execution environment is shutting down. // This is usually due to inactivity. if event.EventType == extension.Shutdown { @@ -114,24 +127,14 @@ func main() { return } - // Flush any APM data, in case waiting for the runtimeDone event timed out, - // the agent data wasn't available yet, and we got to the next event - extension.FlushAPMData(client, agentDataChannel, config) - - // Make a channel for signaling that a runtimeDone event has been received - runtimeDone := make(chan struct{}) - - // Make a channel for signaling that that function invocation has completed - funcInvocDone := make(chan struct{}) - // Receive agent data as it comes in and post it to the APM server. // Stop checking for, and sending agent data when the function invocation // has completed, signaled via a channel. go func() { for { select { - case <-funcInvocDone: - log.Println("Function invocation is complete, not receiving any more agent data") + case <-funcDone: + log.Println("funcDone signal received, not processing any more agent data") return case agentData := <-agentDataChannel: err := extension.PostToApmServer(client, agentData, config) @@ -143,12 +146,12 @@ func main() { }() // Receive Logs API events - // Send to the runtimeDone channel to signal when a runtimeDone event is received + // Send to the runtimeDoneSignal channel to signal when a runtimeDone event is received go func() { for { select { - case <-funcInvocDone: - log.Println("Function invocation is complete, not receiving any more log events") + case <-funcDone: + log.Println("funcDone signal received, not processing any more log events") return case logEvent := <-logsChannel: log.Printf("Received log event %v\n", logEvent.Type) @@ -156,8 +159,8 @@ func main() { // to the id that came in via the Next API if logsapi.SubEventType(logEvent.Type) == logsapi.RuntimeDone { if logEvent.Record.RequestId == event.RequestID { - log.Printf("Received runtimeDone event %v", logEvent) - runtimeDone <- struct{}{} + log.Println("Received runtimeDone event for this function invocation") + runtimeDoneSignal <- struct{}{} return } else { log.Println("Log API runtimeDone event request id didn't match") @@ -167,7 +170,7 @@ func main() { } }() - // Calculate how long to wait for a runtimeDone event + // Calculate how long to wait for a runtimeDoneSignal or AgentDoneSignal signal flushDeadlineMs := event.DeadlineMs - 100 durationUntilFlushDeadline := time.Until(time.Unix(flushDeadlineMs/1000, 0)) @@ -176,17 +179,20 @@ func main() { defer timer.Stop() select { - case <-runtimeDone: - log.Println("Received runtimeDone event signal") + case <-extension.AgentDoneSignal: + log.Println("Received agent done signal") + case <-runtimeDoneSignal: + log.Println("Received runtimeDone signal") case <-timer.C: - log.Println("Time expired waiting for runtimeDone event") + log.Println("Time expired waiting for agent signal or runtimeDone event") } // Flush APM data now that the function invocation has completed extension.FlushAPMData(client, agentDataChannel, config) - // Signal that the function invocation has completed - close(funcInvocDone) + close(funcDone) + close(runtimeDoneSignal) + close(extension.AgentDoneSignal) } } }