diff --git a/apm-lambda-extension/extension/apm_server.go b/apm-lambda-extension/extension/apm_server.go index 1ca2822c..c4c64044 100644 --- a/apm-lambda-extension/extension/apm_server.go +++ b/apm-lambda-extension/extension/apm_server.go @@ -21,45 +21,48 @@ import ( "bytes" "compress/gzip" "fmt" - "io" "io/ioutil" "log" "net/http" + "sync" ) +var bufferPool = sync.Pool{New: func() interface{} { + return &bytes.Buffer{} +}} + // todo: can this be a streaming or streaming style call that keeps the // connection open across invocations? -func PostToApmServer(agentData AgentData, config *extensionConfig) error { - endpointUri := "intake/v2/events" - var req *http.Request - var err error +func PostToApmServer(client *http.Client, agentData AgentData, config *extensionConfig) error { + endpointURI := "intake/v2/events" + encoding := agentData.ContentEncoding + buf := bufferPool.Get().(*bytes.Buffer) + defer func() { + buf.Reset() + bufferPool.Put(buf) + }() if agentData.ContentEncoding == "" { - pr, pw := io.Pipe() - gw, _ := gzip.NewWriterLevel(pw, gzip.BestSpeed) - - go func() { - _, err = io.Copy(gw, bytes.NewReader(agentData.Data)) - gw.Close() - pw.Close() - if err != nil { - log.Printf("Failed to compress data: %v", err) - } - }() - - req, err = http.NewRequest("POST", config.apmServerUrl+endpointUri, pr) + encoding = "gzip" + gw, err := gzip.NewWriterLevel(buf, gzip.BestSpeed) if err != nil { - return fmt.Errorf("failed to create a new request when posting to APM server: %v", err) + return err } - req.Header.Add("Content-Encoding", "gzip") - } else { - req, err = http.NewRequest("POST", config.apmServerUrl+endpointUri, bytes.NewReader(agentData.Data)) - if err != nil { - return fmt.Errorf("failed to create a new request when posting to APM server: %v", err) + if _, err := gw.Write(agentData.Data); err != nil { + log.Printf("Failed to compress data: %v", err) } - req.Header.Add("Content-Encoding", agentData.ContentEncoding) + if err := gw.Close(); err != nil { + log.Printf("Failed write compressed data to buffer: %v", err) + } + } else { + buf.Write(agentData.Data) } + req, err := http.NewRequest("POST", config.apmServerUrl+endpointURI, buf) + if err != nil { + return fmt.Errorf("failed to create a new request when posting to APM server: %v", err) + } + req.Header.Add("Content-Encoding", encoding) req.Header.Add("Content-Type", "application/x-ndjson") if config.apmServerApiKey != "" { req.Header.Add("Authorization", "ApiKey "+config.apmServerApiKey) @@ -67,7 +70,6 @@ func PostToApmServer(agentData AgentData, config *extensionConfig) error { req.Header.Add("Authorization", "Bearer "+config.apmServerSecretToken) } - client := &http.Client{} resp, err := client.Do(req) if err != nil { return fmt.Errorf("failed to post to APM server: %v", err) diff --git a/apm-lambda-extension/extension/apm_server_test.go b/apm-lambda-extension/extension/apm_server_test.go index 4cfd19d5..0668f539 100644 --- a/apm-lambda-extension/extension/apm_server_test.go +++ b/apm-lambda-extension/extension/apm_server_test.go @@ -57,7 +57,7 @@ func TestPostToApmServerDataCompressed(t *testing.T) { apmServerUrl: apmServer.URL + "/", } - err := PostToApmServer(agentData, &config) + err := PostToApmServer(apmServer.Client(), agentData, &config) assert.Equal(t, nil, err) } @@ -90,6 +90,40 @@ func TestPostToApmServerDataNotCompressed(t *testing.T) { apmServerUrl: apmServer.URL + "/", } - err := PostToApmServer(agentData, &config) + err := PostToApmServer(apmServer.Client(), agentData, &config) assert.Equal(t, nil, err) } + +func BenchmarkPostToAPM(b *testing.B) { + // Copied from https://github.com/elastic/apm-server/blob/master/testdata/intake-v2/transactions.ndjson. + benchBody := []byte(`{"metadata": {"service": {"name": "1234_service-12a3","node": {"configured_name": "node-123"},"version": "5.1.3","environment": "staging","language": {"name": "ecmascript","version": "8"},"runtime": {"name": "node","version": "8.0.0"},"framework": {"name": "Express","version": "1.2.3"},"agent": {"name": "elastic-node","version": "3.14.0"}},"user": {"id": "123user", "username": "bar", "email": "bar@user.com"}, "labels": {"tag0": null, "tag1": "one", "tag2": 2}, "process": {"pid": 1234,"ppid": 6789,"title": "node","argv": ["node","server.js"]},"system": {"hostname": "prod1.example.com","architecture": "x64","platform": "darwin", "container": {"id": "container-id"}, "kubernetes": {"namespace": "namespace1", "pod": {"uid": "pod-uid", "name": "pod-name"}, "node": {"name": "node-name"}}},"cloud":{"account":{"id":"account_id","name":"account_name"},"availability_zone":"cloud_availability_zone","instance":{"id":"instance_id","name":"instance_name"},"machine":{"type":"machine_type"},"project":{"id":"project_id","name":"project_name"},"provider":"cloud_provider","region":"cloud_region","service":{"name":"lambda"}}}} +{"transaction": { "id": "945254c567a5417e", "trace_id": "0123456789abcdef0123456789abcdef", "parent_id": "abcdefabcdef01234567", "type": "request", "duration": 32.592981, "span_count": { "started": 43 }}} +{"transaction": {"id": "4340a8e0df1906ecbfa9", "trace_id": "0acd456789abcdef0123456789abcdef", "name": "GET /api/types","type": "request","duration": 32.592981,"outcome":"success", "result": "success", "timestamp": 1496170407154000, "sampled": true, "span_count": {"started": 17},"context": {"service": {"runtime": {"version": "7.0"}},"page":{"referer":"http://localhost:8000/test/e2e/","url":"http://localhost:8000/test/e2e/general-usecase/"}, "request": {"socket": {"remote_address": "12.53.12.1","encrypted": true},"http_version": "1.1","method": "POST","url": {"protocol": "https:","full": "https://www.example.com/p/a/t/h?query=string#hash","hostname": "www.example.com","port": "8080","pathname": "/p/a/t/h","search": "?query=string","hash": "#hash","raw": "/p/a/t/h?query=string#hash"},"headers": {"user-agent":["Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/51.0.2704.103 Safari/537.36","Mozilla Chrome Edge"],"content-type": "text/html","cookie": "c1=v1, c2=v2","some-other-header": "foo","array": ["foo","bar","baz"]},"cookies": {"c1": "v1","c2": "v2"},"env": {"SERVER_SOFTWARE": "nginx","GATEWAY_INTERFACE": "CGI/1.1"},"body": {"str": "hello world","additional": { "foo": {},"bar": 123,"req": "additional information"}}},"response": {"status_code": 200,"headers": {"content-type": "application/json"},"headers_sent": true,"finished": true,"transfer_size":25.8,"encoded_body_size":26.90,"decoded_body_size":29.90}, "user": {"domain": "ldap://abc","id": "99","username": "foo"},"tags": {"organization_uuid": "9f0e9d64-c185-4d21-a6f4-4673ed561ec8", "tag2": 12, "tag3": 12.45, "tag4": false, "tag5": null },"custom": {"my_key": 1,"some_other_value": "foo bar","and_objects": {"foo": ["bar","baz"]},"(": "not a valid regex and that is fine"}}}} +{"transaction": { "id": "cdef4340a8e0df19", "trace_id": "0acd456789abcdef0123456789abcdef", "type": "request", "duration": 13.980558, "timestamp": 1532976822281000, "sampled": null, "span_count": { "dropped": 55, "started": 436 }, "marks": {"navigationTiming": {"appBeforeBootstrap": 608.9300000000001,"navigationStart": -21},"another_mark": {"some_long": 10,"some_float": 10.0}, "performance": {}}, "context": { "request": { "socket": { "remote_address": "192.0.1", "encrypted": null }, "method": "POST", "headers": { "user-agent": null, "content-type": null, "cookie": null }, "url": { "protocol": null, "full": null, "hostname": null, "port": null, "pathname": null, "search": null, "hash": null, "raw": null } }, "response": { "headers": { "content-type": null } }, "service": {"environment":"testing","name": "service1","node": {"configured_name": "node-ABC"}, "language": {"version": "2.5", "name": "ruby"}, "agent": {"version": "2.2", "name": "elastic-ruby", "ephemeral_id": "justanid"}, "framework": {"version": "5.0", "name": "Rails"}, "version": "2", "runtime": {"version": "2.5", "name": "cruby"}}},"experience":{"cls":1,"fid":2.0,"tbt":3.4,"longtask":{"count":3,"sum":2.5,"max":1}}}} +{"transaction": { "id": "00xxxxFFaaaa1234", "trace_id": "0123456789abcdef0123456789abcdef", "name": "amqp receive", "parent_id": "abcdefabcdef01234567", "type": "messaging", "duration": 3, "span_count": { "started": 1 }, "context": {"message": {"queue": { "name": "new_users"}, "age":{ "ms": 1577958057123}, "headers": {"user_id": "1ax3", "involved_services": ["user", "auth"]}, "body": "user created", "routing_key": "user-created-transaction"}},"session":{"id":"sunday","sequence":123}}} +{"transaction": { "name": "july-2021-delete-after-july-31", "type": "lambda", "result": "success", "id": "142e61450efb8574", "trace_id": "eb56529a1f461c5e7e2f66ecb075e983", "subtype": null, "action": null, "duration": 38.853, "timestamp": 1631736666365048, "sampled": true, "context": { "cloud": { "origin": { "account": { "id": "abc123" }, "provider": "aws", "region": "us-east-1", "service": { "name": "serviceName" } } }, "service": { "origin": { "id": "abc123", "name": "service-name", "version": "1.0" } }, "user": {}, "tags": {}, "custom": { } }, "sync": true, "span_count": { "started": 0 }, "outcome": "unknown", "faas": { "coldstart": false, "execution": "2e13b309-23e1-417f-8bf7-074fc96bc683", "trigger": { "request_id": "FuH2Cir_vHcEMUA=", "type": "http" } }, "sample_rate": 1 } } +`) + agentData := AgentData{Data: benchBody, ContentEncoding: ""} + + // Create apm server and handler + apmServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + io.Copy(ioutil.Discard, r.Body) + r.Body.Close() + w.WriteHeader(202) + w.Write([]byte(`{}`)) + })) + config := extensionConfig{ + apmServerUrl: apmServer.URL + "/", + } + + client := &http.Client{ + Transport: http.DefaultTransport.(*http.Transport).Clone(), + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := PostToApmServer(client, agentData, &config) + if err != nil { + b.Fatal(err) + } + } +} diff --git a/apm-lambda-extension/extension/process_events.go b/apm-lambda-extension/extension/process_events.go index 54f479e3..1debb836 100644 --- a/apm-lambda-extension/extension/process_events.go +++ b/apm-lambda-extension/extension/process_events.go @@ -20,6 +20,7 @@ package extension import ( "encoding/json" "log" + "net/http" ) func ProcessShutdown() { @@ -27,13 +28,13 @@ func ProcessShutdown() { log.Println("Exiting") } -func FlushAPMData(dataChannel chan AgentData, config *extensionConfig) { +func FlushAPMData(client *http.Client, dataChannel chan AgentData, config *extensionConfig) { log.Println("Checking for agent data") for { select { case agentData := <-dataChannel: log.Println("Processing agent data") - err := PostToApmServer(agentData, config) + err := PostToApmServer(client, agentData, config) if err != nil { log.Printf("Error sending to APM server, skipping: %v", err) } diff --git a/apm-lambda-extension/main.go b/apm-lambda-extension/main.go index c0bbf5f5..421d3231 100644 --- a/apm-lambda-extension/main.go +++ b/apm-lambda-extension/main.go @@ -20,6 +20,7 @@ package main import ( "context" "log" + "net/http" "os" "os/signal" "path/filepath" @@ -87,6 +88,9 @@ func main() { } } + client := &http.Client{ + Transport: http.DefaultTransport.(*http.Transport).Clone(), + } for { select { case <-ctx.Done(): @@ -112,7 +116,7 @@ func main() { // Flush any APM data, in case waiting for the runtimeDone event timed out, // the agent data wasn't available yet, and we got to the next event - extension.FlushAPMData(agentDataChannel, config) + extension.FlushAPMData(client, agentDataChannel, config) // Make a channel for signaling that a runtimeDone event has been received runtimeDone := make(chan struct{}) @@ -130,7 +134,7 @@ func main() { log.Println("Function invocation is complete, not receiving any more agent data") return case agentData := <-agentDataChannel: - err := extension.PostToApmServer(agentData, config) + err := extension.PostToApmServer(client, agentData, config) if err != nil { log.Printf("Error sending to APM server, skipping: %v", err) } @@ -179,7 +183,7 @@ func main() { } // Flush APM data now that the function invocation has completed - extension.FlushAPMData(agentDataChannel, config) + extension.FlushAPMData(client, agentDataChannel, config) // Signal that the function invocation has completed close(funcInvocDone)