Skip to content
Merged
111 changes: 92 additions & 19 deletions apmproxy/apmserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"bytes"
"compress/gzip"
"context"
"encoding/json"
"errors"
"fmt"
"io"
Expand All @@ -30,11 +31,20 @@ import (
"time"
)

type jsonResult struct {
Errors []jsonError `json:"errors,omitempty"`
}

type jsonError struct {
Message string `json:"message"`
Document string `json:"document,omitempty"`
}

// ForwardApmData receives agent data as it comes in and posts it to the APM server.
// Stop checking for, and sending agent data when the function invocation
// has completed, signaled via a channel.
func (c *Client) ForwardApmData(ctx context.Context, metadataContainer *MetadataContainer) error {
if c.Status == Failing {
if c.IsUnhealthy() {
return nil
}
for {
Expand All @@ -59,7 +69,7 @@ func (c *Client) ForwardApmData(ctx context.Context, metadataContainer *Metadata

// FlushAPMData reads all the apm data in the apm data channel and sends it to the APM server.
func (c *Client) FlushAPMData(ctx context.Context) {
if c.Status == Failing {
if c.IsUnhealthy() {
c.logger.Debug("Flush skipped - Transport failing")
return
}
Expand All @@ -86,7 +96,7 @@ func (c *Client) FlushAPMData(ctx context.Context) {
func (c *Client) PostToApmServer(ctx context.Context, agentData AgentData) error {
// todo: can this be a streaming or streaming style call that keeps the
// connection open across invocations?
if c.Status == Failing {
if c.IsUnhealthy() {
return errors.New("transport status is unhealthy")
}

Expand Down Expand Up @@ -131,61 +141,124 @@ func (c *Client) PostToApmServer(ctx context.Context, agentData AgentData) error
c.logger.Debug("Sending data chunk to APM server")
resp, err := c.client.Do(req)
if err != nil {
c.SetApmServerTransportState(ctx, Failing)
c.UpdateStatus(ctx, Failing)
return fmt.Errorf("failed to post to APM server: %v", err)
}

//Read the response body
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
c.SetApmServerTransportState(ctx, Failing)
return fmt.Errorf("failed to read the response body after posting to the APM server")

// On success, the server will respond with a 202 Accepted status code and no body.
if resp.StatusCode == http.StatusAccepted {
c.UpdateStatus(ctx, Healthy)
return nil
}

// RateLimited
if resp.StatusCode == http.StatusTooManyRequests {
c.logger.Warnf("Transport has been rate limited: response status code: %d", resp.StatusCode)
c.UpdateStatus(ctx, RateLimited)
return nil
}

if resp.StatusCode == http.StatusUnauthorized {
jErr := jsonResult{}
if err := json.NewDecoder(resp.Body).Decode(&jErr); err != nil {
// non critical error.
// Log a warning and continue.
c.logger.Warnf("failed to decode response body: %v", err)
}

// Auth errors
if resp.StatusCode == http.StatusUnauthorized || resp.StatusCode == http.StatusForbidden {
c.logger.Warnf("Authentication with the APM server failed: response status code: %d", resp.StatusCode)
c.logger.Debugf("APM server response body: %v", string(body))
for _, err := range jErr.Errors {
c.logger.Warnf("failed to authenticate: document %s: message: %s", err.Document, err.Message)
}
c.UpdateStatus(ctx, Failing)
return nil
}

// ClientErrors
if resp.StatusCode >= 400 && resp.StatusCode < 500 {
c.logger.Warnf("client error: response status code: %d", resp.StatusCode)
for _, err := range jErr.Errors {
c.logger.Warnf("client error: document %s: message: %s", err.Document, err.Message)
}
c.UpdateStatus(ctx, ClientFailing)
return nil
}

c.SetApmServerTransportState(ctx, Healthy)
c.logger.Debug("Transport status set to healthy")
c.logger.Debugf("APM server response body: %v", string(body))
c.logger.Debugf("APM server response status code: %v", resp.StatusCode)
// critical errors
if resp.StatusCode == http.StatusInternalServerError || resp.StatusCode == http.StatusServiceUnavailable {
c.logger.Warnf("failed to post data to APM server: response status code: %d", resp.StatusCode)
for _, err := range jErr.Errors {
c.logger.Warnf("critical error: document %s: message: %s", err.Document, err.Message)
}
c.UpdateStatus(ctx, Failing)
return nil
}

c.logger.Warnf("unhandled status code: %d", resp.StatusCode)
return nil
}

// SetApmServerTransportState takes a state of the APM server transport and updates
// IsUnhealthy returns true if the apmproxy is not healthy.
func (c *Client) IsUnhealthy() bool {
c.mu.RLock()
defer c.mu.RUnlock()
return c.Status == Failing
}

// UpdateStatus takes a state of the APM server transport and updates
// the current state of the transport. For a change to a failing state, the grace period
// is calculated and a go routine is started that waits for that period to complete
// before changing the status to "pending". This would allow a subsequent send attempt
// to the APM server.
//
// This function is public for use in tests.
func (c *Client) SetApmServerTransportState(ctx context.Context, status Status) {
func (c *Client) UpdateStatus(ctx context.Context, status Status) {
// Reduce lock contention as UpdateStatus is called on every
// successful request
c.mu.RLock()
if status == c.Status {
c.mu.RUnlock()
return
}
c.mu.RUnlock()

switch status {
case Healthy:
c.mu.Lock()
if c.Status == status {
return
}
c.Status = status
c.logger.Debugf("APM server Transport status set to %s", c.Status)
c.ReconnectionCount = -1
c.mu.Unlock()
case RateLimited, ClientFailing:
// No need to start backoff, this is a temporary status. It usually
// means we went over the limit of events/s.
Comment on lines +238 to +239
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When would ClientFailing be temporary? From what I can see above, it's either due to auth failure (probably not temporary without user intervention?) or some validation error (probably implies a bug in the agent or server?)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I can see above, it's either due to auth failure (probably not temporary without user intervention?)

Ah, good point! I've made auth errors a critical failure

When would ClientFailing be temporary?

From what I could see from the middlewares in the APM server repository, this would happen on data decoding/validation errors, request body too large or invalid query. Those are errors tied to a specific request, I don't think we should trigger a backoff and associated delay.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I could see from the middlewares in the APM server repository, this would happen on data decoding/validation errors, request body too large or invalid query. Those are errors tied to a specific request, I don't think we should trigger a backoff and associated delay.

good point, I think this is fine as is now.

c.mu.Lock()
c.Status = status
c.logger.Debugf("APM server Transport status set to %s", c.Status)
c.mu.Unlock()
case Failing:
c.mu.Lock()
c.Status = status
c.logger.Debugf("APM server Transport status set to %s", c.Status)
c.ReconnectionCount++
gracePeriodTimer := time.NewTimer(c.ComputeGracePeriod())
c.logger.Debugf("Grace period entered, reconnection count : %d", c.ReconnectionCount)
c.mu.Unlock()

go func() {
select {
case <-gracePeriodTimer.C:
c.logger.Debug("Grace period over - timer timed out")
case <-ctx.Done():
c.logger.Debug("Grace period over - context done")
}
c.Status = Pending
c.mu.Lock()
c.Status = Started
c.logger.Debugf("APM server Transport status set to %s", c.Status)
c.mu.Unlock()
}()
Expand Down
Loading