Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 93 additions & 0 deletions apm-lambda-extension/extension/http_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
package extension

import (
"bytes"
"io/ioutil"
"net"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"

"gotest.tools/assert"
)
Expand Down Expand Up @@ -165,3 +167,94 @@ func Test_handleInfoRequest(t *testing.T) {
assert.Equal(t, 202, resp.StatusCode)
}
}

func Test_handleIntakeV2EventsQueryParam(t *testing.T) {
body := []byte(`{"metadata": {}`)

AgentDoneSignal = make(chan struct{})

// Create apm server and handler
apmServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
}))
defer apmServer.Close()

// Create extension config and start the server
dataChannel := make(chan AgentData, 100)
config := extensionConfig{
apmServerUrl: apmServer.URL,
dataReceiverServerPort: ":1234",
dataReceiverTimeoutSeconds: 15,
}

StartHttpServer(dataChannel, &config)
defer agentDataServer.Close()

hosts, _ := net.LookupHost("localhost")
url := "http://" + hosts[0] + ":1234/intake/v2/events?flushed=true"

// Create a request to send to the extension
req, err := http.NewRequest("POST", url, bytes.NewReader(body))
if err != nil {
t.Logf("Could not create request")
}

// Send the request to the extension
client := &http.Client{}
go func() {
_, err := client.Do(req)
if err != nil {
t.Logf("Error fetching %s, [%v]", agentDataServer.Addr, err)
t.Fail()
}
}()

timer := time.NewTimer(1 * time.Second)
defer timer.Stop()

select {
case <-AgentDoneSignal:
<-dataChannel
case <-timer.C:
t.Log("Timed out waiting for server to send FuncDone signal")
t.Fail()
}
}

func Test_handleIntakeV2EventsNoQueryParam(t *testing.T) {
body := []byte(`{"metadata": {}`)

// Create apm server and handler
apmServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
}))
defer apmServer.Close()

// Create extension config and start the server
dataChannel := make(chan AgentData, 100)
config := extensionConfig{
apmServerUrl: apmServer.URL,
dataReceiverServerPort: ":1234",
dataReceiverTimeoutSeconds: 15,
}

StartHttpServer(dataChannel, &config)
defer agentDataServer.Close()

hosts, _ := net.LookupHost("localhost")
url := "http://" + hosts[0] + ":1234/intake/v2/events"

// Create a request to send to the extension
req, err := http.NewRequest("POST", url, bytes.NewReader(body))
if err != nil {
t.Logf("Could not create request")
}

// Send the request to the extension
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
t.Logf("Error fetching %s, [%v]", agentDataServer.Addr, err)
t.Fail()
}
<-dataChannel
assert.Equal(t, 202, resp.StatusCode)
}
6 changes: 6 additions & 0 deletions apm-lambda-extension/extension/route_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ type AgentData struct {
ContentEncoding string
}

var AgentDoneSignal chan struct{}

// URL: http://server/
func handleInfoRequest(apmServerUrl string) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -97,5 +99,9 @@ func handleIntakeV2Events(agentDataChan chan AgentData) func(w http.ResponseWrit
}
log.Println("Adding agent data to buffer to be sent to apm server")
agentDataChan <- agentData

if len(r.URL.Query()["flushed"]) > 0 && r.URL.Query()["flushed"][0] == "true" {
AgentDoneSignal <- struct{}{}
}
}
}
64 changes: 35 additions & 29 deletions apm-lambda-extension/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,17 @@ func main() {
// pulls ELASTIC_ env variable into globals for easy access
config := extension.ProcessEnv()

// setup http server to receive data from agent
// and get a channel to listen for that data
// Create a channel to buffer apm agent data
agentDataChannel := make(chan extension.AgentData, 100)

// Start http server to receive data from agent
extension.StartHttpServer(agentDataChannel, config)

// Create a client to use for sending data to the apm server
client := &http.Client{
Transport: http.DefaultTransport.(*http.Transport).Clone(),
}

// Make channel for collecting logs and create a HTTP server to listen for them
logsChannel := make(chan logsapi.LogEvent)

Expand All @@ -74,7 +79,7 @@ func main() {
extensionClient.ExtensionID,
[]logsapi.EventType{logsapi.Platform})
if err != nil {
log.Printf("Could not subscribe to the logs API. Will instead flush APM data 100ms before the function deadline.")
log.Printf("Could not subscribe to the logs API.")
} else {
logsAPIListener, err := logsapi.NewLogsAPIHttpListener(logsChannel)
if err != nil {
Expand All @@ -88,9 +93,6 @@ func main() {
}
}

client := &http.Client{
Transport: http.DefaultTransport.(*http.Transport).Clone(),
}
for {
select {
case <-ctx.Done():
Expand All @@ -107,31 +109,32 @@ func main() {
}
log.Printf("Received event: %v\n", extension.PrettyPrint(event))

// Make a channel for signaling that we received the agent flushed signal
extension.AgentDoneSignal = make(chan struct{})
// Make a channel for signaling that we received the runtimeDone logs API event
runtimeDoneSignal := make(chan struct{})
// Make a channel for signaling that the function invocation is complete
funcDone := make(chan struct{})

// Flush any APM data, in case waiting for the agentDone or runtimeDone signals
// timed out, the agent data wasn't available yet, and we got to the next event
extension.FlushAPMData(client, agentDataChannel, config)

// A shutdown event indicates the execution environment is shutting down.
// This is usually due to inactivity.
if event.EventType == extension.Shutdown {
extension.ProcessShutdown()
return
}

// Flush any APM data, in case waiting for the runtimeDone event timed out,
// the agent data wasn't available yet, and we got to the next event
extension.FlushAPMData(client, agentDataChannel, config)

// Make a channel for signaling that a runtimeDone event has been received
runtimeDone := make(chan struct{})

// Make a channel for signaling that that function invocation has completed
funcInvocDone := make(chan struct{})

// Receive agent data as it comes in and post it to the APM server.
// Stop checking for, and sending agent data when the function invocation
// has completed, signaled via a channel.
go func() {
for {
select {
case <-funcInvocDone:
log.Println("Function invocation is complete, not receiving any more agent data")
case <-funcDone:
log.Println("funcDone signal received, not processing any more agent data")
return
case agentData := <-agentDataChannel:
err := extension.PostToApmServer(client, agentData, config)
Expand All @@ -143,21 +146,21 @@ func main() {
}()

// Receive Logs API events
// Send to the runtimeDone channel to signal when a runtimeDone event is received
// Send to the runtimeDoneSignal channel to signal when a runtimeDone event is received
go func() {
for {
select {
case <-funcInvocDone:
log.Println("Function invocation is complete, not receiving any more log events")
case <-funcDone:
log.Println("funcDone signal received, not processing any more log events")
return
case logEvent := <-logsChannel:
log.Printf("Received log event %v\n", logEvent.Type)
// Check the logEvent for runtimeDone and compare the RequestID
// to the id that came in via the Next API
if logsapi.SubEventType(logEvent.Type) == logsapi.RuntimeDone {
if logEvent.Record.RequestId == event.RequestID {
log.Printf("Received runtimeDone event %v", logEvent)
runtimeDone <- struct{}{}
log.Println("Received runtimeDone event for this function invocation")
runtimeDoneSignal <- struct{}{}
return
} else {
log.Println("Log API runtimeDone event request id didn't match")
Expand All @@ -167,7 +170,7 @@ func main() {
}
}()

// Calculate how long to wait for a runtimeDone event
// Calculate how long to wait for a runtimeDoneSignal or AgentDoneSignal signal
flushDeadlineMs := event.DeadlineMs - 100
durationUntilFlushDeadline := time.Until(time.Unix(flushDeadlineMs/1000, 0))

Expand All @@ -176,17 +179,20 @@ func main() {
defer timer.Stop()

select {
case <-runtimeDone:
log.Println("Received runtimeDone event signal")
case <-extension.AgentDoneSignal:
log.Println("Received agent done signal")
case <-runtimeDoneSignal:
log.Println("Received runtimeDone signal")
case <-timer.C:
log.Println("Time expired waiting for runtimeDone event")
log.Println("Time expired waiting for agent signal or runtimeDone event")
}

// Flush APM data now that the function invocation has completed
extension.FlushAPMData(client, agentDataChannel, config)

// Signal that the function invocation has completed
close(funcInvocDone)
close(funcDone)
close(runtimeDoneSignal)
close(extension.AgentDoneSignal)
}
}
}