diff --git a/apmproxy/apmserver.go b/apmproxy/apmserver.go index 08a06ec4..35fb4396 100644 --- a/apmproxy/apmserver.go +++ b/apmproxy/apmserver.go @@ -21,6 +21,7 @@ import ( "bytes" "compress/gzip" "context" + "encoding/json" "errors" "fmt" "io" @@ -30,11 +31,20 @@ import ( "time" ) +type jsonResult struct { + Errors []jsonError `json:"errors,omitempty"` +} + +type jsonError struct { + Message string `json:"message"` + Document string `json:"document,omitempty"` +} + // ForwardApmData receives agent data as it comes in and posts it to the APM server. // Stop checking for, and sending agent data when the function invocation // has completed, signaled via a channel. func (c *Client) ForwardApmData(ctx context.Context, metadataContainer *MetadataContainer) error { - if c.Status == Failing { + if c.IsUnhealthy() { return nil } for { @@ -59,7 +69,7 @@ func (c *Client) ForwardApmData(ctx context.Context, metadataContainer *Metadata // FlushAPMData reads all the apm data in the apm data channel and sends it to the APM server. func (c *Client) FlushAPMData(ctx context.Context) { - if c.Status == Failing { + if c.IsUnhealthy() { c.logger.Debug("Flush skipped - Transport failing") return } @@ -86,7 +96,7 @@ func (c *Client) FlushAPMData(ctx context.Context) { func (c *Client) PostToApmServer(ctx context.Context, agentData AgentData) error { // todo: can this be a streaming or streaming style call that keeps the // connection open across invocations? - if c.Status == Failing { + if c.IsUnhealthy() { return errors.New("transport status is unhealthy") } @@ -131,46 +141,106 @@ func (c *Client) PostToApmServer(ctx context.Context, agentData AgentData) error c.logger.Debug("Sending data chunk to APM server") resp, err := c.client.Do(req) if err != nil { - c.SetApmServerTransportState(ctx, Failing) + c.UpdateStatus(ctx, Failing) return fmt.Errorf("failed to post to APM server: %v", err) } - - //Read the response body defer resp.Body.Close() - body, err := io.ReadAll(resp.Body) - if err != nil { - c.SetApmServerTransportState(ctx, Failing) - return fmt.Errorf("failed to read the response body after posting to the APM server") + + // On success, the server will respond with a 202 Accepted status code and no body. + if resp.StatusCode == http.StatusAccepted { + c.UpdateStatus(ctx, Healthy) + return nil + } + + // RateLimited + if resp.StatusCode == http.StatusTooManyRequests { + c.logger.Warnf("Transport has been rate limited: response status code: %d", resp.StatusCode) + c.UpdateStatus(ctx, RateLimited) + return nil } - if resp.StatusCode == http.StatusUnauthorized { + jErr := jsonResult{} + if err := json.NewDecoder(resp.Body).Decode(&jErr); err != nil { + // non critical error. + // Log a warning and continue. + c.logger.Warnf("failed to decode response body: %v", err) + } + + // Auth errors + if resp.StatusCode == http.StatusUnauthorized || resp.StatusCode == http.StatusForbidden { c.logger.Warnf("Authentication with the APM server failed: response status code: %d", resp.StatusCode) - c.logger.Debugf("APM server response body: %v", string(body)) + for _, err := range jErr.Errors { + c.logger.Warnf("failed to authenticate: document %s: message: %s", err.Document, err.Message) + } + c.UpdateStatus(ctx, Failing) + return nil + } + + // ClientErrors + if resp.StatusCode >= 400 && resp.StatusCode < 500 { + c.logger.Warnf("client error: response status code: %d", resp.StatusCode) + for _, err := range jErr.Errors { + c.logger.Warnf("client error: document %s: message: %s", err.Document, err.Message) + } + c.UpdateStatus(ctx, ClientFailing) return nil } - c.SetApmServerTransportState(ctx, Healthy) - c.logger.Debug("Transport status set to healthy") - c.logger.Debugf("APM server response body: %v", string(body)) - c.logger.Debugf("APM server response status code: %v", resp.StatusCode) + // critical errors + if resp.StatusCode == http.StatusInternalServerError || resp.StatusCode == http.StatusServiceUnavailable { + c.logger.Warnf("failed to post data to APM server: response status code: %d", resp.StatusCode) + for _, err := range jErr.Errors { + c.logger.Warnf("critical error: document %s: message: %s", err.Document, err.Message) + } + c.UpdateStatus(ctx, Failing) + return nil + } + + c.logger.Warnf("unhandled status code: %d", resp.StatusCode) return nil } -// SetApmServerTransportState takes a state of the APM server transport and updates +// IsUnhealthy returns true if the apmproxy is not healthy. +func (c *Client) IsUnhealthy() bool { + c.mu.RLock() + defer c.mu.RUnlock() + return c.Status == Failing +} + +// UpdateStatus takes a state of the APM server transport and updates // the current state of the transport. For a change to a failing state, the grace period // is calculated and a go routine is started that waits for that period to complete // before changing the status to "pending". This would allow a subsequent send attempt // to the APM server. // // This function is public for use in tests. -func (c *Client) SetApmServerTransportState(ctx context.Context, status Status) { +func (c *Client) UpdateStatus(ctx context.Context, status Status) { + // Reduce lock contention as UpdateStatus is called on every + // successful request + c.mu.RLock() + if status == c.Status { + c.mu.RUnlock() + return + } + c.mu.RUnlock() + switch status { case Healthy: c.mu.Lock() + if c.Status == status { + return + } c.Status = status c.logger.Debugf("APM server Transport status set to %s", c.Status) c.ReconnectionCount = -1 c.mu.Unlock() + case RateLimited, ClientFailing: + // No need to start backoff, this is a temporary status. It usually + // means we went over the limit of events/s. + c.mu.Lock() + c.Status = status + c.logger.Debugf("APM server Transport status set to %s", c.Status) + c.mu.Unlock() case Failing: c.mu.Lock() c.Status = status @@ -178,6 +248,8 @@ func (c *Client) SetApmServerTransportState(ctx context.Context, status Status) c.ReconnectionCount++ gracePeriodTimer := time.NewTimer(c.ComputeGracePeriod()) c.logger.Debugf("Grace period entered, reconnection count : %d", c.ReconnectionCount) + c.mu.Unlock() + go func() { select { case <-gracePeriodTimer.C: @@ -185,7 +257,8 @@ func (c *Client) SetApmServerTransportState(ctx context.Context, status Status) case <-ctx.Done(): c.logger.Debug("Grace period over - context done") } - c.Status = Pending + c.mu.Lock() + c.Status = Started c.logger.Debugf("APM server Transport status set to %s", c.Status) c.mu.Unlock() }() diff --git a/apmproxy/apmserver_test.go b/apmproxy/apmserver_test.go index 3e5cc952..3f028ff0 100644 --- a/apmproxy/apmserver_test.go +++ b/apmproxy/apmserver_test.go @@ -20,13 +20,15 @@ package apmproxy_test import ( "compress/gzip" "context" - "github.com/elastic/apm-aws-lambda/apmproxy" "io" "net/http" "net/http/httptest" + "sync/atomic" "testing" "time" + "github.com/elastic/apm-aws-lambda/apmproxy" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/zap/zaptest" @@ -170,7 +172,7 @@ func TestSetHealthyTransport(t *testing.T) { apmproxy.WithLogger(zaptest.NewLogger(t).Sugar()), ) require.NoError(t, err) - apmClient.SetApmServerTransportState(context.Background(), apmproxy.Healthy) + apmClient.UpdateStatus(context.Background(), apmproxy.Healthy) assert.True(t, apmClient.Status == apmproxy.Healthy) assert.Equal(t, apmClient.ReconnectionCount, -1) } @@ -184,7 +186,7 @@ func TestSetFailingTransport(t *testing.T) { ) require.NoError(t, err) apmClient.ReconnectionCount = 0 - apmClient.SetApmServerTransportState(context.Background(), apmproxy.Failing) + apmClient.UpdateStatus(context.Background(), apmproxy.Failing) assert.True(t, apmClient.Status == apmproxy.Failing) assert.Equal(t, apmClient.ReconnectionCount, 1) } @@ -195,12 +197,12 @@ func TestSetPendingTransport(t *testing.T) { apmproxy.WithLogger(zaptest.NewLogger(t).Sugar()), ) require.NoError(t, err) - apmClient.SetApmServerTransportState(context.Background(), apmproxy.Healthy) - apmClient.SetApmServerTransportState(context.Background(), apmproxy.Failing) + apmClient.UpdateStatus(context.Background(), apmproxy.Healthy) + apmClient.UpdateStatus(context.Background(), apmproxy.Failing) require.Eventually(t, func() bool { - return apmClient.Status != apmproxy.Failing - }, 5*time.Second, 50*time.Millisecond) - assert.True(t, apmClient.Status == apmproxy.Pending) + return !apmClient.IsUnhealthy() + }, 7*time.Second, 50*time.Millisecond) + assert.True(t, apmClient.Status == apmproxy.Started) assert.Equal(t, apmClient.ReconnectionCount, 0) } @@ -210,8 +212,8 @@ func TestSetPendingTransportExplicitly(t *testing.T) { apmproxy.WithLogger(zaptest.NewLogger(t).Sugar()), ) require.NoError(t, err) - apmClient.SetApmServerTransportState(context.Background(), apmproxy.Healthy) - apmClient.SetApmServerTransportState(context.Background(), apmproxy.Pending) + apmClient.UpdateStatus(context.Background(), apmproxy.Healthy) + apmClient.UpdateStatus(context.Background(), apmproxy.Started) assert.True(t, apmClient.Status == apmproxy.Healthy) assert.Equal(t, apmClient.ReconnectionCount, -1) } @@ -222,8 +224,8 @@ func TestSetInvalidTransport(t *testing.T) { apmproxy.WithLogger(zaptest.NewLogger(t).Sugar()), ) require.NoError(t, err) - apmClient.SetApmServerTransportState(context.Background(), apmproxy.Healthy) - apmClient.SetApmServerTransportState(context.Background(), "Invalid") + apmClient.UpdateStatus(context.Background(), apmproxy.Healthy) + apmClient.UpdateStatus(context.Background(), "Invalid") assert.True(t, apmClient.Status == apmproxy.Healthy) assert.Equal(t, apmClient.ReconnectionCount, -1) } @@ -266,7 +268,7 @@ func TestEnterBackoffFromHealthy(t *testing.T) { apmproxy.WithLogger(zaptest.NewLogger(t).Sugar()), ) require.NoError(t, err) - apmClient.SetApmServerTransportState(context.Background(), apmproxy.Healthy) + apmClient.UpdateStatus(context.Background(), apmproxy.Healthy) // Close the APM server early so that POST requests fail and that backoff is enabled apmServer.Close() @@ -321,12 +323,12 @@ func TestEnterBackoffFromFailing(t *testing.T) { ) require.NoError(t, err) - apmClient.SetApmServerTransportState(context.Background(), apmproxy.Healthy) - apmClient.SetApmServerTransportState(context.Background(), apmproxy.Failing) + apmClient.UpdateStatus(context.Background(), apmproxy.Healthy) + apmClient.UpdateStatus(context.Background(), apmproxy.Failing) require.Eventually(t, func() bool { - return apmClient.Status != apmproxy.Failing - }, 5*time.Second, 50*time.Millisecond) - assert.Equal(t, apmClient.Status, apmproxy.Pending) + return !apmClient.IsUnhealthy() + }, 7*time.Second, 50*time.Millisecond) + assert.Equal(t, apmClient.Status, apmproxy.Started) assert.Error(t, apmClient.PostToApmServer(context.Background(), agentData)) assert.Equal(t, apmClient.Status, apmproxy.Failing) @@ -375,12 +377,12 @@ func TestAPMServerRecovery(t *testing.T) { ) require.NoError(t, err) - apmClient.SetApmServerTransportState(context.Background(), apmproxy.Healthy) - apmClient.SetApmServerTransportState(context.Background(), apmproxy.Failing) + apmClient.UpdateStatus(context.Background(), apmproxy.Healthy) + apmClient.UpdateStatus(context.Background(), apmproxy.Failing) require.Eventually(t, func() bool { - return apmClient.Status != apmproxy.Failing - }, 5*time.Second, 50*time.Millisecond) - assert.Equal(t, apmClient.Status, apmproxy.Pending) + return !apmClient.IsUnhealthy() + }, 7*time.Second, 50*time.Millisecond) + assert.Equal(t, apmClient.Status, apmproxy.Started) assert.NoError(t, apmClient.PostToApmServer(context.Background(), agentData)) assert.Equal(t, apmClient.Status, apmproxy.Healthy) @@ -420,16 +422,121 @@ func TestAPMServerAuthFails(t *testing.T) { apmproxy.WithLogger(zaptest.NewLogger(t).Sugar()), ) require.NoError(t, err) - apmClient.SetApmServerTransportState(context.Background(), apmproxy.Healthy) - apmClient.SetApmServerTransportState(context.Background(), apmproxy.Failing) + apmClient.UpdateStatus(context.Background(), apmproxy.Healthy) + apmClient.UpdateStatus(context.Background(), apmproxy.Failing) require.Eventually(t, func() bool { - return apmClient.Status != apmproxy.Failing - }, 5*time.Second, 50*time.Millisecond) - assert.Equal(t, apmClient.Status, apmproxy.Pending) + return !apmClient.IsUnhealthy() + }, 7*time.Second, 50*time.Millisecond) + assert.Equal(t, apmClient.Status, apmproxy.Started) assert.NoError(t, apmClient.PostToApmServer(context.Background(), agentData)) assert.NotEqual(t, apmClient.Status, apmproxy.Healthy) } +func TestAPMServerRatelimit(t *testing.T) { + // Compress the data + pr, pw := io.Pipe() + gw, _ := gzip.NewWriterLevel(pw, gzip.BestSpeed) + go func() { + if _, err := gw.Write([]byte("")); err != nil { + t.Fail() + return + } + if err := gw.Close(); err != nil { + t.Fail() + return + } + if err := pw.Close(); err != nil { + t.Fail() + return + } + }() + + // Create AgentData struct with compressed data + data, _ := io.ReadAll(pr) + agentData := apmproxy.AgentData{Data: data, ContentEncoding: "gzip"} + + // Create apm server and handler + var shouldSucceed atomic.Bool + apmServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Fail the first request + if shouldSucceed.CompareAndSwap(false, true) { + w.WriteHeader(http.StatusTooManyRequests) + return + } + + w.WriteHeader(http.StatusAccepted) + })) + defer apmServer.Close() + + apmClient, err := apmproxy.NewClient( + apmproxy.WithURL(apmServer.URL), + apmproxy.WithLogger(zaptest.NewLogger(t).Sugar()), + ) + require.NoError(t, err) + assert.Equal(t, apmClient.Status, apmproxy.Started) + + // First request fails but does not trigger the backoff + assert.NoError(t, apmClient.PostToApmServer(context.Background(), agentData)) + assert.Equal(t, apmClient.Status, apmproxy.RateLimited) + + // Followup request is succesful + assert.NoError(t, apmClient.PostToApmServer(context.Background(), agentData)) + assert.Equal(t, apmClient.Status, apmproxy.Healthy) + +} + +func TestAPMServerClientFail(t *testing.T) { + // Compress the data + pr, pw := io.Pipe() + gw, _ := gzip.NewWriterLevel(pw, gzip.BestSpeed) + go func() { + if _, err := gw.Write([]byte("")); err != nil { + t.Fail() + return + } + if err := gw.Close(); err != nil { + t.Fail() + return + } + if err := pw.Close(); err != nil { + t.Fail() + return + } + }() + + // Create AgentData struct with compressed data + data, _ := io.ReadAll(pr) + agentData := apmproxy.AgentData{Data: data, ContentEncoding: "gzip"} + + // Create apm server and handler + var shouldSucceed atomic.Bool + apmServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Fail the first request + if shouldSucceed.CompareAndSwap(false, true) { + w.WriteHeader(http.StatusRequestEntityTooLarge) + return + } + + w.WriteHeader(http.StatusAccepted) + })) + defer apmServer.Close() + + apmClient, err := apmproxy.NewClient( + apmproxy.WithURL(apmServer.URL), + apmproxy.WithLogger(zaptest.NewLogger(t).Sugar()), + ) + require.NoError(t, err) + assert.Equal(t, apmClient.Status, apmproxy.Started) + + // First request fails but does not trigger the backoff + assert.NoError(t, apmClient.PostToApmServer(context.Background(), agentData)) + assert.Equal(t, apmClient.Status, apmproxy.ClientFailing) + + // Followup request is succesful + assert.NoError(t, apmClient.PostToApmServer(context.Background(), agentData)) + assert.Equal(t, apmClient.Status, apmproxy.Healthy) +} + func TestContinuedAPMServerFailure(t *testing.T) { // Compress the data pr, pw := io.Pipe() @@ -469,12 +576,12 @@ func TestContinuedAPMServerFailure(t *testing.T) { apmproxy.WithLogger(zaptest.NewLogger(t).Sugar()), ) require.NoError(t, err) - apmClient.SetApmServerTransportState(context.Background(), apmproxy.Healthy) - apmClient.SetApmServerTransportState(context.Background(), apmproxy.Failing) + apmClient.UpdateStatus(context.Background(), apmproxy.Healthy) + apmClient.UpdateStatus(context.Background(), apmproxy.Failing) require.Eventually(t, func() bool { - return apmClient.Status != apmproxy.Failing - }, 5*time.Second, 50*time.Millisecond) - assert.Equal(t, apmClient.Status, apmproxy.Pending) + return !apmClient.IsUnhealthy() + }, 7*time.Second, 50*time.Millisecond) + assert.Equal(t, apmClient.Status, apmproxy.Started) assert.Error(t, apmClient.PostToApmServer(context.Background(), agentData)) assert.Equal(t, apmClient.Status, apmproxy.Failing) } diff --git a/apmproxy/client.go b/apmproxy/client.go index 5b2bc08c..7b161abe 100644 --- a/apmproxy/client.go +++ b/apmproxy/client.go @@ -50,18 +50,18 @@ const ( // Client is the client used to communicate with the apm server. type Client struct { - mu sync.Mutex - bufferPool sync.Pool - DataChannel chan AgentData - client *http.Client - Status Status - ReconnectionCount int - ServerAPIKey string - ServerSecretToken string - serverURL string - receiver *http.Server - sendStrategy SendStrategy - logger *zap.SugaredLogger + mu sync.RWMutex + bufferPool sync.Pool + DataChannel chan AgentData + client *http.Client + Status Status + ReconnectionCount int + ServerAPIKey string + ServerSecretToken string + serverURL string + receiver *http.Server + sendStrategy SendStrategy + logger *zap.SugaredLogger flushMutex sync.Mutex flushCh chan struct{} @@ -76,8 +76,8 @@ func NewClient(opts ...Option) (*Client, error) { client: &http.Client{ Transport: http.DefaultTransport.(*http.Transport).Clone(), }, - ReconnectionCount: -1, - Status: Healthy, + ReconnectionCount: -1, + Status: Started, receiver: &http.Server{ Addr: defaultReceiverAddr, ReadTimeout: defaultDataReceiverTimeout, diff --git a/apmproxy/receiver.go b/apmproxy/receiver.go index 663e0ee5..aca911f4 100644 --- a/apmproxy/receiver.go +++ b/apmproxy/receiver.go @@ -87,7 +87,7 @@ func (c *Client) handleInfoRequest() (func(w http.ResponseWriter, r *http.Reques reverseProxy.Transport = customTransport reverseProxy.ErrorHandler = func(w http.ResponseWriter, r *http.Request, err error) { - c.SetApmServerTransportState(r.Context(), Failing) + c.UpdateStatus(r.Context(), Failing) c.logger.Errorf("Error querying version from the APM server: %v", err) } diff --git a/apmproxy/status.go b/apmproxy/status.go index 84d9fb46..e32c74fd 100644 --- a/apmproxy/status.go +++ b/apmproxy/status.go @@ -22,7 +22,24 @@ package apmproxy type Status string const ( - Failing Status = "Failing" - Pending Status = "Pending" + // The apmproxy started but no information can be + // inferred on the status of the transport. + // Either because the apmproxy just started and no + // request was forwarded or because it recovered + // from a failure. + Started Status = "Started" + + // Last request completed successfully. Healthy Status = "Healthy" + + // Last request failed. + Failing Status = "Failing" + + // The APM Server returned status 429 and the extension + // was ratelimited. + RateLimited Status = "RateLimited" + + // A failure on the client was observed. This does not + // trigger any backoff mechanism. + ClientFailing Status = "ClientFailing" )