diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 06fa4eb8..7f8087eb 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -22,6 +22,10 @@ https://github.com/elastic/apm-aws-lambda/compare/v1.2.0...main[View commits] +[float] +===== Features +- Create proxy transaction with error results if not reported by agent {lambda-pull}315[315] + [float] [[lambda-1.2.0]] === 1.2.0 - 2022/11/01 diff --git a/NOTICE.txt b/NOTICE.txt index 63c735e6..fe042a4a 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -2567,6 +2567,127 @@ OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +-------------------------------------------------------------------------------- +Module : github.com/tidwall/gjson +Version : v1.14.3 +Time : 2022-08-16T13:44:06Z +Licence : MIT + +Contents of probable licence file $GOMODCACHE/github.com/tidwall/gjson@v1.14.3/LICENSE: + +The MIT License (MIT) + +Copyright (c) 2016 Josh Baker + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software is furnished to do so, +subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + + +-------------------------------------------------------------------------------- +Module : github.com/tidwall/match +Version : v1.1.1 +Time : 2021-10-08T14:36:13Z +Licence : MIT + +Contents of probable licence file $GOMODCACHE/github.com/tidwall/match@v1.1.1/LICENSE: + +The MIT License (MIT) + +Copyright (c) 2016 Josh Baker + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software is furnished to do so, +subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + + +-------------------------------------------------------------------------------- +Module : github.com/tidwall/pretty +Version : v1.2.1 +Time : 2022-10-01T20:21:24Z +Licence : MIT + +Contents of probable licence file $GOMODCACHE/github.com/tidwall/pretty@v1.2.1/LICENSE: + +The MIT License (MIT) + +Copyright (c) 2017 Josh Baker + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software is furnished to do so, +subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + + +-------------------------------------------------------------------------------- +Module : github.com/tidwall/sjson +Version : v1.2.5 +Time : 2022-08-05T01:15:59Z +Licence : MIT + +Contents of probable licence file $GOMODCACHE/github.com/tidwall/sjson@v1.2.5/LICENSE: + +The MIT License (MIT) + +Copyright (c) 2016 Josh Baker + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software is furnished to do so, +subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + + + -------------------------------------------------------------------------------- Module : go.elastic.co/apm/v2 Version : v2.1.1-0.20220617022209-90f624fe11b0 diff --git a/accumulator/apmdata.go b/accumulator/apmdata.go new file mode 100644 index 00000000..6b01dc58 --- /dev/null +++ b/accumulator/apmdata.go @@ -0,0 +1,26 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package accumulator + +// APMData represents data to be sent to APMServer. `Agent` type +// data will have `metadata` as ndjson whereas `lambda` type data +// will be without metadata. +type APMData struct { + Data []byte + ContentEncoding string +} diff --git a/accumulator/batch.go b/accumulator/batch.go new file mode 100644 index 00000000..1eef7056 --- /dev/null +++ b/accumulator/batch.go @@ -0,0 +1,283 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package accumulator + +import ( + "bytes" + "errors" + "fmt" + "sync" + "time" + + "github.com/tidwall/gjson" +) + +var ( + // ErrMetadataUnavailable is returned when a lambda data is added to + // the batch without metadata being set. + ErrMetadataUnavailable = errors.New("metadata is not yet available") + // ErrBatchFull signfies that the batch has reached full capacity + // and cannot accept more entries. + ErrBatchFull = errors.New("batch is full") + // ErrInvalidEncoding is returned for any APMData that is encoded + // with any encoding format + ErrInvalidEncoding = errors.New("encoded data not supported") +) + +var ( + maxSizeThreshold = 0.9 + zeroTime = time.Time{} + newLineSep = []byte("\n") + transactionKey = []byte("transaction") +) + +// Batch manages the data that needs to be shipped to APM Server. It holds +// all the invocations that have not yet been shipped to the APM Server and +// is responsible for correlating the invocation with the APM data collected +// from all sources (logs API & APM Agents). As the batch gets the required +// data it marks the data ready for shipping to APM Server. +type Batch struct { + mu sync.RWMutex + // metadataBytes is the size of the metadata in bytes + metadataBytes int + // buf holds data that is ready to be shipped to APM-Server + buf bytes.Buffer + // invocations holds the data for a specific invocation with + // request ID as the key. + invocations map[string]*Invocation + count int + age time.Time + maxSize int + maxAge time.Duration + currentlyExecutingRequestID string +} + +// NewBatch creates a new BatchData which can accept a +// maximum number of entries as specified by the arguments. +func NewBatch(maxSize int, maxAge time.Duration) *Batch { + return &Batch{ + invocations: make(map[string]*Invocation), + maxSize: maxSize, + maxAge: maxAge, + } +} + +// RegisterInvocation registers a new function invocation against its request +// ID. It also updates the caches for currently executing request ID. +func (b *Batch) RegisterInvocation( + requestID, functionARN string, + deadlineMs int64, + timestamp time.Time, +) { + b.mu.Lock() + defer b.mu.Unlock() + b.invocations[requestID] = &Invocation{ + RequestID: requestID, + FunctionARN: functionARN, + DeadlineMs: deadlineMs, + Timestamp: timestamp, + } + b.currentlyExecutingRequestID = requestID +} + +// OnAgentInit caches the transactionID and the payload for the currently +// executing invocation as reported by the agent. The traceID and transactionID +// will be used to create a new transaction in an event the actual transaction +// is not reported by the agent due to unexpected termination. +func (b *Batch) OnAgentInit(transactionID string, payload []byte) error { + b.mu.Lock() + defer b.mu.Unlock() + i, ok := b.invocations[b.currentlyExecutingRequestID] + if !ok { + return fmt.Errorf("invocation for requestID %s does not exist", b.currentlyExecutingRequestID) + } + if !isTransactionEvent(payload) { + return errors.New("invalid payload") + } + i.TransactionID, i.AgentPayload = transactionID, payload + return nil +} + +// AddAgentData adds a data received from agent. For a specific invocation +// agent data is always received in the same invocation. +func (b *Batch) AddAgentData(apmData APMData) error { + if len(apmData.Data) == 0 { + return nil + } + raw, err := GetUncompressedBytes(apmData.Data, apmData.ContentEncoding) + if err != nil { + return err + } + + b.mu.Lock() + defer b.mu.Unlock() + if b.currentlyExecutingRequestID == "" { + return fmt.Errorf("lifecycle error, currently executing requestID is not set") + } + inc, ok := b.invocations[b.currentlyExecutingRequestID] + if !ok { + return fmt.Errorf("invocation for current requestID %s does not exist", b.currentlyExecutingRequestID) + } + + // A request body can either be empty or have a ndjson content with + // first line being metadata. + data, after, _ := bytes.Cut(raw, newLineSep) + if b.metadataBytes == 0 { + b.metadataBytes, _ = b.buf.Write(data) + } + for { + data, after, _ = bytes.Cut(after, newLineSep) + if inc.NeedProxyTransaction() && isTransactionEvent(data) { + res := gjson.GetBytes(data, "transaction.id") + if res.Str != "" && inc.TransactionID == res.Str { + inc.TransactionObserved = true + } + } + if err := b.addData(data); err != nil { + return err + } + if len(after) == 0 { + break + } + } + return nil +} + +// OnLambdaLogRuntimeDone prepares the data for the invocation to be shipped +// to APM Server. It accepts requestID and status of the invocation both of +// which can be retrieved after parsing `platform.runtimeDone` event. +func (b *Batch) OnLambdaLogRuntimeDone(reqID, status string) error { + b.mu.Lock() + defer b.mu.Unlock() + return b.finalizeInvocation(reqID, status) +} + +// OnShutdown flushes the data for shipping to APM Server by finalizing all +// the invocation in the batch. If we haven't received a platform.runtimeDone +// event for an invocation so far we won't be able to recieve it in time thus +// the status needs to be guessed based on the available information. +func (b *Batch) OnShutdown(status string) error { + b.mu.Lock() + defer b.mu.Unlock() + for _, inc := range b.invocations { + if err := b.finalizeInvocation(inc.RequestID, status); err != nil { + return err + } + } + return nil +} + +// AddLambdaData adds a new entry to the batch. Returns ErrBatchFull +// if batch has reached its maximum size. +func (b *Batch) AddLambdaData(d []byte) error { + b.mu.Lock() + defer b.mu.Unlock() + return b.addData(d) +} + +// Count return the number of APMData entries in batch. +func (b *Batch) Count() int { + b.mu.RLock() + defer b.mu.RUnlock() + return b.count +} + +// ShouldShip indicates when a batch is ready for sending. +// A batch is marked as ready for flush when one of the +// below conditions is reached: +// 1. max size is greater than threshold (90% of maxSize) +// 2. batch is older than maturity age +func (b *Batch) ShouldShip() bool { + b.mu.RLock() + defer b.mu.RUnlock() + return (b.count >= int(float64(b.maxSize)*maxSizeThreshold)) || + (!b.age.IsZero() && time.Since(b.age) > b.maxAge) +} + +// Reset resets the batch to prepare for new set of data +func (b *Batch) Reset() { + b.mu.Lock() + defer b.mu.Unlock() + b.count, b.age = 0, zeroTime + b.buf.Truncate(b.metadataBytes) +} + +// ToAPMData returns APMData with metadata and the accumulated batch +func (b *Batch) ToAPMData() APMData { + b.mu.RLock() + defer b.mu.RUnlock() + return APMData{ + Data: b.buf.Bytes(), + } +} + +func (b *Batch) finalizeInvocation(reqID, status string) error { + inc, ok := b.invocations[reqID] + if !ok { + return fmt.Errorf("invocation for requestID %s does not exist", reqID) + } + defer delete(b.invocations, reqID) + proxyTxn, err := inc.Finalize(status) + if err != nil { + return err + } + return b.addData(proxyTxn) +} + +func (b *Batch) addData(data []byte) error { + if len(data) == 0 { + return nil + } + if b.metadataBytes == 0 { + return ErrMetadataUnavailable + } + if b.count == b.maxSize { + return ErrBatchFull + } + if err := b.buf.WriteByte('\n'); err != nil { + return err + } + if _, err := b.buf.Write(data); err != nil { + return err + } + if b.count == 0 { + // For first entry, set the age of the batch + b.age = time.Now() + } + b.count++ + return nil +} + +func isTransactionEvent(body []byte) bool { + var key []byte + for i, r := range body { + if r == '"' || r == '\'' { + key = body[i+1:] + break + } + } + if len(key) < len(transactionKey) { + return false + } + for i := 0; i < len(transactionKey); i++ { + if transactionKey[i] != key[i] { + return false + } + } + return true +} diff --git a/accumulator/batch_test.go b/accumulator/batch_test.go new file mode 100644 index 00000000..be75ff39 --- /dev/null +++ b/accumulator/batch_test.go @@ -0,0 +1,234 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package accumulator + +import ( + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/tidwall/sjson" +) + +const metadata = `{"metadata":{"service":{"agent":{"name":"apm-lambda-extension","version":"1.1.0"},"framework":{"name":"AWS Lambda","version":""},"language":{"name":"python","version":"3.9.8"},"runtime":{"name":"","version":""},"node":{}},"user":{},"process":{"pid":0},"system":{"container":{"id":""},"kubernetes":{"node":{},"pod":{}}},"cloud":{"provider":"","instance":{},"machine":{},"account":{},"project":{},"service":{}}}}` + +func TestAdd(t *testing.T) { + t.Run("empty-without-metadata", func(t *testing.T) { + b := NewBatch(1, time.Hour) + assert.Error(t, b.AddLambdaData([]byte(`{"log":{}}`)), ErrMetadataUnavailable) + }) + t.Run("empty-with-metadata", func(t *testing.T) { + b := NewBatch(1, time.Hour) + b.RegisterInvocation("test", "arn", 500, time.Now()) + require.NoError(t, b.AddAgentData(APMData{Data: []byte(metadata)})) + assert.NoError(t, b.AddLambdaData([]byte(`{"log":{}}`))) + }) + t.Run("full", func(t *testing.T) { + b := NewBatch(1, time.Hour) + b.RegisterInvocation("test", "arn", 500, time.Now()) + require.NoError(t, b.AddAgentData(APMData{Data: []byte(metadata)})) + require.NoError(t, b.AddLambdaData([]byte(`{"log":{}}`))) + + assert.ErrorIs(t, ErrBatchFull, b.AddLambdaData([]byte(`{"log":{}}`))) + }) +} + +func TestReset(t *testing.T) { + b := NewBatch(1, time.Hour) + b.RegisterInvocation("test", "arn", 500, time.Now()) + require.NoError(t, b.AddAgentData(APMData{Data: []byte(metadata)})) + require.NoError(t, b.AddLambdaData([]byte(`{"log":{}}`))) + require.Equal(t, 1, b.Count()) + b.Reset() + + assert.Equal(t, 0, b.Count()) + assert.True(t, b.age.IsZero()) +} + +func TestShouldShip_ReasonSize(t *testing.T) { + b := NewBatch(10, time.Hour) + b.RegisterInvocation("test", "arn", 500, time.Now()) + require.NoError(t, b.AddAgentData(APMData{Data: []byte(metadata)})) + + // Should flush at 90% full + for i := 0; i < 9; i++ { + assert.False(t, b.ShouldShip()) + require.NoError(t, b.AddLambdaData([]byte(`{"log":{}}`))) + } + + require.Equal(t, 9, b.Count()) + assert.True(t, b.ShouldShip()) +} + +func TestShouldShip_ReasonAge(t *testing.T) { + b := NewBatch(10, time.Second) + b.RegisterInvocation("test", "arn", 500, time.Now()) + require.NoError(t, b.AddAgentData(APMData{Data: []byte(metadata)})) + + assert.False(t, b.ShouldShip()) + require.NoError(t, b.AddLambdaData([]byte(`{"log":{}}`))) + + time.Sleep(time.Second + time.Millisecond) + + // Should be ready to send now + require.Equal(t, 1, b.Count()) + assert.True(t, b.ShouldShip()) +} + +func TestLifecycle(t *testing.T) { + reqID := "test-req-id" + fnARN := "test-fn-arn" + txnID := "023d90ff77f13b9f" + lambdaData := `{"log":{"message":"this is log"}}` + txnData := fmt.Sprintf(`{"transaction":{"id":"%s"}}`, txnID) + ts := time.Date(2022, time.October, 1, 1, 1, 1, 0, time.UTC) + + for _, tc := range []struct { + name string + agentInit bool + receiveAgentRootTxn bool + receiveLambdaLogRuntime bool + expected string + }{ + { + name: "without_agent_init_without_root_txn", + agentInit: false, + receiveAgentRootTxn: false, + receiveLambdaLogRuntime: false, + // Without agent init no proxy txn is created if root txn is not reported + expected: fmt.Sprintf( + "%s\n%s", + metadata, + lambdaData, + ), + }, + { + name: "without_agent_init_with_root_txn", + agentInit: false, + receiveAgentRootTxn: true, + receiveLambdaLogRuntime: false, + expected: fmt.Sprintf( + "%s\n%s\n%s", + metadata, + generateCompleteTxn(t, txnData, "success", ""), + lambdaData, + ), + }, + { + name: "with_agent_init_with_root_txn", + agentInit: true, + receiveAgentRootTxn: true, + receiveLambdaLogRuntime: false, + expected: fmt.Sprintf( + "%s\n%s\n%s", + metadata, + generateCompleteTxn(t, txnData, "success", ""), + lambdaData, + ), + }, + { + name: "with_agent_init_without_root_txn_with_runtimeDone", + agentInit: true, + receiveAgentRootTxn: false, + receiveLambdaLogRuntime: true, + // With agent init proxy txn is created if root txn is not reported. + // Details in runtimeDone event is used to find the result of the txn. + expected: fmt.Sprintf( + "%s\n%s\n%s", + metadata, + lambdaData, + generateCompleteTxn(t, txnData, "failure", "failure"), + ), + }, + { + name: "with_agent_init_without_root_txn", + agentInit: true, + receiveAgentRootTxn: false, + receiveLambdaLogRuntime: false, + // With agent init proxy txn is created if root txn is not reported. + // If runtimeDone event is not available `timeout` is used as the + // result of the transaction. + expected: fmt.Sprintf( + "%s\n%s\n%s", + metadata, + lambdaData, + generateCompleteTxn(t, txnData, "timeout", "failure"), + ), + }, + } { + t.Run(tc.name, func(t *testing.T) { + b := NewBatch(100, time.Hour) + // NEXT API response creates a new invocation cache + b.RegisterInvocation(reqID, fnARN, 100, ts) + // Agent creates and registers a partial transaction in the extn + if tc.agentInit { + require.NoError(t, b.OnAgentInit(txnID, []byte(txnData))) + } + // Agent sends a request with metadata + require.NoError(t, b.AddAgentData(APMData{ + Data: []byte(metadata), + })) + if tc.receiveAgentRootTxn { + require.NoError(t, b.AddAgentData(APMData{ + Data: []byte(fmt.Sprintf( + "%s\n%s", + metadata, + generateCompleteTxn(t, txnData, "success", "")), + ), + })) + } + // Lambda API receives a platform.Start event followed by + // function events. + require.NoError(t, b.AddLambdaData([]byte(lambdaData))) + if tc.receiveLambdaLogRuntime { + // Lambda API receives a platform.runtimeDone event + require.NoError(t, b.OnLambdaLogRuntimeDone(reqID, "failure")) + } + // Instance shutdown + require.NoError(t, b.OnShutdown("timeout")) + assert.Equal(t, tc.expected, string(b.ToAPMData().Data)) + }) + } +} + +func TestIsTransactionEvent(t *testing.T) { + for _, tc := range []struct { + body []byte + expected bool + }{ + {body: []byte(`{}`), expected: false}, + {body: []byte(`{"tran":{}}`), expected: false}, + {body: []byte(`{"span":{}}`), expected: false}, + {body: []byte(`{"transaction":{}}`), expected: true}, + } { + assert.Equal(t, tc.expected, isTransactionEvent(tc.body)) + } +} + +func generateCompleteTxn(t *testing.T, src, result, outcome string) string { + t.Helper() + tmp, err := sjson.SetBytes([]byte(src), "transaction.result", result) + assert.NoError(t, err) + if outcome != "" { + tmp, err = sjson.SetBytes(tmp, "transaction.outcome", outcome) + assert.NoError(t, err) + } + return string(tmp) +} diff --git a/accumulator/invocation.go b/accumulator/invocation.go new file mode 100644 index 00000000..0e1b69a9 --- /dev/null +++ b/accumulator/invocation.go @@ -0,0 +1,75 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package accumulator + +import ( + "time" + + "github.com/tidwall/sjson" +) + +// Invocation holds data for each function invocation and finalizes +// the data when `platform.report` type log is received for the +// specific invocation identified by request ID. +type Invocation struct { + // RequestID is the id to identify the invocation. + RequestID string + // Timestamp is the time of the invocation. + Timestamp time.Time + // DeadlineMs is the function execution deadline. + DeadlineMs int64 + // FunctionARN requested. Can be different in each invoke that + // executes the same version. + FunctionARN string + // TransactionID is the ID generated for a transaction for the + // current invocation. It is populated by the request from agent. + TransactionID string + // AgentPayload is the partial transaction registered at agent init. + // It will be used to create a proxy transaction by enriching the + // payload with data from `platform.runtimeDone` event if agent fails + // to report the actual transaction. + AgentPayload []byte + // TransactionObserved is true if the root transaction ID for the + // invocation is observed by the extension. + TransactionObserved bool +} + +// NeedProxyTransaction returns true if a proxy transaction needs to be +// created based on the information available. +func (inc *Invocation) NeedProxyTransaction() bool { + return inc.TransactionID != "" && !inc.TransactionObserved +} + +// Finalize creates a proxy transaction for an invocation if required. +// A proxy transaction will be required to be created if the agent has +// registered a transaction for the invocation but has not sent the +// corresponding transaction to the extension. +func (inc *Invocation) Finalize(status string) ([]byte, error) { + if !inc.NeedProxyTransaction() { + return nil, nil + } + return inc.createProxyTxn(status) +} + +func (inc *Invocation) createProxyTxn(status string) (txn []byte, err error) { + txn, err = sjson.SetBytes(inc.AgentPayload, "transaction.result", status) + if status != "success" { + txn, err = sjson.SetBytes(txn, "transaction.outcome", "failure") + } + return +} diff --git a/accumulator/invocation_test.go b/accumulator/invocation_test.go new file mode 100644 index 00000000..7aaed6fe --- /dev/null +++ b/accumulator/invocation_test.go @@ -0,0 +1,110 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package accumulator + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestFinalize(t *testing.T) { + for _, tc := range []struct { + name string + txnID string + payload string + txnObserved bool + runtimeDoneStatus string + output string + }{ + { + name: "no_txn_registered", + }, + { + name: "txn_registered_observed", + txnID: "test-txn-id", + payload: `{"transaction":{"id":"test-txn-id","trace_id":"test-trace-id"}}`, + txnObserved: true, + runtimeDoneStatus: "success", + }, + { + name: "txn_registered_not_observed_runtime_failure", + txnID: "test-txn-id", + payload: `{"transaction":{"id":"test-txn-id","trace_id":"test-trace-id"}}`, + txnObserved: false, + runtimeDoneStatus: "failure", + output: `{"transaction":{"id":"test-txn-id","trace_id":"test-trace-id","result":"failure","outcome":"failure"}}`, + }, + { + name: "txn_registered_not_observed_runtime_timeout", + txnID: "test-txn-id", + payload: `{"transaction":{"id":"test-txn-id","trace_id":"test-trace-id"}}`, + txnObserved: false, + runtimeDoneStatus: "timeout", + output: `{"transaction":{"id":"test-txn-id","trace_id":"test-trace-id","result":"timeout","outcome":"failure"}}`, + }, + { + name: "txn_registered_not_observed_runtime_success", + txnID: "test-txn-id", + payload: `{"transaction":{"id":"test-txn-id","trace_id":"test-trace-id"}}`, + txnObserved: false, + runtimeDoneStatus: "success", + output: `{"transaction":{"id":"test-txn-id","trace_id":"test-trace-id","result":"success"}}`, + }, + } { + t.Run(tc.name, func(t *testing.T) { + ts := time.Date(2022, time.October, 1, 1, 0, 0, 0, time.UTC) + inc := &Invocation{ + Timestamp: ts, + DeadlineMs: ts.Add(time.Minute).UnixMilli(), + FunctionARN: "test-fn-arn", + TransactionID: tc.txnID, + AgentPayload: []byte(tc.payload), + TransactionObserved: tc.txnObserved, + } + result, err := inc.Finalize(tc.runtimeDoneStatus) + assert.Nil(t, err) + if len(tc.output) > 0 { + assert.JSONEq(t, tc.output, string(result)) + } else { + assert.Nil(t, result) + } + }) + } +} + +func BenchmarkCreateProxyTxn(b *testing.B) { + ts := time.Date(2022, time.October, 1, 1, 0, 0, 0, time.UTC) + inc := &Invocation{ + Timestamp: ts, + DeadlineMs: ts.Add(time.Minute).UnixMilli(), + FunctionARN: "test-fn-arn", + TransactionID: "txn-id", + AgentPayload: []byte(`{"transaction":{"id":"txn-id"}}`), + } + + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := inc.createProxyTxn("success") + if err != nil { + b.Fail() + } + } +} diff --git a/apmproxy/metadata.go b/accumulator/metadata.go similarity index 99% rename from apmproxy/metadata.go rename to accumulator/metadata.go index 6d63b292..cfd32d30 100644 --- a/apmproxy/metadata.go +++ b/accumulator/metadata.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package apmproxy +package accumulator import ( "bytes" diff --git a/apmproxy/metadata_test.go b/accumulator/metadata_test.go similarity index 97% rename from apmproxy/metadata_test.go rename to accumulator/metadata_test.go index 0bc3b3c3..a3eadbdd 100644 --- a/apmproxy/metadata_test.go +++ b/accumulator/metadata_test.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package apmproxy_test +package accumulator import ( "bytes" @@ -24,8 +24,6 @@ import ( "io" "testing" - "github.com/elastic/apm-aws-lambda/apmproxy" - "github.com/stretchr/testify/require" ) @@ -134,8 +132,8 @@ func Test_processMetadata(t *testing.T) { for name, tc := range testCases { t.Run(name, func(t *testing.T) { - agentData := apmproxy.APMData{Data: tc.data(), ContentEncoding: tc.encodingType} - extractedMetadata, err := apmproxy.ProcessMetadata(agentData) + agentData := APMData{Data: tc.data(), ContentEncoding: tc.encodingType} + extractedMetadata, err := ProcessMetadata(agentData) if tc.expectError != nil { require.Nil(t, extractedMetadata) @@ -168,12 +166,12 @@ func BenchmarkProcessMetadata(b *testing.B) { } for _, bench := range benchmarks { - agentData := apmproxy.APMData{Data: bench.body, ContentEncoding: ""} + agentData := APMData{Data: bench.body, ContentEncoding: ""} b.Run(bench.name, func(b *testing.B) { b.ReportAllocs() for i := 0; i < b.N; i++ { - _, _ = apmproxy.ProcessMetadata(agentData) + _, _ = ProcessMetadata(agentData) } }) } diff --git a/apmproxy/apmserver.go b/apmproxy/apmserver.go index 251ad1dc..f901d56c 100644 --- a/apmproxy/apmserver.go +++ b/apmproxy/apmserver.go @@ -29,6 +29,8 @@ import ( "math/rand" "net/http" "time" + + "github.com/elastic/apm-aws-lambda/accumulator" ) type jsonResult struct { @@ -45,9 +47,9 @@ type jsonError struct { // has completed, signaled via a channel. func (c *Client) ForwardApmData(ctx context.Context) error { if c.IsUnhealthy() { + c.logger.Warn("Failed to start APM data forwarder due to client unhealthy") return nil } - var lambdaDataChan chan []byte for { select { @@ -70,7 +72,7 @@ func (c *Client) ForwardApmData(ctx context.Context) error { } // FlushAPMData reads all the apm data in the apm data channel and sends it to the APM server. -func (c *Client) FlushAPMData(ctx context.Context) { +func (c *Client) FlushAPMData(ctx context.Context, shutdown bool) { if c.IsUnhealthy() { c.logger.Debug("Flush skipped - Transport failing") return @@ -85,6 +87,17 @@ func (c *Client) FlushAPMData(ctx context.Context) { } } + if shutdown { + // At shutdown we can not expect platform.runtimeDone events to be reported + // for the remaining invocations. If we haven't received the transaction + // from agents at this point then it is safe to assume that the function + // timed out. We will flush all the cached agent data with no transaction + // assuming the outcome of the transaction to be `timeout`. + if err := c.batch.OnShutdown("timeout"); err != nil { + c.logger.Errorf("Error while flushing agent data from batch: %v", err) + } + } + // If metadata still not available then fail fast if c.batch == nil { c.logger.Warnf("Metadata not available at flush, skipping sending lambda data to APM Server") @@ -118,7 +131,7 @@ func (c *Client) FlushAPMData(ctx context.Context) { // The function compresses the APM agent data, if it's not already compressed. // It sets the APM transport status to failing upon errors, as part of the backoff // strategy. -func (c *Client) PostToApmServer(ctx context.Context, apmData APMData) error { +func (c *Client) PostToApmServer(ctx context.Context, apmData accumulator.APMData) error { // todo: can this be a streaming or streaming style call that keeps the // connection open across invocations? if c.IsUnhealthy() { @@ -328,24 +341,12 @@ func (c *Client) WaitForFlush() <-chan struct{} { return c.flushCh } -func (c *Client) forwardAgentData(ctx context.Context, apmData APMData) error { - if c.batch == nil { - metadata, err := ProcessMetadata(apmData) - if err != nil { - return fmt.Errorf("failed to extract metadata from agent payload %w", err) - } - c.batch = NewBatch(c.maxBatchSize, c.maxBatchAge, metadata) - } - return c.PostToApmServer(ctx, apmData) +func (c *Client) forwardAgentData(ctx context.Context, apmData accumulator.APMData) error { + return c.batch.AddAgentData(apmData) } func (c *Client) forwardLambdaData(ctx context.Context, data []byte) error { - if c.batch == nil { - // This state is not possible since we start processing lambda - // logs only after metadata is available and batch is created. - return errors.New("unexpected state, metadata not yet set") - } - if err := c.batch.Add(data); err != nil { + if err := c.batch.AddLambdaData(data); err != nil { c.logger.Warnf("Dropping data due to error: %v", err) } if c.batch.ShouldShip() { diff --git a/apmproxy/apmserver_test.go b/apmproxy/apmserver_test.go index 436ecf29..99e68680 100644 --- a/apmproxy/apmserver_test.go +++ b/apmproxy/apmserver_test.go @@ -30,10 +30,12 @@ import ( "testing" "time" + "github.com/elastic/apm-aws-lambda/accumulator" "github.com/elastic/apm-aws-lambda/apmproxy" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.uber.org/zap/zapcore" "go.uber.org/zap/zaptest" ) @@ -60,7 +62,7 @@ func TestPostToApmServerDataCompressed(t *testing.T) { // Create AgentData struct with compressed data data, _ := io.ReadAll(pr) - agentData := apmproxy.APMData{Data: data, ContentEncoding: "gzip"} + agentData := accumulator.APMData{Data: data, ContentEncoding: "gzip"} // Create apm server and handler apmServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -86,7 +88,7 @@ func TestPostToApmServerDataCompressed(t *testing.T) { func TestPostToApmServerDataNotCompressed(t *testing.T) { s := "A long time ago in a galaxy far, far away..." body := []byte(s) - agentData := apmproxy.APMData{Data: body, ContentEncoding: ""} + agentData := accumulator.APMData{Data: body, ContentEncoding: ""} // Compress the data, so it can be compared with what // the apm server receives @@ -254,7 +256,7 @@ func TestEnterBackoffFromHealthy(t *testing.T) { // Create AgentData struct with compressed data data, _ := io.ReadAll(pr) - agentData := apmproxy.APMData{Data: data, ContentEncoding: "gzip"} + agentData := accumulator.APMData{Data: data, ContentEncoding: "gzip"} // Create apm server and handler apmServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -305,7 +307,7 @@ func TestEnterBackoffFromFailing(t *testing.T) { // Create AgentData struct with compressed data data, _ := io.ReadAll(pr) - agentData := apmproxy.APMData{Data: data, ContentEncoding: "gzip"} + agentData := accumulator.APMData{Data: data, ContentEncoding: "gzip"} // Create apm server and handler apmServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -359,7 +361,7 @@ func TestAPMServerRecovery(t *testing.T) { // Create AgentData struct with compressed data data, _ := io.ReadAll(pr) - agentData := apmproxy.APMData{Data: data, ContentEncoding: "gzip"} + agentData := accumulator.APMData{Data: data, ContentEncoding: "gzip"} // Create apm server and handler apmServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -412,7 +414,7 @@ func TestAPMServerAuthFails(t *testing.T) { // Create AgentData struct with compressed data data, _ := io.ReadAll(pr) - agentData := apmproxy.APMData{Data: data, ContentEncoding: "gzip"} + agentData := accumulator.APMData{Data: data, ContentEncoding: "gzip"} // Create apm server and handler apmServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -456,7 +458,7 @@ func TestAPMServerRatelimit(t *testing.T) { // Create AgentData struct with compressed data data, _ := io.ReadAll(pr) - agentData := apmproxy.APMData{Data: data, ContentEncoding: "gzip"} + agentData := accumulator.APMData{Data: data, ContentEncoding: "gzip"} // Create apm server and handler var shouldSucceed atomic.Bool @@ -509,7 +511,7 @@ func TestAPMServerClientFail(t *testing.T) { // Create AgentData struct with compressed data data, _ := io.ReadAll(pr) - agentData := apmproxy.APMData{Data: data, ContentEncoding: "gzip"} + agentData := accumulator.APMData{Data: data, ContentEncoding: "gzip"} // Create apm server and handler var shouldSucceed atomic.Bool @@ -561,7 +563,7 @@ func TestContinuedAPMServerFailure(t *testing.T) { // Create AgentData struct with compressed data data, _ := io.ReadAll(pr) - agentData := apmproxy.APMData{Data: data, ContentEncoding: "gzip"} + agentData := accumulator.APMData{Data: data, ContentEncoding: "gzip"} // Create apm server and handler apmServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -612,7 +614,7 @@ func TestForwardApmData(t *testing.T) { require.NoError(t, err) assert.Equal(t, expected, string(out)) } - agentData := fmt.Sprintf("%s\n%s", metadata, `{"log": {"message": "test"}}`) + agentData := fmt.Sprintf("%s\n%s", metadata, `{"transaction":{"id":"0102030405060708","trace_id":"0102030405060708090a0b0c0d0e0f10"}}`) lambdaData := `{"log": {"message": "test"}}` maxBatchAge := 1 * time.Second apmClient, err := apmproxy.NewClient( @@ -620,7 +622,7 @@ func TestForwardApmData(t *testing.T) { apmproxy.WithLogger(zaptest.NewLogger(t).Sugar()), apmproxy.WithAgentDataBufferSize(10), // Configure a small batch age for ease of testing - apmproxy.WithMaxBatchAge(maxBatchAge), + apmproxy.WithBatch(getReadyBatch(100, maxBatchAge)), ) require.NoError(t, err) @@ -634,14 +636,14 @@ func TestForwardApmData(t *testing.T) { }() // Populate metadata by sending agent data - apmClient.AgentDataChannel <- apmproxy.APMData{ + apmClient.AgentDataChannel <- accumulator.APMData{ Data: []byte(agentData), } - assertGzipBody(agentData) - // Send lambda logs API data + // Send lambda logs API data; the expected data will contain metadata + // and agent data both. var expected bytes.Buffer - expected.WriteString(metadata) + expected.WriteString(agentData) // Send multiple lambda logs to batch data for i := 0; i < 5; i++ { if i == 4 { @@ -675,9 +677,11 @@ func BenchmarkFlushAPMData(b *testing.B) { })) b.Cleanup(apmServer.Close) + batch := getReadyBatch(100, time.Minute) apmClient, err := apmproxy.NewClient( apmproxy.WithURL(apmServer.URL), - apmproxy.WithLogger(zaptest.NewLogger(b).Sugar()), + apmproxy.WithLogger(zaptest.NewLogger(b, zaptest.Level(zapcore.WarnLevel)).Sugar()), + apmproxy.WithBatch(batch), ) require.NoError(b, err) @@ -689,16 +693,16 @@ func BenchmarkFlushAPMData(b *testing.B) { {"transaction": { "id": "00xxxxFFaaaa1234", "trace_id": "0123456789abcdef0123456789abcdef", "name": "amqp receive", "parent_id": "abcdefabcdef01234567", "type": "messaging", "duration": 3, "span_count": { "started": 1 }, "context": {"message": {"queue": { "name": "new_users"}, "age":{ "ms": 1577958057123}, "headers": {"user_id": "1ax3", "involved_services": ["user", "auth"]}, "body": "user created", "routing_key": "user-created-transaction"}},"session":{"id":"sunday","sequence":123}}} {"transaction": { "name": "july-2021-delete-after-july-31", "type": "lambda", "result": "success", "id": "142e61450efb8574", "trace_id": "eb56529a1f461c5e7e2f66ecb075e983", "subtype": null, "action": null, "duration": 38.853, "timestamp": 1631736666365048, "sampled": true, "context": { "cloud": { "origin": { "account": { "id": "abc123" }, "provider": "aws", "region": "us-east-1", "service": { "name": "serviceName" } } }, "service": { "origin": { "id": "abc123", "name": "service-name", "version": "1.0" } }, "user": {}, "tags": {}, "custom": { } }, "sync": true, "span_count": { "started": 0 }, "outcome": "unknown", "faas": { "coldstart": false, "execution": "2e13b309-23e1-417f-8bf7-074fc96bc683", "trigger": { "request_id": "FuH2Cir_vHcEMUA=", "type": "http" } }, "sample_rate": 1 } } `) - agentAPMData := apmproxy.APMData{Data: agentData} + agentAPMData := accumulator.APMData{Data: agentData} b.ReportAllocs() b.ResetTimer() for i := 0; i < b.N; i++ { apmClient.AgentDataChannel <- agentAPMData for j := 0; j < 99; j++ { - apmClient.LambdaDataChannel <- []byte("this is test log") + apmClient.LambdaDataChannel <- []byte(`{"log":{"message":this is test log"}}`) } - apmClient.FlushAPMData(context.Background()) + apmClient.FlushAPMData(context.Background(), false) } } @@ -720,7 +724,7 @@ func BenchmarkPostToAPM(b *testing.B) { apmClient, err := apmproxy.NewClient( apmproxy.WithURL(apmServer.URL), - apmproxy.WithLogger(zaptest.NewLogger(b).Sugar()), + apmproxy.WithLogger(zaptest.NewLogger(b, zaptest.Level(zapcore.WarnLevel)).Sugar()), ) require.NoError(b, err) @@ -732,7 +736,7 @@ func BenchmarkPostToAPM(b *testing.B) { {"transaction": { "id": "00xxxxFFaaaa1234", "trace_id": "0123456789abcdef0123456789abcdef", "name": "amqp receive", "parent_id": "abcdefabcdef01234567", "type": "messaging", "duration": 3, "span_count": { "started": 1 }, "context": {"message": {"queue": { "name": "new_users"}, "age":{ "ms": 1577958057123}, "headers": {"user_id": "1ax3", "involved_services": ["user", "auth"]}, "body": "user created", "routing_key": "user-created-transaction"}},"session":{"id":"sunday","sequence":123}}} {"transaction": { "name": "july-2021-delete-after-july-31", "type": "lambda", "result": "success", "id": "142e61450efb8574", "trace_id": "eb56529a1f461c5e7e2f66ecb075e983", "subtype": null, "action": null, "duration": 38.853, "timestamp": 1631736666365048, "sampled": true, "context": { "cloud": { "origin": { "account": { "id": "abc123" }, "provider": "aws", "region": "us-east-1", "service": { "name": "serviceName" } } }, "service": { "origin": { "id": "abc123", "name": "service-name", "version": "1.0" } }, "user": {}, "tags": {}, "custom": { } }, "sync": true, "span_count": { "started": 0 }, "outcome": "unknown", "faas": { "coldstart": false, "execution": "2e13b309-23e1-417f-8bf7-074fc96bc683", "trigger": { "request_id": "FuH2Cir_vHcEMUA=", "type": "http" } }, "sample_rate": 1 } } `) - agentData := apmproxy.APMData{Data: benchBody, ContentEncoding: ""} + agentData := accumulator.APMData{Data: benchBody, ContentEncoding: ""} b.ReportAllocs() b.ResetTimer() @@ -742,3 +746,9 @@ func BenchmarkPostToAPM(b *testing.B) { } } } + +func getReadyBatch(maxSize int, maxAge time.Duration) *accumulator.Batch { + batch := accumulator.NewBatch(maxSize, maxAge) + batch.RegisterInvocation("test-req-id", "test-func-arn", 10_000, time.Now()) + return batch +} diff --git a/apmproxy/batch.go b/apmproxy/batch.go deleted file mode 100644 index fec8355c..00000000 --- a/apmproxy/batch.go +++ /dev/null @@ -1,109 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package apmproxy - -import ( - "bytes" - "errors" - "time" -) - -var ( - // ErrBatchFull signfies that the batch has reached full capacity - // and cannot accept more entries. - ErrBatchFull = errors.New("batch is full") - // ErrInvalidEncoding is returned for any APMData that is encoded - // with any encoding format - ErrInvalidEncoding = errors.New("encoded data not supported") -) - -var ( - maxSizeThreshold = 0.9 - zeroTime = time.Time{} -) - -// BatchData represents a batch of data without metadata -// that will be sent to APMServer. BatchData is not safe -// concurrent access. -type BatchData struct { - metadataBytes int - buf bytes.Buffer - count int - age time.Time - maxSize int - maxAge time.Duration -} - -// NewBatch creates a new BatchData which can accept a -// maximum number of entries as specified by the argument -func NewBatch(maxSize int, maxAge time.Duration, metadata []byte) *BatchData { - b := &BatchData{ - maxSize: maxSize, - maxAge: maxAge, - } - b.metadataBytes, _ = b.buf.Write(metadata) - return b -} - -// Add adds a new entry to the batch. Returns ErrBatchFull -// if batch has reached its maximum size. -func (b *BatchData) Add(d []byte) error { - if b.count == b.maxSize { - return ErrBatchFull - } - if err := b.buf.WriteByte('\n'); err != nil { - return err - } - if _, err := b.buf.Write(d); err != nil { - return err - } - if b.count == 0 { - // For first entry, set the age of the batch - b.age = time.Now() - } - b.count++ - return nil -} - -// Count return the number of APMData entries in batch. -func (b *BatchData) Count() int { - return b.count -} - -// ShouldShip indicates when a batch is ready for sending. -// A batch is marked as ready for flush when one of the -// below conditions is reached: -// 1. max size is greater than threshold (90% of maxSize) -// 2. batch is older than maturity age -func (b *BatchData) ShouldShip() bool { - return (b.count >= int(float64(b.maxSize)*maxSizeThreshold)) || - (!b.age.IsZero() && time.Since(b.age) > b.maxAge) -} - -// Reset resets the batch to prepare for new set of data -func (b *BatchData) Reset() { - b.count, b.age = 0, zeroTime - b.buf.Truncate(b.metadataBytes) -} - -// ToAPMData returns APMData with metadata and the accumulated batch -func (b *BatchData) ToAPMData() APMData { - return APMData{ - Data: b.buf.Bytes(), - } -} diff --git a/apmproxy/batch_test.go b/apmproxy/batch_test.go deleted file mode 100644 index 00df607c..00000000 --- a/apmproxy/batch_test.go +++ /dev/null @@ -1,78 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package apmproxy - -import ( - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -const metadata = `{"metadata":{"service":{"agent":{"name":"apm-lambda-extension","version":"1.1.0"},"framework":{"name":"AWS Lambda","version":""},"language":{"name":"python","version":"3.9.8"},"runtime":{"name":"","version":""},"node":{}},"user":{},"process":{"pid":0},"system":{"container":{"id":""},"kubernetes":{"node":{},"pod":{}}},"cloud":{"provider":"","instance":{},"machine":{},"account":{},"project":{},"service":{}}}}` - -func TestAdd(t *testing.T) { - t.Run("empty", func(t *testing.T) { - b := NewBatch(1, time.Hour, []byte(metadata)) - - assert.NoError(t, b.Add([]byte{})) - }) - t.Run("full", func(t *testing.T) { - b := NewBatch(1, time.Hour, []byte(metadata)) - require.NoError(t, b.Add([]byte{})) - - assert.ErrorIs(t, ErrBatchFull, b.Add([]byte{})) - }) -} - -func TestReset(t *testing.T) { - b := NewBatch(1, time.Hour, []byte(metadata)) - require.NoError(t, b.Add([]byte{})) - require.Equal(t, 1, b.Count()) - b.Reset() - - assert.Equal(t, 0, b.Count()) - assert.True(t, b.age.IsZero()) -} - -func TestShouldShip_ReasonSize(t *testing.T) { - b := NewBatch(10, time.Hour, []byte(metadata)) - - // Should flush at 90% full - for i := 0; i < 9; i++ { - assert.False(t, b.ShouldShip()) - require.NoError(t, b.Add([]byte{})) - } - - require.Equal(t, 9, b.Count()) - assert.True(t, b.ShouldShip()) -} - -func TestShouldShip_ReasonAge(t *testing.T) { - b := NewBatch(10, time.Second, []byte(metadata)) - - assert.False(t, b.ShouldShip()) - require.NoError(t, b.Add([]byte{})) - - time.Sleep(time.Second + time.Millisecond) - - // Should be ready to send now - require.Equal(t, 1, b.Count()) - assert.True(t, b.ShouldShip()) -} diff --git a/apmproxy/client.go b/apmproxy/client.go index 8c5ba8fc..21f9df65 100644 --- a/apmproxy/client.go +++ b/apmproxy/client.go @@ -26,6 +26,7 @@ import ( "sync" "time" + "github.com/elastic/apm-aws-lambda/accumulator" "go.uber.org/zap" ) @@ -47,15 +48,13 @@ const ( defaultReceiverAddr = ":8200" defaultAgentBufferSize int = 100 defaultLambdaBufferSize int = 100 - defaultMaxBatchSize int = 50 - defaultMaxBatchAge time.Duration = 2 * time.Second ) // Client is the client used to communicate with the apm server. type Client struct { mu sync.RWMutex bufferPool sync.Pool - AgentDataChannel chan APMData + AgentDataChannel chan accumulator.APMData LambdaDataChannel chan []byte client *http.Client Status Status @@ -70,9 +69,7 @@ type Client struct { flushMutex sync.Mutex flushCh chan struct{} - batch *BatchData - maxBatchSize int - maxBatchAge time.Duration + batch *accumulator.Batch } func NewClient(opts ...Option) (*Client, error) { @@ -80,7 +77,7 @@ func NewClient(opts ...Option) (*Client, error) { bufferPool: sync.Pool{New: func() interface{} { return &bytes.Buffer{} }}, - AgentDataChannel: make(chan APMData, defaultAgentBufferSize), + AgentDataChannel: make(chan accumulator.APMData, defaultAgentBufferSize), LambdaDataChannel: make(chan []byte, defaultLambdaBufferSize), client: &http.Client{ Transport: http.DefaultTransport.(*http.Transport).Clone(), @@ -95,8 +92,6 @@ func NewClient(opts ...Option) (*Client, error) { }, sendStrategy: SyncFlush, flushCh: make(chan struct{}), - maxBatchSize: defaultMaxBatchSize, - maxBatchAge: defaultMaxBatchAge, } c.client.Timeout = defaultDataForwarderTimeout diff --git a/apmproxy/option.go b/apmproxy/option.go index f1862532..36f997d6 100644 --- a/apmproxy/option.go +++ b/apmproxy/option.go @@ -20,6 +20,7 @@ package apmproxy import ( "time" + "github.com/elastic/apm-aws-lambda/accumulator" "go.uber.org/zap" ) @@ -74,7 +75,7 @@ func WithSendStrategy(strategy SendStrategy) Option { // WithAgentDataBufferSize sets the agent data buffer size. func WithAgentDataBufferSize(size int) Option { return func(c *Client) { - c.AgentDataChannel = make(chan APMData, size) + c.AgentDataChannel = make(chan accumulator.APMData, size) } } @@ -86,24 +87,10 @@ func WithLogger(logger *zap.SugaredLogger) Option { } } -// WithMaxBatchSize configures the maximum batch size for -// the payload sent to the APMServer -func WithMaxBatchSize(size int) Option { +// WithBatch configures a batch to be used for batching data +// before sending to APM Server. +func WithBatch(batch *accumulator.Batch) Option { return func(c *Client) { - c.maxBatchSize = size - } -} - -// WithMaxBatchAge configures the maximum age of the batch -// before it is sent to APMServer. Age is measured from the -// time the first entry is added in the batch. -// -// It is possible for batch age to be greater than the -// configured max batch age when sending since a send is -// triggered by a new log event and log events can be delayed -// due to various factors. -func WithMaxBatchAge(age time.Duration) Option { - return func(c *Client) { - c.maxBatchAge = age + c.batch = batch } } diff --git a/apmproxy/receiver.go b/apmproxy/receiver.go index d4c40951..ed321b06 100644 --- a/apmproxy/receiver.go +++ b/apmproxy/receiver.go @@ -27,15 +27,12 @@ import ( "net/http/httputil" "net/url" "time" + + "github.com/elastic/apm-aws-lambda/accumulator" + "github.com/tidwall/gjson" ) -// APMData represents data to be sent to APMServer. `Agent` type -// data will have `metadata` as ndjson whereas `lambda` type data -// will be without metadata. -type APMData struct { - Data []byte - ContentEncoding string -} +const txnRegistrationContentType = "application/vnd.elastic.apm.transaction+json" // StartReceiver starts the server listening for APM agent data. func (c *Client) StartReceiver() error { @@ -48,6 +45,7 @@ func (c *Client) StartReceiver() error { mux.HandleFunc("/", handleInfoRequest) mux.HandleFunc("/intake/v2/events", c.handleIntakeV2Events()) + mux.HandleFunc("/register/transaction", c.handleTransactionRegistration()) c.receiver.Handler = mux @@ -126,7 +124,7 @@ func (c *Client) handleIntakeV2Events() func(w http.ResponseWriter, r *http.Requ agentFlushed := r.URL.Query().Get("flushed") == "true" - agentData := APMData{ + agentData := accumulator.APMData{ Data: rawBytes, ContentEncoding: r.Header.Get("Content-Encoding"), } @@ -164,3 +162,31 @@ func (c *Client) handleIntakeV2Events() func(w http.ResponseWriter, r *http.Requ } } } + +// URL: http://server/register/transaction +func (c *Client) handleTransactionRegistration() func(w http.ResponseWriter, r *http.Request) { + return func(w http.ResponseWriter, r *http.Request) { + if r.Header.Get("Content-Type") != txnRegistrationContentType { + w.WriteHeader(http.StatusUnsupportedMediaType) + return + } + rawBytes, err := io.ReadAll(r.Body) + defer r.Body.Close() + if err != nil { + c.logger.Warnf("Failed to read transaction registration body: %v", err) + w.WriteHeader(http.StatusBadRequest) + return + } + txnID := gjson.GetBytes(rawBytes, "transaction.id").String() + if txnID == "" { + c.logger.Warn("Could not parse transaction id from transaction registration body") + w.WriteHeader(http.StatusUnprocessableEntity) + return + } + if err := c.batch.OnAgentInit(txnID, rawBytes); err != nil { + c.logger.Warnf("Failed to update invocation for transaction ID %s", txnID) + w.WriteHeader(http.StatusUnprocessableEntity) + return + } + } +} diff --git a/app/app.go b/app/app.go index d2b9f808..b1b38219 100644 --- a/app/app.go +++ b/app/app.go @@ -25,6 +25,7 @@ import ( "strings" "time" + "github.com/elastic/apm-aws-lambda/accumulator" "github.com/elastic/apm-aws-lambda/apmproxy" "github.com/elastic/apm-aws-lambda/extension" "github.com/elastic/apm-aws-lambda/logger" @@ -34,6 +35,11 @@ import ( "go.uber.org/zap" ) +var ( + defaultMaxBatchSize int = 50 + defaultMaxBatchAge time.Duration = 2 * time.Second +) + // App is the main application. type App struct { extensionName string @@ -41,6 +47,7 @@ type App struct { logsClient *logsapi.Client apmClient *apmproxy.Client logger *zap.SugaredLogger + batch *accumulator.Batch } // New returns an App or an error if the @@ -54,6 +61,7 @@ func New(ctx context.Context, opts ...ConfigOption) (*App, error) { app := &App{ extensionName: c.extensionName, + batch: accumulator.NewBatch(defaultMaxBatchSize, defaultMaxBatchAge), } var err error @@ -86,6 +94,7 @@ func New(ctx context.Context, opts ...ConfigOption) (*App, error) { logsapi.WithLogBuffer(100), logsapi.WithLogger(app.logger), logsapi.WithSubscriptionTypes(subscriptionLogStreams...), + logsapi.WithInvocationLifecycler(app.batch), ) if err != nil { return nil, err @@ -132,6 +141,7 @@ func New(ctx context.Context, opts ...ConfigOption) (*App, error) { apmproxy.WithLogger(app.logger), apmproxy.WithAPIKey(apmServerAPIKey), apmproxy.WithSecretToken(apmServerSecretToken), + apmproxy.WithBatch(app.batch), ) ac, err := apmproxy.NewClient(apmOpts...) diff --git a/app/config.go b/app/config.go index 4311de5a..c05050c5 100644 --- a/app/config.go +++ b/app/config.go @@ -17,7 +17,9 @@ package app -import "github.com/aws/aws-sdk-go-v2/aws" +import ( + "github.com/aws/aws-sdk-go-v2/aws" +) type appConfig struct { awsLambdaRuntimeAPI string diff --git a/app/run.go b/app/run.go index cd3fa6e5..85c3db1a 100644 --- a/app/run.go +++ b/app/run.go @@ -76,7 +76,7 @@ func (app *App) Run(ctx context.Context) error { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - app.apmClient.FlushAPMData(ctx) + app.apmClient.FlushAPMData(ctx, true) }() // The previous event id is used to validate the received Lambda metrics @@ -104,8 +104,13 @@ func (app *App) Run(ctx context.Context) error { app.logger.Debug("Waiting for background data send to end") backgroundDataSendWg.Wait() if app.apmClient.ShouldFlush() { + // Use a new cancellable context for flushing APM data to make sure + // that the underlying transport is reset for next invocation without + // waiting for grace period if it got to unhealthy state. + flushCtx, cancel := context.WithCancel(ctx) // Flush APM data now that the function invocation has completed - app.apmClient.FlushAPMData(ctx) + app.apmClient.FlushAPMData(flushCtx, false) + cancel() } prevEvent = event } @@ -140,7 +145,12 @@ func (app *App) processEvent( app.logger.Infof("Exiting") return nil, err } - + app.batch.RegisterInvocation( + event.RequestID, + event.InvokedFunctionArn, + event.DeadlineMs, + event.Timestamp, + ) // Used to compute Lambda Timeout event.Timestamp = time.Now() app.logger.Debug("Received event.") diff --git a/dependencies.asciidoc b/dependencies.asciidoc index 86c0e87e..252a9ff3 100644 --- a/dependencies.asciidoc +++ b/dependencies.asciidoc @@ -27,6 +27,10 @@ This page lists the third-party dependencies used to build {n}. | link:https://github.com/aws/aws-sdk-go-v2[$$github.com/aws/aws-sdk-go-v2$$] | v1.16.7 | Apache-2.0 | link:https://github.com/aws/smithy-go[$$github.com/aws/smithy-go$$] | v1.12.0 | Apache-2.0 | link:https://github.com/pkg/errors[$$github.com/pkg/errors$$] | v0.9.1 | BSD-2-Clause +| link:https://github.com/tidwall/gjson[$$github.com/tidwall/gjson$$] | v1.14.3 | MIT +| link:https://github.com/tidwall/match[$$github.com/tidwall/match$$] | v1.1.1 | MIT +| link:https://github.com/tidwall/pretty[$$github.com/tidwall/pretty$$] | v1.2.1 | MIT +| link:https://github.com/tidwall/sjson[$$github.com/tidwall/sjson$$] | v1.2.5 | MIT | link:https://go.elastic.co/apm/v2[$$go.elastic.co/apm/v2$$] | v2.1.1-0.20220617022209-90f624fe11b0 | Apache-2.0 | link:https://go.elastic.co/ecszap[$$go.elastic.co/ecszap$$] | v1.0.1 | Apache-2.0 | link:https://go.elastic.co/fastjson[$$go.elastic.co/fastjson$$] | v1.1.0 | MIT diff --git a/e2e-testing/e2e_util.go b/e2e-testing/e2e_util.go index 42bb63a6..57cc61f8 100644 --- a/e2e-testing/e2e_util.go +++ b/e2e-testing/e2e_util.go @@ -29,8 +29,7 @@ import ( "path/filepath" "strings" - "github.com/elastic/apm-aws-lambda/apmproxy" - + "github.com/elastic/apm-aws-lambda/accumulator" "go.uber.org/zap" ) @@ -156,7 +155,7 @@ func GetDecompressedBytesFromRequest(req *http.Request) ([]byte, error) { if req.Body != nil { rawBytes, _ = io.ReadAll(req.Body) } - return apmproxy.GetUncompressedBytes(rawBytes, req.Header.Get("Content-Encoding")) + return accumulator.GetUncompressedBytes(rawBytes, req.Header.Get("Content-Encoding")) } // GetFreePort is a function that queries the kernel and obtains an unused port. diff --git a/go.mod b/go.mod index cc3892e2..24318e1e 100644 --- a/go.mod +++ b/go.mod @@ -17,6 +17,8 @@ require ( github.com/aws/aws-sdk-go-v2 v1.16.7 github.com/aws/aws-sdk-go-v2/config v1.15.14 github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.15.13 + github.com/tidwall/gjson v1.14.3 + github.com/tidwall/sjson v1.2.5 go.elastic.co/apm/v2 v2.1.1-0.20220617022209-90f624fe11b0 go.elastic.co/fastjson v1.1.0 ) @@ -36,5 +38,7 @@ require ( github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/santhosh-tekuri/jsonschema v1.2.4 // indirect + github.com/tidwall/match v1.1.1 // indirect + github.com/tidwall/pretty v1.2.1 // indirect gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect ) diff --git a/go.sum b/go.sum index a3c02403..a8601b72 100644 --- a/go.sum +++ b/go.sum @@ -67,6 +67,16 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/tidwall/gjson v1.14.2/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= +github.com/tidwall/gjson v1.14.3 h1:9jvXn7olKEHU1S9vwoMGliaT8jq1vJ7IH/n9zD9Dnlw= +github.com/tidwall/gjson v1.14.3/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= +github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA= +github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM= +github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= +github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4= +github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= +github.com/tidwall/sjson v1.2.5 h1:kLy8mja+1c9jlljvWTlSazM7cKDRfJuR/bOJhcY5NcY= +github.com/tidwall/sjson v1.2.5/go.mod h1:Fvgq9kS/6ociJEDnK0Fk1cpYF4FIW6ZF7LAe+6jwd28= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/yuin/goldmark v1.4.0/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= diff --git a/logsapi/client.go b/logsapi/client.go index b7d18ac0..ef3a015b 100644 --- a/logsapi/client.go +++ b/logsapi/client.go @@ -46,6 +46,10 @@ const ( // ClientOption is a config option for a Client. type ClientOption func(*Client) +type invocationLifecycler interface { + OnLambdaLogRuntimeDone(requestID, status string) error +} + // Client is the client used to subscribe to the Logs API. type Client struct { httpClient *http.Client @@ -55,6 +59,7 @@ type Client struct { listenerAddr string server *http.Server logger *zap.SugaredLogger + invocationLifecycler invocationLifecycler } // NewClient returns a new Client with the given URL. diff --git a/logsapi/event.go b/logsapi/event.go index a26d07c0..20290dbb 100644 --- a/logsapi/event.go +++ b/logsapi/event.go @@ -74,18 +74,20 @@ func (lc *Client) ProcessLogs( switch logEvent.Type { case PlatformStart: platformStartReqID = logEvent.Record.RequestID - // Check the logEvent for runtimeDone and compare the RequestID - // to the id that came in via the Next API case PlatformRuntimeDone: + if err := lc.invocationLifecycler.OnLambdaLogRuntimeDone(logEvent.Record.RequestID, logEvent.Record.Status); err != nil { + lc.logger.Warnf("Failed to finalize invocation with request ID %s", logEvent.Record.RequestID) + } + // For the current invocation the platform.runtimeDone would be the last event if logEvent.Record.RequestID == requestID { lc.logger.Info("Received runtimeDone event for this function invocation") runtimeDoneSignal <- struct{}{} return nil } - lc.logger.Debug("Log API runtimeDone event request id didn't match") // Check if the logEvent contains metrics and verify that they can be linked to the previous invocation case PlatformReport: + // TODO: @lahsivjar Refactor usage of prevEvent.RequestID (should now query the batch?) if prevEvent != nil && logEvent.Record.RequestID == prevEvent.RequestID { lc.logger.Debug("Received platform report for the previous function invocation") processedMetrics, err := ProcessPlatformReport(prevEvent, logEvent) diff --git a/logsapi/option.go b/logsapi/option.go index e431477e..b166fc02 100644 --- a/logsapi/option.go +++ b/logsapi/option.go @@ -17,7 +17,9 @@ package logsapi -import "go.uber.org/zap" +import ( + "go.uber.org/zap" +) // WithListenerAddress sets the listener address of the // server listening for logs event. @@ -55,3 +57,11 @@ func WithSubscriptionTypes(types ...SubscriptionType) ClientOption { c.logsAPISubscriptionTypes = types } } + +// WithInvocationLifecycler configures a lifecycler for acting on certain +// log events. +func WithInvocationLifecycler(l invocationLifecycler) ClientOption { + return func(c *Client) { + c.invocationLifecycler = l + } +} diff --git a/main_test.go b/main_test.go index 774d759a..a85fe890 100644 --- a/main_test.go +++ b/main_test.go @@ -26,6 +26,7 @@ import ( "net/http" "net/http/httptest" "os" + "regexp" "strings" "sync" "testing" @@ -51,7 +52,6 @@ const ( InvokeStandard MockEventType = "Standard" InvokeStandardInfo MockEventType = "StandardInfo" InvokeStandardFlush MockEventType = "StandardFlush" - InvokeStandardMetadata MockEventType = "StandardMetadata" InvokeLateFlush MockEventType = "LateFlush" InvokeWaitgroupsRace MockEventType = "InvokeWaitgroupsRace" InvokeMultipleTransactionsOverload MockEventType = "MultipleTransactionsOverload" @@ -103,24 +103,28 @@ func newMockApmServer(t *testing.T, l *zap.SugaredLogger) (*MockServerInternals, return } - l.Debugf("Event type received by mock APM server : %s", string(decompressedBytes)) - switch APMServerBehavior(decompressedBytes) { - case TimelyResponse: - l.Debug("Timely response signal received") - case SlowResponse: - l.Debug("Slow response signal received") - time.Sleep(2 * time.Second) - case Hangs: - l.Debug("Hang signal received") - apmServerMutex.Lock() - if apmServerInternals.WaitForUnlockSignal { - <-apmServerInternals.UnlockSignalChannel - apmServerInternals.WaitForUnlockSignal = false + sp := bytes.Split(decompressedBytes, []byte("\n")) + for i := 0; i < len(sp); i++ { + expectedBehavior := APMServerBehavior(sp[i]) + l.Debugf("Event type received by mock APM server : %s", string(expectedBehavior)) + switch expectedBehavior { + case TimelyResponse: + l.Debug("Timely response signal received") + case SlowResponse: + l.Debug("Slow response signal received") + time.Sleep(2 * time.Second) + case Hangs: + l.Debug("Hang signal received") + apmServerMutex.Lock() + if apmServerInternals.WaitForUnlockSignal { + <-apmServerInternals.UnlockSignalChannel + apmServerInternals.WaitForUnlockSignal = false + } + apmServerMutex.Unlock() + case Crashes: + panic("Server crashed") + default: } - apmServerMutex.Unlock() - case Crashes: - panic("Server crashed") - default: } if r.RequestURI == "/intake/v2/events" { @@ -248,30 +252,27 @@ func processMockEvent(q chan<- logsapi.LogEvent, currID string, event MockEvent, // float values were silently ignored (casted to int) // Multiply before casting to support more values. delay := time.Duration(event.ExecutionDuration * float64(time.Second)) + buf := bytes.NewBufferString(`{"metadata":{"service":{"name":"1234_service-12a3","version":"5.1.3","environment":"staging","agent":{"name":"elastic-node","version":"3.14.0"},"framework":{"name":"Express","version":"1.2.3"},"language":{"name":"ecmascript","version":"8"},"runtime":{"name":"node","version":"8.0.0"},"node":{"configured_name":"node-123"}},"user":{"username":"bar","id":"123user","email":"bar@user.com"},"labels":{"tag0":null,"tag1":"one","tag2":2},"process":{"pid":1234,"ppid":6789,"title":"node","argv":["node","server.js"]},"system":{"architecture":"x64","hostname":"prod1.example.com","platform":"darwin","container":{"id":"container-id"},"kubernetes":{"namespace":"namespace1","node":{"name":"node-name"},"pod":{"name":"pod-name","uid":"pod-uid"}}},"cloud":{"provider":"cloud_provider","region":"cloud_region","availability_zone":"cloud_availability_zone","instance":{"id":"instance_id","name":"instance_name"},"machine":{"type":"machine_type"},"account":{"id":"account_id","name":"account_name"},"project":{"id":"project_id","name":"project_name"},"service":{"name":"lambda"}}}}`) + buf.WriteByte('\n') + buf.WriteString(string(event.APMServerBehavior)) switch event.Type { case InvokeHang: time.Sleep(time.Duration(event.Timeout * float64(time.Second))) case InvokeStandard: time.Sleep(delay) - req, _ := http.NewRequest("POST", fmt.Sprintf("http://localhost:%s/intake/v2/events", extensionPort), bytes.NewBuffer([]byte(event.APMServerBehavior))) - res, _ := client.Do(req) - l.Debugf("Response seen by the agent : %d", res.StatusCode) - case InvokeStandardMetadata: - time.Sleep(delay) - metadata := `{"metadata":{"service":{"name":"1234_service-12a3","version":"5.1.3","environment":"staging","agent":{"name":"elastic-node","version":"3.14.0"},"framework":{"name":"Express","version":"1.2.3"},"language":{"name":"ecmascript","version":"8"},"runtime":{"name":"node","version":"8.0.0"},"node":{"configured_name":"node-123"}},"user":{"username":"bar","id":"123user","email":"bar@user.com"},"labels":{"tag0":null,"tag1":"one","tag2":2},"process":{"pid":1234,"ppid":6789,"title":"node","argv":["node","server.js"]},"system":{"architecture":"x64","hostname":"prod1.example.com","platform":"darwin","container":{"id":"container-id"},"kubernetes":{"namespace":"namespace1","node":{"name":"node-name"},"pod":{"name":"pod-name","uid":"pod-uid"}}},"cloud":{"provider":"cloud_provider","region":"cloud_region","availability_zone":"cloud_availability_zone","instance":{"id":"instance_id","name":"instance_name"},"machine":{"type":"machine_type"},"account":{"id":"account_id","name":"account_name"},"project":{"id":"project_id","name":"project_name"},"service":{"name":"lambda"}}}}` - req, _ := http.NewRequest("POST", fmt.Sprintf("http://localhost:%s/intake/v2/events", extensionPort), bytes.NewBuffer([]byte(metadata))) + req, _ := http.NewRequest("POST", fmt.Sprintf("http://localhost:%s/intake/v2/events", extensionPort), buf) res, _ := client.Do(req) l.Debugf("Response seen by the agent : %d", res.StatusCode) case InvokeStandardFlush: time.Sleep(delay) - reqData, _ := http.NewRequest("POST", fmt.Sprintf("http://localhost:%s/intake/v2/events?flushed=true", extensionPort), bytes.NewBuffer([]byte(event.APMServerBehavior))) + reqData, _ := http.NewRequest("POST", fmt.Sprintf("http://localhost:%s/intake/v2/events?flushed=true", extensionPort), buf) if _, err := client.Do(reqData); err != nil { l.Error(err.Error()) } case InvokeLateFlush: time.Sleep(delay) - reqData, _ := http.NewRequest("POST", fmt.Sprintf("http://localhost:%s/intake/v2/events?flushed=true", extensionPort), bytes.NewBuffer([]byte(event.APMServerBehavior))) + reqData, _ := http.NewRequest("POST", fmt.Sprintf("http://localhost:%s/intake/v2/events?flushed=true", extensionPort), buf) internals.WaitGroup.Add(1) go func() { <-ch @@ -284,8 +285,8 @@ func processMockEvent(q chan<- logsapi.LogEvent, currID string, event MockEvent, sendMetrics = false case InvokeWaitgroupsRace: time.Sleep(delay) - reqData0, _ := http.NewRequest("POST", fmt.Sprintf("http://localhost:%s/intake/v2/events", extensionPort), bytes.NewBuffer([]byte(event.APMServerBehavior))) - reqData1, _ := http.NewRequest("POST", fmt.Sprintf("http://localhost:%s/intake/v2/events", extensionPort), bytes.NewBuffer([]byte(event.APMServerBehavior))) + reqData0, _ := http.NewRequest("POST", fmt.Sprintf("http://localhost:%s/intake/v2/events", extensionPort), buf) + reqData1, _ := http.NewRequest("POST", fmt.Sprintf("http://localhost:%s/intake/v2/events", extensionPort), buf) if _, err := client.Do(reqData0); err != nil { l.Error(err.Error()) } @@ -299,7 +300,7 @@ func processMockEvent(q chan<- logsapi.LogEvent, currID string, event MockEvent, wg.Add(1) go func() { time.Sleep(delay) - reqData, _ := http.NewRequest("POST", fmt.Sprintf("http://localhost:%s/intake/v2/events", extensionPort), bytes.NewBuffer([]byte(event.APMServerBehavior))) + reqData, _ := http.NewRequest("POST", fmt.Sprintf("http://localhost:%s/intake/v2/events", extensionPort), buf) if _, err := client.Do(reqData); err != nil { l.Error(err.Error()) } @@ -501,7 +502,11 @@ func TestLateFlush(t *testing.T) { eventQueueGenerator(eventsChain, eventsChannel) select { case <-runApp(t, logsapiAddr): - assert.Contains(t, apmServerInternals.Data, TimelyResponse+TimelyResponse) + assert.Regexp( + t, + regexp.MustCompile(fmt.Sprintf(".*\n%s.*\n%s", TimelyResponse, TimelyResponse)), // metadata followed by TimelyResponsex2 + apmServerInternals.Data, + ) case <-time.After(timeout): t.Fatalf("timed out waiting for app to finish") } @@ -563,8 +568,6 @@ func TestAPMServerHangs(t *testing.T) { newMockLambdaServer(t, logsapiAddr, eventsChannel, l) eventsChain := []MockEvent{ - // The first response sets the metadata so that the lambda logs handler is accepting requests - {Type: InvokeStandard, APMServerBehavior: TimelyResponse, ExecutionDuration: 1, Timeout: 5}, {Type: InvokeStandard, APMServerBehavior: Hangs, ExecutionDuration: 1, Timeout: 500}, } eventQueueGenerator(eventsChain, eventsChannel) @@ -596,12 +599,18 @@ func TestAPMServerRecovery(t *testing.T) { {Type: InvokeStandard, APMServerBehavior: TimelyResponse, ExecutionDuration: 1, Timeout: 5}, } eventQueueGenerator(eventsChain, eventsChannel) + var wg sync.WaitGroup + wg.Add(1) go func() { + defer wg.Done() time.Sleep(2500 * time.Millisecond) // Cannot multiply time.Second by a float apmServerInternals.UnlockSignalChannel <- struct{}{} }() select { case <-runApp(t, logsapiAddr): + // Make sure mock APM Server processes the Hangs request + wg.Wait() + time.Sleep(10 * time.Millisecond) assert.Contains(t, apmServerInternals.Data, Hangs) assert.Contains(t, apmServerInternals.Data, TimelyResponse) case <-time.After(10 * time.Second): @@ -757,9 +766,8 @@ func TestInfoRequestHangs(t *testing.T) { } } -// TestMetricsWithoutMetadata checks if the extension sends metrics corresponding to invocation n during invocation -// n+1, even if the metadata container was not populated -func TestMetricsWithoutMetadata(t *testing.T) { +// TestMetrics checks if the extension sends metrics corresponding to invocation n during invocation +func TestMetrics(t *testing.T) { l, err := logger.New(logger.WithLevel(zapcore.DebugLevel)) require.NoError(t, err) @@ -774,39 +782,6 @@ func TestMetricsWithoutMetadata(t *testing.T) { } eventQueueGenerator(eventsChain, eventsChannel) - select { - case <-runApp(t, logsapiAddr): - assert.Contains(t, apmServerInternals.Data, `faas.billed_duration":{"value":60`) - assert.Contains(t, apmServerInternals.Data, `faas.duration":{"value":59.9`) - assert.Contains(t, apmServerInternals.Data, `faas.coldstart_duration":{"value":500`) - assert.Contains(t, apmServerInternals.Data, `faas.timeout":{"value":5000}`) - assert.Contains(t, apmServerInternals.Data, `system.memory.actual.free":{"value":7.1303168e+07`) - assert.Contains(t, apmServerInternals.Data, `system.memory.total":{"value":1.34217728e+08`) - assert.Contains(t, apmServerInternals.Data, `coldstart":true`) - assert.Contains(t, apmServerInternals.Data, `execution":`) - assert.Contains(t, apmServerInternals.Data, `id":"arn:aws:lambda:eu-central-1:627286350134:function:main_unit_test"`) - case <-time.After(timeout): - t.Fatalf("timed out waiting for app to finish") - } -} - -// TestMetricsWithMetadata checks if the extension sends metrics corresponding to invocation n during invocation -// n+1, even if the metadata container was not populated -func TestMetricsWithMetadata(t *testing.T) { - l, err := logger.New(logger.WithLevel(zapcore.DebugLevel)) - require.NoError(t, err) - - eventsChannel := newTestStructs(t) - apmServerInternals, _ := newMockApmServer(t, l) - logsapiAddr := randomAddr() - newMockLambdaServer(t, logsapiAddr, eventsChannel, l) - - eventsChain := []MockEvent{ - {Type: InvokeStandardMetadata, APMServerBehavior: TimelyResponse, ExecutionDuration: 1, Timeout: 5}, - {Type: InvokeStandardMetadata, APMServerBehavior: TimelyResponse, ExecutionDuration: 1, Timeout: 5}, - } - eventQueueGenerator(eventsChain, eventsChannel) - select { case <-runApp(t, logsapiAddr): assert.Contains(t, apmServerInternals.Data, `{"metadata":{"service":{"name":"1234_service-12a3","version":"5.1.3","environment":"staging","agent":{"name":"elastic-node","version":"3.14.0"},"framework":{"name":"Express","version":"1.2.3"},"language":{"name":"ecmascript","version":"8"},"runtime":{"name":"node","version":"8.0.0"},"node":{"configured_name":"node-123"}},"user":{"username":"bar","id":"123user","email":"bar@user.com"},"labels":{"tag0":null,"tag1":"one","tag2":2},"process":{"pid":1234,"ppid":6789,"title":"node","argv":["node","server.js"]},"system":{"architecture":"x64","hostname":"prod1.example.com","platform":"darwin","container":{"id":"container-id"},"kubernetes":{"namespace":"namespace1","node":{"name":"node-name"},"pod":{"name":"pod-name","uid":"pod-uid"}}},"cloud":{"provider":"cloud_provider","region":"cloud_region","availability_zone":"cloud_availability_zone","instance":{"id":"instance_id","name":"instance_name"},"machine":{"type":"machine_type"},"account":{"id":"account_id","name":"account_name"},"project":{"id":"project_id","name":"project_name"},"service":{"name":"lambda"}}}}`)