From e7bfb87c060e05b9a40dfe82547550104039bca2 Mon Sep 17 00:00:00 2001 From: kruskal <99559985+kruskall@users.noreply.github.com> Date: Mon, 22 Aug 2022 19:47:40 +0200 Subject: [PATCH 1/8] fix: guard status operations behind a mutex to avoid race conditions Any change to the transport status is guarded by a mutex but some methods were using the field directly, causing race conditions. apmproxy tests are using a timeout of 5s to wait for the status to change during backoff. However there is a subtle bug that causes the test to fail because the initial delay is exactly 5s. To avoid this, the tests wait 7s now. Update status name and documentation. --- apmproxy/apmserver.go | 28 +++++++++++------ apmproxy/apmserver_test.go | 64 +++++++++++++++++++------------------- apmproxy/client.go | 28 ++++++++--------- apmproxy/receiver.go | 2 +- apmproxy/status.go | 13 ++++++-- 5 files changed, 77 insertions(+), 58 deletions(-) diff --git a/apmproxy/apmserver.go b/apmproxy/apmserver.go index 08a06ec4..274e2f07 100644 --- a/apmproxy/apmserver.go +++ b/apmproxy/apmserver.go @@ -34,7 +34,7 @@ import ( // Stop checking for, and sending agent data when the function invocation // has completed, signaled via a channel. func (c *Client) ForwardApmData(ctx context.Context, metadataContainer *MetadataContainer) error { - if c.Status == Failing { + if c.IsUnhealthy() { return nil } for { @@ -59,7 +59,7 @@ func (c *Client) ForwardApmData(ctx context.Context, metadataContainer *Metadata // FlushAPMData reads all the apm data in the apm data channel and sends it to the APM server. func (c *Client) FlushAPMData(ctx context.Context) { - if c.Status == Failing { + if c.IsUnhealthy() { c.logger.Debug("Flush skipped - Transport failing") return } @@ -86,7 +86,7 @@ func (c *Client) FlushAPMData(ctx context.Context) { func (c *Client) PostToApmServer(ctx context.Context, agentData AgentData) error { // todo: can this be a streaming or streaming style call that keeps the // connection open across invocations? - if c.Status == Failing { + if c.IsUnhealthy() { return errors.New("transport status is unhealthy") } @@ -131,7 +131,7 @@ func (c *Client) PostToApmServer(ctx context.Context, agentData AgentData) error c.logger.Debug("Sending data chunk to APM server") resp, err := c.client.Do(req) if err != nil { - c.SetApmServerTransportState(ctx, Failing) + c.UpdateStatus(ctx, Failing) return fmt.Errorf("failed to post to APM server: %v", err) } @@ -139,7 +139,7 @@ func (c *Client) PostToApmServer(ctx context.Context, agentData AgentData) error defer resp.Body.Close() body, err := io.ReadAll(resp.Body) if err != nil { - c.SetApmServerTransportState(ctx, Failing) + c.UpdateStatus(ctx, Failing) return fmt.Errorf("failed to read the response body after posting to the APM server") } @@ -149,21 +149,28 @@ func (c *Client) PostToApmServer(ctx context.Context, agentData AgentData) error return nil } - c.SetApmServerTransportState(ctx, Healthy) + c.UpdateStatus(ctx, Healthy) c.logger.Debug("Transport status set to healthy") c.logger.Debugf("APM server response body: %v", string(body)) c.logger.Debugf("APM server response status code: %v", resp.StatusCode) return nil } -// SetApmServerTransportState takes a state of the APM server transport and updates +// IsUnhealthy returns true if the apmproxy is not healthy. +func (c *Client) IsUnhealthy() bool { + c.mu.Lock() + defer c.mu.Unlock() + return c.Status == Failing +} + +// UpdateStatus takes a state of the APM server transport and updates // the current state of the transport. For a change to a failing state, the grace period // is calculated and a go routine is started that waits for that period to complete // before changing the status to "pending". This would allow a subsequent send attempt // to the APM server. // // This function is public for use in tests. -func (c *Client) SetApmServerTransportState(ctx context.Context, status Status) { +func (c *Client) UpdateStatus(ctx context.Context, status Status) { switch status { case Healthy: c.mu.Lock() @@ -178,6 +185,8 @@ func (c *Client) SetApmServerTransportState(ctx context.Context, status Status) c.ReconnectionCount++ gracePeriodTimer := time.NewTimer(c.ComputeGracePeriod()) c.logger.Debugf("Grace period entered, reconnection count : %d", c.ReconnectionCount) + c.mu.Unlock() + go func() { select { case <-gracePeriodTimer.C: @@ -185,7 +194,8 @@ func (c *Client) SetApmServerTransportState(ctx context.Context, status Status) case <-ctx.Done(): c.logger.Debug("Grace period over - context done") } - c.Status = Pending + c.mu.Lock() + c.Status = Started c.logger.Debugf("APM server Transport status set to %s", c.Status) c.mu.Unlock() }() diff --git a/apmproxy/apmserver_test.go b/apmproxy/apmserver_test.go index 3e5cc952..ff4b1bd5 100644 --- a/apmproxy/apmserver_test.go +++ b/apmproxy/apmserver_test.go @@ -170,7 +170,7 @@ func TestSetHealthyTransport(t *testing.T) { apmproxy.WithLogger(zaptest.NewLogger(t).Sugar()), ) require.NoError(t, err) - apmClient.SetApmServerTransportState(context.Background(), apmproxy.Healthy) + apmClient.UpdateStatus(context.Background(), apmproxy.Healthy) assert.True(t, apmClient.Status == apmproxy.Healthy) assert.Equal(t, apmClient.ReconnectionCount, -1) } @@ -184,7 +184,7 @@ func TestSetFailingTransport(t *testing.T) { ) require.NoError(t, err) apmClient.ReconnectionCount = 0 - apmClient.SetApmServerTransportState(context.Background(), apmproxy.Failing) + apmClient.UpdateStatus(context.Background(), apmproxy.Failing) assert.True(t, apmClient.Status == apmproxy.Failing) assert.Equal(t, apmClient.ReconnectionCount, 1) } @@ -195,12 +195,12 @@ func TestSetPendingTransport(t *testing.T) { apmproxy.WithLogger(zaptest.NewLogger(t).Sugar()), ) require.NoError(t, err) - apmClient.SetApmServerTransportState(context.Background(), apmproxy.Healthy) - apmClient.SetApmServerTransportState(context.Background(), apmproxy.Failing) + apmClient.UpdateStatus(context.Background(), apmproxy.Healthy) + apmClient.UpdateStatus(context.Background(), apmproxy.Failing) require.Eventually(t, func() bool { - return apmClient.Status != apmproxy.Failing - }, 5*time.Second, 50*time.Millisecond) - assert.True(t, apmClient.Status == apmproxy.Pending) + return !apmClient.IsUnhealthy() + }, 7*time.Second, 50*time.Millisecond) + assert.True(t, apmClient.Status == apmproxy.Started) assert.Equal(t, apmClient.ReconnectionCount, 0) } @@ -210,8 +210,8 @@ func TestSetPendingTransportExplicitly(t *testing.T) { apmproxy.WithLogger(zaptest.NewLogger(t).Sugar()), ) require.NoError(t, err) - apmClient.SetApmServerTransportState(context.Background(), apmproxy.Healthy) - apmClient.SetApmServerTransportState(context.Background(), apmproxy.Pending) + apmClient.UpdateStatus(context.Background(), apmproxy.Healthy) + apmClient.UpdateStatus(context.Background(), apmproxy.Started) assert.True(t, apmClient.Status == apmproxy.Healthy) assert.Equal(t, apmClient.ReconnectionCount, -1) } @@ -222,8 +222,8 @@ func TestSetInvalidTransport(t *testing.T) { apmproxy.WithLogger(zaptest.NewLogger(t).Sugar()), ) require.NoError(t, err) - apmClient.SetApmServerTransportState(context.Background(), apmproxy.Healthy) - apmClient.SetApmServerTransportState(context.Background(), "Invalid") + apmClient.UpdateStatus(context.Background(), apmproxy.Healthy) + apmClient.UpdateStatus(context.Background(), "Invalid") assert.True(t, apmClient.Status == apmproxy.Healthy) assert.Equal(t, apmClient.ReconnectionCount, -1) } @@ -266,7 +266,7 @@ func TestEnterBackoffFromHealthy(t *testing.T) { apmproxy.WithLogger(zaptest.NewLogger(t).Sugar()), ) require.NoError(t, err) - apmClient.SetApmServerTransportState(context.Background(), apmproxy.Healthy) + apmClient.UpdateStatus(context.Background(), apmproxy.Healthy) // Close the APM server early so that POST requests fail and that backoff is enabled apmServer.Close() @@ -321,12 +321,12 @@ func TestEnterBackoffFromFailing(t *testing.T) { ) require.NoError(t, err) - apmClient.SetApmServerTransportState(context.Background(), apmproxy.Healthy) - apmClient.SetApmServerTransportState(context.Background(), apmproxy.Failing) + apmClient.UpdateStatus(context.Background(), apmproxy.Healthy) + apmClient.UpdateStatus(context.Background(), apmproxy.Failing) require.Eventually(t, func() bool { - return apmClient.Status != apmproxy.Failing - }, 5*time.Second, 50*time.Millisecond) - assert.Equal(t, apmClient.Status, apmproxy.Pending) + return !apmClient.IsUnhealthy() + }, 7*time.Second, 50*time.Millisecond) + assert.Equal(t, apmClient.Status, apmproxy.Started) assert.Error(t, apmClient.PostToApmServer(context.Background(), agentData)) assert.Equal(t, apmClient.Status, apmproxy.Failing) @@ -375,12 +375,12 @@ func TestAPMServerRecovery(t *testing.T) { ) require.NoError(t, err) - apmClient.SetApmServerTransportState(context.Background(), apmproxy.Healthy) - apmClient.SetApmServerTransportState(context.Background(), apmproxy.Failing) + apmClient.UpdateStatus(context.Background(), apmproxy.Healthy) + apmClient.UpdateStatus(context.Background(), apmproxy.Failing) require.Eventually(t, func() bool { - return apmClient.Status != apmproxy.Failing - }, 5*time.Second, 50*time.Millisecond) - assert.Equal(t, apmClient.Status, apmproxy.Pending) + return !apmClient.IsUnhealthy() + }, 7*time.Second, 50*time.Millisecond) + assert.Equal(t, apmClient.Status, apmproxy.Started) assert.NoError(t, apmClient.PostToApmServer(context.Background(), agentData)) assert.Equal(t, apmClient.Status, apmproxy.Healthy) @@ -420,12 +420,12 @@ func TestAPMServerAuthFails(t *testing.T) { apmproxy.WithLogger(zaptest.NewLogger(t).Sugar()), ) require.NoError(t, err) - apmClient.SetApmServerTransportState(context.Background(), apmproxy.Healthy) - apmClient.SetApmServerTransportState(context.Background(), apmproxy.Failing) + apmClient.UpdateStatus(context.Background(), apmproxy.Healthy) + apmClient.UpdateStatus(context.Background(), apmproxy.Failing) require.Eventually(t, func() bool { - return apmClient.Status != apmproxy.Failing - }, 5*time.Second, 50*time.Millisecond) - assert.Equal(t, apmClient.Status, apmproxy.Pending) + return !apmClient.IsUnhealthy() + }, 7*time.Second, 50*time.Millisecond) + assert.Equal(t, apmClient.Status, apmproxy.Started) assert.NoError(t, apmClient.PostToApmServer(context.Background(), agentData)) assert.NotEqual(t, apmClient.Status, apmproxy.Healthy) } @@ -469,12 +469,12 @@ func TestContinuedAPMServerFailure(t *testing.T) { apmproxy.WithLogger(zaptest.NewLogger(t).Sugar()), ) require.NoError(t, err) - apmClient.SetApmServerTransportState(context.Background(), apmproxy.Healthy) - apmClient.SetApmServerTransportState(context.Background(), apmproxy.Failing) + apmClient.UpdateStatus(context.Background(), apmproxy.Healthy) + apmClient.UpdateStatus(context.Background(), apmproxy.Failing) require.Eventually(t, func() bool { - return apmClient.Status != apmproxy.Failing - }, 5*time.Second, 50*time.Millisecond) - assert.Equal(t, apmClient.Status, apmproxy.Pending) + return !apmClient.IsUnhealthy() + }, 7*time.Second, 50*time.Millisecond) + assert.Equal(t, apmClient.Status, apmproxy.Started) assert.Error(t, apmClient.PostToApmServer(context.Background(), agentData)) assert.Equal(t, apmClient.Status, apmproxy.Failing) } diff --git a/apmproxy/client.go b/apmproxy/client.go index 5b2bc08c..abfb5c06 100644 --- a/apmproxy/client.go +++ b/apmproxy/client.go @@ -50,18 +50,18 @@ const ( // Client is the client used to communicate with the apm server. type Client struct { - mu sync.Mutex - bufferPool sync.Pool - DataChannel chan AgentData - client *http.Client - Status Status - ReconnectionCount int - ServerAPIKey string - ServerSecretToken string - serverURL string - receiver *http.Server - sendStrategy SendStrategy - logger *zap.SugaredLogger + mu sync.Mutex + bufferPool sync.Pool + DataChannel chan AgentData + client *http.Client + Status Status + ReconnectionCount int + ServerAPIKey string + ServerSecretToken string + serverURL string + receiver *http.Server + sendStrategy SendStrategy + logger *zap.SugaredLogger flushMutex sync.Mutex flushCh chan struct{} @@ -76,8 +76,8 @@ func NewClient(opts ...Option) (*Client, error) { client: &http.Client{ Transport: http.DefaultTransport.(*http.Transport).Clone(), }, - ReconnectionCount: -1, - Status: Healthy, + ReconnectionCount: -1, + Status: Started, receiver: &http.Server{ Addr: defaultReceiverAddr, ReadTimeout: defaultDataReceiverTimeout, diff --git a/apmproxy/receiver.go b/apmproxy/receiver.go index 663e0ee5..aca911f4 100644 --- a/apmproxy/receiver.go +++ b/apmproxy/receiver.go @@ -87,7 +87,7 @@ func (c *Client) handleInfoRequest() (func(w http.ResponseWriter, r *http.Reques reverseProxy.Transport = customTransport reverseProxy.ErrorHandler = func(w http.ResponseWriter, r *http.Request, err error) { - c.SetApmServerTransportState(r.Context(), Failing) + c.UpdateStatus(r.Context(), Failing) c.logger.Errorf("Error querying version from the APM server: %v", err) } diff --git a/apmproxy/status.go b/apmproxy/status.go index 84d9fb46..45b1edbd 100644 --- a/apmproxy/status.go +++ b/apmproxy/status.go @@ -22,7 +22,16 @@ package apmproxy type Status string const ( - Failing Status = "Failing" - Pending Status = "Pending" + // The apmproxy started but no information can be + // inferred on the status of the transport. + // Either because the apmproxy just started and no + // request was forwarded or because it recovered + // from a failure. + Started Status = "Started" + + // Last request completed successfully. Healthy Status = "Healthy" + + // Last request failed. + Failing Status = "Failing" ) From ab0e3c58d68bff7bd6f0b2629143ef897b2d62a9 Mon Sep 17 00:00:00 2001 From: kruskal <99559985+kruskall@users.noreply.github.com> Date: Mon, 22 Aug 2022 20:08:25 +0200 Subject: [PATCH 2/8] feat: handle properly ratelimited requests --- apmproxy/apmserver.go | 13 +++++++++++++ apmproxy/status.go | 4 ++++ 2 files changed, 17 insertions(+) diff --git a/apmproxy/apmserver.go b/apmproxy/apmserver.go index 274e2f07..54aacd60 100644 --- a/apmproxy/apmserver.go +++ b/apmproxy/apmserver.go @@ -143,6 +143,12 @@ func (c *Client) PostToApmServer(ctx context.Context, agentData AgentData) error return fmt.Errorf("failed to read the response body after posting to the APM server") } + if resp.StatusCode == http.StatusTooManyRequests { + c.logger.Warnf("Transport has been rate limited: response status code: %d", resp.StatusCode) + c.UpdateStatus(ctx, RateLimited) + return nil + } + if resp.StatusCode == http.StatusUnauthorized { c.logger.Warnf("Authentication with the APM server failed: response status code: %d", resp.StatusCode) c.logger.Debugf("APM server response body: %v", string(body)) @@ -178,6 +184,13 @@ func (c *Client) UpdateStatus(ctx context.Context, status Status) { c.logger.Debugf("APM server Transport status set to %s", c.Status) c.ReconnectionCount = -1 c.mu.Unlock() + case RateLimited: + // No need to start backoff, this is a temporary status. It usually + // means we went over the limit of events/s. + c.mu.Lock() + c.Status = status + c.logger.Debugf("APM server Transport status set to %s", c.Status) + c.mu.Unlock() case Failing: c.mu.Lock() c.Status = status diff --git a/apmproxy/status.go b/apmproxy/status.go index 45b1edbd..b40aa425 100644 --- a/apmproxy/status.go +++ b/apmproxy/status.go @@ -34,4 +34,8 @@ const ( // Last request failed. Failing Status = "Failing" + + // The APM Server returned status 429 and the extension + // was ratelimited. + RateLimited Status = "RateLimited" ) From 6d5679d4b28ae256707ed7354e4613520d3d9e42 Mon Sep 17 00:00:00 2001 From: kruskal <99559985+kruskall@users.noreply.github.com> Date: Mon, 22 Aug 2022 21:31:23 +0200 Subject: [PATCH 3/8] feat: parse server response and react to error messages Start backoff mechanism on critical error and log a warning on client errors. --- apmproxy/apmserver.go | 65 ++++++++++++++++++++++++++++++++++--------- apmproxy/status.go | 4 +++ 2 files changed, 56 insertions(+), 13 deletions(-) diff --git a/apmproxy/apmserver.go b/apmproxy/apmserver.go index 54aacd60..7e54efbe 100644 --- a/apmproxy/apmserver.go +++ b/apmproxy/apmserver.go @@ -21,6 +21,7 @@ import ( "bytes" "compress/gzip" "context" + "encoding/json" "errors" "fmt" "io" @@ -30,6 +31,16 @@ import ( "time" ) +type jsonResult struct { + Accepted int `json:"accepted"` + Errors []jsonError `json:"errors,omitempty"` +} + +type jsonError struct { + Message string `json:"message"` + Document string `json:"document,omitempty"` +} + // ForwardApmData receives agent data as it comes in and posts it to the APM server. // Stop checking for, and sending agent data when the function invocation // has completed, signaled via a channel. @@ -134,31 +145,59 @@ func (c *Client) PostToApmServer(ctx context.Context, agentData AgentData) error c.UpdateStatus(ctx, Failing) return fmt.Errorf("failed to post to APM server: %v", err) } - - //Read the response body defer resp.Body.Close() - body, err := io.ReadAll(resp.Body) - if err != nil { - c.UpdateStatus(ctx, Failing) - return fmt.Errorf("failed to read the response body after posting to the APM server") + + // On success, the server will respond with a 202 Accepted status code and no body. + if resp.StatusCode == http.StatusAccepted { + c.UpdateStatus(ctx, Healthy) + return nil } + // RateLimited if resp.StatusCode == http.StatusTooManyRequests { c.logger.Warnf("Transport has been rate limited: response status code: %d", resp.StatusCode) c.UpdateStatus(ctx, RateLimited) return nil } - if resp.StatusCode == http.StatusUnauthorized { + jErr := jsonResult{} + if err := json.NewDecoder(resp.Body).Decode(&jErr); err != nil { + // non critical error. + // Log a warning and continue. + c.logger.Warnf("failed to decode response body: %v", err) + } + + // Auth errors + if resp.StatusCode == http.StatusUnauthorized || resp.StatusCode == http.StatusForbidden { c.logger.Warnf("Authentication with the APM server failed: response status code: %d", resp.StatusCode) - c.logger.Debugf("APM server response body: %v", string(body)) + for _, err := range jErr.Errors { + c.logger.Warnf("failed to authenticate: document %s: message: %s", err.Document, err.Message) + } + c.UpdateStatus(ctx, ClientFailing) + return nil + } + + // ClientErrors + if resp.StatusCode >= 400 && resp.StatusCode < 500 { + c.logger.Warnf("client error: response status code: %d", resp.StatusCode) + for _, err := range jErr.Errors { + c.logger.Warnf("client error: document %s: message: %s", err.Document, err.Message) + } + c.UpdateStatus(ctx, ClientFailing) + return nil + } + + // critical errors + if resp.StatusCode == http.StatusInternalServerError || resp.StatusCode == http.StatusServiceUnavailable { + c.logger.Warnf("failed to post data to APM server: response status code: %d", resp.StatusCode) + for _, err := range jErr.Errors { + c.logger.Warnf("critical error: document %s: message: %s", err.Document, err.Message) + } + c.UpdateStatus(ctx, Failing) return nil } - c.UpdateStatus(ctx, Healthy) - c.logger.Debug("Transport status set to healthy") - c.logger.Debugf("APM server response body: %v", string(body)) - c.logger.Debugf("APM server response status code: %v", resp.StatusCode) + c.logger.Warnf("unhandled status code: %d", resp.StatusCode) return nil } @@ -184,7 +223,7 @@ func (c *Client) UpdateStatus(ctx context.Context, status Status) { c.logger.Debugf("APM server Transport status set to %s", c.Status) c.ReconnectionCount = -1 c.mu.Unlock() - case RateLimited: + case RateLimited, ClientFailing: // No need to start backoff, this is a temporary status. It usually // means we went over the limit of events/s. c.mu.Lock() diff --git a/apmproxy/status.go b/apmproxy/status.go index b40aa425..e32c74fd 100644 --- a/apmproxy/status.go +++ b/apmproxy/status.go @@ -38,4 +38,8 @@ const ( // The APM Server returned status 429 and the extension // was ratelimited. RateLimited Status = "RateLimited" + + // A failure on the client was observed. This does not + // trigger any backoff mechanism. + ClientFailing Status = "ClientFailing" ) From 161bb54e00754ed26ee9e47a6f3e5c2335fd2ffe Mon Sep 17 00:00:00 2001 From: kruskal <99559985+kruskall@users.noreply.github.com> Date: Sat, 27 Aug 2022 15:09:58 +0200 Subject: [PATCH 4/8] refactor: remove unused struct field --- apmproxy/apmserver.go | 1 - 1 file changed, 1 deletion(-) diff --git a/apmproxy/apmserver.go b/apmproxy/apmserver.go index 7e54efbe..28401c79 100644 --- a/apmproxy/apmserver.go +++ b/apmproxy/apmserver.go @@ -32,7 +32,6 @@ import ( ) type jsonResult struct { - Accepted int `json:"accepted"` Errors []jsonError `json:"errors,omitempty"` } From fc16906a5c530e1da762640acb44c23e694c79be Mon Sep 17 00:00:00 2001 From: kruskal <99559985+kruskall@users.noreply.github.com> Date: Sat, 27 Aug 2022 21:32:07 +0200 Subject: [PATCH 5/8] fix: make auth failure a trigger the backoff mechanism auth errors are not temporary failures --- apmproxy/apmserver.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apmproxy/apmserver.go b/apmproxy/apmserver.go index 28401c79..89492431 100644 --- a/apmproxy/apmserver.go +++ b/apmproxy/apmserver.go @@ -172,7 +172,7 @@ func (c *Client) PostToApmServer(ctx context.Context, agentData AgentData) error for _, err := range jErr.Errors { c.logger.Warnf("failed to authenticate: document %s: message: %s", err.Document, err.Message) } - c.UpdateStatus(ctx, ClientFailing) + c.UpdateStatus(ctx, Failing) return nil } From 8923a9f173ef6b5a1df71d988822bbbfebb8cb49 Mon Sep 17 00:00:00 2001 From: kruskal <99559985+kruskall@users.noreply.github.com> Date: Tue, 30 Aug 2022 21:38:11 +0200 Subject: [PATCH 6/8] refactor: switch to RWMutex and reduce lock contention --- apmproxy/apmserver.go | 16 +++++++++++++--- apmproxy/client.go | 2 +- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/apmproxy/apmserver.go b/apmproxy/apmserver.go index 89492431..6b2541b9 100644 --- a/apmproxy/apmserver.go +++ b/apmproxy/apmserver.go @@ -32,7 +32,7 @@ import ( ) type jsonResult struct { - Errors []jsonError `json:"errors,omitempty"` + Errors []jsonError `json:"errors,omitempty"` } type jsonError struct { @@ -202,8 +202,8 @@ func (c *Client) PostToApmServer(ctx context.Context, agentData AgentData) error // IsUnhealthy returns true if the apmproxy is not healthy. func (c *Client) IsUnhealthy() bool { - c.mu.Lock() - defer c.mu.Unlock() + c.mu.RLock() + defer c.mu.RUnlock() return c.Status == Failing } @@ -215,6 +215,16 @@ func (c *Client) IsUnhealthy() bool { // // This function is public for use in tests. func (c *Client) UpdateStatus(ctx context.Context, status Status) { + // Reduce lock contention as UpdateStatus is called on every + // successful request + c.mu.RLock() + if status == c.Status { + c.logger.Debugf("APM server Transport status not changed: %s", status) + c.mu.RUnlock() + return + } + c.mu.RUnlock() + switch status { case Healthy: c.mu.Lock() diff --git a/apmproxy/client.go b/apmproxy/client.go index abfb5c06..7b161abe 100644 --- a/apmproxy/client.go +++ b/apmproxy/client.go @@ -50,7 +50,7 @@ const ( // Client is the client used to communicate with the apm server. type Client struct { - mu sync.Mutex + mu sync.RWMutex bufferPool sync.Pool DataChannel chan AgentData client *http.Client From b4a0c112822666aa9cffcb29d10f6da56113eb42 Mon Sep 17 00:00:00 2001 From: kruskal <99559985+kruskall@users.noreply.github.com> Date: Wed, 31 Aug 2022 19:05:51 +0200 Subject: [PATCH 7/8] refactor: skip update debug message if status has not changed --- apmproxy/apmserver.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/apmproxy/apmserver.go b/apmproxy/apmserver.go index 6b2541b9..35fb4396 100644 --- a/apmproxy/apmserver.go +++ b/apmproxy/apmserver.go @@ -219,7 +219,6 @@ func (c *Client) UpdateStatus(ctx context.Context, status Status) { // successful request c.mu.RLock() if status == c.Status { - c.logger.Debugf("APM server Transport status not changed: %s", status) c.mu.RUnlock() return } @@ -228,6 +227,9 @@ func (c *Client) UpdateStatus(ctx context.Context, status Status) { switch status { case Healthy: c.mu.Lock() + if c.Status == status { + return + } c.Status = status c.logger.Debugf("APM server Transport status set to %s", c.Status) c.ReconnectionCount = -1 From e2fbd6ef4b92549a1d8dc7eb33e9335eaed45bee Mon Sep 17 00:00:00 2001 From: kruskal <99559985+kruskall@users.noreply.github.com> Date: Thu, 1 Sep 2022 12:49:57 +0200 Subject: [PATCH 8/8] test: add ratelimit and clientfailing test --- apmproxy/apmserver_test.go | 109 ++++++++++++++++++++++++++++++++++++- 1 file changed, 108 insertions(+), 1 deletion(-) diff --git a/apmproxy/apmserver_test.go b/apmproxy/apmserver_test.go index ff4b1bd5..3f028ff0 100644 --- a/apmproxy/apmserver_test.go +++ b/apmproxy/apmserver_test.go @@ -20,13 +20,15 @@ package apmproxy_test import ( "compress/gzip" "context" - "github.com/elastic/apm-aws-lambda/apmproxy" "io" "net/http" "net/http/httptest" + "sync/atomic" "testing" "time" + "github.com/elastic/apm-aws-lambda/apmproxy" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/zap/zaptest" @@ -430,6 +432,111 @@ func TestAPMServerAuthFails(t *testing.T) { assert.NotEqual(t, apmClient.Status, apmproxy.Healthy) } +func TestAPMServerRatelimit(t *testing.T) { + // Compress the data + pr, pw := io.Pipe() + gw, _ := gzip.NewWriterLevel(pw, gzip.BestSpeed) + go func() { + if _, err := gw.Write([]byte("")); err != nil { + t.Fail() + return + } + if err := gw.Close(); err != nil { + t.Fail() + return + } + if err := pw.Close(); err != nil { + t.Fail() + return + } + }() + + // Create AgentData struct with compressed data + data, _ := io.ReadAll(pr) + agentData := apmproxy.AgentData{Data: data, ContentEncoding: "gzip"} + + // Create apm server and handler + var shouldSucceed atomic.Bool + apmServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Fail the first request + if shouldSucceed.CompareAndSwap(false, true) { + w.WriteHeader(http.StatusTooManyRequests) + return + } + + w.WriteHeader(http.StatusAccepted) + })) + defer apmServer.Close() + + apmClient, err := apmproxy.NewClient( + apmproxy.WithURL(apmServer.URL), + apmproxy.WithLogger(zaptest.NewLogger(t).Sugar()), + ) + require.NoError(t, err) + assert.Equal(t, apmClient.Status, apmproxy.Started) + + // First request fails but does not trigger the backoff + assert.NoError(t, apmClient.PostToApmServer(context.Background(), agentData)) + assert.Equal(t, apmClient.Status, apmproxy.RateLimited) + + // Followup request is succesful + assert.NoError(t, apmClient.PostToApmServer(context.Background(), agentData)) + assert.Equal(t, apmClient.Status, apmproxy.Healthy) + +} + +func TestAPMServerClientFail(t *testing.T) { + // Compress the data + pr, pw := io.Pipe() + gw, _ := gzip.NewWriterLevel(pw, gzip.BestSpeed) + go func() { + if _, err := gw.Write([]byte("")); err != nil { + t.Fail() + return + } + if err := gw.Close(); err != nil { + t.Fail() + return + } + if err := pw.Close(); err != nil { + t.Fail() + return + } + }() + + // Create AgentData struct with compressed data + data, _ := io.ReadAll(pr) + agentData := apmproxy.AgentData{Data: data, ContentEncoding: "gzip"} + + // Create apm server and handler + var shouldSucceed atomic.Bool + apmServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Fail the first request + if shouldSucceed.CompareAndSwap(false, true) { + w.WriteHeader(http.StatusRequestEntityTooLarge) + return + } + + w.WriteHeader(http.StatusAccepted) + })) + defer apmServer.Close() + + apmClient, err := apmproxy.NewClient( + apmproxy.WithURL(apmServer.URL), + apmproxy.WithLogger(zaptest.NewLogger(t).Sugar()), + ) + require.NoError(t, err) + assert.Equal(t, apmClient.Status, apmproxy.Started) + + // First request fails but does not trigger the backoff + assert.NoError(t, apmClient.PostToApmServer(context.Background(), agentData)) + assert.Equal(t, apmClient.Status, apmproxy.ClientFailing) + + // Followup request is succesful + assert.NoError(t, apmClient.PostToApmServer(context.Background(), agentData)) + assert.Equal(t, apmClient.Status, apmproxy.Healthy) +} + func TestContinuedAPMServerFailure(t *testing.T) { // Compress the data pr, pw := io.Pipe()