diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 69e7ee54..41c18298 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -30,6 +30,7 @@ https://github.com/elastic/apm-aws-lambda/compare/v1.1.0...main[View commits] ===== Features - Disable CGO to prevent libc/ABI compatibility issues {lambda-pull}292[292] - Add support for collecting and shipping function logs to APM Server {lambda-pull}303[303] +- Batch data collected from lambda logs API before sending to APM Server {lambda-pull}314[314] [float] ===== Bug fixes diff --git a/apmproxy/apmserver.go b/apmproxy/apmserver.go index 35fb4396..634e6a2f 100644 --- a/apmproxy/apmserver.go +++ b/apmproxy/apmserver.go @@ -40,28 +40,30 @@ type jsonError struct { Document string `json:"document,omitempty"` } -// ForwardApmData receives agent data as it comes in and posts it to the APM server. -// Stop checking for, and sending agent data when the function invocation +// ForwardApmData receives apm data as it comes in and posts it to the APM server. +// Stop checking for, and sending apm data when the function invocation // has completed, signaled via a channel. -func (c *Client) ForwardApmData(ctx context.Context, metadataContainer *MetadataContainer) error { +func (c *Client) ForwardApmData(ctx context.Context) error { if c.IsUnhealthy() { return nil } + + var lambdaDataChan chan APMData for { select { case <-ctx.Done(): c.logger.Debug("Invocation context cancelled, not processing any more agent data") return nil - case agentData := <-c.DataChannel: - if metadataContainer.Metadata == nil { - metadata, err := ProcessMetadata(agentData) - if err != nil { - return fmt.Errorf("failed to extract metadata from agent payload %w", err) - } - metadataContainer.Metadata = metadata + case data := <-c.AgentDataChannel: + if err := c.forwardAgentData(ctx, data); err != nil { + return err } - if err := c.PostToApmServer(ctx, agentData); err != nil { - return fmt.Errorf("error sending to APM server, skipping: %v", err) + // Wait for metadata to be available, metadata will be available as soon as + // the first agent data is processed. + lambdaDataChan = c.LambdaDataChannel + case data := <-lambdaDataChan: + if err := c.forwardLambdaData(ctx, data); err != nil { + return err } } } @@ -74,15 +76,38 @@ func (c *Client) FlushAPMData(ctx context.Context) { return } c.logger.Debug("Flush started - Checking for agent data") + + // Flush agent data first to make sure metadata is available if possible + for i := len(c.AgentDataChannel); i > 0; i-- { + data := <-c.AgentDataChannel + if err := c.forwardAgentData(ctx, data); err != nil { + c.logger.Errorf("Error sending to APM Server, skipping: %v", err) + } + } + + // If metadata still not available then fail fast + if c.batch == nil { + c.logger.Warnf("Metadata not available at flush, skipping sending lambda data to APM Server") + return + } + + // Flush lambda data for { select { - case agentData := <-c.DataChannel: - c.logger.Debug("Flush in progress - Processing agent data") - if err := c.PostToApmServer(ctx, agentData); err != nil { + case apmData := <-c.LambdaDataChannel: + c.logger.Debug("Flush in progress - Processing lambda data") + if err := c.forwardLambdaData(ctx, apmData); err != nil { c.logger.Errorf("Error sending to APM server, skipping: %v", err) } + case <-ctx.Done(): + c.logger.Debugf("Failed to flush completely, may result in data drop") + return default: - c.logger.Debug("Flush ended - No agent data on buffer") + // Flush any remaining data in batch + if err := c.sendBatch(ctx); err != nil { + c.logger.Errorf("Error sending to APM server, skipping: %v", err) + } + c.logger.Debug("Flush ended for lambda data - no data in buffer") return } } @@ -93,7 +118,7 @@ func (c *Client) FlushAPMData(ctx context.Context) { // The function compresses the APM agent data, if it's not already compressed. // It sets the APM transport status to failing upon errors, as part of the backoff // strategy. -func (c *Client) PostToApmServer(ctx context.Context, agentData AgentData) error { +func (c *Client) PostToApmServer(ctx context.Context, apmData APMData) error { // todo: can this be a streaming or streaming style call that keeps the // connection open across invocations? if c.IsUnhealthy() { @@ -101,11 +126,11 @@ func (c *Client) PostToApmServer(ctx context.Context, agentData AgentData) error } endpointURI := "intake/v2/events" - encoding := agentData.ContentEncoding + encoding := apmData.ContentEncoding var r io.Reader - if agentData.ContentEncoding != "" { - r = bytes.NewReader(agentData.Data) + if apmData.ContentEncoding != "" { + r = bytes.NewReader(apmData.Data) } else { encoding = "gzip" buf := c.bufferPool.Get().(*bytes.Buffer) @@ -117,7 +142,7 @@ func (c *Client) PostToApmServer(ctx context.Context, agentData AgentData) error if err != nil { return err } - if _, err := gw.Write(agentData.Data); err != nil { + if _, err := gw.Write(apmData.Data); err != nil { return fmt.Errorf("failed to compress data: %w", err) } if err := gw.Close(); err != nil { @@ -281,17 +306,6 @@ func (c *Client) ComputeGracePeriod() time.Duration { return time.Duration((gracePeriodWithoutJitter + jitter*gracePeriodWithoutJitter) * float64(time.Second)) } -// EnqueueAPMData adds a AgentData struct to the agent data channel, effectively queueing for a send -// to the APM server. -func (c *Client) EnqueueAPMData(agentData AgentData) { - select { - case c.DataChannel <- agentData: - c.logger.Debug("Adding agent data to buffer to be sent to apm server") - default: - c.logger.Warn("Channel full: dropping a subset of agent data") - } -} - // ShouldFlush returns true if the client should flush APM data after processing the event. func (c *Client) ShouldFlush() bool { return c.sendStrategy == SyncFlush @@ -313,3 +327,37 @@ func (c *Client) WaitForFlush() <-chan struct{} { defer c.flushMutex.Unlock() return c.flushCh } + +func (c *Client) forwardAgentData(ctx context.Context, apmData APMData) error { + if c.batch == nil { + metadata, err := ProcessMetadata(apmData) + if err != nil { + return fmt.Errorf("failed to extract metadata from agent payload %w", err) + } + c.batch = NewBatch(c.maxBatchSize, c.maxBatchAge, metadata) + } + return c.PostToApmServer(ctx, apmData) +} + +func (c *Client) forwardLambdaData(ctx context.Context, apmData APMData) error { + if c.batch == nil { + // This state is not possible since we start processing lambda + // logs only after metadata is available and batch is created. + return errors.New("unexpected state, metadata not yet set") + } + if err := c.batch.Add(apmData); err != nil { + c.logger.Warnf("Dropping data due to error: %v", err) + } + if c.batch.ShouldShip() { + return c.sendBatch(ctx) + } + return nil +} + +func (c *Client) sendBatch(ctx context.Context) error { + if c.batch == nil || c.batch.Count() == 0 { + return nil + } + defer c.batch.Reset() + return c.PostToApmServer(ctx, c.batch.ToAPMData()) +} diff --git a/apmproxy/apmserver_test.go b/apmproxy/apmserver_test.go index 3f028ff0..5bba30c3 100644 --- a/apmproxy/apmserver_test.go +++ b/apmproxy/apmserver_test.go @@ -18,11 +18,14 @@ package apmproxy_test import ( + "bytes" "compress/gzip" "context" + "fmt" "io" "net/http" "net/http/httptest" + "sync" "sync/atomic" "testing" "time" @@ -57,7 +60,7 @@ func TestPostToApmServerDataCompressed(t *testing.T) { // Create AgentData struct with compressed data data, _ := io.ReadAll(pr) - agentData := apmproxy.AgentData{Data: data, ContentEncoding: "gzip"} + agentData := apmproxy.APMData{Data: data, ContentEncoding: "gzip"} // Create apm server and handler apmServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -83,7 +86,7 @@ func TestPostToApmServerDataCompressed(t *testing.T) { func TestPostToApmServerDataNotCompressed(t *testing.T) { s := "A long time ago in a galaxy far, far away..." body := []byte(s) - agentData := apmproxy.AgentData{Data: body, ContentEncoding: ""} + agentData := apmproxy.APMData{Data: body, ContentEncoding: ""} // Compress the data, so it can be compared with what // the apm server receives @@ -251,7 +254,7 @@ func TestEnterBackoffFromHealthy(t *testing.T) { // Create AgentData struct with compressed data data, _ := io.ReadAll(pr) - agentData := apmproxy.AgentData{Data: data, ContentEncoding: "gzip"} + agentData := apmproxy.APMData{Data: data, ContentEncoding: "gzip"} // Create apm server and handler apmServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -302,7 +305,7 @@ func TestEnterBackoffFromFailing(t *testing.T) { // Create AgentData struct with compressed data data, _ := io.ReadAll(pr) - agentData := apmproxy.AgentData{Data: data, ContentEncoding: "gzip"} + agentData := apmproxy.APMData{Data: data, ContentEncoding: "gzip"} // Create apm server and handler apmServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -356,7 +359,7 @@ func TestAPMServerRecovery(t *testing.T) { // Create AgentData struct with compressed data data, _ := io.ReadAll(pr) - agentData := apmproxy.AgentData{Data: data, ContentEncoding: "gzip"} + agentData := apmproxy.APMData{Data: data, ContentEncoding: "gzip"} // Create apm server and handler apmServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -409,7 +412,7 @@ func TestAPMServerAuthFails(t *testing.T) { // Create AgentData struct with compressed data data, _ := io.ReadAll(pr) - agentData := apmproxy.AgentData{Data: data, ContentEncoding: "gzip"} + agentData := apmproxy.APMData{Data: data, ContentEncoding: "gzip"} // Create apm server and handler apmServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -453,7 +456,7 @@ func TestAPMServerRatelimit(t *testing.T) { // Create AgentData struct with compressed data data, _ := io.ReadAll(pr) - agentData := apmproxy.AgentData{Data: data, ContentEncoding: "gzip"} + agentData := apmproxy.APMData{Data: data, ContentEncoding: "gzip"} // Create apm server and handler var shouldSucceed atomic.Bool @@ -506,7 +509,7 @@ func TestAPMServerClientFail(t *testing.T) { // Create AgentData struct with compressed data data, _ := io.ReadAll(pr) - agentData := apmproxy.AgentData{Data: data, ContentEncoding: "gzip"} + agentData := apmproxy.APMData{Data: data, ContentEncoding: "gzip"} // Create apm server and handler var shouldSucceed atomic.Bool @@ -558,7 +561,7 @@ func TestContinuedAPMServerFailure(t *testing.T) { // Create AgentData struct with compressed data data, _ := io.ReadAll(pr) - agentData := apmproxy.AgentData{Data: data, ContentEncoding: "gzip"} + agentData := apmproxy.APMData{Data: data, ContentEncoding: "gzip"} // Create apm server and handler apmServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -586,6 +589,123 @@ func TestContinuedAPMServerFailure(t *testing.T) { assert.Equal(t, apmClient.Status, apmproxy.Failing) } +func TestForwardApmData(t *testing.T) { + receivedReqBodyChan := make(chan []byte) + apmServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + bytes, _ := io.ReadAll(r.Body) + receivedReqBodyChan <- bytes + w.WriteHeader(http.StatusAccepted) + })) + t.Cleanup(apmServer.Close) + metadata := `{"metadata":{"service":{"agent":{"name":"apm-lambda-extension","version":"1.1.0"},"framework":{"name":"AWS Lambda","version":""},"language":{"name":"python","version":"3.9.8"},"runtime":{"name":"","version":""},"node":{}},"user":{},"process":{"pid":0},"system":{"container":{"id":""},"kubernetes":{"node":{},"pod":{}}},"cloud":{"provider":"","instance":{},"machine":{},"account":{},"project":{},"service":{}}}}` + assertGzipBody := func(expected string) { + var body []byte + select { + case body = <-receivedReqBodyChan: + case <-time.After(1 * time.Second): + require.Fail(t, "mock APM-Server timed out waiting for request") + } + buf := bytes.NewReader(body) + r, err := gzip.NewReader(buf) + require.NoError(t, err) + out, err := io.ReadAll(r) + require.NoError(t, err) + assert.Equal(t, expected, string(out)) + } + agentData := fmt.Sprintf("%s\n%s", metadata, `{"log": {"message": "test"}}`) + lambdaData := `{"log": {"message": "test"}}` + maxBatchAge := 1 * time.Second + apmClient, err := apmproxy.NewClient( + apmproxy.WithURL(apmServer.URL), + apmproxy.WithLogger(zaptest.NewLogger(t).Sugar()), + apmproxy.WithAgentDataBufferSize(10), + // Configure a small batch age for ease of testing + apmproxy.WithMaxBatchAge(maxBatchAge), + ) + require.NoError(t, err) + + // Start forwarding APM data + ctx, cancel := context.WithCancel(context.Background()) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + assert.NoError(t, apmClient.ForwardApmData(ctx)) + }() + + // Populate metadata by sending agent data + apmClient.AgentDataChannel <- apmproxy.APMData{ + Data: []byte(agentData), + } + assertGzipBody(agentData) + + // Send lambda logs API data + var expected bytes.Buffer + expected.WriteString(metadata) + // Send multiple lambda logs to batch data + for i := 0; i < 5; i++ { + if i == 4 { + // Wait for batch age to make sure the batch is mature to be sent + time.Sleep(maxBatchAge + time.Millisecond) + } + apmClient.LambdaDataChannel <- apmproxy.APMData{ + Data: []byte(lambdaData), + } + expected.WriteByte('\n') + expected.WriteString(lambdaData) + } + + assertGzipBody(expected.String()) + // Wait for ForwardApmData to exit + cancel() + wg.Wait() +} + +func BenchmarkFlushAPMData(b *testing.B) { + // Create apm server and handler + apmServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if _, err := io.Copy(io.Discard, r.Body); err != nil { + return + } + if err := r.Body.Close(); err != nil { + return + } + w.WriteHeader(202) + if _, err := w.Write([]byte(`{}`)); err != nil { + return + } + })) + b.Cleanup(apmServer.Close) + + apmClient, err := apmproxy.NewClient( + apmproxy.WithURL(apmServer.URL), + apmproxy.WithLogger(zaptest.NewLogger(b).Sugar()), + ) + require.NoError(b, err) + + // Copied from https://github.com/elastic/apm-server/blob/master/testdata/intake-v2/transactions.ndjson. + agentData := []byte(`{"metadata": {"service": {"name": "1234_service-12a3","node": {"configured_name": "node-123"},"version": "5.1.3","environment": "staging","language": {"name": "ecmascript","version": "8"},"runtime": {"name": "node","version": "8.0.0"},"framework": {"name": "Express","version": "1.2.3"},"agent": {"name": "elastic-node","version": "3.14.0"}},"user": {"id": "123user", "username": "bar", "email": "bar@user.com"}, "labels": {"tag0": null, "tag1": "one", "tag2": 2}, "process": {"pid": 1234,"ppid": 6789,"title": "node","argv": ["node","server.js"]},"system": {"hostname": "prod1.example.com","architecture": "x64","platform": "darwin", "container": {"id": "container-id"}, "kubernetes": {"namespace": "namespace1", "pod": {"uid": "pod-uid", "name": "pod-name"}, "node": {"name": "node-name"}}},"cloud":{"account":{"id":"account_id","name":"account_name"},"availability_zone":"cloud_availability_zone","instance":{"id":"instance_id","name":"instance_name"},"machine":{"type":"machine_type"},"project":{"id":"project_id","name":"project_name"},"provider":"cloud_provider","region":"cloud_region","service":{"name":"lambda"}}}} +{"transaction": { "id": "945254c567a5417e", "trace_id": "0123456789abcdef0123456789abcdef", "parent_id": "abcdefabcdef01234567", "type": "request", "duration": 32.592981, "span_count": { "started": 43 }}} +{"transaction": {"id": "4340a8e0df1906ecbfa9", "trace_id": "0acd456789abcdef0123456789abcdef", "name": "GET /api/types","type": "request","duration": 32.592981,"outcome":"success", "result": "success", "timestamp": 1496170407154000, "sampled": true, "span_count": {"started": 17},"context": {"service": {"runtime": {"version": "7.0"}},"page":{"referer":"http://localhost:8000/test/e2e/","url":"http://localhost:8000/test/e2e/general-usecase/"}, "request": {"socket": {"remote_address": "12.53.12.1","encrypted": true},"http_version": "1.1","method": "POST","url": {"protocol": "https:","full": "https://www.example.com/p/a/t/h?query=string#hash","hostname": "www.example.com","port": "8080","pathname": "/p/a/t/h","search": "?query=string","hash": "#hash","raw": "/p/a/t/h?query=string#hash"},"headers": {"user-agent":["Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/51.0.2704.103 Safari/537.36","Mozilla Chrome Edge"],"content-type": "text/html","cookie": "c1=v1, c2=v2","some-other-header": "foo","array": ["foo","bar","baz"]},"cookies": {"c1": "v1","c2": "v2"},"env": {"SERVER_SOFTWARE": "nginx","GATEWAY_INTERFACE": "CGI/1.1"},"body": {"str": "hello world","additional": { "foo": {},"bar": 123,"req": "additional information"}}},"response": {"status_code": 200,"headers": {"content-type": "application/json"},"headers_sent": true,"finished": true,"transfer_size":25.8,"encoded_body_size":26.90,"decoded_body_size":29.90}, "user": {"domain": "ldap://abc","id": "99","username": "foo"},"tags": {"organization_uuid": "9f0e9d64-c185-4d21-a6f4-4673ed561ec8", "tag2": 12, "tag3": 12.45, "tag4": false, "tag5": null },"custom": {"my_key": 1,"some_other_value": "foo bar","and_objects": {"foo": ["bar","baz"]},"(": "not a valid regex and that is fine"}}}} +{"transaction": { "id": "cdef4340a8e0df19", "trace_id": "0acd456789abcdef0123456789abcdef", "type": "request", "duration": 13.980558, "timestamp": 1532976822281000, "sampled": null, "span_count": { "dropped": 55, "started": 436 }, "marks": {"navigationTiming": {"appBeforeBootstrap": 608.9300000000001,"navigationStart": -21},"another_mark": {"some_long": 10,"some_float": 10.0}, "performance": {}}, "context": { "request": { "socket": { "remote_address": "192.0.1", "encrypted": null }, "method": "POST", "headers": { "user-agent": null, "content-type": null, "cookie": null }, "url": { "protocol": null, "full": null, "hostname": null, "port": null, "pathname": null, "search": null, "hash": null, "raw": null } }, "response": { "headers": { "content-type": null } }, "service": {"environment":"testing","name": "service1","node": {"configured_name": "node-ABC"}, "language": {"version": "2.5", "name": "ruby"}, "agent": {"version": "2.2", "name": "elastic-ruby", "ephemeral_id": "justanid"}, "framework": {"version": "5.0", "name": "Rails"}, "version": "2", "runtime": {"version": "2.5", "name": "cruby"}}},"experience":{"cls":1,"fid":2.0,"tbt":3.4,"longtask":{"count":3,"sum":2.5,"max":1}}}} +{"transaction": { "id": "00xxxxFFaaaa1234", "trace_id": "0123456789abcdef0123456789abcdef", "name": "amqp receive", "parent_id": "abcdefabcdef01234567", "type": "messaging", "duration": 3, "span_count": { "started": 1 }, "context": {"message": {"queue": { "name": "new_users"}, "age":{ "ms": 1577958057123}, "headers": {"user_id": "1ax3", "involved_services": ["user", "auth"]}, "body": "user created", "routing_key": "user-created-transaction"}},"session":{"id":"sunday","sequence":123}}} +{"transaction": { "name": "july-2021-delete-after-july-31", "type": "lambda", "result": "success", "id": "142e61450efb8574", "trace_id": "eb56529a1f461c5e7e2f66ecb075e983", "subtype": null, "action": null, "duration": 38.853, "timestamp": 1631736666365048, "sampled": true, "context": { "cloud": { "origin": { "account": { "id": "abc123" }, "provider": "aws", "region": "us-east-1", "service": { "name": "serviceName" } } }, "service": { "origin": { "id": "abc123", "name": "service-name", "version": "1.0" } }, "user": {}, "tags": {}, "custom": { } }, "sync": true, "span_count": { "started": 0 }, "outcome": "unknown", "faas": { "coldstart": false, "execution": "2e13b309-23e1-417f-8bf7-074fc96bc683", "trigger": { "request_id": "FuH2Cir_vHcEMUA=", "type": "http" } }, "sample_rate": 1 } } +`) + agentAPMData := apmproxy.APMData{Data: agentData} + + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + apmClient.AgentDataChannel <- agentAPMData + for j := 0; j < 99; j++ { + apmClient.LambdaDataChannel <- apmproxy.APMData{ + Data: []byte("this is test log"), + } + } + apmClient.FlushAPMData(context.Background()) + } +} + func BenchmarkPostToAPM(b *testing.B) { // Create apm server and handler apmServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -600,6 +720,7 @@ func BenchmarkPostToAPM(b *testing.B) { return } })) + b.Cleanup(apmServer.Close) apmClient, err := apmproxy.NewClient( apmproxy.WithURL(apmServer.URL), @@ -615,8 +736,9 @@ func BenchmarkPostToAPM(b *testing.B) { {"transaction": { "id": "00xxxxFFaaaa1234", "trace_id": "0123456789abcdef0123456789abcdef", "name": "amqp receive", "parent_id": "abcdefabcdef01234567", "type": "messaging", "duration": 3, "span_count": { "started": 1 }, "context": {"message": {"queue": { "name": "new_users"}, "age":{ "ms": 1577958057123}, "headers": {"user_id": "1ax3", "involved_services": ["user", "auth"]}, "body": "user created", "routing_key": "user-created-transaction"}},"session":{"id":"sunday","sequence":123}}} {"transaction": { "name": "july-2021-delete-after-july-31", "type": "lambda", "result": "success", "id": "142e61450efb8574", "trace_id": "eb56529a1f461c5e7e2f66ecb075e983", "subtype": null, "action": null, "duration": 38.853, "timestamp": 1631736666365048, "sampled": true, "context": { "cloud": { "origin": { "account": { "id": "abc123" }, "provider": "aws", "region": "us-east-1", "service": { "name": "serviceName" } } }, "service": { "origin": { "id": "abc123", "name": "service-name", "version": "1.0" } }, "user": {}, "tags": {}, "custom": { } }, "sync": true, "span_count": { "started": 0 }, "outcome": "unknown", "faas": { "coldstart": false, "execution": "2e13b309-23e1-417f-8bf7-074fc96bc683", "trigger": { "request_id": "FuH2Cir_vHcEMUA=", "type": "http" } }, "sample_rate": 1 } } `) - agentData := apmproxy.AgentData{Data: benchBody, ContentEncoding: ""} + agentData := apmproxy.APMData{Data: benchBody, ContentEncoding: ""} + b.ReportAllocs() b.ResetTimer() for i := 0; i < b.N; i++ { if err := apmClient.PostToApmServer(context.Background(), agentData); err != nil { diff --git a/apmproxy/batch.go b/apmproxy/batch.go new file mode 100644 index 00000000..d139c259 --- /dev/null +++ b/apmproxy/batch.go @@ -0,0 +1,112 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package apmproxy + +import ( + "bytes" + "errors" + "time" +) + +var ( + // ErrBatchFull signfies that the batch has reached full capacity + // and cannot accept more entries. + ErrBatchFull = errors.New("batch is full") + // ErrInvalidEncoding is returned for any APMData that is encoded + // with any encoding format + ErrInvalidEncoding = errors.New("encoded data not supported") +) + +var ( + maxSizeThreshold = 0.9 + zeroTime = time.Time{} +) + +// BatchData represents a batch of data without metadata +// that will be sent to APMServer. BatchData is not safe +// concurrent access. +type BatchData struct { + metadataBytes int + buf bytes.Buffer + count int + age time.Time + maxSize int + maxAge time.Duration +} + +// NewBatch creates a new BatchData which can accept a +// maximum number of entries as specified by the argument +func NewBatch(maxSize int, maxAge time.Duration, metadata []byte) *BatchData { + b := &BatchData{ + maxSize: maxSize, + maxAge: maxAge, + } + b.metadataBytes, _ = b.buf.Write(metadata) + return b +} + +// Add adds a new entry to the batch. Returns ErrBatchFull +// if batch has reached its maximum size. +func (b *BatchData) Add(d APMData) error { + if b.count == b.maxSize { + return ErrBatchFull + } + if d.ContentEncoding != "" { + return ErrInvalidEncoding + } + if err := b.buf.WriteByte('\n'); err != nil { + return err + } + if _, err := b.buf.Write(d.Data); err != nil { + return err + } + if b.count == 0 { + // For first entry, set the age of the batch + b.age = time.Now() + } + b.count++ + return nil +} + +// Count return the number of APMData entries in batch. +func (b *BatchData) Count() int { + return b.count +} + +// ShouldShip indicates when a batch is ready for sending. +// A batch is marked as ready for flush when one of the +// below conditions is reached: +// 1. max size is greater than threshold (90% of maxSize) +// 2. batch is older than maturity age +func (b *BatchData) ShouldShip() bool { + return (b.count >= int(float64(b.maxSize)*maxSizeThreshold)) || + (!b.age.IsZero() && time.Since(b.age) > b.maxAge) +} + +// Reset resets the batch to prepare for new set of data +func (b *BatchData) Reset() { + b.count, b.age = 0, zeroTime + b.buf.Truncate(b.metadataBytes) +} + +// ToAPMData returns APMData with metadata and the accumulated batch +func (b *BatchData) ToAPMData() APMData { + return APMData{ + Data: b.buf.Bytes(), + } +} diff --git a/apmproxy/batch_test.go b/apmproxy/batch_test.go new file mode 100644 index 00000000..2d93d122 --- /dev/null +++ b/apmproxy/batch_test.go @@ -0,0 +1,78 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package apmproxy + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +const metadata = `{"metadata":{"service":{"agent":{"name":"apm-lambda-extension","version":"1.1.0"},"framework":{"name":"AWS Lambda","version":""},"language":{"name":"python","version":"3.9.8"},"runtime":{"name":"","version":""},"node":{}},"user":{},"process":{"pid":0},"system":{"container":{"id":""},"kubernetes":{"node":{},"pod":{}}},"cloud":{"provider":"","instance":{},"machine":{},"account":{},"project":{},"service":{}}}}` + +func TestAdd(t *testing.T) { + t.Run("empty", func(t *testing.T) { + b := NewBatch(1, time.Hour, []byte(metadata)) + + assert.NoError(t, b.Add(APMData{})) + }) + t.Run("full", func(t *testing.T) { + b := NewBatch(1, time.Hour, []byte(metadata)) + require.NoError(t, b.Add(APMData{})) + + assert.ErrorIs(t, ErrBatchFull, b.Add(APMData{})) + }) +} + +func TestReset(t *testing.T) { + b := NewBatch(1, time.Hour, []byte(metadata)) + require.NoError(t, b.Add(APMData{})) + require.Equal(t, 1, b.Count()) + b.Reset() + + assert.Equal(t, 0, b.Count()) + assert.True(t, b.age.IsZero()) +} + +func TestShouldShip_ReasonSize(t *testing.T) { + b := NewBatch(10, time.Hour, []byte(metadata)) + + // Should flush at 90% full + for i := 0; i < 9; i++ { + assert.False(t, b.ShouldShip()) + require.NoError(t, b.Add(APMData{})) + } + + require.Equal(t, 9, b.Count()) + assert.True(t, b.ShouldShip()) +} + +func TestShouldShip_ReasonAge(t *testing.T) { + b := NewBatch(10, time.Second, []byte(metadata)) + + assert.False(t, b.ShouldShip()) + require.NoError(t, b.Add(APMData{})) + + time.Sleep(time.Second + time.Millisecond) + + // Should be ready to send now + require.Equal(t, 1, b.Count()) + assert.True(t, b.ShouldShip()) +} diff --git a/apmproxy/client.go b/apmproxy/client.go index 7b161abe..feccd734 100644 --- a/apmproxy/client.go +++ b/apmproxy/client.go @@ -46,13 +46,17 @@ const ( defaultDataForwarderTimeout time.Duration = 3 * time.Second defaultReceiverAddr = ":8200" defaultAgentBufferSize int = 100 + defaultLambdaBufferSize int = 100 + defaultMaxBatchSize int = 50 + defaultMaxBatchAge time.Duration = 2 * time.Second ) // Client is the client used to communicate with the apm server. type Client struct { mu sync.RWMutex bufferPool sync.Pool - DataChannel chan AgentData + AgentDataChannel chan APMData + LambdaDataChannel chan APMData client *http.Client Status Status ReconnectionCount int @@ -65,6 +69,10 @@ type Client struct { flushMutex sync.Mutex flushCh chan struct{} + + batch *BatchData + maxBatchSize int + maxBatchAge time.Duration } func NewClient(opts ...Option) (*Client, error) { @@ -72,7 +80,8 @@ func NewClient(opts ...Option) (*Client, error) { bufferPool: sync.Pool{New: func() interface{} { return &bytes.Buffer{} }}, - DataChannel: make(chan AgentData, defaultAgentBufferSize), + AgentDataChannel: make(chan APMData, defaultAgentBufferSize), + LambdaDataChannel: make(chan APMData, defaultLambdaBufferSize), client: &http.Client{ Transport: http.DefaultTransport.(*http.Transport).Clone(), }, @@ -86,6 +95,8 @@ func NewClient(opts ...Option) (*Client, error) { }, sendStrategy: SyncFlush, flushCh: make(chan struct{}), + maxBatchSize: defaultMaxBatchSize, + maxBatchAge: defaultMaxBatchAge, } c.client.Timeout = defaultDataForwarderTimeout diff --git a/apmproxy/client_test.go b/apmproxy/client_test.go index 8dd75d36..3f57c73b 100644 --- a/apmproxy/client_test.go +++ b/apmproxy/client_test.go @@ -18,9 +18,10 @@ package apmproxy_test import ( - "github.com/elastic/apm-aws-lambda/apmproxy" "testing" + "github.com/elastic/apm-aws-lambda/apmproxy" + "github.com/stretchr/testify/require" "go.uber.org/zap/zaptest" ) @@ -34,10 +35,7 @@ func TestClient(t *testing.T) { expectedErr: true, }, "missing base url": { - opts: []apmproxy.Option{ - apmproxy.WithURL(""), - apmproxy.WithLogger(zaptest.NewLogger(t).Sugar()), - }, + opts: []apmproxy.Option{}, expectedErr: true, }, "missing logger": { diff --git a/apmproxy/metadata.go b/apmproxy/metadata.go index d0becec0..6d63b292 100644 --- a/apmproxy/metadata.go +++ b/apmproxy/metadata.go @@ -25,13 +25,9 @@ import ( "io" ) -type MetadataContainer struct { - Metadata []byte -} - // ProcessMetadata return a byte array containing the Metadata marshaled in JSON // In case we want to update the Metadata values, usage of https://github.com/tidwall/sjson is advised -func ProcessMetadata(data AgentData) ([]byte, error) { +func ProcessMetadata(data APMData) ([]byte, error) { uncompressedData, err := GetUncompressedBytes(data.Data, data.ContentEncoding) if err != nil { return nil, fmt.Errorf("error uncompressing agent data for metadata extraction: %w", err) diff --git a/apmproxy/metadata_test.go b/apmproxy/metadata_test.go index be88a0f3..0bc3b3c3 100644 --- a/apmproxy/metadata_test.go +++ b/apmproxy/metadata_test.go @@ -21,10 +21,11 @@ import ( "bytes" "compress/gzip" "compress/zlib" - "github.com/elastic/apm-aws-lambda/apmproxy" "io" "testing" + "github.com/elastic/apm-aws-lambda/apmproxy" + "github.com/stretchr/testify/require" ) @@ -133,7 +134,7 @@ func Test_processMetadata(t *testing.T) { for name, tc := range testCases { t.Run(name, func(t *testing.T) { - agentData := apmproxy.AgentData{Data: tc.data(), ContentEncoding: tc.encodingType} + agentData := apmproxy.APMData{Data: tc.data(), ContentEncoding: tc.encodingType} extractedMetadata, err := apmproxy.ProcessMetadata(agentData) if tc.expectError != nil { @@ -167,7 +168,7 @@ func BenchmarkProcessMetadata(b *testing.B) { } for _, bench := range benchmarks { - agentData := apmproxy.AgentData{Data: bench.body, ContentEncoding: ""} + agentData := apmproxy.APMData{Data: bench.body, ContentEncoding: ""} b.Run(bench.name, func(b *testing.B) { b.ReportAllocs() diff --git a/apmproxy/option.go b/apmproxy/option.go index 60b353f5..f1862532 100644 --- a/apmproxy/option.go +++ b/apmproxy/option.go @@ -74,12 +74,36 @@ func WithSendStrategy(strategy SendStrategy) Option { // WithAgentDataBufferSize sets the agent data buffer size. func WithAgentDataBufferSize(size int) Option { return func(c *Client) { - c.DataChannel = make(chan AgentData, size) + c.AgentDataChannel = make(chan APMData, size) } } +// WithLogger configures a custom zap logger to be used by +// the client. func WithLogger(logger *zap.SugaredLogger) Option { return func(c *Client) { c.logger = logger } } + +// WithMaxBatchSize configures the maximum batch size for +// the payload sent to the APMServer +func WithMaxBatchSize(size int) Option { + return func(c *Client) { + c.maxBatchSize = size + } +} + +// WithMaxBatchAge configures the maximum age of the batch +// before it is sent to APMServer. Age is measured from the +// time the first entry is added in the batch. +// +// It is possible for batch age to be greater than the +// configured max batch age when sending since a send is +// triggered by a new log event and log events can be delayed +// due to various factors. +func WithMaxBatchAge(age time.Duration) Option { + return func(c *Client) { + c.maxBatchAge = age + } +} diff --git a/apmproxy/receiver.go b/apmproxy/receiver.go index aca911f4..d4c40951 100644 --- a/apmproxy/receiver.go +++ b/apmproxy/receiver.go @@ -29,12 +29,15 @@ import ( "time" ) -type AgentData struct { +// APMData represents data to be sent to APMServer. `Agent` type +// data will have `metadata` as ndjson whereas `lambda` type data +// will be without metadata. +type APMData struct { Data []byte ContentEncoding string } -// StartHttpServer starts the server listening for APM agent data. +// StartReceiver starts the server listening for APM agent data. func (c *Client) StartReceiver() error { mux := http.NewServeMux() @@ -123,13 +126,17 @@ func (c *Client) handleIntakeV2Events() func(w http.ResponseWriter, r *http.Requ agentFlushed := r.URL.Query().Get("flushed") == "true" - agentData := AgentData{ + agentData := APMData{ Data: rawBytes, ContentEncoding: r.Header.Get("Content-Encoding"), } if len(agentData.Data) != 0 { - c.EnqueueAPMData(agentData) + select { + case c.AgentDataChannel <- agentData: + default: + c.logger.Warnf("Channel full: dropping a subset of agent data") + } } if agentFlushed { diff --git a/apmproxy/receiver_test.go b/apmproxy/receiver_test.go index a66de72c..155ae79a 100644 --- a/apmproxy/receiver_test.go +++ b/apmproxy/receiver_test.go @@ -19,7 +19,6 @@ package apmproxy_test import ( "bytes" - "github.com/elastic/apm-aws-lambda/apmproxy" "io" "net" "net/http" @@ -28,6 +27,8 @@ import ( "testing" "time" + "github.com/elastic/apm-aws-lambda/apmproxy" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/zap/zaptest" @@ -243,7 +244,7 @@ func Test_handleIntakeV2EventsNoQueryParam(t *testing.T) { resp, err := client.Do(req) require.NoError(t, err) select { - case <-apmClient.DataChannel: + case <-apmClient.AgentDataChannel: case <-time.After(1 * time.Second): t.Log("Timed out waiting for server to send FuncDone signal") t.Fail() diff --git a/app/run.go b/app/run.go index 8a9a1438..f4035660 100644 --- a/app/run.go +++ b/app/run.go @@ -23,7 +23,6 @@ import ( "sync" "time" - "github.com/elastic/apm-aws-lambda/apmproxy" "github.com/elastic/apm-aws-lambda/extension" ) @@ -82,9 +81,6 @@ func (app *App) Run(ctx context.Context) error { // The previous event id is used to validate the received Lambda metrics var prevEvent *extension.NextEventResponse - // This data structure contains metadata tied to the current Lambda instance. If empty, it is populated once for each - // active Lambda environment - metadataContainer := apmproxy.MetadataContainer{} for { select { @@ -96,7 +92,7 @@ func (app *App) Run(ctx context.Context) error { // Use a wait group to ensure the background go routine sending to the APM server // completes before signaling that the extension is ready for the next invocation. var backgroundDataSendWg sync.WaitGroup - event, err := app.processEvent(ctx, &backgroundDataSendWg, prevEvent, &metadataContainer) + event, err := app.processEvent(ctx, &backgroundDataSendWg, prevEvent) if err != nil { return err } @@ -120,7 +116,6 @@ func (app *App) processEvent( ctx context.Context, backgroundDataSendWg *sync.WaitGroup, prevEvent *extension.NextEventResponse, - metadataContainer *apmproxy.MetadataContainer, ) (*extension.NextEventResponse, error) { // Reset flush state for future events. defer app.apmClient.ResetFlush() @@ -159,7 +154,7 @@ func (app *App) processEvent( backgroundDataSendWg.Add(1) go func() { defer backgroundDataSendWg.Done() - if err := app.apmClient.ForwardApmData(invocationCtx, metadataContainer); err != nil { + if err := app.apmClient.ForwardApmData(invocationCtx); err != nil { app.logger.Error(err) } }() @@ -174,7 +169,6 @@ func (app *App) processEvent( event.RequestID, event.InvokedFunctionArn, app.apmClient, - metadataContainer, runtimeDone, prevEvent, ); err != nil { diff --git a/logsapi/event.go b/logsapi/event.go index 0cefa6be..414bbff5 100644 --- a/logsapi/event.go +++ b/logsapi/event.go @@ -33,6 +33,7 @@ const ( PlatformRuntimeDone LogEventType = "platform.runtimeDone" PlatformFault LogEventType = "platform.fault" PlatformReport LogEventType = "platform.report" + PlatformLogsDropped LogEventType = "platform.logsDropped" PlatformStart LogEventType = "platform.start" PlatformEnd LogEventType = "platform.end" FunctionLog LogEventType = "function" @@ -60,7 +61,6 @@ func (lc *Client) ProcessLogs( requestID string, invokedFnArn string, apmClient *apmproxy.Client, - metadataContainer *apmproxy.MetadataContainer, runtimeDoneSignal chan struct{}, prevEvent *extension.NextEventResponse, ) error { @@ -89,22 +89,24 @@ func (lc *Client) ProcessLogs( case PlatformReport: if prevEvent != nil && logEvent.Record.RequestID == prevEvent.RequestID { lc.logger.Debug("Received platform report for the previous function invocation") - processedMetrics, err := ProcessPlatformReport(metadataContainer, prevEvent, logEvent) + processedMetrics, err := ProcessPlatformReport(prevEvent, logEvent) if err != nil { - lc.logger.Errorf("Error processing Lambda platform metrics : %v", err) + lc.logger.Errorf("Error processing Lambda platform metrics: %v", err) } else { - apmClient.EnqueueAPMData(processedMetrics) + select { + case apmClient.LambdaDataChannel <- processedMetrics: + case <-ctx.Done(): + } } } else { lc.logger.Warn("report event request id didn't match the previous event id") lc.logger.Debug("Log API runtimeDone event request id didn't match") } + case PlatformLogsDropped: + lc.logger.Warnf("Logs dropped due to extension falling behind: %v", logEvent.Record) case FunctionLog: - // TODO: @lahsivjar Buffer logs and send batches of data to APM-Server. - // Buffering should account for metadata being available before sending. lc.logger.Debug("Received function log") processedLog, err := ProcessFunctionLog( - metadataContainer, platformStartReqID, invokedFnArn, logEvent, @@ -112,7 +114,10 @@ func (lc *Client) ProcessLogs( if err != nil { lc.logger.Errorf("Error processing function log : %v", err) } else { - apmClient.EnqueueAPMData(processedLog) + select { + case apmClient.LambdaDataChannel <- processedLog: + case <-ctx.Done(): + } } } case <-ctx.Done(): diff --git a/logsapi/functionlogs.go b/logsapi/functionlogs.go index 77298890..19b67b72 100644 --- a/logsapi/functionlogs.go +++ b/logsapi/functionlogs.go @@ -18,8 +18,6 @@ package logsapi import ( - "errors" - "github.com/elastic/apm-aws-lambda/apmproxy" "go.elastic.co/apm/v2/model" "go.elastic.co/fastjson" @@ -88,15 +86,10 @@ func (lc logContainer) MarshalFastJSON(json *fastjson.Writer) error { // ProcessFunctionLog consumes agent metadata and log event from Lambda // logs API to create a payload for APM server. func ProcessFunctionLog( - metadataContainer *apmproxy.MetadataContainer, requestID string, invokedFnArn string, log LogEvent, -) (apmproxy.AgentData, error) { - if metadataContainer == nil || len(metadataContainer.Metadata) == 0 { - return apmproxy.AgentData{}, errors.New("metadata is required") - } - +) (apmproxy.APMData, error) { lc := logContainer{ Log: &logLine{ Timestamp: model.Time(log.Time), @@ -111,14 +104,10 @@ func ProcessFunctionLog( var jsonWriter fastjson.Writer if err := lc.MarshalFastJSON(&jsonWriter); err != nil { - return apmproxy.AgentData{}, err + return apmproxy.APMData{}, err } - capacity := len(metadataContainer.Metadata) + jsonWriter.Size() + 1 - logData := make([]byte, len(metadataContainer.Metadata), capacity) - copy(logData, metadataContainer.Metadata) - - logData = append(logData, '\n') - logData = append(logData, jsonWriter.Bytes()...) - return apmproxy.AgentData{Data: logData}, nil + return apmproxy.APMData{ + Data: jsonWriter.Bytes(), + }, nil } diff --git a/logsapi/functionlogs_test.go b/logsapi/functionlogs_test.go index 5aa4034b..7ebf6dee 100644 --- a/logsapi/functionlogs_test.go +++ b/logsapi/functionlogs_test.go @@ -22,15 +22,11 @@ import ( "testing" "time" - "github.com/elastic/apm-aws-lambda/apmproxy" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) func TestProcessFunctionLog(t *testing.T) { - metadataContainer := &apmproxy.MetadataContainer{ - Metadata: []byte(`{"metadata":{"service":{"agent":{"name":"apm-lambda-extension","version":"1.1.0"},"framework":{"name":"AWS Lambda","version":""},"language":{"name":"python","version":"3.9.8"},"runtime":{"name":"","version":""},"node":{}},"user":{},"process":{"pid":0},"system":{"container":{"id":""},"kubernetes":{"node":{},"pod":{}}},"cloud":{"provider":"","instance":{},"machine":{},"account":{},"project":{},"service":{}}}}`), - } event := LogEvent{ Time: time.Date(2022, 11, 12, 0, 0, 0, 0, time.UTC), Type: FunctionLog, @@ -39,15 +35,14 @@ func TestProcessFunctionLog(t *testing.T) { reqID := "8476a536-e9f4-11e8-9739-2dfe598c3fcd" invokedFnArn := "arn:aws:lambda:us-east-2:123456789012:function:custom-runtime" expectedData := fmt.Sprintf( - "%s\n{\"log\":{\"message\":\"%s\",\"@timestamp\":%d,\"faas\":{\"id\":\"%s\",\"execution\":\"%s\"}}}", - metadataContainer.Metadata, + "{\"log\":{\"message\":\"%s\",\"@timestamp\":%d,\"faas\":{\"id\":\"%s\",\"execution\":\"%s\"}}}", event.StringRecord, event.Time.UnixNano()/int64(time.Microsecond), invokedFnArn, reqID, ) - apmData, err := ProcessFunctionLog(metadataContainer, reqID, invokedFnArn, event) + apmData, err := ProcessFunctionLog(reqID, invokedFnArn, event) require.NoError(t, err) assert.Equal(t, expectedData, string(apmData.Data)) diff --git a/logsapi/metrics.go b/logsapi/metrics.go index 6108b22c..c7ebf92a 100644 --- a/logsapi/metrics.go +++ b/logsapi/metrics.go @@ -18,7 +18,6 @@ package logsapi import ( - "errors" "math" "github.com/elastic/apm-aws-lambda/apmproxy" @@ -63,12 +62,7 @@ func (mc MetricsContainer) MarshalFastJSON(json *fastjson.Writer) error { return nil } -func ProcessPlatformReport(metadataContainer *apmproxy.MetadataContainer, functionData *extension.NextEventResponse, platformReport LogEvent) (apmproxy.AgentData, error) { - - if metadataContainer == nil || len(metadataContainer.Metadata) == 0 { - return apmproxy.AgentData{}, errors.New("metadata is not populated") - } - +func ProcessPlatformReport(functionData *extension.NextEventResponse, platformReport LogEvent) (apmproxy.APMData, error) { metricsContainer := MetricsContainer{ Metrics: &model.Metrics{}, } @@ -103,14 +97,10 @@ func ProcessPlatformReport(metadataContainer *apmproxy.MetadataContainer, functi var jsonWriter fastjson.Writer if err := metricsContainer.MarshalFastJSON(&jsonWriter); err != nil { - return apmproxy.AgentData{}, err + return apmproxy.APMData{}, err } - capacity := len(metadataContainer.Metadata) + jsonWriter.Size() + 1 // 1 for newline - metricsData := make([]byte, len(metadataContainer.Metadata), capacity) - copy(metricsData, metadataContainer.Metadata) - - metricsData = append(metricsData, []byte("\n")...) - metricsData = append(metricsData, jsonWriter.Bytes()...) - return apmproxy.AgentData{Data: metricsData}, nil + return apmproxy.APMData{ + Data: jsonWriter.Bytes(), + }, nil } diff --git a/logsapi/metrics_test.go b/logsapi/metrics_test.go index 7ca086a5..e1e53941 100644 --- a/logsapi/metrics_test.go +++ b/logsapi/metrics_test.go @@ -20,7 +20,6 @@ package logsapi import ( "fmt" "log" - "strings" "testing" "time" @@ -32,10 +31,6 @@ import ( ) func TestProcessPlatformReport_Coldstart(t *testing.T) { - mc := apmproxy.MetadataContainer{ - Metadata: []byte(fmt.Sprintf(`{"metadata":{"service":{"agent":{"name":"apm-lambda-extension","version":"%s"},"framework":{"name":"AWS Lambda","version":""},"language":{"name":"python","version":"3.9.8"},"runtime":{"name":"","version":""},"node":{}},"user":{},"process":{"pid":0},"system":{"container":{"id":""},"kubernetes":{"node":{},"pod":{}}},"cloud":{"provider":"","instance":{},"machine":{},"account":{},"project":{},"service":{}}}}`, extension.Version)), - } - timestamp := time.Now() pm := PlatformMetrics{ @@ -71,31 +66,21 @@ func TestProcessPlatformReport_Coldstart(t *testing.T) { }, } - desiredOutputMetadata := fmt.Sprintf(`{"metadata":{"service":{"agent":{"name":"apm-lambda-extension","version":"%s"},"framework":{"name":"AWS Lambda","version":""},"language":{"name":"python","version":"3.9.8"},"runtime":{"name":"","version":""},"node":{}},"user":{},"process":{"pid":0},"system":{"container":{"id":""},"kubernetes":{"node":{},"pod":{}}},"cloud":{"provider":"","instance":{},"machine":{},"account":{},"project":{},"service":{}}}}`, extension.Version) - desiredOutputMetrics := fmt.Sprintf(`{"metricset":{"samples":{"faas.coldstart_duration":{"value":422.9700012207031},"faas.timeout":{"value":5000},"system.memory.total":{"value":1.34217728e+08},"system.memory.actual.free":{"value":5.4525952e+07},"faas.duration":{"value":182.42999267578125},"faas.billed_duration":{"value":183}},"timestamp":%d,"faas":{"coldstart":true,"execution":"6f7f0961f83442118a7af6fe80b88d56","id":"arn:aws:lambda:us-east-2:123456789012:function:custom-runtime"}}}`, timestamp.UnixNano()/1e3) - rawBytes, err := ProcessPlatformReport(&mc, &event, logEvent) + apmData, err := ProcessPlatformReport(&event, logEvent) require.NoError(t, err) - requestBytes, err := apmproxy.GetUncompressedBytes(rawBytes.Data, "") + requestBytes, err := apmproxy.GetUncompressedBytes(apmData.Data, "") require.NoError(t, err) out := string(requestBytes) log.Println(out) - processingResult := strings.Split(string(requestBytes), "\n") - - assert.JSONEq(t, desiredOutputMetadata, processingResult[0]) - assert.JSONEq(t, desiredOutputMetrics, processingResult[1]) + assert.JSONEq(t, desiredOutputMetrics, string(requestBytes)) } func TestProcessPlatformReport_NoColdstart(t *testing.T) { - - mc := apmproxy.MetadataContainer{ - Metadata: []byte(fmt.Sprintf(`{"metadata":{"service":{"agent":{"name":"apm-lambda-extension","version":"%s"},"framework":{"name":"AWS Lambda","version":""},"language":{"name":"python","version":"3.9.8"},"runtime":{"name":"","version":""},"node":{}},"user":{},"process":{"pid":0},"system":{"container":{"id":""},"kubernetes":{"node":{},"pod":{}}},"cloud":{"provider":"","instance":{},"machine":{},"account":{},"project":{},"service":{}}}}`, extension.Version)), - } - timestamp := time.Now() pm := PlatformMetrics{ @@ -131,73 +116,18 @@ func TestProcessPlatformReport_NoColdstart(t *testing.T) { }, } - desiredOutputMetadata := fmt.Sprintf(`{"metadata":{"service":{"agent":{"name":"apm-lambda-extension","version":"%s"},"framework":{"name":"AWS Lambda","version":""},"language":{"name":"python","version":"3.9.8"},"runtime":{"name":"","version":""},"node":{}},"user":{},"process":{"pid":0},"system":{"container":{"id":""},"kubernetes":{"node":{},"pod":{}}},"cloud":{"provider":"","instance":{},"machine":{},"account":{},"project":{},"service":{}}}}`, extension.Version) - desiredOutputMetrics := fmt.Sprintf(`{"metricset":{"samples":{"faas.coldstart_duration":{"value":0},"faas.timeout":{"value":5000},"system.memory.total":{"value":1.34217728e+08},"system.memory.actual.free":{"value":5.4525952e+07},"faas.duration":{"value":182.42999267578125},"faas.billed_duration":{"value":183}},"timestamp":%d,"faas":{"coldstart":false,"execution":"6f7f0961f83442118a7af6fe80b88d56","id":"arn:aws:lambda:us-east-2:123456789012:function:custom-runtime"}}}`, timestamp.UnixNano()/1e3) - rawBytes, err := ProcessPlatformReport(&mc, &event, logEvent) - require.NoError(t, err) - - requestBytes, err := apmproxy.GetUncompressedBytes(rawBytes.Data, "") - require.NoError(t, err) - - out := string(requestBytes) - log.Println(out) - - processingResult := strings.Split(string(requestBytes), "\n") - - assert.JSONEq(t, desiredOutputMetadata, processingResult[0]) - assert.JSONEq(t, desiredOutputMetrics, processingResult[1]) -} - -func TestProcessPlatformReport_DataCorruption(t *testing.T) { - // Allocate big capacity metadata array to prevent reallocation - raw := make([]byte, 0, 7000) - mc := &apmproxy.MetadataContainer{ - Metadata: append(raw, []byte(fmt.Sprintf(`{"metadata":{"service":{"agent":{"name":"apm-lambda-extension","version":"%s"},"framework":{"name":"AWS Lambda","version":""},"language":{"name":"python","version":"3.9.8"},"runtime":{"name":"","version":""},"node":{}},"user":{},"process":{"pid":0},"system":{"container":{"id":""},"kubernetes":{"node":{},"pod":{}}},"cloud":{"provider":"","instance":{},"machine":{},"account":{},"project":{},"service":{}}}}`, extension.Version))...), - } - reqID := "8476a536-e9f4-11e8-9739-2dfe598c3fcd" - invokedFnArn := "arn:aws:lambda:us-east-2:123456789012:function:custom-runtime" - timestamp := time.Date(2022, 11, 12, 0, 0, 0, 0, time.UTC) - logEvent := LogEvent{ - Time: timestamp, - Type: PlatformReport, - Record: LogEventRecord{ - RequestID: reqID, - Metrics: PlatformMetrics{ - DurationMs: 1.0, - BilledDurationMs: 1, - MemorySizeMB: 1, - MaxMemoryUsedMB: 1, - InitDurationMs: 1.0, - }, - }, - } - nextEventResp := &extension.NextEventResponse{ - Timestamp: timestamp, - EventType: extension.Invoke, - RequestID: reqID, - InvokedFunctionArn: invokedFnArn, - } - expected := "{\"metricset\":{\"samples\":{\"system.memory.total\":{\"value\":1.048576e+06},\"system.memory.actual.free\":{\"value\":0},\"faas.duration\":{\"value\":1},\"faas.billed_duration\":{\"value\":1},\"faas.coldstart_duration\":{\"value\":1},\"faas.timeout\":{\"value\":-1.6682112e+12}},\"timestamp\":1668211200000000,\"faas\":{\"coldstart\":true,\"execution\":\"8476a536-e9f4-11e8-9739-2dfe598c3fcd\",\"id\":\"arn:aws:lambda:us-east-2:123456789012:function:custom-runtime\"}}}" - - agentData, err := ProcessPlatformReport(mc, nextEventResp, logEvent) + apmData, err := ProcessPlatformReport(&event, logEvent) require.NoError(t, err) - // process another platform report to ensure the previous payload is not corrupted - logEvent.Record.RequestID = "corrupt-req-id" - nextEventResp.RequestID = "corrupt-req-id" - _, err = ProcessPlatformReport(mc, nextEventResp, logEvent) + requestBytes, err := apmproxy.GetUncompressedBytes(apmData.Data, "") require.NoError(t, err) - data := strings.Split(string(agentData.Data), "\n") - assert.JSONEq(t, expected, data[1]) + assert.JSONEq(t, desiredOutputMetrics, string(requestBytes)) } func BenchmarkPlatformReport(b *testing.B) { - mc := &apmproxy.MetadataContainer{ - Metadata: []byte(`{"metadata":{"service":{"agent":{"name":"apm-lambda-extension","version":"1.1.0"},"framework":{"name":"AWS Lambda","version":""},"language":{"name":"python","version":"3.9.8"},"runtime":{"name":"","version":""},"node":{}},"user":{},"process":{"pid":0},"system":{"container":{"id":""},"kubernetes":{"node":{},"pod":{}}},"cloud":{"provider":"","instance":{},"machine":{},"account":{},"project":{},"service":{}}}}`), - } reqID := "8476a536-e9f4-11e8-9739-2dfe598c3fcd" invokedFnArn := "arn:aws:lambda:us-east-2:123456789012:function:custom-runtime" timestamp := time.Date(2022, 11, 12, 0, 0, 0, 0, time.UTC) @@ -223,7 +153,7 @@ func BenchmarkPlatformReport(b *testing.B) { } for n := 0; n < b.N; n++ { - _, err := ProcessPlatformReport(mc, nextEventResp, logEvent) + _, err := ProcessPlatformReport(nextEventResp, logEvent) require.NoError(b, err) } } diff --git a/logsapi/route_handlers.go b/logsapi/route_handlers.go index 0981d497..6dbca1b1 100644 --- a/logsapi/route_handlers.go +++ b/logsapi/route_handlers.go @@ -40,7 +40,13 @@ func handleLogEventsRequest(logger *zap.SugaredLogger, logsChannel chan LogEvent w.WriteHeader(http.StatusInternalServerError) continue } - logsChannel <- logEvents[idx] + select { + case logsChannel <- logEvents[idx]: + case <-r.Context().Done(): + logger.Warnf("Failed to enqueue event, signaling lambda to retry") + w.WriteHeader(http.StatusInternalServerError) + return + } } } } diff --git a/logsapi/subscribe.go b/logsapi/subscribe.go index 9e96a096..c13cadcb 100644 --- a/logsapi/subscribe.go +++ b/logsapi/subscribe.go @@ -86,8 +86,8 @@ func (lc *Client) subscribe(types []SubscriptionType, extensionID string, uri st LogTypes: types, BufferingCfg: BufferingCfg{ MaxItems: 10000, - MaxBytes: 262144, - TimeoutMS: 25, + MaxBytes: 1024 * 1024, + TimeoutMS: 100, }, Destination: Destination{ Protocol: "HTTP", diff --git a/main_test.go b/main_test.go index 72609e82..774d759a 100644 --- a/main_test.go +++ b/main_test.go @@ -156,6 +156,21 @@ func newMockApmServer(t *testing.T, l *zap.SugaredLogger) (*MockServerInternals, func newMockLambdaServer(t *testing.T, logsapiAddr string, eventsChannel chan MockEvent, l *zap.SugaredLogger) *MockServerInternals { var lambdaServerInternals MockServerInternals + // A big queue that can hold all the events required for a test + mockLogEventQ := make(chan logsapi.LogEvent, 100) + ctx, cancel := context.WithCancel(context.Background()) + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + startLogSender(ctx, mockLogEventQ, logsapiAddr, l) + }() + t.Cleanup(func() { + cancel() + wg.Wait() + }) + lambdaServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { switch r.RequestURI { // Extension registration request @@ -171,19 +186,19 @@ func newMockLambdaServer(t *testing.T, logsapiAddr string, eventsChannel chan Mo } case "/2020-01-01/extension/event/next": lambdaServerInternals.WaitGroup.Wait() - currId := uuid.New().String() + currID := uuid.New().String() select { case nextEvent := <-eventsChannel: - sendNextEventInfo(w, currId, nextEvent, l) - go processMockEvent(currId, nextEvent, os.Getenv("ELASTIC_APM_DATA_RECEIVER_SERVER_PORT"), logsapiAddr, &lambdaServerInternals, l) + sendNextEventInfo(w, currID, nextEvent.Timeout, nextEvent.Type == Shutdown, l) + go processMockEvent(mockLogEventQ, currID, nextEvent, os.Getenv("ELASTIC_APM_DATA_RECEIVER_SERVER_PORT"), &lambdaServerInternals, l) default: finalShutDown := MockEvent{ Type: Shutdown, ExecutionDuration: 0, Timeout: 0, } - sendNextEventInfo(w, currId, finalShutDown, l) - go processMockEvent(currId, finalShutDown, os.Getenv("ELASTIC_APM_DATA_RECEIVER_SERVER_PORT"), logsapiAddr, &lambdaServerInternals, l) + sendNextEventInfo(w, currID, finalShutDown.Timeout, true, l) + go processMockEvent(mockLogEventQ, currID, finalShutDown, os.Getenv("ELASTIC_APM_DATA_RECEIVER_SERVER_PORT"), &lambdaServerInternals, l) } // Logs API subscription request case "/2020-08-15/logs": @@ -209,14 +224,12 @@ func newMockLambdaServer(t *testing.T, logsapiAddr string, eventsChannel chan Mo func newTestStructs(t *testing.T) chan MockEvent { http.DefaultServeMux = new(http.ServeMux) - _, cancel := context.WithCancel(context.Background()) - t.Cleanup(func() { cancel() }) eventsChannel := make(chan MockEvent, 100) return eventsChannel } -func processMockEvent(currId string, event MockEvent, extensionPort string, logsapiAddr string, internals *MockServerInternals, l *zap.SugaredLogger) { - sendLogEvent(logsapiAddr, currId, logsapi.PlatformStart, l) +func processMockEvent(q chan<- logsapi.LogEvent, currID string, event MockEvent, extensionPort string, internals *MockServerInternals, l *zap.SugaredLogger) { + queueLogEvent(q, currID, logsapi.PlatformStart, l) client := http.Client{} // Use a custom transport with a low timeout @@ -311,22 +324,22 @@ func processMockEvent(currId string, event MockEvent, extensionPort string, logs case Shutdown: } if sendRuntimeDone { - sendLogEvent(logsapiAddr, currId, logsapi.PlatformRuntimeDone, l) + queueLogEvent(q, currID, logsapi.PlatformRuntimeDone, l) } if sendMetrics { - sendLogEvent(logsapiAddr, currId, logsapi.PlatformReport, l) + queueLogEvent(q, currID, logsapi.PlatformReport, l) } } -func sendNextEventInfo(w http.ResponseWriter, id string, event MockEvent, l *zap.SugaredLogger) { +func sendNextEventInfo(w http.ResponseWriter, id string, timeoutSec float64, shutdown bool, l *zap.SugaredLogger) { nextEventInfo := extension.NextEventResponse{ EventType: "INVOKE", - DeadlineMs: time.Now().UnixNano()/int64(time.Millisecond) + int64(event.Timeout*1000), + DeadlineMs: time.Now().UnixNano()/int64(time.Millisecond) + int64(timeoutSec*1000), RequestID: id, InvokedFunctionArn: "arn:aws:lambda:eu-central-1:627286350134:function:main_unit_test", Tracing: extension.Tracing{}, } - if event.Type == Shutdown { + if shutdown { nextEventInfo.EventType = "SHUTDOWN" } @@ -335,9 +348,9 @@ func sendNextEventInfo(w http.ResponseWriter, id string, event MockEvent, l *zap } } -func sendLogEvent(logsapiAddr string, requestId string, logEventType logsapi.LogEventType, l *zap.SugaredLogger) { +func queueLogEvent(q chan<- logsapi.LogEvent, requestID string, logEventType logsapi.LogEventType, l *zap.SugaredLogger) { record := logsapi.LogEventRecord{ - RequestID: requestId, + RequestID: requestID, } if logEventType == logsapi.PlatformReport { record.Metrics = logsapi.PlatformMetrics{ @@ -359,27 +372,64 @@ func sendLogEvent(logsapiAddr string, requestId string, logEventType logsapi.Log bufRecord := new(bytes.Buffer) if err := json.NewEncoder(bufRecord).Encode(record); err != nil { l.Errorf("Could not encode record : %v", err) - return } logEvent.StringRecord = bufRecord.String() + q <- logEvent +} - // Convert full log event to JSON - bufLogEvent := new(bytes.Buffer) - if err := json.NewEncoder(bufLogEvent).Encode([]logsapi.LogEvent{logEvent}); err != nil { - l.Errorf("Could not encode record : %v", err) - return +func startLogSender(ctx context.Context, q <-chan logsapi.LogEvent, logsapiAddr string, l *zap.SugaredLogger) { + client := http.Client{ + Timeout: 10 * time.Millisecond, } + doSend := func(events []logsapi.LogEvent) error { + var buf bytes.Buffer + if err := json.NewEncoder(&buf).Encode(events); err != nil { + return err + } - req, err := http.NewRequest(http.MethodPost, "http://"+logsapiAddr, bufLogEvent) - if err != nil { - l.Errorf("Could not create logs api request: %v", err) - return - } + req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("http://%s", logsapiAddr), &buf) + if err != nil { + return err + } - client := http.Client{} - if _, err := client.Do(req); err != nil { - l.Errorf("Could not send log event : %v", err) - return + resp, err := client.Do(req) + if err != nil { + return err + } + if resp.StatusCode/100 != 2 { + return fmt.Errorf("received a non 2xx status code: %d", resp.StatusCode) + } + return nil + } + + var batch []logsapi.LogEvent + flushInterval := time.NewTicker(100 * time.Millisecond) + defer flushInterval.Stop() + for { + select { + case <-flushInterval.C: + var trySend bool + for !trySend { + // TODO: @lahsivjar mock dropping of logs, batch age and batch size + // TODO: @lahsivjar is it possible for one batch to have platform.runtimeDone + // event in middle of the batch? + select { + case e := <-q: + batch = append(batch, e) + default: + trySend = true + if len(batch) > 0 { + if err := doSend(batch); err != nil { + l.Warnf("mock lambda API failed to send logs to the extension: %v", err) + } else { + batch = batch[:0] + } + } + } + } + case <-ctx.Done(): + return + } } } @@ -513,6 +563,8 @@ func TestAPMServerHangs(t *testing.T) { newMockLambdaServer(t, logsapiAddr, eventsChannel, l) eventsChain := []MockEvent{ + // The first response sets the metadata so that the lambda logs handler is accepting requests + {Type: InvokeStandard, APMServerBehavior: TimelyResponse, ExecutionDuration: 1, Timeout: 5}, {Type: InvokeStandard, APMServerBehavior: Hangs, ExecutionDuration: 1, Timeout: 500}, } eventQueueGenerator(eventsChain, eventsChannel) @@ -692,7 +744,7 @@ func TestInfoRequestHangs(t *testing.T) { lambdaServerInternals := newMockLambdaServer(t, logsapiAddr, eventsChannel, l) eventsChain := []MockEvent{ - {Type: InvokeStandardInfo, APMServerBehavior: Hangs, ExecutionDuration: 1, Timeout: 500}, + {Type: InvokeStandardInfo, APMServerBehavior: Hangs, ExecutionDuration: 1, Timeout: 5}, } eventQueueGenerator(eventsChain, eventsChannel) select { @@ -772,7 +824,6 @@ func TestMetricsWithMetadata(t *testing.T) { func runApp(t *testing.T, logsapiAddr string) <-chan struct{} { ctx, cancel := context.WithCancel(context.Background()) - app, err := app.New(ctx, app.WithExtensionName("apm-lambda-extension"), app.WithLambdaRuntimeAPI(os.Getenv("AWS_LAMBDA_RUNTIME_API")),