diff --git a/.golangci.yml b/.golangci.yml new file mode 100644 index 00000000..9d7e1f28 --- /dev/null +++ b/.golangci.yml @@ -0,0 +1,129 @@ +service: + golangci-lint-version: 1.59.x + +issues: + exclude-dirs: + - artifacts + - build-targets + - design + - docker-images + - docs + - etc + - experiments + - infrastructure + - legal + - libpf-rs + - mocks + - pf-code-indexing-service/cibackend/gomock_* + - pf-debug-metadata-service/dmsbackend/gomock_* + - pf-host-agent/support/ci-kernels + - pf-storage-backend/storagebackend/gomock_* + - scratch + - systemtests/benchmarks/_outdata + - target + - virt-tests + - vm-images + +linters: + enable-all: true + disable: + # Disabled because of + # - too many non-sensical warnings + # - not relevant for us + # - false positives + # + # "might be worth fixing" means we should investigate/fix in the mid term + - canonicalheader + - contextcheck # might be worth fixing + - cyclop + - depguard + - dupword + - err113 + - errorlint # might be worth fixing + - exhaustive + - exhaustruct + - forbidigo + - forcetypeassert # might be worth fixing + - funlen + - gci # might be worth fixing + - gochecknoglobals + - gochecknoinits + - gocognit + - goconst + - gocyclo + - godot + - godox # complains about TODO etc + - gofumpt + - gomnd + - gomoddirectives + - inamedparam + - interfacebloat + - ireturn + - lll + - maintidx + - makezero + - mnd + - nestif + - nlreturn + - noctx # might be worth fixing + - nolintlint + - nonamedreturns + - paralleltest + - protogetter + - tagalign + - tagliatelle + - testpackage + - thelper + - varnamelen + - wastedassign + - wsl + - wrapcheck + # the following linters are deprecated + - execinquery + # we don't want to change code to Go 1.22+ yet + - intrange + - copyloopvar + +linters-settings: + goconst: + min-len: 2 + min-occurrences: 2 + gocritic: + enabled-tags: + - diagnostic + - experimental + - opinionated + - performance + - style + disabled-checks: + - dupImport # https://github.com/go-critic/go-critic/issues/845 + - ifElseChain + - octalLiteral + - whyNoLint + - wrapperFunc + - sloppyReassign + - uncheckedInlineErr # Experimental rule with high false positive rate. + + # Broken with Go 1.18 feature (https://github.com/golangci/golangci-lint/issues/2649): + - hugeParam + - rangeValCopy + - typeDefFirst + - paramTypeCombine + gocyclo: + min-complexity: 15 + govet: + enable-all: true + disable: + - fieldalignment + settings: + printf: # analyzer name, run `go tool vet help` to see all analyzers + funcs: # run `go tool vet help printf` to see available settings for `printf` analyzer + - debug,debugf,debugln + - error,errorf,errorln + - fatal,fatalf,fataln + - info,infof,infoln + - log,logf,logln + - warn,warnf,warnln + - print,printf,println,sprint,sprintf,sprintln,fprint,fprintf,fprintln + misspell: + locale: US diff --git a/Makefile b/Makefile index dbe06da2..4a23491f 100644 --- a/Makefile +++ b/Makefile @@ -2,7 +2,7 @@ SHELL = /bin/bash -eo pipefail GORELEASER_VERSION = "v1.19.2" GO_LICENSER_VERSION = "v0.4.0" -GOLANGCI_LINT_VERSION = "v1.54.2" +GOLANGCI_LINT_VERSION = "v1.59.1" export DOCKER_IMAGE_NAME = observability/apm-lambda-extension export DOCKER_REGISTRY = docker.elastic.co @@ -39,8 +39,8 @@ lint-prep: .PHONY: lint lint: - @go run github.com/golangci/golangci-lint/cmd/golangci-lint@$(GOLANGCI_LINT_VERSION) version - @go run github.com/golangci/golangci-lint/cmd/golangci-lint@$(GOLANGCI_LINT_VERSION) run + @if [ "$(CI)" != "" ]; then go run github.com/golangci/golangci-lint/cmd/golangci-lint@$(GOLANGCI_LINT_VERSION) version; fi + @go run github.com/golangci/golangci-lint/cmd/golangci-lint@$(GOLANGCI_LINT_VERSION) run --build-tags tools NOTICE.txt: go.mod @bash ./scripts/notice.sh diff --git a/accumulator/batch.go b/accumulator/batch.go index d469a1c7..7b773058 100644 --- a/accumulator/batch.go +++ b/accumulator/batch.go @@ -31,7 +31,7 @@ var ( // ErrMetadataUnavailable is returned when a lambda data is added to // the batch without metadata being set. ErrMetadataUnavailable = errors.New("metadata is not yet available") - // ErrBatchFull signfies that the batch has reached full capacity + // ErrBatchFull signifies that the batch has reached full capacity // and cannot accept more entries. ErrBatchFull = errors.New("batch is full") // ErrInvalidEncoding is returned for any APMData that is encoded @@ -184,7 +184,7 @@ func (b *Batch) AddAgentData(apmData APMData) error { return ErrBatchFull } if b.currentlyExecutingRequestID == "" { - return fmt.Errorf("lifecycle error, currently executing requestID is not set") + return errors.New("lifecycle error, currently executing requestID is not set") } inc, ok := b.invocations[b.currentlyExecutingRequestID] if !ok { @@ -218,10 +218,10 @@ func (b *Batch) AddAgentData(apmData APMData) error { // OnLambdaLogRuntimeDone prepares the data for the invocation to be shipped // to APM Server. It accepts requestID and status of the invocation both of // which can be retrieved after parsing `platform.runtimeDone` event. -func (b *Batch) OnLambdaLogRuntimeDone(reqID, status string, time time.Time) error { +func (b *Batch) OnLambdaLogRuntimeDone(reqID, status string, endTime time.Time) error { b.mu.Lock() defer b.mu.Unlock() - return b.finalizeInvocation(reqID, status, time) + return b.finalizeInvocation(reqID, status, endTime) } func (b *Batch) OnPlatformStart(reqID string) { @@ -236,6 +236,8 @@ func (b *Batch) PlatformStartReqID() string { // platform.report event the batch will cleanup any datastructure for the request // ID. It will return some of the function metadata to allow the caller to enrich // the report metrics. +// +//nolint:gocritic func (b *Batch) OnPlatformReport(reqID string) (string, int64, time.Time, error) { b.mu.Lock() defer b.mu.Unlock() @@ -249,7 +251,7 @@ func (b *Batch) OnPlatformReport(reqID string) (string, int64, time.Time, error) // OnShutdown flushes the data for shipping to APM Server by finalizing all // the invocation in the batch. If we haven't received a platform.runtimeDone -// event for an invocation so far we won't be able to recieve it in time thus +// event for an invocation so far we won't be able to receive it in time thus // the status needs to be guessed based on the available information. func (b *Batch) OnShutdown(status string) error { b.mu.Lock() @@ -259,8 +261,8 @@ func (b *Batch) OnShutdown(status string) error { // TODO: @lahsivjar Is it possible to tweak the extension lifecycle in // a way that we receive the platform.report metric for a invocation // consistently and enrich the metrics with reported values? - time := time.Unix(0, inc.DeadlineMs*int64(time.Millisecond)) - if err := b.finalizeInvocation(inc.RequestID, status, time); err != nil { + endTime := time.Unix(0, inc.DeadlineMs*int64(time.Millisecond)) + if err := b.finalizeInvocation(inc.RequestID, status, endTime); err != nil { return err } delete(b.invocations, inc.RequestID) @@ -315,12 +317,12 @@ func (b *Batch) ToAPMData() APMData { } } -func (b *Batch) finalizeInvocation(reqID, status string, time time.Time) error { +func (b *Batch) finalizeInvocation(reqID, status string, endTime time.Time) error { inc, ok := b.invocations[reqID] if !ok { return fmt.Errorf("invocation for requestID %s does not exist", reqID) } - proxyTxn, err := inc.MaybeCreateProxyTxn(status, time) + proxyTxn, err := inc.MaybeCreateProxyTxn(status, endTime) if err != nil { return err } diff --git a/accumulator/batch_test.go b/accumulator/batch_test.go index 244d7e66..3e9ba690 100644 --- a/accumulator/batch_test.go +++ b/accumulator/batch_test.go @@ -32,7 +32,7 @@ const metadata = `{"metadata":{"service":{"agent":{"name":"apm-lambda-extension" func TestAdd(t *testing.T) { t.Run("empty-without-metadata", func(t *testing.T) { b := NewBatch(1, time.Hour) - assert.Error(t, b.AddLambdaData([]byte(`{"log":{}}`)), ErrMetadataUnavailable) + assert.ErrorIs(t, b.AddLambdaData([]byte(`{"log":{}}`)), ErrMetadataUnavailable) }) t.Run("empty-with-metadata", func(t *testing.T) { b := NewBatch(1, time.Hour) @@ -96,7 +96,7 @@ func TestLifecycle(t *testing.T) { reqID := "test-req-id" fnARN := "test-fn-arn" lambdaData := `{"log":{"message":"this is log"}}` - txnData := fmt.Sprintf(`{"transaction":{"id":"%s"}}`, "023d90ff77f13b9f") + txnData := `{"transaction":{"id":"023d90ff77f13b9f"}}` ts := time.Date(2022, time.October, 1, 1, 1, 1, 0, time.UTC) txnDur := time.Second @@ -276,12 +276,12 @@ func TestFindEventType(t *testing.T) { func generateCompleteTxn(t *testing.T, src, result, outcome string, d time.Duration) string { t.Helper() tmp, err := sjson.SetBytes([]byte(src), "transaction.result", result) - assert.NoError(t, err) + require.NoError(t, err) tmp, err = sjson.SetBytes(tmp, "transaction.duration", d.Milliseconds()) - assert.NoError(t, err) + require.NoError(t, err) if outcome != "" { tmp, err = sjson.SetBytes(tmp, "transaction.outcome", outcome) - assert.NoError(t, err) + require.NoError(t, err) } return string(tmp) } diff --git a/accumulator/invocation.go b/accumulator/invocation.go index 28950466..423be81f 100644 --- a/accumulator/invocation.go +++ b/accumulator/invocation.go @@ -63,7 +63,7 @@ func (inc *Invocation) NeedProxyTransaction() bool { // has not sent the corresponding transaction to the extension. The // proxy transaction will not be created if the invocation has // already been finalized or the agent has reported the transaction. -func (inc *Invocation) MaybeCreateProxyTxn(status string, time time.Time) ([]byte, error) { +func (inc *Invocation) MaybeCreateProxyTxn(status string, endTime time.Time) ([]byte, error) { if !inc.NeedProxyTransaction() { return nil, nil } @@ -74,7 +74,7 @@ func (inc *Invocation) MaybeCreateProxyTxn(status string, time time.Time) ([]byt // Transaction duration cannot be known in partial transaction payload. Estimate // the duration based on the time provided. Time can be based on the runtimeDone // log record or function deadline. - duration := time.Sub(inc.Timestamp) + duration := endTime.Sub(inc.Timestamp) txn, err = sjson.SetBytes(txn, "transaction.duration", duration.Milliseconds()) if err != nil { return nil, err diff --git a/accumulator/invocation_test.go b/accumulator/invocation_test.go index a73c27fc..9dbb29b3 100644 --- a/accumulator/invocation_test.go +++ b/accumulator/invocation_test.go @@ -23,6 +23,7 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestCreateProxyTransaction(t *testing.T) { @@ -90,8 +91,8 @@ func TestCreateProxyTransaction(t *testing.T) { TransactionObserved: tc.txnObserved, } result, err := inc.MaybeCreateProxyTxn(tc.runtimeDoneStatus, ts.Add(txnDur)) - assert.Nil(t, err) - if len(tc.output) > 0 { + require.NoError(t, err) + if tc.output != "" { assert.JSONEq(t, tc.output, string(result)) } else { assert.Nil(t, result) diff --git a/apmproxy/apmserver.go b/apmproxy/apmserver.go index 8a1ee983..0e517ffb 100644 --- a/apmproxy/apmserver.go +++ b/apmproxy/apmserver.go @@ -56,7 +56,7 @@ func (c *Client) ForwardApmData(ctx context.Context) error { for { select { case <-ctx.Done(): - c.logger.Debug("Invocation context cancelled, not processing any more agent data") + c.logger.Debug("Invocation context canceled, not processing any more agent data") return nil case data := <-c.AgentDataChannel: if err := c.forwardAgentData(ctx, data); err != nil { @@ -312,11 +312,11 @@ func (c *Client) ComputeGracePeriod() time.Duration { // The grace period for the first reconnection count was 0 but that // leads to collisions with multiple environments. if c.ReconnectionCount == 0 { - gracePeriod := rand.Float64() * 5 + gracePeriod := rand.Float64() * 5 //nolint:gosec return time.Duration(gracePeriod * float64(time.Second)) } gracePeriodWithoutJitter := math.Pow(math.Min(float64(c.ReconnectionCount), 6), 2) - jitter := rand.Float64()/5 - 0.1 + jitter := rand.Float64()/5 - 0.1 //nolint:gosec return time.Duration((gracePeriodWithoutJitter + jitter*gracePeriodWithoutJitter) * float64(time.Second)) } @@ -334,7 +334,7 @@ func (c *Client) ResetFlush() { c.flushCh = make(chan struct{}) } -// WaitForFlush returns a channel that is closed when the agent has signalled that +// WaitForFlush returns a channel that is closed when the agent has signaled that // the Lambda invocation has completed, and there is no more APM data coming. func (c *Client) WaitForFlush() <-chan struct{} { c.flushMutex.Lock() diff --git a/apmproxy/apmserver_test.go b/apmproxy/apmserver_test.go index 78d9f831..19d58b49 100644 --- a/apmproxy/apmserver_test.go +++ b/apmproxy/apmserver_test.go @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +//nolint:dupl package apmproxy_test import ( @@ -144,31 +145,31 @@ func TestGracePeriod(t *testing.T) { apmClient.ReconnectionCount = 1 val1 := apmClient.ComputeGracePeriod().Seconds() - assert.InDelta(t, val1, float64(1), 0.1*1) + assert.InDelta(t, float64(1), val1, 0.1*1) apmClient.ReconnectionCount = 2 val2 := apmClient.ComputeGracePeriod().Seconds() - assert.InDelta(t, val2, float64(4), 0.1*4) + assert.InDelta(t, float64(4), val2, 0.1*4) apmClient.ReconnectionCount = 3 val3 := apmClient.ComputeGracePeriod().Seconds() - assert.InDelta(t, val3, float64(9), 0.1*9) + assert.InDelta(t, float64(9), val3, 0.1*9) apmClient.ReconnectionCount = 4 val4 := apmClient.ComputeGracePeriod().Seconds() - assert.InDelta(t, val4, float64(16), 0.1*16) + assert.InDelta(t, float64(16), val4, 0.1*16) apmClient.ReconnectionCount = 5 val5 := apmClient.ComputeGracePeriod().Seconds() - assert.InDelta(t, val5, float64(25), 0.1*25) + assert.InDelta(t, float64(25), val5, 0.1*25) apmClient.ReconnectionCount = 6 val6 := apmClient.ComputeGracePeriod().Seconds() - assert.InDelta(t, val6, float64(36), 0.1*36) + assert.InDelta(t, float64(36), val6, 0.1*36) apmClient.ReconnectionCount = 7 val7 := apmClient.ComputeGracePeriod().Seconds() - assert.InDelta(t, val7, float64(36), 0.1*36) + assert.InDelta(t, float64(36), val7, 0.1*36) } func TestSetHealthyTransport(t *testing.T) { @@ -178,7 +179,7 @@ func TestSetHealthyTransport(t *testing.T) { ) require.NoError(t, err) apmClient.UpdateStatus(context.Background(), apmproxy.Healthy) - assert.True(t, apmClient.Status == apmproxy.Healthy) + assert.Equal(t, apmproxy.Healthy, apmClient.Status) assert.Equal(t, apmClient.ReconnectionCount, -1) } @@ -192,8 +193,8 @@ func TestSetFailingTransport(t *testing.T) { require.NoError(t, err) apmClient.ReconnectionCount = 0 apmClient.UpdateStatus(context.Background(), apmproxy.Failing) - assert.True(t, apmClient.Status == apmproxy.Failing) - assert.Equal(t, apmClient.ReconnectionCount, 1) + assert.Equal(t, apmproxy.Failing, apmClient.Status) + assert.Equal(t, 1, apmClient.ReconnectionCount) } func TestSetPendingTransport(t *testing.T) { @@ -207,8 +208,8 @@ func TestSetPendingTransport(t *testing.T) { require.Eventually(t, func() bool { return !apmClient.IsUnhealthy() }, 7*time.Second, 50*time.Millisecond) - assert.True(t, apmClient.Status == apmproxy.Started) - assert.Equal(t, apmClient.ReconnectionCount, 0) + assert.Equal(t, apmproxy.Started, apmClient.Status) + assert.Equal(t, 0, apmClient.ReconnectionCount) } func TestSetPendingTransportExplicitly(t *testing.T) { @@ -219,7 +220,7 @@ func TestSetPendingTransportExplicitly(t *testing.T) { require.NoError(t, err) apmClient.UpdateStatus(context.Background(), apmproxy.Healthy) apmClient.UpdateStatus(context.Background(), apmproxy.Started) - assert.True(t, apmClient.Status == apmproxy.Healthy) + assert.Equal(t, apmproxy.Healthy, apmClient.Status) assert.Equal(t, apmClient.ReconnectionCount, -1) } @@ -231,7 +232,7 @@ func TestSetInvalidTransport(t *testing.T) { require.NoError(t, err) apmClient.UpdateStatus(context.Background(), apmproxy.Healthy) apmClient.UpdateStatus(context.Background(), "Invalid") - assert.True(t, apmClient.Status == apmproxy.Healthy) + assert.Equal(t, apmproxy.Healthy, apmClient.Status) assert.Equal(t, apmClient.ReconnectionCount, -1) } @@ -282,8 +283,8 @@ func TestEnterBackoffFromHealthy(t *testing.T) { return } // No way to know for sure if failing or pending (0 sec grace period) - assert.True(t, apmClient.Status != apmproxy.Healthy) - assert.Equal(t, apmClient.ReconnectionCount, 0) + assert.NotEqual(t, apmproxy.Healthy, apmClient.Status) + assert.Equal(t, 0, apmClient.ReconnectionCount) } func TestEnterBackoffFromFailing(t *testing.T) { @@ -333,11 +334,11 @@ func TestEnterBackoffFromFailing(t *testing.T) { require.Eventually(t, func() bool { return !apmClient.IsUnhealthy() }, 7*time.Second, 50*time.Millisecond) - assert.Equal(t, apmClient.Status, apmproxy.Started) + assert.Equal(t, apmproxy.Started, apmClient.Status) - assert.Error(t, apmClient.PostToApmServer(context.Background(), agentData)) - assert.Equal(t, apmClient.Status, apmproxy.Failing) - assert.Equal(t, apmClient.ReconnectionCount, 1) + require.Error(t, apmClient.PostToApmServer(context.Background(), agentData)) + assert.Equal(t, apmproxy.Failing, apmClient.Status) + assert.Equal(t, 1, apmClient.ReconnectionCount) } func TestAPMServerRecovery(t *testing.T) { @@ -366,7 +367,7 @@ func TestAPMServerRecovery(t *testing.T) { // Create apm server and handler apmServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { bytes, err := io.ReadAll(r.Body) - require.NoError(t, err) + assert.NoError(t, err) assert.Equal(t, string(data), string(bytes)) assert.Equal(t, "gzip", r.Header.Get("Content-Encoding")) w.WriteHeader(http.StatusAccepted) @@ -387,10 +388,10 @@ func TestAPMServerRecovery(t *testing.T) { require.Eventually(t, func() bool { return !apmClient.IsUnhealthy() }, 7*time.Second, 50*time.Millisecond) - assert.Equal(t, apmClient.Status, apmproxy.Started) + assert.Equal(t, apmproxy.Started, apmClient.Status) - assert.NoError(t, apmClient.PostToApmServer(context.Background(), agentData)) - assert.Equal(t, apmClient.Status, apmproxy.Healthy) + require.NoError(t, apmClient.PostToApmServer(context.Background(), agentData)) + assert.Equal(t, apmproxy.Healthy, apmClient.Status) } func TestAPMServerAuthFails(t *testing.T) { @@ -417,7 +418,7 @@ func TestAPMServerAuthFails(t *testing.T) { agentData := accumulator.APMData{Data: data, ContentEncoding: "gzip"} // Create apm server and handler - apmServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + apmServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { w.WriteHeader(http.StatusUnauthorized) })) defer apmServer.Close() @@ -432,9 +433,9 @@ func TestAPMServerAuthFails(t *testing.T) { require.Eventually(t, func() bool { return !apmClient.IsUnhealthy() }, 7*time.Second, 50*time.Millisecond) - assert.Equal(t, apmClient.Status, apmproxy.Started) - assert.NoError(t, apmClient.PostToApmServer(context.Background(), agentData)) - assert.NotEqual(t, apmClient.Status, apmproxy.Healthy) + assert.Equal(t, apmproxy.Started, apmClient.Status) + require.NoError(t, apmClient.PostToApmServer(context.Background(), agentData)) + assert.NotEqual(t, apmproxy.Healthy, apmClient.Status) } func TestAPMServerRatelimit(t *testing.T) { @@ -462,7 +463,7 @@ func TestAPMServerRatelimit(t *testing.T) { // Create apm server and handler var shouldSucceed atomic.Bool - apmServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + apmServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { // Fail the first request if shouldSucceed.CompareAndSwap(false, true) { w.WriteHeader(http.StatusTooManyRequests) @@ -478,16 +479,15 @@ func TestAPMServerRatelimit(t *testing.T) { apmproxy.WithLogger(zaptest.NewLogger(t).Sugar()), ) require.NoError(t, err) - assert.Equal(t, apmClient.Status, apmproxy.Started) + assert.Equal(t, apmproxy.Started, apmClient.Status) // First request fails but does not trigger the backoff - assert.NoError(t, apmClient.PostToApmServer(context.Background(), agentData)) - assert.Equal(t, apmClient.Status, apmproxy.RateLimited) - - // Followup request is succesful - assert.NoError(t, apmClient.PostToApmServer(context.Background(), agentData)) - assert.Equal(t, apmClient.Status, apmproxy.Healthy) + require.NoError(t, apmClient.PostToApmServer(context.Background(), agentData)) + assert.Equal(t, apmproxy.RateLimited, apmClient.Status) + // Followup request is successful + require.NoError(t, apmClient.PostToApmServer(context.Background(), agentData)) + assert.Equal(t, apmproxy.Healthy, apmClient.Status) } func TestAPMServerClientFail(t *testing.T) { @@ -515,7 +515,7 @@ func TestAPMServerClientFail(t *testing.T) { // Create apm server and handler var shouldSucceed atomic.Bool - apmServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + apmServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { // Fail the first request if shouldSucceed.CompareAndSwap(false, true) { w.WriteHeader(http.StatusRequestEntityTooLarge) @@ -531,15 +531,15 @@ func TestAPMServerClientFail(t *testing.T) { apmproxy.WithLogger(zaptest.NewLogger(t).Sugar()), ) require.NoError(t, err) - assert.Equal(t, apmClient.Status, apmproxy.Started) + assert.Equal(t, apmproxy.Started, apmClient.Status) // First request fails but does not trigger the backoff - assert.NoError(t, apmClient.PostToApmServer(context.Background(), agentData)) - assert.Equal(t, apmClient.Status, apmproxy.ClientFailing) + require.NoError(t, apmClient.PostToApmServer(context.Background(), agentData)) + assert.Equal(t, apmproxy.ClientFailing, apmClient.Status) - // Followup request is succesful - assert.NoError(t, apmClient.PostToApmServer(context.Background(), agentData)) - assert.Equal(t, apmClient.Status, apmproxy.Healthy) + // Followup request is successful + require.NoError(t, apmClient.PostToApmServer(context.Background(), agentData)) + assert.Equal(t, apmproxy.Healthy, apmClient.Status) } func TestContinuedAPMServerFailure(t *testing.T) { @@ -586,9 +586,9 @@ func TestContinuedAPMServerFailure(t *testing.T) { require.Eventually(t, func() bool { return !apmClient.IsUnhealthy() }, 7*time.Second, 50*time.Millisecond) - assert.Equal(t, apmClient.Status, apmproxy.Started) - assert.Error(t, apmClient.PostToApmServer(context.Background(), agentData)) - assert.Equal(t, apmClient.Status, apmproxy.Failing) + assert.Equal(t, apmproxy.Started, apmClient.Status) + require.Error(t, apmClient.PostToApmServer(context.Background(), agentData)) + assert.Equal(t, apmproxy.Failing, apmClient.Status) } func TestForwardApmData(t *testing.T) { @@ -614,7 +614,8 @@ func TestForwardApmData(t *testing.T) { require.NoError(t, err) assert.Equal(t, expected, string(out)) } - agentData := fmt.Sprintf("%s\n%s", metadata, `{"transaction":{"id":"0102030405060708","trace_id":"0102030405060708090a0b0c0d0e0f10"}}`) + agentData := fmt.Sprintf("%s\n%s", metadata, + `{"transaction":{"id":"0102030405060708","trace_id":"0102030405060708090a0b0c0d0e0f10"}}`) lambdaData := `{"log": {"message": "test"}}` maxBatchAge := 1 * time.Second apmClient, err := apmproxy.NewClient( @@ -670,7 +671,7 @@ func BenchmarkFlushAPMData(b *testing.B) { if err := r.Body.Close(); err != nil { return } - w.WriteHeader(202) + w.WriteHeader(http.StatusAccepted) if _, err := w.Write([]byte(`{}`)); err != nil { return } @@ -715,7 +716,7 @@ func BenchmarkPostToAPM(b *testing.B) { if err := r.Body.Close(); err != nil { return } - w.WriteHeader(202) + w.WriteHeader(http.StatusAccepted) if _, err := w.Write([]byte(`{}`)); err != nil { return } diff --git a/apmproxy/client.go b/apmproxy/client.go index 98af327d..8be8a185 100644 --- a/apmproxy/client.go +++ b/apmproxy/client.go @@ -109,7 +109,7 @@ func NewClient(opts ...Option) (*Client, error) { // normalize server URL if !strings.HasSuffix(c.serverURL, "/") { - c.serverURL = c.serverURL + "/" + c.serverURL += "/" } return &c, nil diff --git a/apmproxy/client_test.go b/apmproxy/client_test.go index 3f57c73b..72ad69de 100644 --- a/apmproxy/client_test.go +++ b/apmproxy/client_test.go @@ -53,7 +53,6 @@ func TestClient(t *testing.T) { } for name, tc := range testCases { t.Run(name, func(t *testing.T) { - _, err := apmproxy.NewClient(tc.opts...) if tc.expectedErr { require.Error(t, err) diff --git a/apmproxy/option.go b/apmproxy/option.go index f41f5049..9a8b6fbd 100644 --- a/apmproxy/option.go +++ b/apmproxy/option.go @@ -100,7 +100,7 @@ func WithBatch(batch *accumulator.Batch) Option { func WithRootCerts(certs string) Option { return func(c *Client) { - EnsureTlsConfig(c) + EnsureTlSConfig(c) transportClient := c.client.Transport.(*http.Transport) if transportClient.TLSClientConfig.RootCAs == nil { transportClient.TLSClientConfig.RootCAs = DefaultCertPool() @@ -111,7 +111,7 @@ func WithRootCerts(certs string) Option { func WithVerifyCerts(verify bool) Option { return func(c *Client) { - EnsureTlsConfig(c) + EnsureTlSConfig(c) transportClient := c.client.Transport.(*http.Transport) transportClient.TLSClientConfig.InsecureSkipVerify = !verify } @@ -124,9 +124,11 @@ func DefaultCertPool() *x509.CertPool { } return certPool } -func EnsureTlsConfig(c *Client) { +func EnsureTlSConfig(c *Client) { transportClient := c.client.Transport.(*http.Transport) if transportClient.TLSClientConfig == nil { - transportClient.TLSClientConfig = &tls.Config{} + transportClient.TLSClientConfig = &tls.Config{ + MinVersion: tls.VersionTLS12, + } } } diff --git a/apmproxy/receiver.go b/apmproxy/receiver.go index 40c34310..aa1e2681 100644 --- a/apmproxy/receiver.go +++ b/apmproxy/receiver.go @@ -85,12 +85,12 @@ func (c *Client) handleInfoRequest() (func(w http.ResponseWriter, r *http.Reques reverseProxy.Transport = c.client.Transport.(*http.Transport).Clone() - reverseProxy.ErrorHandler = func(w http.ResponseWriter, r *http.Request, err error) { + reverseProxy.ErrorHandler = func(w http.ResponseWriter, _ *http.Request, err error) { // Don't update the status of the transport as it is possible that the extension // is frozen while processing the request and context is canceled due to timeout. c.logger.Errorf("Error querying version from the APM server: %v", err) - // Server is unreachable, return StatusBadGateway (default behaviour) to avoid + // Server is unreachable, return StatusBadGateway (default behavior) to avoid // returning a Status OK. w.WriteHeader(http.StatusBadGateway) } @@ -199,7 +199,7 @@ func (c *Client) handleTransactionRegistration() func(w http.ResponseWriter, r * if err := c.batch.OnAgentInit( reqID, r.Header.Get("Content-Encoding"), rawBytes, ); err != nil { - c.logger.Warnf("Failed to update invocation: %w", err) + c.logger.Warnf("Failed to update invocation: %v", err) w.WriteHeader(http.StatusUnprocessableEntity) return } diff --git a/apmproxy/receiver_test.go b/apmproxy/receiver_test.go index c5e08b6c..b91f4f6e 100644 --- a/apmproxy/receiver_test.go +++ b/apmproxy/receiver_test.go @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +//nolint:dupl package apmproxy_test import ( @@ -41,12 +42,12 @@ func TestInfoProxy(t *testing.T) { // Create apm server and handler apmServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { for key := range headers { - assert.Equal(t, 2, len(r.Header[key])) + assert.Len(t, r.Header[key], 2) assert.Equal(t, headers[key], r.Header[key][0]) } w.Header().Add("test", "header") _, err := w.Write([]byte(`{"foo": "bar"}`)) - require.NoError(t, err) + assert.NoError(t, err) })) defer apmServer.Close() @@ -70,7 +71,7 @@ func TestInfoProxy(t *testing.T) { url := "http://127.0.0.1:1234" // Create a request to send to the extension - req, err := http.NewRequest(http.MethodGet, url, nil) + req, err := http.NewRequest(http.MethodGet, url, http.NoBody) require.NoError(t, err) for name, value := range headers { req.Header.Add(name, value) @@ -83,7 +84,7 @@ func TestInfoProxy(t *testing.T) { body, err := io.ReadAll(resp.Body) require.NoError(t, err) - assert.Equal(t, string(body), wantResp) + assert.Equal(t, wantResp, string(body)) assert.Equal(t, "header", resp.Header.Get("test")) require.NoError(t, resp.Body.Close()) } @@ -122,7 +123,7 @@ func TestInfoProxyAuth(t *testing.T) { func TestInfoProxyErrorStatusCode(t *testing.T) { // Create apm server and handler - apmServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + apmServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { w.WriteHeader(http.StatusUnauthorized) })) defer apmServer.Close() @@ -146,19 +147,20 @@ func TestInfoProxyErrorStatusCode(t *testing.T) { url := "http://127.0.0.1:1234" // Create a request to send to the extension - req, err := http.NewRequest(http.MethodGet, url, nil) + req, err := http.NewRequest(http.MethodGet, url, http.NoBody) require.NoError(t, err) // Send the request to the extension client := &http.Client{} resp, err := client.Do(req) require.NoError(t, err) + defer resp.Body.Close() assert.Equal(t, http.StatusUnauthorized, resp.StatusCode) } func TestInfoProxyUnreachable(t *testing.T) { // Create apm server and handler - apmServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {})) + apmServer := httptest.NewServer(http.HandlerFunc(func(_ http.ResponseWriter, _ *http.Request) {})) // Shutdown apmServer.Close() @@ -181,13 +183,14 @@ func TestInfoProxyUnreachable(t *testing.T) { url := "http://127.0.0.1:1234" // Create a request to send to the extension - req, err := http.NewRequest(http.MethodGet, url, nil) + req, err := http.NewRequest(http.MethodGet, url, http.NoBody) require.NoError(t, err) // Send the request to the extension client := &http.Client{} resp, err := client.Do(req) require.NoError(t, err) + defer resp.Body.Close() // Make sure we don't get a 200 OK assert.Equal(t, http.StatusBadGateway, resp.StatusCode) @@ -233,6 +236,7 @@ func Test_handleInfoRequest(t *testing.T) { client := &http.Client{} resp, err := client.Do(req) require.NoError(t, err) + defer resp.Body.Close() assert.Equal(t, http.StatusAccepted, resp.StatusCode) } @@ -240,8 +244,7 @@ func Test_handleIntakeV2EventsQueryParam(t *testing.T) { body := []byte(`{"metadata": {}`) // Create apm server and handler - apmServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - })) + apmServer := httptest.NewServer(http.HandlerFunc(func(_ http.ResponseWriter, _ *http.Request) {})) defer apmServer.Close() // Create extension config and start the server @@ -266,8 +269,11 @@ func Test_handleIntakeV2EventsQueryParam(t *testing.T) { // Send the request to the extension client := &http.Client{} go func() { - _, err := client.Do(req) - require.NoError(t, err) + resp, err := client.Do(req) + assert.NoError(t, err) + if err == nil { + resp.Body.Close() + } }() select { @@ -281,8 +287,7 @@ func Test_handleIntakeV2EventsNoQueryParam(t *testing.T) { body := []byte(`{"metadata": {}`) // Create apm server and handler - apmServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - })) + apmServer := httptest.NewServer(http.HandlerFunc(func(_ http.ResponseWriter, _ *http.Request) {})) defer apmServer.Close() // Create extension config and start the server @@ -310,6 +315,7 @@ func Test_handleIntakeV2EventsNoQueryParam(t *testing.T) { client := &http.Client{} resp, err := client.Do(req) require.NoError(t, err) + defer resp.Body.Close() select { case <-apmClient.AgentDataChannel: case <-time.After(1 * time.Second): @@ -323,8 +329,7 @@ func Test_handleIntakeV2EventsQueryParamEmptyData(t *testing.T) { body := []byte(``) // Create apm server and handler - apmServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - })) + apmServer := httptest.NewServer(http.HandlerFunc(func(_ http.ResponseWriter, _ *http.Request) {})) defer apmServer.Close() // Create extension config and start the server @@ -349,8 +354,11 @@ func Test_handleIntakeV2EventsQueryParamEmptyData(t *testing.T) { // Send the request to the extension client := &http.Client{} go func() { - _, err := client.Do(req) - require.NoError(t, err) + resp, err := client.Do(req) + assert.NoError(t, err) + if err == nil { + resp.Body.Close() + } }() select { @@ -365,10 +373,10 @@ func TestWithVerifyCerts(t *testing.T) { clientConnected := false // Create apm server and handler - apmServer := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + apmServer := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { w.Header().Add("test", "header") _, err := w.Write([]byte(`{"foo": "bar"}`)) - require.NoError(t, err) + assert.NoError(t, err) clientConnected = true })) defer apmServer.Close() @@ -393,7 +401,7 @@ func TestWithVerifyCerts(t *testing.T) { url := "http://127.0.0.1:1234" // Create a request to send to the extension - req, err := http.NewRequest(http.MethodGet, url, nil) + req, err := http.NewRequest(http.MethodGet, url, http.NoBody) require.NoError(t, err) for name, value := range headers { req.Header.Add(name, value) @@ -413,10 +421,10 @@ func TestWithRootCerts(t *testing.T) { clientConnected := false // Create apm server and handler - apmServer := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + apmServer := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { w.Header().Add("test", "header") _, err := w.Write([]byte(`{"foo": "bar"}`)) - require.NoError(t, err) + assert.NoError(t, err) clientConnected = true })) defer apmServer.Close() @@ -443,7 +451,7 @@ func TestWithRootCerts(t *testing.T) { url := "http://127.0.0.1:1234" // Create a request to send to the extension - req, err := http.NewRequest(http.MethodGet, url, nil) + req, err := http.NewRequest(http.MethodGet, url, http.NoBody) require.NoError(t, err) for name, value := range headers { req.Header.Add(name, value) diff --git a/app/app.go b/app/app.go index 8c76118c..bc44f2e1 100644 --- a/app/app.go +++ b/app/app.go @@ -35,9 +35,9 @@ import ( "go.uber.org/zap" ) -var ( - defaultMaxBatchSize int = 50 - defaultMaxBatchAge time.Duration = 2 * time.Second +const ( + defaultMaxBatchSize = 50 + defaultMaxBatchAge = 2 * time.Second ) // App is the main application. @@ -50,8 +50,9 @@ type App struct { batch *accumulator.Batch } -// New returns an App or an error if the -// creation failed. +// New returns an App or an error if the creation failed. +// +//nolint:govet func New(ctx context.Context, opts ...ConfigOption) (*App, error) { c := appConfig{} @@ -70,10 +71,7 @@ func New(ctx context.Context, opts ...ConfigOption) (*App, error) { return nil, err } - apmServerAPIKey, apmServerSecretToken, err := loadAWSOptions(ctx, c.awsConfig, app.logger) - if err != nil { - return nil, err - } + apmServerAPIKey, apmServerSecretToken := loadAWSOptions(ctx, c.awsConfig, app.logger) app.extensionClient = extension.NewClient(c.awsLambdaRuntimeAPI, app.logger) @@ -88,8 +86,8 @@ func New(ctx context.Context, opts ...ConfigOption) (*App, error) { subscriptionLogStreams = append(subscriptionLogStreams, logsapi.Function) } - lc, err := logsapi.NewClient( - logsapi.WithLogsAPIBaseURL(fmt.Sprintf("http://%s", c.awsLambdaRuntimeAPI)), + app.logsClient, err = logsapi.NewClient( + logsapi.WithLogsAPIBaseURL("http://"+c.awsLambdaRuntimeAPI), logsapi.WithListenerAddress(addr), logsapi.WithLogBuffer(100), logsapi.WithLogger(app.logger), @@ -99,8 +97,6 @@ func New(ctx context.Context, opts ...ConfigOption) (*App, error) { if err != nil { return nil, err } - - app.logsClient = lc } var apmOpts []apmproxy.Option @@ -120,7 +116,7 @@ func New(ctx context.Context, opts ...ConfigOption) (*App, error) { } if port := os.Getenv("ELASTIC_APM_DATA_RECEIVER_SERVER_PORT"); port != "" { - apmOpts = append(apmOpts, apmproxy.WithReceiverAddress(fmt.Sprintf(":%s", port))) + apmOpts = append(apmOpts, apmproxy.WithReceiverAddress(":"+port)) } if strategy, ok := parseStrategy(os.Getenv("ELASTIC_APM_SEND_STRATEGY")); ok { @@ -163,7 +159,7 @@ func New(ctx context.Context, opts ...ConfigOption) (*App, error) { } if acmCertArn := os.Getenv("ELASTIC_APM_SERVER_CA_CERT_ACM_ID"); acmCertArn != "" { - cert, err := loadAcmCertificate(acmCertArn, c.awsConfig, ctx) + cert, err := loadAcmCertificate(ctx, acmCertArn, c.awsConfig) if err != nil { return nil, err } diff --git a/app/aws.go b/app/aws.go index a73c5894..c85917d5 100644 --- a/app/aws.go +++ b/app/aws.go @@ -29,26 +29,26 @@ import ( "go.uber.org/zap" ) -func loadAWSOptions(ctx context.Context, cfg aws.Config, logger *zap.SugaredLogger) (string, string, error) { +func loadAWSOptions(ctx context.Context, cfg aws.Config, logger *zap.SugaredLogger) (string, string) { //nolint:gocritic manager := secretsmanager.NewFromConfig(cfg) - apmServerApiKey := os.Getenv("ELASTIC_APM_API_KEY") - if apmServerApiKeySMSecretId, ok := os.LookupEnv("ELASTIC_APM_SECRETS_MANAGER_API_KEY_ID"); ok { - result, err := loadSecret(ctx, manager, apmServerApiKeySMSecretId) + apmServerAPIKey := os.Getenv("ELASTIC_APM_API_KEY") + if apmServerAPIKeySMSecretID, ok := os.LookupEnv("ELASTIC_APM_SECRETS_MANAGER_API_KEY_ID"); ok { + result, err := loadSecret(ctx, manager, apmServerAPIKeySMSecretID) if err != nil { - logger.Warnf("Could not load APM API key from AWS Secrets Manager. Reporting APM data will likely fail. Is 'ELASTIC_APM_SECRETS_MANAGER_API_KEY_ID=%s' correct? See https://www.elastic.co/guide/en/apm/lambda/current/aws-lambda-secrets-manager.html. Error message: %v", apmServerApiKeySMSecretId, err) - apmServerApiKey = "" + logger.Warnf("Could not load APM API key from AWS Secrets Manager. Reporting APM data will likely fail. Is 'ELASTIC_APM_SECRETS_MANAGER_API_KEY_ID=%s' correct? See https://www.elastic.co/guide/en/apm/lambda/current/aws-lambda-secrets-manager.html. Error message: %v", apmServerAPIKeySMSecretID, err) + apmServerAPIKey = "" } else { logger.Infof("Using the APM API key retrieved from AWS Secrets Manager.") - apmServerApiKey = result + apmServerAPIKey = result } } apmServerSecretToken := os.Getenv("ELASTIC_APM_SECRET_TOKEN") - if apmServerSecretTokenSMSecretId, ok := os.LookupEnv("ELASTIC_APM_SECRETS_MANAGER_SECRET_TOKEN_ID"); ok { - result, err := loadSecret(ctx, manager, apmServerSecretTokenSMSecretId) + if apmServerSecretTokenSMSecretID, ok := os.LookupEnv("ELASTIC_APM_SECRETS_MANAGER_SECRET_TOKEN_ID"); ok { + result, err := loadSecret(ctx, manager, apmServerSecretTokenSMSecretID) if err != nil { - logger.Warnf("Could not load APM secret token from AWS Secrets Manager. Reporting APM data will likely fail. Is 'ELASTIC_APM_SECRETS_MANAGER_SECRET_TOKEN_ID=%s' correct? See https://www.elastic.co/guide/en/apm/lambda/current/aws-lambda-secrets-manager.html. Error message: %v", apmServerSecretTokenSMSecretId, err) + logger.Warnf("Could not load APM secret token from AWS Secrets Manager. Reporting APM data will likely fail. Is 'ELASTIC_APM_SECRETS_MANAGER_SECRET_TOKEN_ID=%s' correct? See https://www.elastic.co/guide/en/apm/lambda/current/aws-lambda-secrets-manager.html. Error message: %v", apmServerSecretTokenSMSecretID, err) apmServerSecretToken = "" } else { logger.Infof("Using the APM secret token retrieved from AWS Secrets Manager.") @@ -56,7 +56,7 @@ func loadAWSOptions(ctx context.Context, cfg aws.Config, logger *zap.SugaredLogg } } - return apmServerApiKey, apmServerSecretToken, nil + return apmServerAPIKey, apmServerSecretToken } func loadSecret(ctx context.Context, manager *secretsmanager.Client, secretID string) (string, error) { @@ -82,7 +82,7 @@ func loadSecret(ctx context.Context, manager *secretsmanager.Client, secretID st return string(decodedBinarySecretBytes), nil } -func loadAcmCertificate(arn string, cfg aws.Config, ctx context.Context) (*string, error) { +func loadAcmCertificate(ctx context.Context, arn string, cfg aws.Config) (*string, error) { acmClient := acm.NewFromConfig(cfg) getCertificateInput := acm.GetCertificateInput{ CertificateArn: &arn, diff --git a/app/run.go b/app/run.go index 4b021d64..17936025 100644 --- a/app/run.go +++ b/app/run.go @@ -57,10 +57,10 @@ func (app *App) Run(ctx context.Context) error { // Flush all data before shutting down. defer func() { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + flushCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - app.apmClient.FlushAPMData(ctx) + app.apmClient.FlushAPMData(flushCtx) }() if app.logsClient != nil { diff --git a/e2e-testing/e2e_test.go b/e2e-testing/e2e_test.go index 8481f15d..6cc50fe1 100644 --- a/e2e-testing/e2e_test.go +++ b/e2e-testing/e2e_test.go @@ -15,10 +15,9 @@ // specific language governing permissions and limitations // under the License. -package e2eTesting +package e2etesting import ( - "github.com/elastic/apm-aws-lambda/logger" "flag" "fmt" "io" @@ -26,10 +25,13 @@ import ( "net/http/httptest" "os" "path/filepath" + "strconv" "strings" "testing" "time" + "github.com/elastic/apm-aws-lambda/logger" + "github.com/google/uuid" "github.com/joho/godotenv" "github.com/stretchr/testify/assert" @@ -65,7 +67,7 @@ func TestEndToEnd(t *testing.T) { languageName := strings.ToLower(*langPtr) supportedLanguages := []string{"nodejs", "python", "java"} if !IsStringInSlice(languageName, supportedLanguages) { - ProcessError(l, fmt.Errorf(fmt.Sprintf("Unsupported language %s ! Supported languages are %v", languageName, supportedLanguages))) + ProcessError(l, fmt.Errorf("unsupported language %s ! Supported languages are %v", languageName, supportedLanguages)) } samPath := "sam-" + languageName @@ -85,7 +87,7 @@ func TestEndToEnd(t *testing.T) { // Initialize Mock APM Server mockAPMServerLog := "" - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + ts := httptest.NewServer(http.HandlerFunc(func(_ http.ResponseWriter, r *http.Request) { if r.RequestURI == "/intake/v2/events" { bytesRes, _ := GetDecompressedBytesFromRequest(r) mockAPMServerLog += fmt.Sprintf("%s\n", bytesRes) @@ -95,13 +97,13 @@ func TestEndToEnd(t *testing.T) { resultsChan := make(chan string, 1) - testUuid := runTestWithTimer(l, samPath, samServiceName, ts.URL, *rebuildPtr, *timerPtr, resultsChan) - l.Infof("UUID generated during the test : %s", testUuid) - if testUuid == "" { + testUUID := runTestWithTimer(l, samPath, samServiceName, ts.URL, *rebuildPtr, *timerPtr, resultsChan) + l.Infof("UUID generated during the test : %s", testUUID) + if testUUID == "" { t.Fail() } l.Infof("Querying the mock server for transaction bound to %s...", samServiceName) - assert.True(t, strings.Contains(mockAPMServerLog, testUuid)) + assert.True(t, strings.Contains(mockAPMServerLog, testUUID)) } func runTestWithTimer(l *zap.SugaredLogger, path string, serviceName string, serverURL string, buildFlag bool, lambdaFuncTimeout int, resultsChan chan string) string { @@ -109,8 +111,8 @@ func runTestWithTimer(l *zap.SugaredLogger, path string, serviceName string, ser defer timer.Stop() go runTest(l, path, serviceName, serverURL, buildFlag, lambdaFuncTimeout, resultsChan) select { - case testUuid := <-resultsChan: - return testUuid + case testUUID := <-resultsChan: + return testUUID case <-timer.C: return "" } @@ -133,9 +135,9 @@ func runTest(l *zap.SugaredLogger, path string, serviceName string, serverURL st urlSlice := strings.Split(serverURL, ":") port := urlSlice[len(urlSlice)-1] RunCommandInDir(l, "sam", []string{"local", "invoke", "--parameter-overrides", - fmt.Sprintf("ParameterKey=ApmServerURL,ParameterValue=http://host.docker.internal:%s", port), - fmt.Sprintf("ParameterKey=TestUUID,ParameterValue=%s", uuidWithHyphen), - fmt.Sprintf("ParameterKey=TimeoutParam,ParameterValue=%d", lambdaFuncTimeout)}, + "ParameterKey=ApmServerURL,ParameterValue=http://host.docker.internal:" + port, + "ParameterKey=TestUUID,ParameterValue=" + uuidWithHyphen, + "ParameterKey=TimeoutParam,ParameterValue=" + strconv.Itoa(lambdaFuncTimeout)}, path) l.Infof("%s execution complete", serviceName) @@ -143,7 +145,6 @@ func runTest(l *zap.SugaredLogger, path string, serviceName string, serverURL st } func retrieveJavaAgent(l *zap.SugaredLogger, samJavaPath string, version string) { - agentFolderPath := filepath.Join(samJavaPath, "agent") agentArchivePath := filepath.Join(samJavaPath, "agent.zip") diff --git a/e2e-testing/e2e_util.go b/e2e-testing/e2e_util.go index 57cc61f8..f8a9e3d5 100644 --- a/e2e-testing/e2e_util.go +++ b/e2e-testing/e2e_util.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package e2eTesting +package e2etesting import ( "archive/zip" @@ -66,7 +66,6 @@ func RunCommandInDir(l *zap.SugaredLogger, command string, args []string, dir st if err := e.Wait(); err != nil { l.Errorf("Could not wait for the execution of %s : %v", command, err) } - } // FolderExists returns true if the specified folder exists, and false else. @@ -85,7 +84,6 @@ func ProcessError(l *zap.SugaredLogger, err error) { // Unzip is a utility function that unzips a specified zip archive to a specified destination. func Unzip(l *zap.SugaredLogger, archivePath string, destinationFolderPath string) { - openedArchive, err := zip.OpenReader(archivePath) ProcessError(l, err) defer openedArchive.Close() @@ -108,7 +106,7 @@ func Unzip(l *zap.SugaredLogger, archivePath string, destinationFolderPath strin } }() - path := filepath.Join(destinationFolderPath, f.Name) + path := filepath.Join(destinationFolderPath, f.Name) //nolint:gosec // Check for ZipSlip (Directory traversal) if !strings.HasPrefix(path, filepath.Clean(destinationFolderPath)+string(os.PathSeparator)) { @@ -116,7 +114,7 @@ func Unzip(l *zap.SugaredLogger, archivePath string, destinationFolderPath strin } if f.FileInfo().IsDir() { - if err := os.MkdirAll(path, f.Mode()); err != nil { + if err = os.MkdirAll(path, f.Mode()); err != nil { l.Errorf("Could not unzip folder : %v", err) } } else { @@ -126,7 +124,7 @@ func Unzip(l *zap.SugaredLogger, archivePath string, destinationFolderPath strin f, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, f.Mode()) ProcessError(l, err) defer f.Close() - _, err = io.Copy(f, rc) + _, err = io.Copy(f, rc) //nolint:gosec ProcessError(l, err) } return nil diff --git a/extension/client.go b/extension/client.go index 67d1eb25..e02df84b 100644 --- a/extension/client.go +++ b/extension/client.go @@ -58,7 +58,7 @@ type StatusResponse struct { Status string `json:"status"` } -// EventType represents the type of events recieved from /event/next +// EventType represents the type of events received from /event/next type EventType string const ( @@ -131,7 +131,7 @@ func (e *Client) NextEvent(ctx context.Context) (*NextEventResponse, error) { const action = "/event/next" url := e.baseURL + action - httpReq, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + httpReq, err := http.NewRequestWithContext(ctx, http.MethodGet, url, http.NoBody) if err != nil { return nil, fmt.Errorf("failed to create next event request: %w", err) } @@ -154,11 +154,13 @@ func (e *Client) NextEvent(ctx context.Context) (*NextEventResponse, error) { } // InitError reports an initialization error to the platform. Call it when you registered but failed to initialize +// +//nolint:dupl func (e *Client) InitError(ctx context.Context, errorType string) (*StatusResponse, error) { const action = "/init/error" url := e.baseURL + action - httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, nil) + httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, http.NoBody) if err != nil { return nil, fmt.Errorf("failed to create init error request: %w", err) } @@ -182,11 +184,13 @@ func (e *Client) InitError(ctx context.Context, errorType string) (*StatusRespon } // ExitError reports an error to the platform before exiting. Call it when you encounter an unexpected failure +// +//nolint:dupl func (e *Client) ExitError(ctx context.Context, errorType string) (*StatusResponse, error) { const action = "/exit/error" url := e.baseURL + action - httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, nil) + httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, http.NoBody) if err != nil { return nil, fmt.Errorf("failed to create exit error request: %w", err) } diff --git a/extension/client_test.go b/extension/client_test.go index d548facd..db08d91f 100644 --- a/extension/client_test.go +++ b/extension/client_test.go @@ -76,7 +76,7 @@ func TestNextEvent(t *testing.T) { } `) - runtimeServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + runtimeServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { if _, err := w.Write(response); err != nil { t.Fail() return diff --git a/logger/logger.go b/logger/logger.go index faa7f89a..6b1b9a87 100644 --- a/logger/logger.go +++ b/logger/logger.go @@ -27,7 +27,7 @@ import ( ) // New returns a logger. -func New(opts ...option) (*zap.SugaredLogger, error) { +func New(opts ...Option) (*zap.SugaredLogger, error) { conf := zap.NewProductionConfig() for _, opt := range opts { diff --git a/logger/logger_test.go b/logger/logger_test.go index 0233723a..cea5c4fa 100644 --- a/logger/logger_test.go +++ b/logger/logger_test.go @@ -18,10 +18,10 @@ package logger_test import ( - "github.com/elastic/apm-aws-lambda/logger" "os" "testing" + "github.com/elastic/apm-aws-lambda/logger" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.elastic.co/ecszap" diff --git a/logger/option.go b/logger/option.go index 0d03bd32..a9e8fadf 100644 --- a/logger/option.go +++ b/logger/option.go @@ -22,21 +22,21 @@ import ( "go.uber.org/zap/zapcore" ) -type option func(*zap.Config) +type Option func(*zap.Config) -func WithLevel(level zapcore.Level) option { +func WithLevel(level zapcore.Level) Option { return func(c *zap.Config) { c.Level.SetLevel(level) } } -func WithEncoderConfig(encoderConfig zapcore.EncoderConfig) option { +func WithEncoderConfig(encoderConfig zapcore.EncoderConfig) Option { return func(c *zap.Config) { c.EncoderConfig = encoderConfig } } -func WithOutputPaths(path string) option { +func WithOutputPaths(path string) Option { return func(c *zap.Config) { c.OutputPaths = append(c.OutputPaths, path) } diff --git a/logsapi/client.go b/logsapi/client.go index ac3433bb..df884cef 100644 --- a/logsapi/client.go +++ b/logsapi/client.go @@ -73,7 +73,10 @@ type Client struct { // NewClient returns a new Client with the given URL. func NewClient(opts ...ClientOption) (*Client, error) { c := Client{ - server: &http.Server{}, + server: &http.Server{ + // Fixes "Potential Slowloris Attack because ReadHeaderTimeout is not configured in the http.Server" + ReadHeaderTimeout: time.Second * 5, + }, httpClient: &http.Client{}, } @@ -106,25 +109,25 @@ func (lc *Client) StartService(extensionID string) error { _, port, err := net.SplitHostPort(addr) if err != nil { - if err := lc.Shutdown(); err != nil { - lc.logger.Warnf("failed to shutdown the server: %v", err) + if err2 := lc.Shutdown(); err2 != nil { + lc.logger.Warnf("failed to shutdown the server: %v", err2) } return fmt.Errorf("failed to retrieve port from address %s: %w", addr, err) } host, _, err := net.SplitHostPort(lc.listenerAddr) if err != nil { - if err := lc.Shutdown(); err != nil { - lc.logger.Warnf("failed to shutdown the server: %v", err) + if err2 := lc.Shutdown(); err2 != nil { + lc.logger.Warnf("failed to shutdown the server: %v", err2) } return fmt.Errorf("failed to retrieve host from address %s: %w", lc.listenerAddr, err) } - uri := fmt.Sprintf("http://%s", net.JoinHostPort(host, port)) + uri := "http://" + net.JoinHostPort(host, port) if err := lc.subscribe(lc.logsAPISubscriptionTypes, extensionID, uri); err != nil { - if err := lc.Shutdown(); err != nil { - lc.logger.Warnf("failed to shutdown the server: %v", err) + if err2 := lc.Shutdown(); err2 != nil { + lc.logger.Warnf("failed to shutdown the server: %v", err2) } return err } diff --git a/logsapi/client_test.go b/logsapi/client_test.go index b29d8cb8..cdd3736e 100644 --- a/logsapi/client_test.go +++ b/logsapi/client_test.go @@ -26,6 +26,7 @@ import ( "testing" "github.com/elastic/apm-aws-lambda/logsapi" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/zap/zaptest" @@ -97,15 +98,19 @@ func TestSubscribe(t *testing.T) { t.Run(name, func(t *testing.T) { s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { var subRequest logsapi.SubscribeRequest - require.NoError(t, json.NewDecoder(r.Body).Decode(&subRequest)) + assert.NoError(t, json.NewDecoder(r.Body).Decode(&subRequest)) _, err := url.ParseRequestURI(subRequest.Destination.URI) - require.NoError(t, err) + assert.NoError(t, err) w.WriteHeader(tc.responseHeader) })) defer s.Close() - cOpts := append(tc.opts, logsapi.WithLogsAPIBaseURL(s.URL), logsapi.WithSubscriptionTypes(logsapi.Platform)) - c, err := logsapi.NewClient(cOpts...) + c, err := logsapi.NewClient( + append(tc.opts, + logsapi.WithLogsAPIBaseURL(s.URL), + logsapi.WithSubscriptionTypes(logsapi.Platform), + )..., + ) require.NoError(t, err) if tc.expectedErr { @@ -136,20 +141,21 @@ func TestSubscribeAWSRequest(t *testing.T) { t.Run(name, func(t *testing.T) { s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { var subRequest logsapi.SubscribeRequest - require.NoError(t, json.NewDecoder(r.Body).Decode(&subRequest)) + assert.NoError(t, json.NewDecoder(r.Body).Decode(&subRequest)) _, err := url.ParseRequestURI(subRequest.Destination.URI) - require.NoError(t, err) + assert.NoError(t, err) w.WriteHeader(http.StatusOK) })) defer s.Close() - cOpts := append( - tc.opts, - logsapi.WithLogsAPIBaseURL(s.URL), - logsapi.WithLogBuffer(1), - logsapi.WithSubscriptionTypes(logsapi.Platform, logsapi.Function), + c, err := logsapi.NewClient( + append( + tc.opts, + logsapi.WithLogsAPIBaseURL(s.URL), + logsapi.WithLogBuffer(1), + logsapi.WithSubscriptionTypes(logsapi.Platform, logsapi.Function), + )..., ) - c, err := logsapi.NewClient(cOpts...) require.NoError(t, err) require.NoError(t, c.StartService("testID")) diff --git a/logsapi/event.go b/logsapi/event.go index e3d9975c..2dc79f60 100644 --- a/logsapi/event.go +++ b/logsapi/event.go @@ -52,7 +52,7 @@ type LogEventRecord struct { } // ProcessLogs consumes log events until there are no more log events that -// can be consumed or ctx is cancelled. For INVOKE event this state is +// can be consumed or ctx is canceled. For INVOKE event this state is // reached when runtimeDone event for the current requestID is processed // whereas for SHUTDOWN event this state is reached when the platformReport // event for the previous requestID is processed. @@ -66,7 +66,7 @@ func (lc *Client) ProcessLogs( for { select { case logEvent := <-lc.logsChannel: - if shouldExit := lc.handleEvent(logEvent, ctx, requestID, invokedFnArn, dataChan, isShutdown); shouldExit { + if shouldExit := lc.handleEvent(ctx, logEvent, requestID, invokedFnArn, dataChan, isShutdown); shouldExit { return } case <-ctx.Done(): @@ -87,7 +87,7 @@ func (lc *Client) FlushData( for { select { case logEvent := <-lc.logsChannel: - if shouldExit := lc.handleEvent(logEvent, ctx, requestID, invokedFnArn, dataChan, isShutdown); shouldExit { + if shouldExit := lc.handleEvent(ctx, logEvent, requestID, invokedFnArn, dataChan, isShutdown); shouldExit { return } case <-ctx.Done(): @@ -102,8 +102,8 @@ func (lc *Client) FlushData( } } -func (lc *Client) handleEvent(logEvent LogEvent, - ctx context.Context, +func (lc *Client) handleEvent(ctx context.Context, + logEvent LogEvent, requestID string, invokedFnArn string, dataChan chan []byte, diff --git a/logsapi/functionlogs_test.go b/logsapi/functionlogs_test.go index ff31c389..1ea00ea3 100644 --- a/logsapi/functionlogs_test.go +++ b/logsapi/functionlogs_test.go @@ -35,7 +35,7 @@ func TestProcessFunctionLog(t *testing.T) { reqID := "8476a536-e9f4-11e8-9739-2dfe598c3fcd" invokedFnArn := "arn:aws:lambda:us-east-2:123456789012:function:custom-runtime" expectedData := fmt.Sprintf( - "{\"log\":{\"@timestamp\":%d,\"message\":\"%s\",\"faas\":{\"execution\":\"%s\",\"id\":\"%s\"}}}", + "{\"log\":{\"@timestamp\":%d,\"message\":%q,\"faas\":{\"execution\":%q,\"id\":%q}}}", event.Time.UnixNano()/int64(time.Microsecond), event.StringRecord, reqID, diff --git a/logsapi/metrics_test.go b/logsapi/metrics_test.go index 468dbc9e..7e0463df 100644 --- a/logsapi/metrics_test.go +++ b/logsapi/metrics_test.go @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +//nolint:dupl package logsapi import ( @@ -54,9 +55,7 @@ func TestProcessPlatformReport_Coldstart(t *testing.T) { event := extension.NextEventResponse{ Timestamp: timestamp, - EventType: extension.Invoke, DeadlineMs: timestamp.UnixNano()/1e6 + 4584, // Milliseconds - RequestID: "8476a536-e9f4-11e8-9739-2dfe598c3fcd", InvokedFunctionArn: "arn:aws:lambda:us-east-2:123456789012:function:custom-runtime", Tracing: extension.Tracing{ Type: "None", @@ -98,9 +97,7 @@ func TestProcessPlatformReport_NoColdstart(t *testing.T) { event := extension.NextEventResponse{ Timestamp: timestamp, - EventType: extension.Invoke, DeadlineMs: timestamp.UnixNano()/1e6 + 4584, // Milliseconds - RequestID: "8476a536-e9f4-11e8-9739-2dfe598c3fcd", InvokedFunctionArn: "arn:aws:lambda:us-east-2:123456789012:function:custom-runtime", Tracing: extension.Tracing{ Type: "None", @@ -136,8 +133,6 @@ func BenchmarkPlatformReport(b *testing.B) { } nextEventResp := &extension.NextEventResponse{ Timestamp: timestamp, - EventType: extension.Invoke, - RequestID: reqID, InvokedFunctionArn: invokedFnArn, } diff --git a/logsapi/model/model.go b/logsapi/model/model.go index 5d3f42de..a65dd0ba 100644 --- a/logsapi/model/model.go +++ b/logsapi/model/model.go @@ -43,7 +43,7 @@ func (t Time) MarshalFastJSON(w *fastjson.Writer) error { // faas struct is a subset of go.elastic.co/apm/v2/model#FAAS // // The purpose of having a separate struct is to have a custom -// marshalling logic that is targeted for the faas fields +// marshaling logic that is targeted for the faas fields // available for function logs. For example: `coldstart` value // cannot be inferred for function logs so this struct drops // the field entirely. diff --git a/logsapi/route_handlers_test.go b/logsapi/route_handlers_test.go index c64069ef..ec8026fb 100644 --- a/logsapi/route_handlers_test.go +++ b/logsapi/route_handlers_test.go @@ -57,7 +57,6 @@ func TestLogEventUnmarshalReport(t *testing.T) { }, } assert.Equal(t, rec, le.Record) - } func TestLogEventUnmarshalFault(t *testing.T) { @@ -74,7 +73,6 @@ func TestLogEventUnmarshalFault(t *testing.T) { assert.Equal(t, "2020-08-20T12:31:32.123Z", le.Time.Format(time.RFC3339Nano)) rec := "RequestId: d783b35e-a91d-4251-af17-035953428a2c Process exited before completing request" assert.Equal(t, rec, le.StringRecord) - } func Test_unmarshalRuntimeDoneRecordObject(t *testing.T) { diff --git a/logsapi/subscribe.go b/logsapi/subscribe.go index ecf4b309..5182ca23 100644 --- a/logsapi/subscribe.go +++ b/logsapi/subscribe.go @@ -102,7 +102,7 @@ func (lc *Client) subscribe(types []SubscriptionType, extensionID string, uri st return fmt.Errorf("failed to marshal SubscribeRequest: %w", err) } - url := fmt.Sprintf("%s/2020-08-15/logs", lc.logsAPIBaseURL) + url := lc.logsAPIBaseURL + "/2020-08-15/logs" resp, err := lc.sendRequest(url, data, extensionID) if err != nil { return err diff --git a/main.go b/main.go index 23ec9511..478674ac 100644 --- a/main.go +++ b/main.go @@ -19,6 +19,7 @@ package main import ( "context" + "fmt" "log" "os" "os/signal" @@ -31,13 +32,19 @@ import ( ) func main() { + if err := mainWithError(); err != nil { + log.Fatal(err) + } +} + +func mainWithError() error { // Global context ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT) defer cancel() cfg, err := config.LoadDefaultConfig(ctx) if err != nil { - log.Fatalf("failed to load AWS default config: %v", err) + return fmt.Errorf("failed to load AWS default config: %v", err) } appConfigs := []app.ConfigOption{ @@ -68,12 +75,14 @@ func main() { appConfigs = append(appConfigs, app.WithFunctionLogSubscription()) } - app, err := app.New(ctx, appConfigs...) + application, err := app.New(ctx, appConfigs...) if err != nil { - log.Fatalf("failed to create the app: %v", err) + return fmt.Errorf("failed to create the app: %v", err) } - if err := app.Run(ctx); err != nil { - log.Fatalf("error while running: %v", err) + if err := application.Run(ctx); err != nil { + return fmt.Errorf("error while running: %v", err) } + + return nil } diff --git a/main_test.go b/main_test.go index c71422a4..bb27b3f4 100644 --- a/main_test.go +++ b/main_test.go @@ -27,6 +27,7 @@ import ( "net/http/httptest" "os" "regexp" + "strconv" "strings" "sync" "testing" @@ -195,7 +196,9 @@ func newMockLambdaServer(t *testing.T, logsapiAddr string, eventsChannel chan Mo case nextEvent := <-eventsChannel: sendNextEventInfo(w, currID, nextEvent.Timeout, nextEvent.Type == Shutdown, l) wg.Add(1) - go processMockEvent(mockLogEventQ, currID, nextEvent, os.Getenv("ELASTIC_APM_DATA_RECEIVER_SERVER_PORT"), &lambdaServerInternals, l, &wg) + go processMockEvent(mockLogEventQ, currID, nextEvent, + os.Getenv("ELASTIC_APM_DATA_RECEIVER_SERVER_PORT"), &lambdaServerInternals, l, + &wg) default: finalShutDown := MockEvent{ Type: Shutdown, @@ -204,7 +207,9 @@ func newMockLambdaServer(t *testing.T, logsapiAddr string, eventsChannel chan Mo } sendNextEventInfo(w, currID, finalShutDown.Timeout, true, l) wg.Add(1) - go processMockEvent(mockLogEventQ, currID, finalShutDown, os.Getenv("ELASTIC_APM_DATA_RECEIVER_SERVER_PORT"), &lambdaServerInternals, l, &wg) + go processMockEvent(mockLogEventQ, currID, finalShutDown, + os.Getenv("ELASTIC_APM_DATA_RECEIVER_SERVER_PORT"), &lambdaServerInternals, l, + &wg) } // Logs API subscription request case "/2020-08-15/logs": @@ -222,13 +227,13 @@ func newMockLambdaServer(t *testing.T, logsapiAddr string, eventsChannel chan Mo l.Errorf("Could not find free port for the extension to listen on : %v", err) extensionPort = 8200 } - t.Setenv("ELASTIC_APM_DATA_RECEIVER_SERVER_PORT", fmt.Sprint(extensionPort)) + t.Setenv("ELASTIC_APM_DATA_RECEIVER_SERVER_PORT", strconv.Itoa(extensionPort)) t.Cleanup(func() { lambdaServer.Close() }) return &lambdaServerInternals } -func newTestStructs(t *testing.T) chan MockEvent { +func newTestStructs(_ *testing.T) chan MockEvent { http.DefaultServeMux = new(http.ServeMux) eventsChannel := make(chan MockEvent, 100) return eventsChannel @@ -264,7 +269,8 @@ func processMockEvent(q chan<- logsapi.LogEvent, currID string, event MockEvent, time.Sleep(time.Duration(event.Timeout * float64(time.Second))) case InvokeStandard: time.Sleep(delay) - req, err := http.NewRequest("POST", fmt.Sprintf("http://localhost:%s/intake/v2/events", extensionPort), buf) + req, err := http.NewRequest(http.MethodPost, + fmt.Sprintf("http://localhost:%s/intake/v2/events", extensionPort), buf) if err != nil { l.Error(err.Error()) } @@ -276,13 +282,17 @@ func processMockEvent(q chan<- logsapi.LogEvent, currID string, event MockEvent, l.Debugf("Response seen by the agent : %d", res.StatusCode) case InvokeStandardFlush: time.Sleep(delay) - reqData, _ := http.NewRequest("POST", fmt.Sprintf("http://localhost:%s/intake/v2/events?flushed=true", extensionPort), buf) - if _, err := client.Do(reqData); err != nil { + reqData, _ := http.NewRequest(http.MethodPost, + fmt.Sprintf("http://localhost:%s/intake/v2/events?flushed=true", extensionPort), buf) + res, err := client.Do(reqData) + if err != nil { l.Error(err.Error()) } + res.Body.Close() case InvokeLateFlush: time.Sleep(delay) - reqData, _ := http.NewRequest("POST", fmt.Sprintf("http://localhost:%s/intake/v2/events?flushed=true", extensionPort), buf) + reqData, _ := http.NewRequest(http.MethodPost, + fmt.Sprintf("http://localhost:%s/intake/v2/events?flushed=true", extensionPort), buf) internals.WaitGroup.Add(1) go func() { <-ch @@ -300,8 +310,12 @@ func processMockEvent(q chan<- logsapi.LogEvent, currID string, event MockEvent, // we can't share a bytes.Buffer with two http requests // create two bytes.Reader to avoid a race condition body := buf.Bytes() - reqData0, _ := http.NewRequest("POST", fmt.Sprintf("http://localhost:%s/intake/v2/events", extensionPort), bytes.NewReader(body)) - reqData1, _ := http.NewRequest("POST", fmt.Sprintf("http://localhost:%s/intake/v2/events", extensionPort), bytes.NewReader(body)) + reqData0, _ := http.NewRequest(http.MethodPost, + fmt.Sprintf("http://localhost:%s/intake/v2/events", extensionPort), + bytes.NewReader(body)) + reqData1, _ := http.NewRequest(http.MethodPost, + fmt.Sprintf("http://localhost:%s/intake/v2/events", extensionPort), + bytes.NewReader(body)) res, err := client.Do(reqData0) if err != nil { l.Error(err.Error()) @@ -322,7 +336,9 @@ func processMockEvent(q chan<- logsapi.LogEvent, currID string, event MockEvent, wg.Add(1) go func() { time.Sleep(delay) - reqData, _ := http.NewRequest("POST", fmt.Sprintf("http://localhost:%s/intake/v2/events", extensionPort), bytes.NewReader(body)) + reqData, _ := http.NewRequest(http.MethodPost, + fmt.Sprintf("http://localhost:%s/intake/v2/events", extensionPort), + bytes.NewReader(body)) res, err := client.Do(reqData) if err != nil { l.Error(err.Error()) @@ -334,7 +350,9 @@ func processMockEvent(q chan<- logsapi.LogEvent, currID string, event MockEvent, wg.Wait() case InvokeStandardInfo: time.Sleep(delay) - req, _ := http.NewRequest("POST", fmt.Sprintf("http://localhost:%s/", extensionPort), bytes.NewBuffer([]byte(event.APMServerBehavior))) + req, _ := http.NewRequest(http.MethodPost, + fmt.Sprintf("http://localhost:%s/", extensionPort), + bytes.NewBuffer([]byte(event.APMServerBehavior))) res, err := client.Do(req) if err != nil { l.Errorf("No response following info request : %v", err) @@ -407,11 +425,11 @@ func startLogSender(ctx context.Context, q <-chan logsapi.LogEvent, logsapiAddr } doSend := func(events []logsapi.LogEvent) error { var buf bytes.Buffer - if err := json.NewEncoder(&buf).Encode(events); err != nil { + if err := json.NewEncoder(&buf).Encode(events); err != nil { //nolint:musttag return err } - req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("http://%s", logsapiAddr), &buf) + req, err := http.NewRequest(http.MethodPost, "http://"+logsapiAddr, &buf) if err != nil { return err } @@ -420,7 +438,8 @@ func startLogSender(ctx context.Context, q <-chan logsapi.LogEvent, logsapiAddr if err != nil { return err } - if resp.StatusCode/100 != 2 { + defer resp.Body.Close() + if resp.StatusCode/100 != 2 { //nolint:usestdlibvars return fmt.Errorf("received a non 2xx status code: %d", resp.StatusCode) } return nil @@ -550,7 +569,8 @@ func TestLateFlush(t *testing.T) { case <-runApp(t, logsapiAddr): assert.Regexp( t, - regexp.MustCompile(fmt.Sprintf(".*\n%s.*\n%s", TimelyResponse, TimelyResponse)), // metadata followed by TimelyResponsex2 + regexp.MustCompile(fmt.Sprintf(".*\n%s.*\n%s", TimelyResponse, + TimelyResponse)), // metadata followed by TimelyResponsex2 apmServerInternals.Data, ) case <-time.After(timeout): @@ -662,7 +682,6 @@ func TestAPMServerRecovery(t *testing.T) { case <-time.After(10 * time.Second): t.Fatalf("timed out waiting for app to finish") } - } // TestGracePeriodHangs verifies that the WaitforGracePeriod goroutine ends when main() ends. @@ -687,7 +706,6 @@ func TestGracePeriodHangs(t *testing.T) { case <-time.After(timeout): t.Fatalf("timed out waiting for app to finish") } - } // TestAPMServerCrashesDuringExecution tests that main does not panic nor runs indefinitely when the APM server crashes @@ -805,7 +823,8 @@ func TestInfoRequestHangs(t *testing.T) { select { case <-runApp(t, logsapiAddr): time.Sleep(2 * time.Second) - assert.NotContains(t, lambdaServerInternals.Data, "7814d524d3602e70b703539c57568cba6964fc20") + assert.NotContains(t, lambdaServerInternals.Data, + "7814d524d3602e70b703539c57568cba6964fc20") apmServerInternals.UnlockSignalChannel <- struct{}{} case <-time.After(timeout): t.Fatalf("timed out waiting for app to finish") @@ -830,14 +849,16 @@ func TestMetrics(t *testing.T) { select { case <-runApp(t, logsapiAddr): - assert.Contains(t, apmServerInternals.Data, `{"metadata":{"service":{"name":"1234_service-12a3","version":"5.1.3","environment":"staging","agent":{"name":"elastic-node","version":"3.14.0"},"framework":{"name":"Express","version":"1.2.3"},"language":{"name":"ecmascript","version":"8"},"runtime":{"name":"node","version":"8.0.0"},"node":{"configured_name":"node-123"}},"user":{"username":"bar","id":"123user","email":"bar@user.com"},"labels":{"tag0":null,"tag1":"one","tag2":2},"process":{"pid":1234,"ppid":6789,"title":"node","argv":["node","server.js"]},"system":{"architecture":"x64","hostname":"prod1.example.com","platform":"darwin","container":{"id":"container-id"},"kubernetes":{"namespace":"namespace1","node":{"name":"node-name"},"pod":{"name":"pod-name","uid":"pod-uid"}}},"cloud":{"provider":"cloud_provider","region":"cloud_region","availability_zone":"cloud_availability_zone","instance":{"id":"instance_id","name":"instance_name"},"machine":{"type":"machine_type"},"account":{"id":"account_id","name":"account_name"},"project":{"id":"project_id","name":"project_name"},"service":{"name":"lambda"}}}}`) + assert.Contains(t, apmServerInternals.Data, + `{"metadata":{"service":{"name":"1234_service-12a3","version":"5.1.3","environment":"staging","agent":{"name":"elastic-node","version":"3.14.0"},"framework":{"name":"Express","version":"1.2.3"},"language":{"name":"ecmascript","version":"8"},"runtime":{"name":"node","version":"8.0.0"},"node":{"configured_name":"node-123"}},"user":{"username":"bar","id":"123user","email":"bar@user.com"},"labels":{"tag0":null,"tag1":"one","tag2":2},"process":{"pid":1234,"ppid":6789,"title":"node","argv":["node","server.js"]},"system":{"architecture":"x64","hostname":"prod1.example.com","platform":"darwin","container":{"id":"container-id"},"kubernetes":{"namespace":"namespace1","node":{"name":"node-name"},"pod":{"name":"pod-name","uid":"pod-uid"}}},"cloud":{"provider":"cloud_provider","region":"cloud_region","availability_zone":"cloud_availability_zone","instance":{"id":"instance_id","name":"instance_name"},"machine":{"type":"machine_type"},"account":{"id":"account_id","name":"account_name"},"project":{"id":"project_id","name":"project_name"},"service":{"name":"lambda"}}}}`) assert.Contains(t, apmServerInternals.Data, `faas.billed_duration":{"value":60`) assert.Contains(t, apmServerInternals.Data, `faas.duration":{"value":59.9`) assert.Contains(t, apmServerInternals.Data, `faas.coldstart_duration":{"value":500`) assert.Contains(t, apmServerInternals.Data, `faas.timeout":{"value":5000}`) assert.Contains(t, apmServerInternals.Data, `coldstart":true`) assert.Contains(t, apmServerInternals.Data, `execution"`) - assert.Contains(t, apmServerInternals.Data, `id":"arn:aws:lambda:eu-central-1:627286350134:function:main_unit_test"`) + assert.Contains(t, apmServerInternals.Data, + `id":"arn:aws:lambda:eu-central-1:627286350134:function:main_unit_test"`) case <-time.After(timeout): t.Fatalf("timed out waiting for app to finish") } @@ -858,11 +879,11 @@ func runAppFull(t *testing.T, logsapiAddr string, disableLogsAPI bool) <-chan st if disableLogsAPI { opts = append(opts, app.WithoutLogsAPI()) } - app, err := app.New(ctx, opts...) + application, err := app.New(ctx, opts...) require.NoError(t, err) go func() { - require.NoError(t, app.Run(ctx)) + assert.NoError(t, application.Run(ctx)) cancel() }()