Skip to content

Commit 77d6862

Browse files
committed
Signal that function is complete if query param flushed=true is received
1 parent 8e47191 commit 77d6862

File tree

3 files changed

+81
-23
lines changed

3 files changed

+81
-23
lines changed

apm-lambda-extension/extension/http_server_test.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,13 @@
1818
package extension
1919

2020
import (
21+
"bytes"
2122
"io/ioutil"
2223
"net"
2324
"net/http"
2425
"net/http/httptest"
2526
"testing"
27+
"time"
2628

2729
"gotest.tools/assert"
2830
)
@@ -78,3 +80,54 @@ func TestInfoProxy(t *testing.T) {
7880
resp.Body.Close()
7981
}
8082
}
83+
84+
func Test_handleIntakeV2EventsFlushed(t *testing.T) {
85+
body := []byte(`{"metadata": {}`)
86+
87+
FuncDone = make(chan struct{})
88+
89+
// Create apm server and handler
90+
apmServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
91+
}))
92+
defer apmServer.Close()
93+
94+
// Create extension config and start the server
95+
dataChannel := make(chan AgentData, 100)
96+
config := extensionConfig{
97+
apmServerUrl: apmServer.URL,
98+
dataReceiverServerPort: ":1234",
99+
dataReceiverTimeoutSeconds: 15,
100+
}
101+
102+
StartHttpServer(dataChannel, &config)
103+
defer agentDataServer.Close()
104+
105+
hosts, _ := net.LookupHost("localhost")
106+
url := "http://" + hosts[0] + ":1234/intake/v2/events?flushed=true"
107+
108+
// Create a request to send to the extension
109+
req, err := http.NewRequest("POST", url, bytes.NewReader(body))
110+
if err != nil {
111+
t.Logf("Could not create request")
112+
}
113+
114+
// Send the request to the extension
115+
client := &http.Client{}
116+
go func() {
117+
_, err := client.Do(req)
118+
if err != nil {
119+
t.Logf("Error fetching %s, [%v]", agentDataServer.Addr, err)
120+
t.Fail()
121+
}
122+
}()
123+
124+
timer := time.NewTimer(1 * time.Second)
125+
defer timer.Stop()
126+
127+
select {
128+
case <-FuncDone:
129+
case <-timer.C:
130+
t.Log("Timed out waiting for server to send FuncDone signal")
131+
t.Fail()
132+
}
133+
}

apm-lambda-extension/extension/route_handlers.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ type AgentData struct {
2828
ContentEncoding string
2929
}
3030

31+
var FuncDone chan struct{}
32+
3133
// URL: http://server/
3234
func handleInfoRequest(apmServerUrl string) func(w http.ResponseWriter, r *http.Request) {
3335
return func(w http.ResponseWriter, r *http.Request) {
@@ -97,5 +99,9 @@ func handleIntakeV2Events(agentDataChan chan AgentData) func(w http.ResponseWrit
9799
}
98100
log.Println("Adding agent data to buffer to be sent to apm server")
99101
agentDataChan <- agentData
102+
103+
if r.URL.Query()["flushed"][0] == "true" {
104+
FuncDone <- struct{}{}
105+
}
100106
}
101107
}

apm-lambda-extension/main.go

Lines changed: 22 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -60,12 +60,17 @@ func main() {
6060
// pulls ELASTIC_ env variable into globals for easy access
6161
config := extension.ProcessEnv()
6262

63-
// setup http server to receive data from agent
64-
// and get a channel to listen for that data
63+
// Create a channel to buffer apm agent data
6564
agentDataChannel := make(chan extension.AgentData, 100)
6665

66+
// Start http server to receive data from agent
6767
extension.StartHttpServer(agentDataChannel, config)
6868

69+
// Create a client to use for sending data to the apm server
70+
client := &http.Client{
71+
Transport: http.DefaultTransport.(*http.Transport).Clone(),
72+
}
73+
6974
// Make channel for collecting logs and create a HTTP server to listen for them
7075
logsChannel := make(chan logsapi.LogEvent)
7176

@@ -74,7 +79,7 @@ func main() {
7479
extensionClient.ExtensionID,
7580
[]logsapi.EventType{logsapi.Platform})
7681
if err != nil {
77-
log.Printf("Could not subscribe to the logs API. Will instead flush APM data 100ms before the function deadline.")
82+
log.Printf("Could not subscribe to the logs API.")
7883
} else {
7984
logsAPIListener, err := logsapi.NewLogsAPIHttpListener(logsChannel)
8085
if err != nil {
@@ -88,9 +93,6 @@ func main() {
8893
}
8994
}
9095

91-
client := &http.Client{
92-
Transport: http.DefaultTransport.(*http.Transport).Clone(),
93-
}
9496
for {
9597
select {
9698
case <-ctx.Done():
@@ -107,30 +109,27 @@ func main() {
107109
}
108110
log.Printf("Received event: %v\n", extension.PrettyPrint(event))
109111

112+
// Make a channel for signaling that all apm data has been received
113+
extension.FuncDone = make(chan struct{})
114+
115+
// Flush any APM data, in case waiting for the FuncDone signal timed out,
116+
// the agent data wasn't available yet, and we got to the next event
117+
extension.FlushAPMData(client, agentDataChannel, config)
118+
110119
// A shutdown event indicates the execution environment is shutting down.
111120
// This is usually due to inactivity.
112121
if event.EventType == extension.Shutdown {
113122
extension.ProcessShutdown()
114123
return
115124
}
116125

117-
// Flush any APM data, in case waiting for the runtimeDone event timed out,
118-
// the agent data wasn't available yet, and we got to the next event
119-
extension.FlushAPMData(client, agentDataChannel, config)
120-
121-
// Make a channel for signaling that a runtimeDone event has been received
122-
runtimeDone := make(chan struct{})
123-
124-
// Make a channel for signaling that that function invocation has completed
125-
funcInvocDone := make(chan struct{})
126-
127126
// Receive agent data as it comes in and post it to the APM server.
128127
// Stop checking for, and sending agent data when the function invocation
129128
// has completed, signaled via a channel.
130129
go func() {
131130
for {
132131
select {
133-
case <-funcInvocDone:
132+
case <-extension.FuncDone:
134133
log.Println("Function invocation is complete, not receiving any more agent data")
135134
return
136135
case agentData := <-agentDataChannel:
@@ -147,7 +146,7 @@ func main() {
147146
go func() {
148147
for {
149148
select {
150-
case <-funcInvocDone:
149+
case <-extension.FuncDone:
151150
log.Println("Function invocation is complete, not receiving any more log events")
152151
return
153152
case logEvent := <-logsChannel:
@@ -157,7 +156,7 @@ func main() {
157156
if logsapi.SubEventType(logEvent.Type) == logsapi.RuntimeDone {
158157
if logEvent.Record.RequestId == event.RequestID {
159158
log.Printf("Received runtimeDone event %v", logEvent)
160-
runtimeDone <- struct{}{}
159+
extension.FuncDone <- struct{}{}
161160
return
162161
} else {
163162
log.Println("Log API runtimeDone event request id didn't match")
@@ -167,7 +166,7 @@ func main() {
167166
}
168167
}()
169168

170-
// Calculate how long to wait for a runtimeDone event
169+
// Calculate how long to wait for a FuncDone signal
171170
flushDeadlineMs := event.DeadlineMs - 100
172171
durationUntilFlushDeadline := time.Until(time.Unix(flushDeadlineMs/1000, 0))
173172

@@ -176,8 +175,8 @@ func main() {
176175
defer timer.Stop()
177176

178177
select {
179-
case <-runtimeDone:
180-
log.Println("Received runtimeDone event signal")
178+
case <-extension.FuncDone:
179+
log.Println("Received signal that function is complete")
181180
case <-timer.C:
182181
log.Println("Time expired waiting for runtimeDone event")
183182
}
@@ -186,7 +185,7 @@ func main() {
186185
extension.FlushAPMData(client, agentDataChannel, config)
187186

188187
// Signal that the function invocation has completed
189-
close(funcInvocDone)
188+
close(extension.FuncDone)
190189
}
191190
}
192191
}

0 commit comments

Comments
 (0)