Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 28 additions & 26 deletions apm-lambda-extension/extension/apm_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,53 +21,55 @@ import (
"bytes"
"compress/gzip"
"fmt"
"io"
"io/ioutil"
"log"
"net/http"
"sync"
)

var bufferPool = sync.Pool{New: func() interface{} {
return &bytes.Buffer{}
}}

// todo: can this be a streaming or streaming style call that keeps the
// connection open across invocations?
func PostToApmServer(agentData AgentData, config *extensionConfig) error {
endpointUri := "intake/v2/events"
var req *http.Request
var err error
func PostToApmServer(client *http.Client, agentData AgentData, config *extensionConfig) error {
endpointURI := "intake/v2/events"
encoding := agentData.ContentEncoding
buf := bufferPool.Get().(*bytes.Buffer)
defer func() {
buf.Reset()
bufferPool.Put(buf)
}()

if agentData.ContentEncoding == "" {
pr, pw := io.Pipe()
gw, _ := gzip.NewWriterLevel(pw, gzip.BestSpeed)

go func() {
_, err = io.Copy(gw, bytes.NewReader(agentData.Data))
gw.Close()
pw.Close()
if err != nil {
log.Printf("Failed to compress data: %v", err)
}
}()

req, err = http.NewRequest("POST", config.apmServerUrl+endpointUri, pr)
encoding = "gzip"
gw, err := gzip.NewWriterLevel(buf, gzip.BestSpeed)
if err != nil {
return fmt.Errorf("failed to create a new request when posting to APM server: %v", err)
return err
}
req.Header.Add("Content-Encoding", "gzip")
} else {
req, err = http.NewRequest("POST", config.apmServerUrl+endpointUri, bytes.NewReader(agentData.Data))
if err != nil {
return fmt.Errorf("failed to create a new request when posting to APM server: %v", err)
if _, err := gw.Write(agentData.Data); err != nil {
log.Printf("Failed to compress data: %v", err)
}
req.Header.Add("Content-Encoding", agentData.ContentEncoding)
if err := gw.Close(); err != nil {
log.Printf("Failed write compressed data to buffer: %v", err)
}
} else {
buf.Write(agentData.Data)
}

req, err := http.NewRequest("POST", config.apmServerUrl+endpointURI, buf)
if err != nil {
return fmt.Errorf("failed to create a new request when posting to APM server: %v", err)
}
req.Header.Add("Content-Encoding", encoding)
req.Header.Add("Content-Type", "application/x-ndjson")
if config.apmServerApiKey != "" {
req.Header.Add("Authorization", "ApiKey "+config.apmServerApiKey)
} else if config.apmServerSecretToken != "" {
req.Header.Add("Authorization", "Bearer "+config.apmServerSecretToken)
}

client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("failed to post to APM server: %v", err)
Expand Down
38 changes: 36 additions & 2 deletions apm-lambda-extension/extension/apm_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func TestPostToApmServerDataCompressed(t *testing.T) {
apmServerUrl: apmServer.URL + "/",
}

err := PostToApmServer(agentData, &config)
err := PostToApmServer(apmServer.Client(), agentData, &config)
assert.Equal(t, nil, err)
}

Expand Down Expand Up @@ -90,6 +90,40 @@ func TestPostToApmServerDataNotCompressed(t *testing.T) {
apmServerUrl: apmServer.URL + "/",
}

err := PostToApmServer(agentData, &config)
err := PostToApmServer(apmServer.Client(), agentData, &config)
assert.Equal(t, nil, err)
}

func BenchmarkPostToAPM(b *testing.B) {
// Copied from https://github.com/elastic/apm-server/blob/master/testdata/intake-v2/transactions.ndjson.
benchBody := []byte(`{"metadata": {"service": {"name": "1234_service-12a3","node": {"configured_name": "node-123"},"version": "5.1.3","environment": "staging","language": {"name": "ecmascript","version": "8"},"runtime": {"name": "node","version": "8.0.0"},"framework": {"name": "Express","version": "1.2.3"},"agent": {"name": "elastic-node","version": "3.14.0"}},"user": {"id": "123user", "username": "bar", "email": "[email protected]"}, "labels": {"tag0": null, "tag1": "one", "tag2": 2}, "process": {"pid": 1234,"ppid": 6789,"title": "node","argv": ["node","server.js"]},"system": {"hostname": "prod1.example.com","architecture": "x64","platform": "darwin", "container": {"id": "container-id"}, "kubernetes": {"namespace": "namespace1", "pod": {"uid": "pod-uid", "name": "pod-name"}, "node": {"name": "node-name"}}},"cloud":{"account":{"id":"account_id","name":"account_name"},"availability_zone":"cloud_availability_zone","instance":{"id":"instance_id","name":"instance_name"},"machine":{"type":"machine_type"},"project":{"id":"project_id","name":"project_name"},"provider":"cloud_provider","region":"cloud_region","service":{"name":"lambda"}}}}
{"transaction": { "id": "945254c567a5417e", "trace_id": "0123456789abcdef0123456789abcdef", "parent_id": "abcdefabcdef01234567", "type": "request", "duration": 32.592981, "span_count": { "started": 43 }}}
{"transaction": {"id": "4340a8e0df1906ecbfa9", "trace_id": "0acd456789abcdef0123456789abcdef", "name": "GET /api/types","type": "request","duration": 32.592981,"outcome":"success", "result": "success", "timestamp": 1496170407154000, "sampled": true, "span_count": {"started": 17},"context": {"service": {"runtime": {"version": "7.0"}},"page":{"referer":"http://localhost:8000/test/e2e/","url":"http://localhost:8000/test/e2e/general-usecase/"}, "request": {"socket": {"remote_address": "12.53.12.1","encrypted": true},"http_version": "1.1","method": "POST","url": {"protocol": "https:","full": "https://www.example.com/p/a/t/h?query=string#hash","hostname": "www.example.com","port": "8080","pathname": "/p/a/t/h","search": "?query=string","hash": "#hash","raw": "/p/a/t/h?query=string#hash"},"headers": {"user-agent":["Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/51.0.2704.103 Safari/537.36","Mozilla Chrome Edge"],"content-type": "text/html","cookie": "c1=v1, c2=v2","some-other-header": "foo","array": ["foo","bar","baz"]},"cookies": {"c1": "v1","c2": "v2"},"env": {"SERVER_SOFTWARE": "nginx","GATEWAY_INTERFACE": "CGI/1.1"},"body": {"str": "hello world","additional": { "foo": {},"bar": 123,"req": "additional information"}}},"response": {"status_code": 200,"headers": {"content-type": "application/json"},"headers_sent": true,"finished": true,"transfer_size":25.8,"encoded_body_size":26.90,"decoded_body_size":29.90}, "user": {"domain": "ldap://abc","id": "99","username": "foo"},"tags": {"organization_uuid": "9f0e9d64-c185-4d21-a6f4-4673ed561ec8", "tag2": 12, "tag3": 12.45, "tag4": false, "tag5": null },"custom": {"my_key": 1,"some_other_value": "foo bar","and_objects": {"foo": ["bar","baz"]},"(": "not a valid regex and that is fine"}}}}
{"transaction": { "id": "cdef4340a8e0df19", "trace_id": "0acd456789abcdef0123456789abcdef", "type": "request", "duration": 13.980558, "timestamp": 1532976822281000, "sampled": null, "span_count": { "dropped": 55, "started": 436 }, "marks": {"navigationTiming": {"appBeforeBootstrap": 608.9300000000001,"navigationStart": -21},"another_mark": {"some_long": 10,"some_float": 10.0}, "performance": {}}, "context": { "request": { "socket": { "remote_address": "192.0.1", "encrypted": null }, "method": "POST", "headers": { "user-agent": null, "content-type": null, "cookie": null }, "url": { "protocol": null, "full": null, "hostname": null, "port": null, "pathname": null, "search": null, "hash": null, "raw": null } }, "response": { "headers": { "content-type": null } }, "service": {"environment":"testing","name": "service1","node": {"configured_name": "node-ABC"}, "language": {"version": "2.5", "name": "ruby"}, "agent": {"version": "2.2", "name": "elastic-ruby", "ephemeral_id": "justanid"}, "framework": {"version": "5.0", "name": "Rails"}, "version": "2", "runtime": {"version": "2.5", "name": "cruby"}}},"experience":{"cls":1,"fid":2.0,"tbt":3.4,"longtask":{"count":3,"sum":2.5,"max":1}}}}
{"transaction": { "id": "00xxxxFFaaaa1234", "trace_id": "0123456789abcdef0123456789abcdef", "name": "amqp receive", "parent_id": "abcdefabcdef01234567", "type": "messaging", "duration": 3, "span_count": { "started": 1 }, "context": {"message": {"queue": { "name": "new_users"}, "age":{ "ms": 1577958057123}, "headers": {"user_id": "1ax3", "involved_services": ["user", "auth"]}, "body": "user created", "routing_key": "user-created-transaction"}},"session":{"id":"sunday","sequence":123}}}
{"transaction": { "name": "july-2021-delete-after-july-31", "type": "lambda", "result": "success", "id": "142e61450efb8574", "trace_id": "eb56529a1f461c5e7e2f66ecb075e983", "subtype": null, "action": null, "duration": 38.853, "timestamp": 1631736666365048, "sampled": true, "context": { "cloud": { "origin": { "account": { "id": "abc123" }, "provider": "aws", "region": "us-east-1", "service": { "name": "serviceName" } } }, "service": { "origin": { "id": "abc123", "name": "service-name", "version": "1.0" } }, "user": {}, "tags": {}, "custom": { } }, "sync": true, "span_count": { "started": 0 }, "outcome": "unknown", "faas": { "coldstart": false, "execution": "2e13b309-23e1-417f-8bf7-074fc96bc683", "trigger": { "request_id": "FuH2Cir_vHcEMUA=", "type": "http" } }, "sample_rate": 1 } }
`)
agentData := AgentData{Data: benchBody, ContentEncoding: ""}

// Create apm server and handler
apmServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
io.Copy(ioutil.Discard, r.Body)
r.Body.Close()
w.WriteHeader(202)
w.Write([]byte(`{}`))
}))
config := extensionConfig{
apmServerUrl: apmServer.URL + "/",
}

client := &http.Client{
Transport: http.DefaultTransport.(*http.Transport).Clone(),
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := PostToApmServer(client, agentData, &config)
if err != nil {
b.Fatal(err)
}
}
}
5 changes: 3 additions & 2 deletions apm-lambda-extension/extension/process_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,21 @@ package extension
import (
"encoding/json"
"log"
"net/http"
)

func ProcessShutdown() {
log.Println("Received SHUTDOWN event")
log.Println("Exiting")
}

func FlushAPMData(dataChannel chan AgentData, config *extensionConfig) {
func FlushAPMData(client *http.Client, dataChannel chan AgentData, config *extensionConfig) {
log.Println("Checking for agent data")
for {
select {
case agentData := <-dataChannel:
log.Println("Processing agent data")
err := PostToApmServer(agentData, config)
err := PostToApmServer(client, agentData, config)
if err != nil {
log.Printf("Error sending to APM server, skipping: %v", err)
}
Expand Down
10 changes: 7 additions & 3 deletions apm-lambda-extension/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package main
import (
"context"
"log"
"net/http"
"os"
"os/signal"
"path/filepath"
Expand Down Expand Up @@ -87,6 +88,9 @@ func main() {
}
}

client := &http.Client{
Transport: http.DefaultTransport.(*http.Transport).Clone(),
}
for {
select {
case <-ctx.Done():
Expand All @@ -112,7 +116,7 @@ func main() {

// Flush any APM data, in case waiting for the runtimeDone event timed out,
// the agent data wasn't available yet, and we got to the next event
extension.FlushAPMData(agentDataChannel, config)
extension.FlushAPMData(client, agentDataChannel, config)

// Make a channel for signaling that a runtimeDone event has been received
runtimeDone := make(chan struct{})
Expand All @@ -130,7 +134,7 @@ func main() {
log.Println("Function invocation is complete, not receiving any more agent data")
return
case agentData := <-agentDataChannel:
err := extension.PostToApmServer(agentData, config)
err := extension.PostToApmServer(client, agentData, config)
if err != nil {
log.Printf("Error sending to APM server, skipping: %v", err)
}
Expand Down Expand Up @@ -179,7 +183,7 @@ func main() {
}

// Flush APM data now that the function invocation has completed
extension.FlushAPMData(agentDataChannel, config)
extension.FlushAPMData(client, agentDataChannel, config)

// Signal that the function invocation has completed
close(funcInvocDone)
Expand Down